Skip to content
Merged

Rpc #60

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8e1e297
chore: 🤖 setup rpc/ folder
streamich Oct 7, 2025
e8c40da
feat: 🎸 add initial RPC implementation
streamich Oct 7, 2025
15a2d24
feat: 🎸 harden RPC implementation
streamich Oct 7, 2025
13cbabd
feat: 🎸 update RPC implementation
streamich Oct 7, 2025
daa290a
feat: 🎸 setup RM codec folder
streamich Oct 7, 2025
6a1b4b1
feat: 🎸 implement RM decoder
streamich Oct 7, 2025
675a656
feat: 🎸 update record encoder implementation
streamich Oct 7, 2025
853f336
feat: 🎸 update RPC codec
streamich Oct 7, 2025
f2e5078
test: 💍 add real trace test
streamich Oct 7, 2025
81d8140
perf: ⚡️ return `Reader` from RM decoder
streamich Oct 7, 2025
ead50de
feat: 🎸 flatten message classes and use Reader
streamich Oct 7, 2025
1610d30
feat: 🎸 upate rpc codec implementation
streamich Oct 7, 2025
025f38e
feat: 🎸 use `Reader` in `RpcOpaqueAuth` body
streamich Oct 7, 2025
f51b9c9
perf: ⚡️ improve writer, get direct reference to buffers
streamich Oct 7, 2025
9b2f8bd
docs: ✏️ add newer RFCs
streamich Oct 7, 2025
bffe22d
perf: ⚡️ update decoder code
streamich Oct 7, 2025
2f4371b
feat: 🎸 add support for newer RFCs
streamich Oct 7, 2025
15a9243
style: 💄 fix linter issues
streamich Oct 7, 2025
fd40a92
feat: 🎸 review XDR compliance with specifications
streamich Oct 7, 2025
64ad66b
feat: 🎸 cleanup XDR implementation
streamich Oct 7, 2025
cf63ac0
chore: 🤖 general cleanup
streamich Oct 7, 2025
27d38d3
test: 💍 add more RPC traces
streamich Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
},
"dependencies": {
"@jsonjoy.com/base64": "^1.1.2",
"@jsonjoy.com/buffers": "^1.0.0",
"@jsonjoy.com/buffers": "^1.2.0",
"@jsonjoy.com/codegen": "^1.0.0",
"@jsonjoy.com/json-pointer": "^1.0.1",
"@jsonjoy.com/json-pointer": "^1.0.2",
"@jsonjoy.com/util": "^1.9.0",
"hyperdyperid": "^1.2.0",
"thingies": "^2.5.0"
Expand Down
37 changes: 37 additions & 0 deletions src/rm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Record Marking (RM) Protocol

Implements rm/tcp/ip protocol Record Marking (RM) Standard as specified in RFC 1057.
The RM standard splits a byte stream into discrete messages by prefixing each
message with a 4-byte header.

Excerpt from RFC 1057, Section 10:

```
10. RECORD MARKING STANDARD

When RPC messages are passed on top of a byte stream transport
protocol (like TCP), it is necessary to delimit one message from
another in order to detect and possibly recover from protocol errors.
This is called record marking (RM). Sun uses this RM/TCP/IP
transport for passing RPC messages on TCP streams. One RPC message
fits into one RM record.

A record is composed of one or more record fragments. A record



Sun Microsystems [Page 18]

RFC 1057 Remote Procedure Call, Version 2 June 1988


fragment is a four-byte header followed by 0 to (2**31) - 1 bytes of
fragment data. The bytes encode an unsigned binary number; as with
XDR integers, the byte order is from highest to lowest. The number
encodes two values -- a boolean which indicates whether the fragment
is the last fragment of the record (bit value 1 implies the fragment
is the last fragment) and a 31-bit unsigned binary value which is the
length in bytes of the fragment's data. The boolean value is the
highest-order bit of the header; the length is the 31 low-order bits.
(Note that this record specification is NOT in XDR standard form!)
```
49 changes: 49 additions & 0 deletions src/rm/RmRecordDecoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {StreamingReader} from '@jsonjoy.com/buffers/lib/StreamingReader';
import {Reader} from '@jsonjoy.com/buffers/lib/Reader';
import {concatList} from '@jsonjoy.com/buffers/lib/concat';

export class RmRecordDecoder {
public readonly reader = new StreamingReader();
protected fragments: Uint8Array[] = [];

public push(uint8: Uint8Array): void {
this.reader.push(uint8);
}

/**
* @todo PERF: Make it return Slice instead of Uint8Array
*/
public readRecord(): Reader | undefined {
const reader = this.reader;
let size = reader.size();
if (size < 4) return undefined;
const x = reader.x;
READ_FRAGMENT: {
try {
const header = reader.u32();
size -= 4;
const fin = !!(header & 0b10000000_00000000_00000000_00000000);
const len = header & 0b01111111_11111111_11111111_11111111;
if (size < len) break READ_FRAGMENT;
reader.consume();
const fragments = this.fragments;
if (fin) {
if (!fragments.length) return reader.cut(len);
fragments.push(reader.buf(len));
const record = concatList(fragments);
this.fragments = [];
return record.length ? new Reader(record) : undefined;
} else {
fragments.push(reader.buf(len));
return undefined;
}
} catch (err) {
reader.x = x;
if (err instanceof RangeError) return undefined;
else throw err;
}
}
reader.x = x;
return undefined;
}
}
43 changes: 43 additions & 0 deletions src/rm/RmRecordEncoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {Writer} from '@jsonjoy.com/util/lib/buffers/Writer';
import type {IWriter, IWriterGrowable} from '@jsonjoy.com/util/lib/buffers';

export class RmRecordEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriterGrowable> {
constructor(public readonly writer: W = new Writer() as any) {}

public encodeHdr(fin: 0 | 1, length: number): Uint8Array {
this.writeHdr(fin, length);
return this.writer.flush();
}

public encodeRecord(record: Uint8Array): Uint8Array {
this.writeRecord(record);
return this.writer.flush();
}

public writeHdr(fin: 0 | 1, length: number): void {
this.writer.u32((fin ? 0b10000000_00000000_00000000_00000000 : 0) + length);
}

public writeRecord(record: Uint8Array): void {
const length = record.length;
if (length <= 2147483647) {
const writer = this.writer;
writer.u32(0b10000000_00000000_00000000_00000000 + length);
writer.buf(record, length);
return;
}
let offset = 0;
while (offset < length) {
const fragmentLength = Math.min(length - offset, 0x7fffffff);
const fin = fragmentLength + offset >= length ? 1 : 0;
this.writeFragment(record, offset, fragmentLength, fin);
offset += fragmentLength;
}
}

public writeFragment(record: Uint8Array, offset: number, length: number, fin: 0 | 1): void {
this.writeHdr(fin, length);
const fragment = record.subarray(offset, offset + length);
this.writer.buf(fragment, length);
}
}
131 changes: 131 additions & 0 deletions src/rm/__tests__/RmRecordDecoder.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {RmRecordDecoder} from '../RmRecordDecoder';

describe('RmRecordDecoder', () => {
describe('.readRecord()', () => {
test('returns undefined when no data available', () => {
const decoder = new RmRecordDecoder();
const result = decoder.readRecord();
expect(result).toBeUndefined();
});

test('decodes empty record', () => {
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0, 0, 0, 0]));
expect(decoder.readRecord()).toBeUndefined();
});

test('decodes empty record', () => {
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0, 0, 0, 0, 0]));
expect(decoder.readRecord()).toBeUndefined();
});

