Skip to content

Commit

Permalink
chore: add additional examples (#258)
Browse files Browse the repository at this point in the history
* chore: add nvmrc file
* chore: add tcp request stream example
* chore: add client-only composite metadata routing example
* chore: fix comment typos in various examples
  • Loading branch information
viglucci committed Apr 8, 2023
1 parent 8f862ad commit 48d122a
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 1 deletion.
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v18.12.1
2 changes: 2 additions & 0 deletions packages/rsocket-examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"start-client-request-channel": "ts-node -r tsconfig-paths/register src/ClientRequestChannelExample.ts",
"start-client-server-request-channel-resume": "ts-node -r tsconfig-paths/register src/ClientServerRequestChannelResumeExample.ts",
"start-client-request-fnf-with-lease": "ts-node -r tsconfig-paths/register src/ClienRequestFnfnWithLeaseExampleTcp.ts",
"start-client-composite-metadata-route": "ts-node -r tsconfig-paths/register src/ClientCompositeMetadataRouteExample.ts",
"start-client-server-request-stream-tcp": "ts-node -r tsconfig-paths/register src/ClientServerRequestStreamExampleTCP.ts",
"start-client-server-request-stream-websocket": "ts-node -r tsconfig-paths/register src/ClientServerRequestStreamExampleWebSocket.ts",
"start-client-server-request-response-tcp": "ts-node -r tsconfig-paths/register src/ClientServerRequestResponseExampleTcp.ts",
"start-client-server-request-response-websocket": "ts-node -r tsconfig-paths/register src/ClientServerRequestResponseExampleWebSocket.ts",
Expand Down
138 changes: 138 additions & 0 deletions packages/rsocket-examples/src/ClientCompositeMetadataRouteExample.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { RSocket, RSocketConnector } from "@rsocket/core";
import { TcpClientTransport } from "@rsocket/transport-tcp-client";
import {
encodeCompositeMetadata,
encodeRoute,
WellKnownMimeType,
} from "@rsocket/composite-metadata";
import Logger from "./shared/logger";
import { exit } from "process";
import MESSAGE_RSOCKET_ROUTING = WellKnownMimeType.MESSAGE_RSOCKET_ROUTING;
import MESSAGE_RSOCKET_COMPOSITE_METADATA = WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA;

/**
* This example assumes you have a RSocket server running on 127.0.0.1:9000 that will respond
* to requests at the following routes:
* - login (requestResponse)
* - message (requestResponse)
* - messages.incoming (requestStream)
*/

function makeConnector() {
const connectorConnectionOptions = {
host: "127.0.0.1",
port: 9000,
};
console.log(
`Creating connector to ${JSON.stringify(connectorConnectionOptions)}`
);
return new RSocketConnector({
setup: {
metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
},
transport: new TcpClientTransport({
connectionOptions: connectorConnectionOptions,
}),
});
}

function createRoute(route?: string) {
let compositeMetaData = undefined;
if (route) {
const encodedRoute = encodeRoute(route);

const map = new Map<WellKnownMimeType, Buffer>();
map.set(MESSAGE_RSOCKET_ROUTING, encodedRoute);
compositeMetaData = encodeCompositeMetadata(map);
}
return compositeMetaData;
}

async function requestResponse(rsocket: RSocket, route: string, data: string) {
console.log(`Executing requestResponse: ${JSON.stringify({ route, data })}`);
return new Promise((resolve, reject) => {
return rsocket.requestResponse(
{
data: Buffer.from(data),
metadata: createRoute(route),
},
{
onError: (e) => {
reject(e);
},
onNext: (payload, isComplete) => {
Logger.info(
`requestResponse onNext payload[data: ${payload.data}; metadata: ${payload.metadata}]|${isComplete}`
);
resolve(payload);
},
onComplete: () => {
Logger.info(`requestResponse onComplete`);
resolve(null);
},
onExtension: () => {},
}
);
});
}

async function main() {
const connector = makeConnector();
const rsocket = await connector.connect();

await requestResponse(rsocket, "login", "user1");

await requestResponse(
rsocket,
"message",
JSON.stringify({ user: "user1", content: "a message" })
);

await new Promise((resolve, reject) => {
let payloadsReceived = 0;
const maxPayloads = 10;
const requester = rsocket.requestStream(
{
data: Buffer.from("Hello World"),
metadata: createRoute("messages.incoming"),
},
3,
{
onError: (e) => reject(e),
onNext: (payload, isComplete) => {
Logger.info(
`[client] payload[data: ${payload.data}; metadata: ${payload.metadata}]|isComplete: ${isComplete}`
);

payloadsReceived++;

// request 5 more payloads every 5th payload, until a max total payloads received
if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) {
requester.request(2);
} else if (payloadsReceived >= maxPayloads) {
requester.cancel();
setTimeout(() => {
resolve(null);
});
}

if (isComplete) {
resolve(null);
}
},
onComplete: () => {
Logger.info(`requestStream onComplete`);
resolve(null);
},
onExtension: () => {},
}
);
});
}

