-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade to cats-effect 2.x, preliminary scala 2.13 support #63
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
@@ -44,7 +47,7 @@ trait Converter { | |||
* Converts an Akka Stream [[Graph]] of [[SinkShape]] to an FS2 [[Sink]]. The [[Graph]] is materialized when | |||
* the [[Sink]]'s [[F]] in run. The materialized value can be obtained with the `onMaterialization` callback. | |||
*/ | |||
def akkaSinkToFs2Sink[F[_], A, M](sink: Graph[SinkShape[A], M])(onMaterialization: M => Unit)(implicit materializer: Materializer, F: Concurrent[F]): Sink[F, A] = { s => | |||
def akkaSinkToFs2Pipe[F[_]: ContextShift, A, M](sink: Graph[SinkShape[A], M])(onMaterialization: M => Unit)(implicit materializer: Materializer, F: Concurrent[F]): Pipe[F, A, Unit] = { s => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fs2.Sink
is deprecated
* The stream returned by this will emit the Future's value one time at the end, | ||
* then terminate. | ||
*/ | ||
def akkaSinkToFs2PipeMat[F[_]: ConcurrentEffect: ContextShift, A, M](akkaSink: Graph[SinkShape[A], Future[M]])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provides an out of the box workaround for #54 and makes the unit test a bit more honest
.toMat(AkkaSink.head)(Keep.right) | ||
.mapMaterializedValue(ffd => IO.fromFuture(IO.pure(ffd)).flatMap(fd => IO.fromFuture(IO.pure(fd))).unsafeToFuture()) | ||
.mapMaterializedValue(ffd => ffd.flatten) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoids a whole bunch of thread pool hops
val sink2 = AkkaSink.foreach[(SourceQueueWithComplete[B], SinkQueueWithCancel[A])](ps => F.toIO(transformerStream(ps._2, ps._1, pipe).compile.drain).unsafeToFuture()) | ||
val sink2 = AkkaSink.foreach[(SourceQueueWithComplete[B], SinkQueueWithCancel[A])] { ps => | ||
// Fire and forget Future so it runs in the background | ||
F.toIO(transformerStream(ps._2, ps._1, pipe).compile.drain).unsafeToFuture() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to hook in cancellation support...
streamz-converter/src/main/scala/streamz/converter/package.scala
Outdated
Show resolved
Hide resolved
@@ -71,15 +71,15 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with | |||
|
|||
expectError(stream.compile.drain.unsafeRunSync()) | |||
} | |||
"propagate cancellation from stream to source (on stream completion)" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wording change - cancellation
is a specific thing in cats-effect, and this test doesn't cover it
- Upgrade to cats-effect 2.x, fs2 1.1.x - Add api to avoid dropping Future[Mat] values - Deprecate apis hardcoding `IO` - Fix race condition in akka-fs2 `publisherStream` between interruption and publish when using the fromFuture+ContextShift.shift behavior - Test changes to make wording match assertions better - Add scala-collection-compat for camel converter crossbuilding - Bump version for milestone release - Include sbt-tpolecat plugin to prevent lintable bugs
IO
publisherStream
between interruptionand publish when using the fromFuture+ContextShift.shift behavior
Ref #61
Close #62