test('decodes empty record - 2', () => {
const decoder = new RmRecordDecoder();
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0]));
expect(decoder.readRecord()).toBeUndefined();
});

test('decodes two records streamed one byte at a time', () => {
const decoder = new RmRecordDecoder();
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0b10000000]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([1]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([42]));
expect(decoder.readRecord()?.buf()).toEqual(new Uint8Array([42]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0b10000000, 0, 0]));
expect(decoder.readRecord()).toBeUndefined();
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([1, 43]));
expect(decoder.readRecord()?.buf()).toEqual(new Uint8Array([43]));
expect(decoder.readRecord()).toBeUndefined();
});

test('decodes single-byte record', () => {
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0b10000000, 0, 0, 1, 42]));
const result = decoder.readRecord()?.buf();
expect(result).toBeInstanceOf(Uint8Array);
expect(result!.length).toBe(1);
expect(result![0]).toBe(42);
});

test('decodes multi-byte record', () => {
const decoder = new RmRecordDecoder();
const data = new Uint8Array([1, 2, 3, 4, 5]);
decoder.push(new Uint8Array([0b10000000, 0, 0, data.length, ...data]));
const result = decoder.readRecord()?.buf();
expect(result).toBeInstanceOf(Uint8Array);
expect(result!.length).toBe(data.length);
expect(result).toEqual(data);
});

test('decodes ASCII string data', () => {
const text = 'hello world';
const data = new TextEncoder().encode(text);
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0b10000000, 0, 0, data.length, ...data]));
const result = decoder.readRecord()?.buf();
expect(result).toBeInstanceOf(Uint8Array);
expect(result!.length).toBe(data.length);
expect(result).toEqual(data);
});

test('decodes large record', () => {
const size = 10000;
const data = new Uint8Array(size);
for (let i = 0; i < size; i++) data[i] = i % 256;
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0b10000000, (size >> 16) & 0xff, (size >> 8) & 0xff, size & 0xff, ...data]));
const result = decoder.readRecord()?.buf();
expect(result).toBeInstanceOf(Uint8Array);
expect(result!.length).toBe(data.length);
expect(result).toEqual(data);
});
});

describe('fragmented records', () => {
test('decodes record with two fragments', () => {
const part1 = new Uint8Array([1, 2, 3]);
const part2 = new Uint8Array([4, 5, 6]);
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0b00000000, 0, 0, part1.length, ...part1]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0b10000000, 0, 0, part2.length, ...part2]));
const result = decoder.readRecord()?.buf();
expect(result).toBeInstanceOf(Uint8Array);
expect(result!.length).toBe(part1.length + part2.length);
expect(result).toEqual(new Uint8Array([...part1, ...part2]));
});

test('decodes record with three fragments', () => {
const part1 = new Uint8Array([1, 2]);
const part2 = new Uint8Array([3, 4]);
const part3 = new Uint8Array([5, 6]);
const decoder = new RmRecordDecoder();
decoder.push(new Uint8Array([0b00000000, 0, 0, part1.length, ...part1]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0b00000000, 0, 0, part2.length, ...part2]));
expect(decoder.readRecord()).toBeUndefined();
decoder.push(new Uint8Array([0b10000000, 0, 0, part3.length, ...part3]));
const result = decoder.readRecord()?.buf();
expect(result).toBeInstanceOf(Uint8Array);
expect(result!.length).toBe(part1.length + part2.length + part3.length);
expect(result).toEqual(new Uint8Array([...part1, ...part2, ...part3]));
});
});
});
Loading