main()
.then(() => exit())
.catch((error: Error) => {
console.error(error);
exit(1);
});
167 changes: 167 additions & 0 deletions packages/rsocket-examples/src/ClientServerRequestStreamExampleTCP.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
OnExtensionSubscriber,
OnNextSubscriber,
OnTerminalSubscriber,
Payload,
RSocketConnector,
RSocketServer,
} from "@rsocket/core";
import { WebsocketClientTransport } from "@rsocket/transport-websocket-client";
import { WebsocketServerTransport } from "@rsocket/transport-websocket-server";
import { exit } from "process";
import WebSocket from "ws";
import Logger from "./shared/logger";
import { TcpServerTransport } from "@rsocket/transport-tcp-server";
import { TcpClientTransport } from "@rsocket/transport-tcp-client";

function makeServer() {
return new RSocketServer({
transport: new TcpServerTransport({
listenOptions: {
port: 9090,
host: "127.0.0.1",
},
}),
acceptor: {
accept: async () => {
return {
requestStream: (
payload: Payload,
initialRequestN,
responderStream: OnTerminalSubscriber &
OnNextSubscriber &
OnExtensionSubscriber
) => {
Logger.info(
`[server] requestStream payload[data: ${payload.data}; metadata: ${payload.metadata}]|initialRequestN: ${initialRequestN}`
);

let interval = null;
let requestedResponses = initialRequestN;
let sentResponses = 0;

// simulate async data with interval
interval = setInterval(() => {
sentResponses++;
let isComplete = sentResponses >= requestedResponses;
responderStream.onNext(
{
data: Buffer.from(new Date()),
metadata: undefined,
},
isComplete
);
if (isComplete) {
clearInterval(interval);
}
}, 750);

return {
cancel() {
Logger.info("[server] stream cancelled by client");
clearInterval(interval);
},
request(n) {
requestedResponses += n;
Logger.info(
`[server] request n: ${n}, requestedResponses: ${requestedResponses}, sentResponses: ${sentResponses}`
);
},
onExtension: () => {},
};
},
};
},
},
});
}

function makeConnector() {
const connectorConnectionOptions = {
host: "127.0.0.1",
port: 9090,
};
console.log(
`Creating connector to ${JSON.stringify(connectorConnectionOptions)}`
);
return new RSocketConnector({
transport: new TcpClientTransport({
connectionOptions: connectorConnectionOptions,
}),
});
}

let serverCloseable;

async function main() {
const server = makeServer();
const connector = makeConnector();

serverCloseable = await server.bind();
const rsocket = await connector.connect();

await new Promise((resolve, reject) => {
let payloadsReceived = 0;
const maxPayloads = 10;
const requester = rsocket.requestStream(
{
data: Buffer.from("Hello World"),
},
3,
{
onError: (e) => reject(e),
onNext: (payload, isComplete) => {
Logger.info(
`[client] payload[data: ${payload.data}; metadata: ${payload.metadata}]|isComplete: ${isComplete}`
);

payloadsReceived++;

// request 5 more payloads every 5th payload, until a max total payloads received
if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) {
requester.request(2);
} else if (payloadsReceived >= maxPayloads) {
requester.cancel();
setTimeout(() => {
resolve(null);
});
}

if (isComplete) {
resolve(null);
}
},
onComplete: () => {
resolve(null);
},
onExtension: () => {},
}
);
});
}

main()
.then(() => exit())
.catch((error: Error) => {
console.error(error);
exit(1);
})
.finally(() => {
serverCloseable?.close();
});
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async function main() {

payloadsReceived++;

// request 5 more payloads event 5th payload, until a max total payloads received
// request 5 more payloads every 5th payload, until a max total payloads received
if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) {
requester.request(2);
} else if (payloadsReceived >= maxPayloads) {
Expand Down

0 comments on commit 48d122a

Please sign in to comment.