Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fs2-core, fs2-io to 3.5.0 #970

Merged
merged 5 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ organization := "io.laserdisc"
name := "fs2-aws"

lazy val scala213 = "2.13.10"
lazy val scala3 = "3.1.1"
lazy val scala3 = "3.2.1"

lazy val supportedScalaVersions = List(scala213, scala3)

Expand Down Expand Up @@ -288,7 +288,7 @@ lazy val commonSettings = Def.settings(
},
Test / console / scalacOptions := (Compile / console / scalacOptions).value,
Test / scalacOptions := (Compile / scalacOptions).value,
Test / scalacOptions += "-Wconf:msg=is not declared `infix`:s,msg=is declared 'open':s",
Test / scalacOptions += "-Wconf:msg=is not declared infix:s,msg=is declared 'open':s",
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1",
libraryDependencies ++= Seq(
compilerPlugin(("org.typelevel" %% "kind-projector" % "0.13.2").cross(CrossVersion.full)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ trait DynamoDB[F[_]] {
/** Initialize a worker and start streaming records from a DynamoDB stream
* On stream finish (due to error or other), worker will be shutdown
*
* @tparam F effect type of the fs2 stream
* @param appName name of the DynamoDB application. Used by KCL when resharding
* @param streamName name of the DynamoDB stream to consume from
* @return an infinite fs2 Stream that emits Kinesis Records
Expand All @@ -35,8 +34,7 @@ trait DynamoDB[F[_]] {
/** Initialize a worker and start streaming records from a DynamoDB stream
* On stream finish (due to error or other), worker will be shutdown
*
* @tparam F effect type of the fs2 stream
* @param consumerConfig configuration parameters for the KCL
* @param workerConfiguration configuration parameters for the KCL
* @return an infinite fs2 Stream that emits DynamoDB Records
*/
def readFromDynamoDBStream(
Expand All @@ -48,7 +46,6 @@ trait DynamoDB[F[_]] {
* After accumulating maxBatchSize or reaching maxBatchWait for a respective shard, the latest record is checkpointed
* By design, all records prior to the checkpointed record are also checkpointed in Kinesis
*
* @tparam F effect type of the fs2 stream
* @param checkpointSettings configure maxBatchSize and maxBatchWait time before triggering a checkpoint
* @return a stream of Record types representing checkpointed messages
*/
Expand Down Expand Up @@ -79,7 +76,7 @@ object DynamoDB {
// Instantiate a new bounded queue and concurrently run the queue populator
// Expose the elements by dequeuing the internal buffer
for {
dispatcher <- Stream.resource(Dispatcher[F])
dispatcher <- Stream.resource(Dispatcher.parallel[F])
buffer <- Stream.eval(Queue.unbounded[F, CommittableRecord])
interruptSignal <- Stream.eval(SignallingRef[F, Boolean](false))
_ <- instantiateScheduler(dispatcher, buffer, interruptSignal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object StreamScan {
pageSize: Int
): Stream[F, Chunk[JMap[String, AttributeValue]]] =
for {
dispatcher <- Stream.resource(Dispatcher[F])
dispatcher <- Stream.resource(Dispatcher.parallel[F])
queue <- Stream.eval(Queue.bounded[F, Option[Chunk[JMap[String, AttributeValue]]]](1))
sub <- Stream.eval(Ref[F].of[Option[Subscription]](None))
_ <- Stream.eval(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker}
import com.amazonaws.services.kinesis.clientlibrary.types.{
ExtendedSequenceNumber,
InitializationInput,
ProcessRecordsInput,
ShutdownInput
}
import com.amazonaws.services.kinesis.clientlibrary.types.{ExtendedSequenceNumber, InitializationInput, ProcessRecordsInput, ShutdownInput}
import com.amazonaws.services.kinesis.model.Record
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.*
Expand All @@ -27,7 +22,7 @@ import org.slf4j.LoggerFactory

import java.util.Date
import java.util.concurrent.{CountDownLatch, Phaser}
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*

Expand Down Expand Up @@ -118,7 +113,7 @@ class NewDynamoDBConsumerSpec
recordProcessor.processRecords(recordsInput.withRecords(List(record).asJava))
}
}
).parMapN { case (msgs, _, _) => msgs }.unsafeToFuture().futureValue
).parMapN { case (msgs, _, _) => msgs }.unsafeToFuture().onError(e => Future(e.printStackTrace())).futureValue

res should have size 60
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object Kinesis {
// Instantiate a new bounded queue and concurrently run the queue populator
// Expose the elements by dequeuing the internal buffer
for {
dispatcher <- Stream.resource(Dispatcher[F])
dispatcher <- Stream.resource(Dispatcher.parallel[F])
buffer <- Stream.eval(Queue.bounded[F, Chunk[CommittableRecord]](streamConfig.bufferSize))
interruptSignal <- Stream.eval(SignallingRef[F, Boolean](false))
_ <- instantiateScheduler(dispatcher, buffer, interruptSignal)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val AwsSdk = "2.18.7"
val Circe = "0.14.3"
val Munit = "0.7.29"
val Fs2 = "3.3.0"
val Fs2 = "3.5.0"
val Refined = "0.10.1"
val ScalaTest = "3.2.14"
val MockitoScalaTest = "1.17.5"
Expand Down