Skip to content

linux-china/rsocket-deno

Repository files navigation

RSocket Deno module

🦕Deno library to create/consume async RSocket services.

What is RSocket?

RSocket is a binary protocol for use on byte stream transports such as TCP and WebSocket. It enables the following symmetric interaction models via async message passing over a single connection:

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • channel (bi-directional streams)

Yes, RSocket is designed for async/reactive communication between services.

How to use?

Now RSocket Deno is under active development, please execute following command to make sure all code are updated to last version.

deno run --reload https://deno.land/x/rsocket/mod.ts

Start RSocket Server with Deno

$ deno run --allow-net https://deno.land/x/rsocket/rsocket_server.ts

and RSocket server side code as following:

import {
    RSocketServer,
    forRequestResponse,
    Payload
} from "https://deno.land/x/rsocket/mod.ts"

await RSocketServer.create(forRequestResponse(
    async (payload: Payload): Promise<Payload> => {
        console.log(`Received: ${payload.getDataUtf8()} `)
        return Payload.fromText("Hello, this is Deno Server!", "");
    })
).bind("tcp://0.0.0.0:42252");

console.log("RSocket Server started on 0.0.0.0:42252")

Start RSocket requester to test async RPC call

$ deno run --allow-net https://deno.land/x/rsocket/rsocket_client.ts

and RSocket client side code as following:

import {
    RSocketConnector,
    Payload
} from "https://deno.land/x/rsocket/mod.ts"

const rsocket = await RSocketConnector.create().connect("tcp://127.0.0.1:42252");

const result = await rsocket.requestResponse(Payload.fromText("Hello, I'm requester!", ""));
console.log(result.getDataUtf8());

Service router and stub

Service route for RSocket server side

import {
    RSocketServer,
    RSocket,
    ConnectionSetupPayload,
    RSocketRouteHandler
} from "https://deno.land/x/rsocket/mod.ts"

//RSocket Service
class UserService {

    async findNickById(id: number): Promise<string> {
        return "DenoServer";
    }
}

const server = await RSocketServer.create({
    accept(setup: ConnectionSetupPayload, sendingSocket: RSocket) {
        return RSocketRouteHandler.fromHandler("com.example.UserService", new UserService());
    }
}).bind("tcp://127.0.0.1:42252");

Service stub for requester side

import {RSocketConnector, buildServiceStub} from "https://deno.land/x/rsocket/mod.ts"

const rsocket = await RSocketConnector.create().connect("tcp://127.0.0.1:42252");

interface UserService {
    findNickById(id: number): Promise<string>;
}

const userService = buildServiceStub<UserService>(rsocket, "com.example.UserService")

let nick = await userService.findNickById(1);
console.log(nick)

WebSocket support

Just use "ws://127.0.0.0:42252" format.

Interoperate with Spring Boot RSocket

Reactive streams interoperation with RxJS

Reactive Streams supplies interoperation with RxJS, such as Publisher to Observable or Observable to Publisher.

// @deno-types="https://deno.land/x/types/rxjs/v6.5.5/rxjs.d.ts"
import {Observable, range, of} from "https://cdn.pika.dev/rxjs@6.5.5";
// @deno-types="https://deno.land/x/types/rxjs/v6.5.5/operators.d.ts"
import operators from 'https://dev.jspm.io/rxjs@6.5.5/operators';
const {map, filter} = operators;

import { publisherToObservable, observableToPublisher } from "https://deno.land/x/rsocket/reactivestreams/rxjs.ts"

or you can use https://deno.land/x/rxjs

import {Observable} from "https://deno.land/x/rxjs/mod.ts";
import {map, last} from "https://deno.land/x/rxjs/src/operators/index.ts";

TODO

RSocket

  • Operations
    • REQUEST_FNF
    • REQUEST_RESPONSE
    • REQUEST_STREAM
    • REQUEST_CHANNEL
    • METADATA_PUSH
  • More Operations
    • Error
    • Cancel
    • Keepalive
  • QoS
    • RequestN
    • Lease
  • Transport
    • TCP
    • Websocket
  • High Level APIs
    • Client
    • Server
  • Misc
    • RxJS

References