Skip to content

Commit

Permalink
Merge 2a24091 into ca3c660
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 4, 2023
2 parents ca3c660 + 2a24091 commit 844d272
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

# Optional. Default value '1 hour'. The maximum period a message ack deadline will be extended.
"maxAckExtensionPeriod": "1 hour"

# Optional. Sets a lower-bound on how the pubsub Subscriber extends ack deadlines. Most relevant after the app
# first starts up, until the underlying Subscriber has metrics on how long we take to process an event.
"minDurationPerAckExtension": "60 seconds"
}

# Path to transformed archive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object Config {
parallelPullCount: Int,
bufferSize: Int,
maxAckExtensionPeriod: FiniteDuration,
maxOutstandingMessagesSize: Option[Long]
maxOutstandingMessagesSize: Option[Long],
minDurationPerAckExtension: FiniteDuration
) extends StreamInput {
val (projectId, subscriptionId) =
subscription.split("/").toList match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"parallelPullCount": 1
"bufferSize": 500
"maxAckExtensionPeriod": "1 hours"
"minDurationPerAckExtension": "60 seconds"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.typelevel.log4cats.Logger
import com.google.api.gax.batching.FlowControlSettings
import com.google.api.gax.core.ExecutorProvider
import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors}
import org.threeten.bp.{Duration => ThreetenDuration}

import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
Expand Down Expand Up @@ -80,6 +81,7 @@ object Main extends IOApp {
def getExecutor: ScheduledExecutorService = scheduledExecutorService
}
}
s.setMinDurationPerAckExtension(ThreetenDuration.ofMillis(conf.minDurationPerAckExtension.toMillis))
}
)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ object ConfigSpec {
parallelPullCount = 1,
bufferSize = 500,
maxAckExtensionPeriod = 1.hour,
maxOutstandingMessagesSize = None
maxOutstandingMessagesSize = None,
minDurationPerAckExtension = 60.seconds
)
val exampleWindowPeriod = 5.minutes
val exampleOutput = Config.Output.GCS(
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object Dependencies {
val enumeratum = "1.7.0"
val aws = "1.12.261"
val aws2 = "2.21.33"
val pubsub = "1.125.13"
val jSch = "0.2.1"
val sentry = "1.7.30"
val protobuf = "3.21.7" // Fix CVE
Expand Down Expand Up @@ -193,6 +194,7 @@ object Dependencies {
val aws2kinesis = "software.amazon.awssdk" % "kinesis" % V.aws2
val aws2regions = "software.amazon.awssdk" % "regions" % V.aws2
val aws2sts = "software.amazon.awssdk" % "sts" % V.aws2 % Runtime
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf
val nettyCodec = "io.netty" % "netty-codec" % V.nettyCodec
val zookeeper = "org.apache.zookeeper" % "zookeeper" % V.zookeeper
Expand Down Expand Up @@ -227,6 +229,7 @@ object Dependencies {
val gcpDependencies = Seq(
fs2BlobstoreGCS,
fs2PubSub,
pubsub,
secretManager,
gcpStorage
)
Expand Down

0 comments on commit 844d272

Please sign in to comment.