Skip to content

Commit

Permalink
Upgrade fs2-kafka to 3.5.1
Browse files Browse the repository at this point in the history
sink max 8 chunk at once by default for kafka and kinesis

Update default concurrency.sink values for kafka and kinesis

Remove warning for concurrency.sink (close #896)

Produce Kafka records at once

Fix config tests
  • Loading branch information
oguzhanunlu committed Jun 3, 2024
1 parent 6ff6a9c commit 0c51795
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 24 deletions.
3 changes: 1 addition & 2 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@
# Number of events that can get enriched at the same time within a chunk
"enrich": 256
# Number of chunks that can get sunk at the same time
# WARNING: if greater than 1, records can get checkpointed before they are sunk
"sink": 1
"sink": 8
}

# Optional, period after which enrich assets should be checked for updates
Expand Down
3 changes: 1 addition & 2 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@
# Number of events that can get enriched at the same time within a chunk
"enrich": 256
# Number of chunks that can get sunk at the same time
# WARNING: if greater than 1, records can get checkpointed before they are sunk
"sink": 1
"sink": 8
}

# Optional, period after which enrich assets should be checked for updates
Expand Down
2 changes: 1 addition & 1 deletion modules/kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

"concurrency" : {
"enrich": 256
"sink": 1
"sink": 8
}

"remoteAdapters" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
package com.snowplowanalytics.snowplow.enrich.kafka

import java.util.UUID

import cats.Parallel
import cats.implicits._

import cats.effect.kernel.{Async, Resource, Sync}

import fs2.kafka._

import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output
import fs2.Chunk

object Sink {

Expand All @@ -40,12 +37,10 @@ object Sink {
output match {
case k: Output.Kafka =>
mkProducer(k, authCallbackClass).map { producer => records =>
records.parTraverse_ { record =>
producer
.produceOne_(toProducerRecord(k.topicName, record))
.flatten
.void
}
producer
.produce(Chunk.from(records.map(toProducerRecord(k.topicName, _))))
.flatten
.void
}
case o => Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"Output $o is not Kafka")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ConfigSpec extends Specification with CatsEffect {
)
)
),
io.Concurrency(256, 1),
io.Concurrency(256, 8),
Some(7.days),
io.RemoteAdapterConfigs(
10.seconds,
Expand Down Expand Up @@ -202,7 +202,7 @@ class ConfigSpec extends Specification with CatsEffect {
)
)
),
io.Concurrency(256, 1),
io.Concurrency(256, 8),
None,
io.RemoteAdapterConfigs(
10.seconds,
Expand Down
2 changes: 1 addition & 1 deletion modules/kinesis/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

"concurrency" : {
"enrich": 256
"sink": 1
"sink": 8
}

"remoteAdapters" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ConfigSpec extends Specification with CatsEffect {
None
)
),
io.Concurrency(256, 1),
io.Concurrency(256, 8),
Some(7.days),
io.RemoteAdapterConfigs(
10.seconds,
Expand Down Expand Up @@ -175,7 +175,7 @@ class ConfigSpec extends Specification with CatsEffect {
None
)
),
io.Concurrency(256, 1),
io.Concurrency(256, 8),
None,
io.RemoteAdapterConfigs(
10.seconds,
Expand Down
8 changes: 4 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,18 @@ object Dependencies {
val gcpSdk = "2.36.1"
val awsSdk2 = "2.25.24"
val kinesisClient2 = "2.4.3"
val kafka = "2.8.2"
val kafka = "3.7.0"
val mskAuth = "2.0.3"
val nsqClient = "1.3.0"
val jackson = "2.16.1"
val config = "1.3.4"

val decline = "1.0.0"
val fs2 = "3.9.3"
val catsEffect = "3.5.2"
val fs2 = "3.10.2"
val catsEffect = "3.5.4"
val fs2PubSub = "0.22.0"
val fs2Aws = "4.1.0"
val fs2Kafka = "3.2.0"
val fs2Kafka = "3.5.1"
val fs2BlobStorage = "0.9.12"
val azureIdentity = "1.11.1"
val nimbusJoseJwt = "9.37.2"
Expand Down

0 comments on commit 0c51795

Please sign in to comment.