Skip to content
Permalink
Browse files

Added fs2 client constructor for single node connection (#136)

* Added fs2 client constructor for single node connection

* Changed client constructor in CLI

* Kttens 2.0.0

* fs2 2.0.0

* Bump circe and scodec-stream

* Scala 2.12.10

* Fixed readme
  • Loading branch information...
barambani authored and sirocchj committed Sep 11, 2019
1 parent bccc5a7 commit 5b149acfa8dee20c41e6c881b6dc75633f2dd4b1
@@ -3,7 +3,7 @@ sudo: false

language: scala
scala:
- 2.12.9
- 2.12.10
- 2.13.0

jdk:
@@ -86,42 +86,39 @@ of the JSON data structure, i.e. no spacing is used
### Example usage
With a running Redis instance on `localhost:6379` try running the following:
```scala
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors
import java.util.concurrent.Executors.newWorkStealingPool
import cats.effect._
import cats.syntax.apply._
import cats.syntax.functor._
import fs2.Stream
import laserdisc._
import laserdisc.auto._
import laserdisc.fs2._
import laserdisc.auto._
import cats.effect.{ExitCode, IO, IOApp, Resource, SyncIO}
import cats.syntax.flatMap._
import cats.syntax.functor._
import log.effect.fs2.Fs2LogWriter
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.fromExecutorService
import java.util.concurrent.Executors.newWorkStealingPool
object Main extends IOApp.WithContext {
override final protected val executionContextResource: Resource[SyncIO, ExecutionContext] =
MkResource.of(SyncIO(fromExecutorService(newWorkStealingPool())))
override final def run(args: List[String]): IO[ExitCode] =
Fs2LogWriter.consoleLogStream[IO].flatMap { implicit logger =>
RedisClient[IO](Set(RedisAddress("localhost", 6379))).evalMap { client =>
client.send2(strings.set("a", 23), strings.get[PosInt]("a")).flatMap {
private def redisTest: Stream[IO, Unit] =
Fs2LogWriter.consoleLogStream[IO] >>= { implicit log =>
RedisClient.toNode[IO]("localhost", 6379) evalMap { client =>
client.send2(strings.set("a", 23), strings.get[PosInt]("a")) >>= {
case (Right(OK), Right(Some(getResponse))) if getResponse.value == 23 =>
logger.info("yay!")
log info "yay!"
case other =>
logger.error(s"something went terribly wrong $other") *>
log.error(s"something went terribly wrong $other") >>
IO.raiseError(new RuntimeException("boom"))
}
}
}
.compile
.drain
.as(ExitCode.Success)
override final def run(args: List[String]): IO[ExitCode] =
redisTest.compile.drain.as(ExitCode.Success)
}
```

@@ -1,14 +1,14 @@
// shadow sbt-scalajs' crossProject and CrossType from Scala.js 0.6.x
import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType}

val `scala 2.12` = "2.12.9"
val `scala 2.12` = "2.12.10"
val `scala 2.13` = "2.13.0"

val V = new {
val circe = "0.12.0-RC4"
val fs2 = "1.1.0-M2"
val circe = "0.12.1"
val fs2 = "2.0.0"
val `kind-projector` = "0.10.3"
val kittens = "2.0.0-M1"
val kittens = "2.0.0"
val `log-effect-fs2` = "0.9.0"
val parallelCollections = "0.2.0"
val refined = "0.9.9"
@@ -17,7 +17,7 @@ val V = new {
val scalatest = "3.0.8"
val `scodec-bits` = "1.1.12"
val `scodec-core` = "1.11.4"
val `scodec-stream` = "2.0.0-SNAPSHOT"
val `scodec-stream` = "2.0.0"
val shapeless = "2.3.3"
}

@@ -83,7 +83,7 @@ object CLI extends IOApp.WithContext { self =>
def mkStream(host: Host, port: Port): Stream[IO, ExitCode] =
Stream.resource(Blocker.fromExecutorService[IO](IO(fromExecutorService(newSingleThreadExecutor())))) >>= { replBlockingEC =>
Fs2LogWriter.consoleLogStream[IO] >>= { implicit log =>
RedisClient[IO](Set(RedisAddress(host, port))).evalMap { redisClient =>
RedisClient.toNode[IO](host, port) evalMap { redisClient =>
val promptStream: Stream[IO, String] = Stream.emit(s"$host:$port> ").repeat

val emptyPrompt: IO[Unit] =
@@ -77,6 +77,19 @@ object RedisClient {
): Stream[F, RedisClient[F]] =
blockingOn(Blocker[F])(addresses, writeTimeout, readMaxBytes)

/**
* Creates a redis client for a single redis node
* that will handle the blocking network connection's
* operations on a cached thread pool.
*/
@inline final def toNode[F[_]: ConcurrentEffect: ContextShift: Timer: LogWriter](
host: Host,
port: Port,
writeTimeout: Option[FiniteDuration] = Some(10.seconds),
readMaxBytes: Int = 256 * 1024
): Stream[F, RedisClient[F]] =
blockingOn(Blocker[F])(Set(RedisAddress(host, port)), writeTimeout, readMaxBytes)

/**
* Creates a redis client allowing to specify what blocking
* thread pool will be used to handle the blocking network
@@ -32,7 +32,7 @@ final class RedisClientSpec extends WordSpecLike with Matchers with BeforeAndAft

def clientUnderTest[F[_]: ContextShift: Timer: ConcurrentEffect]: Stream[F, RedisClient[F]] =
noOpLogStreamF >>= { implicit l =>
RedisClient[F](Set(RedisAddress("127.0.0.1", 6379)))
RedisClient.toNode("127.0.0.1", 6379)
}

"an fs2 redis client" should {

0 comments on commit 5b149ac

Please sign in to comment.
You can’t perform that action at this time.