Kotlin implementation of RSocket
Clone or download
mostroverkhov Cleanup gradle files (#49)
* update dependencies

* dependency management

* extract maven publishing and artifactory/bintray configurations to separate files

* ci script in proper directory
Latest commit 5237e0a Oct 19, 2018



Join the chat at https://gitter.im/RSocket/reactivesocket-java

R(eactive)Socket: Reactive Streams over network boundary (tcp, websockets, etc) using Kotlin/Rxjava

RSocket is binary application protocol which models all communication as multiplexed streams of messages over a single network connection, and never synchronously blocks while waiting for a response.

It enables following symmetric interaction models:

  • fire-and-forget (no response)
  • request/response (stream of 1)
  • request/stream (finite/infinite stream of many)
  • channel (bi-directional streams)
  • per-stream and per-RSocket metadata

Build and Binaries

Snapshots are available on Bintray

 repositories {
     maven { url 'https://oss.jfrog.org/libs-snapshot' }
    dependencies {
        compile 'io.rsocket.kotlin:rsocket-core:0.9-SNAPSHOT'


Netty based Websockets and TCP transport (Client and Server)
OkHttp based Websockets transport (Client only)

 dependencies {
                compile 'io.rsocket.kotlin:rsocket-transport-netty:0.9-SNAPSHOT'
                compile 'io.rsocket.kotlin:rsocket-transport-okhttp:0.9-SNAPSHOT'


Each side of connection (Client and Server) has Requester RSocket for making requests to peer, and Responder RSocket to handle requests from peer.

Messages for all interactions are represented as Payload of binary (NIO ByteBuffer) data and metadata.

UTF-8 text payloads can be constructed as follows

val request = DefaultPayload.text("data", "metadata")

Stream Metadata is optional

val request = DefaultPayload.text("data")


  • Fire and Forget
    RSocket.fireAndForget(payload: Payload): Completable

  • Request-Response
    RSocket.requestResponse(payload: Payload): Single<Payload>

  • Request-Stream
    RSocket.requestStream(payload: Payload): Flowable<Payload>

  • Request-Channel
    RSocket.requestChannel(payload: Publisher<Payload>): Flowable<Payload>

  • Metadata-Push
    fun metadataPush(payload: Payload): Completable


Client is initiator of Connections

val rSocket: Single<RSocket> = RSocketFactory               // Requester RSocket
            .acceptor { { requesterRSocket -> handler(requesterRSocket) } }  // Optional handler RSocket
            .transport(OkhttpWebsocketClientTransport       // WebSockets transport
                  .create(protocol, host, port))

            private fun handler(requester:RSocket): RSocket {
                    return object : AbstractRSocket() {
                        override fun requestStream(payload: Payload): Flowable<Payload> {
                            return Flowable.just(DefaultPayload.text("client handler response"))


Server is acceptor of Connections from Clients

val closeable: Single<Closeable> = RSocketFactory
                .acceptor { { setup, rSocket -> handler(setup, rSocket) } } // server handler RSocket
                .transport(WebsocketServerTransport.create(port))  // Netty websocket transport

private fun handler(setup: Setup, rSocket: RSocket): Single<RSocket> {
        return Single.just(object : AbstractRSocket() {
            override fun requestStream(payload: Payload): Flowable<Payload> {
                return Flowable.just(DefaultPayload.text("server handler response"))


Copyright 2015-2018 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at


Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.