From 2a24091f55c14120f79292230fc3d00b61730779 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 4 Dec 2023 12:56:17 +0000 Subject: [PATCH] pubsub transformer: Increase default value of minDurationPerAckExtension (close #1326) --- .../transformer/gcp/transformer.pubsub.config.reference.hocon | 4 ++++ .../snowplow/rdbloader/transformer/stream/common/Config.scala | 3 ++- .../transformer-pubsub/src/main/resources/application.conf | 1 + .../snowplow/rdbloader/transformer/stream/pubsub/Main.scala | 2 ++ .../rdbloader/transformer/stream/pubsub/ConfigSpec.scala | 3 ++- project/Dependencies.scala | 3 +++ 6 files changed, 14 insertions(+), 2 deletions(-) diff --git a/config/transformer/gcp/transformer.pubsub.config.reference.hocon b/config/transformer/gcp/transformer.pubsub.config.reference.hocon index 3819e532f..97943b574 100644 --- a/config/transformer/gcp/transformer.pubsub.config.reference.hocon +++ b/config/transformer/gcp/transformer.pubsub.config.reference.hocon @@ -12,6 +12,10 @@ # Optional. Default value '1 hour'. The maximum period a message ack deadline will be extended. "maxAckExtensionPeriod": "1 hour" + + # Optional. Sets a lower-bound on how the pubsub Subscriber extends ack deadlines. Most relevant after the app + # first starts up, until the underlying Subscriber has metrics on how long we take to process an event. + "minDurationPerAckExtension": "60 seconds" } # Path to transformed archive diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala index d8120595c..4e9c86725 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala @@ -69,7 +69,8 @@ object Config { parallelPullCount: Int, bufferSize: Int, maxAckExtensionPeriod: FiniteDuration, - maxOutstandingMessagesSize: Option[Long] + maxOutstandingMessagesSize: Option[Long], + minDurationPerAckExtension: FiniteDuration ) extends StreamInput { val (projectId, subscriptionId) = subscription.split("/").toList match { diff --git a/modules/transformer-pubsub/src/main/resources/application.conf b/modules/transformer-pubsub/src/main/resources/application.conf index 94e1b653d..0d67e34c6 100644 --- a/modules/transformer-pubsub/src/main/resources/application.conf +++ b/modules/transformer-pubsub/src/main/resources/application.conf @@ -5,6 +5,7 @@ "parallelPullCount": 1 "bufferSize": 500 "maxAckExtensionPeriod": "1 hours" + "minDurationPerAckExtension": "60 seconds" } "output": { diff --git a/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala b/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala index 5273431e6..620a62ca8 100644 --- a/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala +++ b/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala @@ -15,6 +15,7 @@ import org.typelevel.log4cats.Logger import com.google.api.gax.batching.FlowControlSettings import com.google.api.gax.core.ExecutorProvider import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors} +import org.threeten.bp.{Duration => ThreetenDuration} import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} @@ -80,6 +81,7 @@ object Main extends IOApp { def getExecutor: ScheduledExecutorService = scheduledExecutorService } } + s.setMinDurationPerAckExtension(ThreetenDuration.ofMillis(conf.minDurationPerAckExtension.toMillis)) } ) case _ => diff --git a/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala b/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala index 7cfef07b3..67115d118 100644 --- a/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala +++ b/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala @@ -71,7 +71,8 @@ object ConfigSpec { parallelPullCount = 1, bufferSize = 500, maxAckExtensionPeriod = 1.hour, - maxOutstandingMessagesSize = None + maxOutstandingMessagesSize = None, + minDurationPerAckExtension = 60.seconds ) val exampleWindowPeriod = 5.minutes val exampleOutput = Config.Output.GCS( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ff2577c93..baefe306d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -62,6 +62,7 @@ object Dependencies { val enumeratum = "1.7.0" val aws = "1.12.261" val aws2 = "2.21.33" + val pubsub = "1.125.13" val jSch = "0.2.1" val sentry = "1.7.30" val protobuf = "3.21.7" // Fix CVE @@ -193,6 +194,7 @@ object Dependencies { val aws2kinesis = "software.amazon.awssdk" % "kinesis" % V.aws2 val aws2regions = "software.amazon.awssdk" % "regions" % V.aws2 val aws2sts = "software.amazon.awssdk" % "sts" % V.aws2 % Runtime + val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf val nettyCodec = "io.netty" % "netty-codec" % V.nettyCodec val zookeeper = "org.apache.zookeeper" % "zookeeper" % V.zookeeper @@ -227,6 +229,7 @@ object Dependencies { val gcpDependencies = Seq( fs2BlobstoreGCS, fs2PubSub, + pubsub, secretManager, gcpStorage )