Skip to content

Commit

Permalink
fix channel's payload.data deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
palamccc committed Mar 31, 2024
1 parent 0f3b333 commit f13ddeb
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
14 changes: 7 additions & 7 deletions packages/rsocket-core/src/RSocketMachine.js
Original file line number Diff line number Diff line change
Expand Up @@ -784,13 +784,14 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
}

_handleRequestChannel(streamId: number, frame: RequestChannelFrame): void {
const payload = this._deserializePayload(frame);
const existingSubscription = this._subscriptions.get(streamId);
if (existingSubscription) {
//Likely a duplicate REQUEST_CHANNEL frame, ignore per spec
return;
}

const payloads = new Flowable(subscriber => {
const payloads = new Flowable<Payload<D, M>>(subscriber => {
let firstRequest = true;

subscriber.onSubscribe({
Expand Down Expand Up @@ -823,15 +824,13 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
//critically, if n is 0 now, that's okay because we eagerly decremented it
if (firstRequest && n >= 0) {
firstRequest = false;
//release the initial frame we received in frame form due to map operator
subscriber.onNext(frame);
//release the initial payload we received in frame form due to map operator
subscriber.onNext(payload);
}
},
});
}, MAX_REQUEST_N);
const framesToPayloads = new FlowableProcessor(payloads, frame =>
this._deserializePayload(frame),
);
const framesToPayloads = new FlowableProcessor(payloads);
this._receivers.set(streamId, framesToPayloads);

this._requestHandler.requestChannel(framesToPayloads).subscribe({
Expand Down Expand Up @@ -892,13 +891,14 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
flags |= FLAGS.COMPLETE;
this._subscriptions.delete(streamId);
}
let metadata;
if (payload.metadata !== undefined &&
payload.metadata !== null) {
// eslint-disable-next-line no-bitwise
flags |= FLAGS.METADATA;
metadata = this._serializers.metadata.serialize(payload.metadata);
}
const data = this._serializers.data.serialize(payload.data);
const metadata = this._serializers.metadata.serialize(payload.metadata);
this._connection.sendOne({
data,
flags,
Expand Down
63 changes: 63 additions & 0 deletions packages/rsocket-core/src/__tests__/RSocketServer-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,69 @@ describe('RSocketServer', () => {
});

describe('RequestHandler', () => {
it('deserializes and serializes the channel\'s payload.data', () => {
console.error = jest.fn();
const transport = genMockTransportServer();

const makePayload = (data) => ({
type: FRAME_TYPES.PAYLOAD,
streamId: 1,
flags: 32, // next bit - invoke onNext
data,
});

const server = new RSocketServer({
getRequestHandler: () => {
return {
requestChannel: (incomingFlowable) => {
// If the payload.data has 'name', reply with custom response
return incomingFlowable.map(payload => {
if (payload?.data?.name) {
return { data: { say: 'hello ' + payload.data.name } };
} else {
return payload;
}
});
},
};
},
serializers:JsonSerializers,
transport,
});

server.start();
transport.mock.connect();
connection.receive.mock.publisher.onNext({
type: FRAME_TYPES.SETUP,
data: undefined,
dataMimeType: '<dataMimeType>',
flags: 0,
keepAlive: 42,
lifetime: 2017,
metadata: undefined,
metadataMimeType: '<metadataMimeType>',
resumeToken: null,
streamId: 0,
majorVersion: 1,
minorVersion: 0,
});
connection.receive.mock.publisher.onNext({
type: FRAME_TYPES.REQUEST_CHANNEL,
flags: 0,
requestN: 100,
streamId: 1,
// data along with first REQUEST_CHANNEL frame
data: JSON.stringify({ name: 'Alex' }),
});
// data as separate PAYLOAD frame
connection.receive.mock.publisher.onNext(makePayload(JSON.stringify({ name: 'Bob' })));
jest.runOnlyPendingTimers();

expect(connection.sendOne.mock.calls.length).toBe(3);
expect(connection.sendOne.mock.calls[1][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Alex' })));
expect(connection.sendOne.mock.calls[2][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Bob' })));
});

it('sends error if getRequestHandler throws', () => {
const transport = genMockTransportServer();
const server = new RSocketServer({
Expand Down

0 comments on commit f13ddeb

Please sign in to comment.