Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
version = "3.5.3"
runner.dialect = scala213
maxColumn = 120
fileOverride {
"glob:**/fs2/src/**" {
runner.dialect = scala213source3
}
"glob:**/fs2/test/src/**" {
runner.dialect = scala213source3
}
"glob:**/core/test/src-jvm-native/**" {
runner.dialect = scala213source3
}
"glob:**/core/src/**" {
runner.dialect = scala213source3
}
}
40 changes: 29 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
[![CI](https://github.com/neandertech/jsonrpclib/actions/workflows/ci.yml/badge.svg)](https://github.com/neandertech/jsonrpclib/actions/workflows/ci.yml)

[![jsonrpclib-fs2 Scala version support](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2/latest-by-scala-version.svg?platform=jvm)](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2)

[![jsonrpclib-fs2 Scala version support](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2/latest-by-scala-version.svg?platform=sjs1)](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2)


# jsonrpclib

This is a cross-platform, cross-scala-version [jsonrpc](https://www.jsonrpc.org/) library that provides construct for bidirectional communication using
the jsonrpc protocol.
This is a cross-platform, cross-scala-version library that provides construct for bidirectional communication using the [jsonrpc](https://www.jsonrpc.org/) protocol. It is built on top of [fs2](https://fs2.io/#/) and [jsoniter-scala](https://github.com/plokhotnyuk/jsoniter-scala)

This library does not enforce any transport, and can work on top of stdin/stdout or other channels.

## Installation

The dependencies below are following [cross-platform semantics](http://youforgotapercentagesignoracolon.com/).
Adapt according to your needs

### SBT

This library does not enforce any transport, and works as long as you can provide input/output byte streams.
```scala
libraryDependencies += "tech.neander" %%% "jsonrpclib-fs2" % version
```

### Mill

## Dev Notes
```scala
override def ivyDeps = super.ivyDeps() ++ Agg(ivy"tech.neander::jsonrpclib-fs2::$version")
```

### Scala-native
### Scala-cli

See
* https://github.com/scala-native/scala-native/blob/63d07093f6d0a6e9de28cd8f9fb6bc1d6596c6ec/test-interface/src/main/scala/scala/scalanative/testinterface/NativeRPC.scala
```scala
//> using lib "tech.neander::jsonrpclib-fs2:<VERSION>"
```

## Usage

### Scala-js
**/!\ Please be aware that this library is in its early days and offers strictly no guarantee with regards to backward compatibility**

See
* https://github.com/scala-js/scala-js-js-envs/blob/main/nodejs-env/src/main/scala/org/scalajs/jsenv/nodejs/ComSupport.scala#L245
* https://github.com/scala-js/scala-js/blob/0708917912938714d52be1426364f78a3d1fd269/test-bridge/src/main/scala/org/scalajs/testing/bridge/JSRPC.scala
See the examples folder
25 changes: 24 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import mill.define.Target
import mill.util.Jvm
import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION`
import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.1`
import $ivy.`de.tototec::de.tobiasroeser.mill.vcs.version::0.1.4`
Expand All @@ -20,6 +22,7 @@ object versions {
val scalaNativeVersion = "0.4.4"
val munitVersion = "0.7.29"
val munitNativeVersion = "1.0.0-M6"
val fs2 = "3.2.11"

val scala213 = "2.13"
val scala212 = "2.12"
Expand Down Expand Up @@ -59,7 +62,7 @@ object fs2 extends RPCCrossPlatformModule { cross =>

override def crossPlatformModuleDeps = Seq(core)
def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
ivy"co.fs2::fs2-core::3.2.8"
ivy"co.fs2::fs2-core::${versions.fs2}"
)

object jvm extends mill.Cross[JvmModule](scala213, scala3)
Expand All @@ -74,6 +77,26 @@ object fs2 extends RPCCrossPlatformModule { cross =>

}

object examples extends mill.define.Module {

object server extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213))
def scalaVersion = versions.scala213Version
}

object client extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213))
def scalaVersion = versions.scala213Version
def forkEnv: Target[Map[String, String]] = T {
val assembledServer = server.assembly()
super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString())
}
}

}

// #############################################################################
// COMMON SETUP
// #############################################################################
Expand Down
15 changes: 15 additions & 0 deletions devnotes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Dev Notes

In case somebody wants to implement future-based channels, here are some source of inspiration :

### Scala-native

See
* https://github.com/scala-native/scala-native/blob/63d07093f6d0a6e9de28cd8f9fb6bc1d6596c6ec/test-interface/src/main/scala/scala/scalanative/testinterface/NativeRPC.scala


### Scala-js

See
* https://github.com/scala-js/scala-js-js-envs/blob/main/nodejs-env/src/main/scala/org/scalajs/jsenv/nodejs/ComSupport.scala#L245
* https://github.com/scala-js/scala-js/blob/0708917912938714d52be1426364f78a3d1fd269/test-bridge/src/main/scala/org/scalajs/testing/bridge/JSRPC.scala
68 changes: 68 additions & 0 deletions examples/client/src/examples/client/ChildProcess.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package examples.client

