Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fs2 client constructor for single node connection #136

Merged
merged 7 commits into from Sep 11, 2019

Conversation

@barambani
Copy link
Member

commented Sep 7, 2019

I open this so you can have a look and see if you agree. I still have a problem with the blocking thread pool and the IOApp. When I run

import java.util.concurrent.Executors.newWorkStealingPool

import _root_.fs2.Stream
import cats.effect.{ExitCode, IO, IOApp, Resource, SyncIO}
import cats.syntax.flatMap._
import cats.syntax.functor._
import laserdisc.auto._
import laserdisc.fs2.{MkResource, RedisClient}
import laserdisc.{OK, PosInt, strings}
import log.effect.fs2.Fs2LogWriter

import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.fromExecutorService

object Main extends IOApp.WithContext {

  override final protected val executionContextResource: Resource[SyncIO, ExecutionContext] =
    MkResource.of(SyncIO(fromExecutorService(newWorkStealingPool())))

  private def redisTest: Stream[IO, Unit] =
    Fs2LogWriter.consoleLogStream[IO] >>= { implicit log =>
      RedisClient.toNode[IO]("localhost", 6379) evalMap { client =>
        client.send2(strings.set("a", 23), strings.get[PosInt]("a")) >>= {
          case (Right(OK), Right(Some(getResponse))) if getResponse.value == 23 =>
            log info "yay!"
          case other =>
            log.error(s"something went terribly wrong $other") >>
              IO.raiseError(new RuntimeException("boom"))
        }
      }
    }

  override final def run(args: List[String]): IO[ExitCode] =
    redisTest.compile.drain.as(ExitCode.Success)
}

I get this

[info] running Main
[info] - [ForkJoinPool-9-worker-2] Starting connection
[info] - [ForkJoinPool-9-worker-1] Server available for publishing: localhost:6379
[debug] - [ForkJoinPool-9-worker-1] sending Arr(Bulk(SET),Bulk(a),Bulk(23))
[debug] - [ForkJoinPool-9-worker-1] receiving Str(OK)
[debug] - [ForkJoinPool-9-worker-1] sending Arr(Bulk(GET),Bulk(a))
[debug] - [ForkJoinPool-9-worker-0] receiving Bulk(23)
[info] - [ForkJoinPool-9-worker-3] yay!
[info] - [ForkJoinPool-9-worker-3] Shutting down connection
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:2328)
	at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2419)
	at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2648)
	at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:23)
	at cats.effect.internals.IOShift$$anon$1.apply(IOShift.scala:28)
	at cats.effect.internals.IOShift$$anon$1.apply(IOShift.scala:26)
	at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:341)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:119)
	at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
	at cats.effect.internals.IOStart$.$anonfun$apply$1(IOStart.scala:36)
	at cats.effect.internals.IOStart$.$anonfun$apply$1$adapted(IOStart.scala:28)
	at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:341)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:119)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
	at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
	at cats.effect.internals.ForwardCancelable$$anon$1.run(ForwardCancelable.scala:48)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:137)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:124)
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1(Deferred.scala:205)
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1$adapted(Deferred.scala:205)
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$notifyReadersLoop$1(Deferred.scala:241)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
[success] Total time: 3 s, completed 07-Sep-2019 14:58:01

@barambani barambani requested a review from sirocchj Sep 7, 2019

@codecov-io

This comment has been minimized.

Copy link

commented Sep 7, 2019

Codecov Report

Merging #136 into master will increase coverage by 0.03%.
The diff coverage is 50%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #136      +/-   ##
==========================================
+ Coverage   61.19%   61.23%   +0.03%     
==========================================
  Files          36       36              
  Lines        1201     1202       +1     
  Branches       12        4       -8     
==========================================
+ Hits          735      736       +1     
  Misses        466      466
Impacted Files Coverage Δ
cli/src/main/scala/laserdisc/cli/CLI.scala 0% <0%> (ø) ⬆️
fs2/src/main/scala/laserdisc/fs2/RedisClient.scala 67.56% <100%> (-0.93%) ⬇️
core/src/main/scala/laserdisc/protocol/KeyP.scala 92.68% <0%> (+1.21%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bccc5a7...04c4d50. Read the comment docs.

barambani added 2 commits Sep 7, 2019
README.md Outdated Show resolved Hide resolved
README.md Show resolved Hide resolved
@sirocchj
Copy link
Member

left a comment

LGTM, just minor comments on the README.md that can be addressed after. And a couple more ideas to further improve the ergonomics for single node Redis instances.

IIUC master's IOApp is broken too so this can go in as is and we'll address the issue separately

@barambani barambani changed the title [WIP] Added fs2 client constructor for single node connection Added fs2 client constructor for single node connection Sep 11, 2019

@sirocchj sirocchj merged commit 5b149ac into laserdisc-io:master Sep 11, 2019

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@barambani barambani deleted the barambani:ergonomics-and-readme branch Sep 13, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.