From cf106a7c347398a566e275facb5470a889d8080d Mon Sep 17 00:00:00 2001 From: spenes Date: Fri, 30 Sep 2022 14:49:46 +0300 Subject: [PATCH] temp pubsub --- config/config.pubsub.reference.hocon | 3 --- modules/loader/src/main/resources/application.conf | 1 - .../snowplow/postgres/config/LoaderConfig.scala | 3 +-- .../snowplow/postgres/env/pubsub/PubSubSink.scala | 3 --- .../config/CliSpec.scala | 3 +-- 5 files changed, 2 insertions(+), 11 deletions(-) diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 1cdf90e..0d579f9 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -60,9 +60,6 @@ # Max size of the batch in bytes before emitting # Default is 5MB "maxBatchBytes": 5000000 - # The number of threads used internally by library to process the callback after message delivery - # Default is 1 - "numCallbackExecutors": 1 # Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded. "reportPeriod": 10 seconds diff --git a/modules/loader/src/main/resources/application.conf b/modules/loader/src/main/resources/application.conf index 1a684c6..c9ebc9a 100644 --- a/modules/loader/src/main/resources/application.conf +++ b/modules/loader/src/main/resources/application.conf @@ -26,7 +26,6 @@ "delayThreshold": 200 milliseconds "maxBatchSize": 500 "maxBatchBytes": 5000000 - "numCallbackExecutors": 1 } } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index a7a5644..645cf99 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -248,8 +248,7 @@ object LoaderConfig { topicId: String, delayThreshold: FiniteDuration, maxBatchSize: Long, - maxBatchBytes: Long, - numCallbackExecutors: Int) extends StreamSink + maxBatchBytes: Long) extends StreamSink implicit def sinkConfigDecoder: Decoder[StreamSink] = deriveConfiguredDecoder[StreamSink] diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/pubsub/PubSubSink.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/pubsub/PubSubSink.scala index af319cf..eaa6bbf 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/pubsub/PubSubSink.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/pubsub/PubSubSink.scala @@ -50,9 +50,6 @@ object PubSubSink { // Set the delay threshold to use for batching. After this amount of time has elapsed (counting // from the first element added), the elements will be wrapped up in a batch and sent. delayThreshold = config.delayThreshold, - // The number of threads used internally by permutive library to process the callback after message delivery. - // The callback does very little "work" so it is best to use minimum number of threads. - callbackExecutors = config.numCallbackExecutors, onFailedTerminate = err => Async[F].delay(logger.error(err)("PubSub sink termination error")) ) diff --git a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala index 120a1c4..367652a 100644 --- a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala +++ b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala @@ -176,8 +176,7 @@ class CliSpec extends Specification { "my-topic", 200.milliseconds, 500L, - 5000000L, - 1 + 5000000L ), ), Purpose.Enriched,