import fs2.Stream
import cats.effect._
import cats.syntax.all._
import scala.jdk.CollectionConverters._
import java.io.OutputStream

trait ChildProcess[F[_]] {
def stdin: fs2.Pipe[F, Byte, Unit]
def stdout: Stream[F, Byte]
def stderr: Stream[F, Byte]
}

object ChildProcess {

def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] =
Stream.bracket(start[F](command))(_._2).map(_._1)

val readBufferSize = 512
private def start[F[_]: Async](command: Seq[String]) = Async[F].interruptible {
val p =
new java.lang.ProcessBuilder(command.asJava)
.start() // .directory(new java.io.File(wd)).start()
val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit()))

val terminate: F[Unit] = Sync[F].interruptible(p.destroy())

import cats._
val onGlobal = new (F ~> F) {
def apply[A](fa: F[A]): F[A] = Async[F].evalOn(fa, scala.concurrent.ExecutionContext.global)
}

val cp = new ChildProcess[F] {
def stdin: fs2.Pipe[F, Byte, Unit] =
writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream()))

def stdout: fs2.Stream[F, Byte] = fs2.io
.readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize)
.translate(onGlobal)

def stderr: fs2.Stream[F, Byte] = fs2.io
.readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize)
.translate(onGlobal)
// Avoids broken pipe - we cut off when the program ends.
// Users can decide what to do with the error logs using the exitCode value
.interruptWhen(done.void.attempt)
}
(cp, terminate)
}

/** Adds a flush after each chunk
*/
def writeOutputStreamFlushingChunks[F[_]](
fos: F[OutputStream],
closeAfterUse: Boolean = true
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
s => {
def useOs(os: OutputStream): Stream[F, Nothing] =
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))

val os =
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
else Stream.eval(fos)
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
}

}
58 changes: 58 additions & 0 deletions examples/client/src/examples/client/ClientMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package examples.server

import jsonrpclib.CallId
import jsonrpclib.fs2._
import cats.effect._
import fs2.io._
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import jsonrpclib.Endpoint
import cats.syntax.all._
import fs2.Stream
import jsonrpclib.StubTemplate
import cats.effect.std.Dispatcher
import cats.effect.implicits._
import java.io.OutputStream
import java.io.InputStream
import examples.client.ChildProcess

object ClientMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

// Creating a datatype that'll serve as a request (and response) of an endpoint
case class IntWrapper(value: Int)
object IntWrapper {
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
}

type IOStream[A] = fs2.Stream[IO, A]
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str))

def run: IO[Unit] = {
import scala.concurrent.duration._
// Using errorln as stdout is used by the RPC channel
val run = for {
_ <- log("Starting client")
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
// Starting the server
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
// Creating a channel that will be used to communicate to the server
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
_ <- Stream(())
.concurrently(fs2Channel.output.through(lsp.encodePayloads).through(rp.stdin))
.concurrently(rp.stdout.through(lsp.decodePayloads).through(fs2Channel.input))
.concurrently(rp.stderr.through(fs2.io.stderr[IO]))
// Creating a `IntWrapper => IO[IntWrapper]` stub that can call the server
increment = fs2Channel.simpleStub[IntWrapper, IntWrapper]("increment")
result1 <- Stream.eval(increment(IntWrapper(0)))
_ <- log(s"Client received $result1")
result2 <- Stream.eval(increment(result1))
_ <- log(s"Client received $result2")
_ <- log("Terminating client")
} yield ()
run.compile.drain
}

}
45 changes: 45 additions & 0 deletions examples/server/src/examples/server/ServerMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package examples.server

import jsonrpclib.CallId
import jsonrpclib.fs2._
import cats.effect._
import fs2.io._
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import jsonrpclib.Endpoint
import cats.syntax.all._

object ServerMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

// Creating a datatype that'll serve as a request (and response) of an endpoint
case class IntWrapper(value: Int)
object IntWrapper {
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
}

// Implementing an incrementation endpoint
val increment = Endpoint[IO]("increment").simple { in: IntWrapper =>
IO.consoleForIO.errorln(s"Server received $in") >>
IO.pure(in.copy(value = in.value + 1))
}

def run: IO[Unit] = {
// Using errorln as stdout is used by the RPC channel
IO.consoleForIO.errorln("Starting server") >>
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
.flatMap(channel =>
fs2.Stream
.eval(IO.never) // running the server forever
.concurrently(stdin[IO](512).through(lsp.decodePayloads).through(channel.input))
.concurrently(channel.output.through(lsp.encodePayloads).through(stdout[IO]))
)
.compile
.drain
.guarantee(IO.consoleForIO.errorln("Terminating server"))
}

}
Loading