Skip to content

Commit

Permalink
CE3 upgrade (#649)
Browse files Browse the repository at this point in the history
* Update fs2-core, fs2-io to 3.0.1

* migrate on CE3

* clean up

* CE 3.1.0

* SBT Up

* rebase on latest master plus versions management for CE

Co-authored-by: dsemenov <dmytro.semenov@disneystreaming.com>
  • Loading branch information
scala-steward and dsemenov committed Apr 26, 2021
1 parent bd95ce9 commit beedb45
Show file tree
Hide file tree
Showing 63 changed files with 515 additions and 1,928 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def stream(deferedListener: Deferred[IO, MessageListener]) =
.toList

//create the program for testing the stream
import io.circe.syntax._
import io.circe.fs2.aws.examples.syntax._
import io.circe.generic.auto._
val quote = Quote(...)
val program : IO[List[(Quote, MessageListener)]] = for {
Expand Down
41 changes: 22 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ThisBuild / scalaVersion := scala213

lazy val root = (project in file("."))
.aggregate(
`fs2-aws`,
`fs2-aws-kinesis`,
`fs2-aws-s3`,
`fs2-aws-sqs`,
`fs2-aws-testkit`,
Expand Down Expand Up @@ -67,14 +67,15 @@ lazy val `fs2-aws-core` = (project in file("fs2-aws-core"))
.settings(scalacOptions ++= commonOptions(scalaVersion.value))

lazy val `fs2-aws-ciris` = (project in file("fs2-aws-ciris"))
.dependsOn(`fs2-aws`)
.settings(
name := "fs2-aws-ciris",
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" % "mockito-core" % V.MockitoCore % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"is.cir" %% "ciris" % "1.2.1"
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" % "mockito-core" % V.MockitoCore % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"is.cir" %% "ciris" % "2.0.0-RC2",
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.3.4",
"org.typelevel" %% "cats-effect" % V.CE % Test
),
coverageMinimum := 40,
coverageFailOnMinimum := true
Expand Down Expand Up @@ -110,7 +111,7 @@ lazy val `fs2-aws-examples` = (project in file("fs2-aws-examples"))
`pure-kinesis-tagless`,
`pure-dynamodb-tagless`,
`pure-cloudwatch-tagless`,
`fs2-aws`,
`fs2-aws-kinesis`,
`fs2-aws-s3`
)
.settings(
Expand All @@ -123,7 +124,7 @@ lazy val `fs2-aws-examples` = (project in file("fs2-aws-examples"))
"ch.qos.logback" % "logback-core" % "1.2.3",
"org.slf4j" % "jcl-over-slf4j" % "1.7.30",
"org.slf4j" % "jul-to-slf4j" % "1.7.30",
"org.typelevel" %% "log4cats-slf4j" % "1.2.2",
"org.typelevel" %% "log4cats-slf4j" % "2.1.0",
"io.laserdisc" %% "scanamo-circe" % "1.0.8"
)
)
Expand Down Expand Up @@ -151,15 +152,15 @@ lazy val `fs2-aws-s3` = (project in file("fs2-aws-s3"))
.settings(scalacOptions := commonOptions(scalaVersion.value))
.dependsOn(`pure-s3-tagless`)

lazy val `fs2-aws` = (project in file("fs2-aws"))
lazy val `fs2-aws-kinesis` = (project in file("fs2-aws-kinesis"))
.dependsOn(
`fs2-aws-core`,
`pure-cloudwatch-tagless`,
`pure-dynamodb-tagless`,
`pure-kinesis-tagless`
)
.settings(
name := "fs2-aws",
name := "fs2-aws-kinesis",
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % V.Fs2,
"co.fs2" %% "fs2-io" % V.Fs2,
Expand All @@ -169,7 +170,9 @@ lazy val `fs2-aws` = (project in file("fs2-aws"))
"eu.timepit" %% "refined" % V.Refined,
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"org.mockito" % "mockito-core" % V.MockitoCore % Test
"org.mockito" % "mockito-core" % V.MockitoCore % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"ch.qos.logback" % "logback-core" % "1.2.3" % Test
),
coverageMinimum := 40,
coverageFailOnMinimum := true
Expand Down Expand Up @@ -226,7 +229,7 @@ lazy val `pure-sqs-tagless` = (project in file("pure-aws/pure-sqs-tagless"))
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"eu.timepit" %% "refined" % V.Refined,
"org.typelevel" %% "cats-effect" % "2.5.0"
"org.typelevel" %% "cats-effect" % V.CE
),
taglessGenDir := (Compile / scalaSource).value / "io" / "laserdisc" / "pure" / "sqs" / "tagless",
taglessGenPackage := "io.laserdisc.pure.sqs.tagless",
Expand All @@ -250,7 +253,7 @@ lazy val `pure-s3-tagless` = (project in file("pure-aws/pure-s3-tagless"))
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"eu.timepit" %% "refined" % V.Refined,
"org.typelevel" %% "cats-effect" % "2.5.0"
"org.typelevel" %% "cats-effect" % V.CE
),
taglessGenDir := (Compile / scalaSource).value / "io" / "laserdisc" / "pure" / "s3" / "tagless",
taglessGenPackage := "io.laserdisc.pure.s3.tagless",
Expand All @@ -274,7 +277,7 @@ lazy val `pure-sns-tagless` = (project in file("pure-aws/pure-sns-tagless"))
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"eu.timepit" %% "refined" % V.Refined,
"org.typelevel" %% "cats-effect" % "2.5.0"
"org.typelevel" %% "cats-effect" % V.CE
),
taglessGenDir := (Compile / scalaSource).value / "io" / "laserdisc" / "pure" / "sns" / "tagless",
taglessGenPackage := "io.laserdisc.pure.sns.tagless",
Expand All @@ -298,7 +301,7 @@ lazy val `pure-kinesis-tagless` = (project in file("pure-aws/pure-kinesis-tagles
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"eu.timepit" %% "refined" % V.Refined,
"org.typelevel" %% "cats-effect" % "2.5.0"
"org.typelevel" %% "cats-effect" % V.CE
),
taglessGenDir := (Compile / scalaSource).value / "io" / "laserdisc" / "pure" / "kinesis" / "tagless",
taglessGenPackage := "io.laserdisc.pure.kinesis.tagless",
Expand All @@ -322,7 +325,7 @@ lazy val `pure-dynamodb-tagless` = (project in file("pure-aws/pure-dynamodb-tagl
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"eu.timepit" %% "refined" % V.Refined,
"org.typelevel" %% "cats-effect" % "2.5.0"
"org.typelevel" %% "cats-effect" % V.CE
),
taglessGenDir := (Compile / scalaSource).value / "io" / "laserdisc" / "pure" / "dynamodb" / "tagless",
taglessGenPackage := "io.laserdisc.pure.dynamodb.tagless",
Expand All @@ -346,7 +349,7 @@ lazy val `pure-cloudwatch-tagless` = (project in file("pure-aws/pure-cloudwatch-
"org.scalatest" %% "scalatest" % V.ScalaTest % Test,
"org.mockito" %% "mockito-scala-scalatest" % V.MockitoScalaTest % Test,
"eu.timepit" %% "refined" % V.Refined,
"org.typelevel" %% "cats-effect" % "2.5.0"
"org.typelevel" %% "cats-effect" % V.CE
),
taglessGenDir := (Compile / scalaSource).value / "io" / "laserdisc" / "pure" / "cloudwatch" / "tagless",
taglessGenPackage := "io.laserdisc.pure.cloudwatch.tagless",
Expand All @@ -361,7 +364,7 @@ lazy val `pure-cloudwatch-tagless` = (project in file("pure-aws/pure-cloudwatch-
.settings(scalacOptions ++= commonOptions(scalaVersion.value))

lazy val `fs2-aws-testkit` = (project in file("fs2-aws-testkit"))
.dependsOn(`fs2-aws`)
.dependsOn(`fs2-aws-kinesis`)
.settings(
name := "fs2-aws-testkit",
libraryDependencies ++= Seq(
Expand All @@ -378,7 +381,7 @@ lazy val `fs2-aws-testkit` = (project in file("fs2-aws-testkit"))
.settings(scalacOptions ++= commonOptions(scalaVersion.value))

lazy val `fs2-aws-benchmarks` = (project in file("fs2-aws-benchmarks"))
.dependsOn(`fs2-aws`)
.dependsOn(`fs2-aws-kinesis`)
.dependsOn(`fs2-aws-testkit`)
.settings(
name := "fs2-aws-benchmarks",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package fs2.aws.kinesis

import java.nio.ByteBuffer
import java.time.Instant

import cats.effect.{ ContextShift, IO, Sync, Timer }
import cats.effect.unsafe.IORuntime
import cats.effect.{ IO, Sync }
import cats.implicits._
import fs2.aws.kinesis
import fs2.aws.testkit.SchedulerFactoryTestContext
import org.mockito.MockitoSugar.mock
import org.openjdk.jmh.annotations.{ Benchmark, Scope, State }
Expand All @@ -15,13 +12,14 @@ import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.KinesisClientRecord
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters._
object KinesisFlowBenchmark {

implicit val ec: ExecutionContext = ExecutionContext.global
implicit val timer: Timer[IO] = IO.timer(ec)
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val runtime: IORuntime = IORuntime.global

@State(Scope.Thread)
class ThreadState {
Expand All @@ -41,12 +39,12 @@ object KinesisFlowBenchmark {
@Benchmark
def KinesisStream(state: ThreadState): Unit =
(for {
processorContext <- IO.delay(new SchedulerFactoryTestContext(shards = 10))
streamUnderTest = kinesis.testkit.readFromKinesisStream[IO](processorContext)
processorContext <- IO.delay(new SchedulerFactoryTestContext[IO](shards = 10))
k = Kinesis.create(processorContext)
streamUnderTest = k.readFromKinesisStream("foo", "bar")
_ <- (
streamUnderTest
// .evalMap(r => Sync[IO].pure(r.record))
.through(consumer.checkpointRecords[IO]())
.through(k.checkpointRecords(KinesisCheckpointSettings.defaultInstance))
.take(state.records.size * 10)
.compile
.drain,
Expand All @@ -73,7 +71,7 @@ object KinesisFlowBenchmark {
.build()
)
}
}.parUnorderedSequence
}.parSequence
}
).parMapN { case _ => () }
} yield ()).unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package fs2.aws.ciris;

import java.util.Date

import cats.effect.{ ContextShift, IO }
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import ciris.{ ConfigException, ConfigValue }
import org.scalatest.Assertion
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import software.amazon.kinesis.common.InitialPositionInStream

import scala.concurrent.ExecutionContext.Implicits.global;
import java.util.Date

class CirisDecoderSpec extends AnyWordSpec with Matchers {
implicit val cs: ContextShift[IO] = IO.contextShift(global)
implicit val runtime: IORuntime = IORuntime.global

"InitialPositionDecoderSpec" should {

Expand Down
15 changes: 7 additions & 8 deletions fs2-aws-core/src/main/scala/fs2/aws/core/kinesis/package.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package fs2.aws

import cats.effect.Concurrent
import cats.effect.concurrent.Ref
import cats.effect.{ Concurrent, Ref }
import cats.implicits._
import fs2.concurrent.Queue
import cats.effect.std.Queue
import fs2.{ Pipe, Stream }

package object core {
Expand All @@ -27,10 +26,10 @@ package object core {
)(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = { in =>
Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { queueMap =>
val cleanup = {
queueMap.get.flatMap(_.values.toList.traverse_(_.enqueue1(None)))
queueMap.get.flatMap(_.values.toList.traverse_(_.offer(None)))
}

(in ++ Stream.eval_(cleanup))
(in ++ Stream.eval(cleanup).drain)
.evalMap { elem =>
(selector(elem), queueMap.get).mapN { (key, queues) =>
queues
Expand All @@ -40,9 +39,9 @@ package object core {
newQ <- Queue.unbounded[F, Option[A]] // Create a new queue
_ <- queueMap.modify(queues => (queues + (key -> newQ), queues))
// Enqueue the element lifted into an Option to the new queue
_ <- newQ.enqueue1(elem.some)
} yield (key -> newQ.dequeue.unNoneTerminate).some
}(_.enqueue1(elem.some) as None)
_ <- newQ.offer(elem.some)
} yield (key -> Stream.fromQueueNoneTerminated(newQ)).some
}(_.offer(elem.some) as None)
}.flatten
}
.unNone
Expand Down
10 changes: 7 additions & 3 deletions fs2-aws-core/src/test/scala/fs2/aws/core/InternalSpec.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package fs2.aws.core

import cats.effect.{ ContextShift, IO }
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import fs2.Stream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.ExecutionContext

class InternalSpec extends AnyFlatSpec with Matchers {
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val runtime: IORuntime = IORuntime.global

"groupBy" should "create K substreams based on K selector outputs" in {
val k = 30
val streams = Stream
.emits(1 to 100000)
.covary[IO]
.through(groupBy(i => IO(i % k)))
.compile
.toVector
Expand All @@ -26,6 +28,7 @@ class InternalSpec extends AnyFlatSpec with Matchers {
it should "split stream elements into respective substreams" in {
val streams = Stream
.emits(1 to 10)
.covary[IO]
.through(groupBy(i => IO(i % 2)))
.compile
.toVector
Expand All @@ -38,6 +41,7 @@ class InternalSpec extends AnyFlatSpec with Matchers {
it should "fail on exception" in {
val streams = Stream
.emits(1 to 10)
.covary[IO]
.through(groupBy(i => IO(throw new Exception())))
.attempt
.compile
Expand Down
Loading

0 comments on commit beedb45

Please sign in to comment.