Skip to content

Commit

Permalink
Merge d9706e3 into ca3c660
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored Dec 4, 2023
2 parents ca3c660 + d9706e3 commit 9d723b0
Show file tree
Hide file tree
Showing 14 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@

# Optional. Default value '1 hour'. The maximum period a message ack deadline will be extended.
"maxAckExtensionPeriod": "1 hour"

# Optional. Sets a lower-bound on how the pubsub Subscriber extends ack deadlines. Most relevant after the app
# first starts up, until the underlying Subscriber has metrics on how long we take to process an event.
"minDurationPerAckExtension": "60 seconds"

# Optional. How long we allow the pubsub Subscriber to ack any outstanding events during clean shutdown
"awaitTerminatePeriod": "60 seconds"
}

# Path to transformed archive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ object Config {
parallelPullCount: Int,
bufferSize: Int,
maxAckExtensionPeriod: FiniteDuration,
maxOutstandingMessagesSize: Option[Long]
maxOutstandingMessagesSize: Option[Long],
minDurationPerAckExtension: FiniteDuration,
awaitTerminatePeriod: FiniteDuration
) extends StreamInput {
val (projectId, subscriptionId) =
subscription.split("/").toList match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ object WiderowJsonProcessingSpec {
| "parallelPullCount": 1
| "bufferSize": 500
| "maxAckExtensionPeriod": "1 hours"
| "minDurationPerAckExtension": "60 seconds"
| "awaitTerminatePeriod": "5 seconds"
| }
| "output": {
| "path": "${outputPath.toNioPath.toUri.toString}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ object ConfigSpec {
subscription = "projects/project-id/subscriptions/subscription-id",
customPubsubEndpoint = None,
parallelPullCount = 1,
awaitTerminatePeriod = 30.seconds,
bufferSize = 10
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ object Pubsub {
parallelPullCount: Int,
bufferSize: Int,
maxAckExtensionPeriod: FiniteDuration,
awaitTerminatePeriod: FiniteDuration,
customPubsubEndpoint: Option[String] = None,
customizeSubscriber: Subscriber.Builder => Subscriber.Builder = identity,
postProcess: Option[Queue.Consumer.PostProcess[F]] = None
Expand All @@ -111,6 +112,7 @@ object Pubsub {
parallelPullCount = parallelPullCount,
maxQueueSize = bufferSize,
maxAckExtensionPeriod = maxAckExtensionPeriod,
awaitTerminatePeriod = awaitTerminatePeriod,
customizeSubscriber = {
val customChannel: Subscriber.Builder => Subscriber.Builder = channelProvider
.map { c => b: Subscriber.Builder =>
Expand Down
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"cloud": "dummy"
"messageQueue": {
"parallelPullCount": 1,
"awaitTerminatePeriod": "30 seconds"
"bufferSize": 10
"consumerConf": {
"enable.auto.commit": "false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ object Config {
subscription: String,
customPubsubEndpoint: Option[String],
parallelPullCount: Int,
awaitTerminatePeriod: FiniteDuration,
bufferSize: Int
) {
val (projectId, subscriptionId) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ object Environment {
parallelPullCount = c.messageQueue.parallelPullCount,
bufferSize = c.messageQueue.bufferSize,
maxAckExtensionPeriod = config.timeouts.loading,
awaitTerminatePeriod = c.messageQueue.awaitTerminatePeriod,
customPubsubEndpoint = c.messageQueue.customPubsubEndpoint,
postProcess = Some(postProcess)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ConfigSpec extends Specification {
)
val awsConfig = exampleCloud
val gcpConfig = Config.Cloud.GCP(
messageQueue = Config.Cloud.GCP.Pubsub("projects/project-id/subscriptions/subscription-id", None, 1, 1)
messageQueue = Config.Cloud.GCP.Pubsub("projects/project-id/subscriptions/subscription-id", None, 1, 30.seconds, 1)
)
val azureConfig = Config.Cloud.Azure(
URI.create("https://test.blob.core.windows.net/test-container/"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ConfigSpec extends Specification {
subscription = "projects/project-id/subscriptions/subscription-id",
customPubsubEndpoint = None,
parallelPullCount = 1,
awaitTerminatePeriod = 30.seconds,
bufferSize = 10
)
)
Expand Down Expand Up @@ -165,6 +166,7 @@ class ConfigSpec extends Specification {
subscription = "projects/project-id/subscriptions/subscription-id",
customPubsubEndpoint = None,
parallelPullCount = 1,
awaitTerminatePeriod = 30.seconds,
bufferSize = 10
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"parallelPullCount": 1
"bufferSize": 500
"maxAckExtensionPeriod": "1 hours"
"minDurationPerAckExtension": "60 seconds"
"awaitTerminatePeriod": "60 seconds"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.typelevel.log4cats.Logger
import com.google.api.gax.batching.FlowControlSettings
import com.google.api.gax.core.ExecutorProvider
import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors}
import org.threeten.bp.{Duration => ThreetenDuration}

import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
Expand Down Expand Up @@ -59,6 +60,7 @@ object Main extends IOApp {
parallelPullCount = conf.parallelPullCount,
bufferSize = conf.bufferSize,
maxAckExtensionPeriod = conf.maxAckExtensionPeriod,
awaitTerminatePeriod = conf.awaitTerminatePeriod,
customPubsubEndpoint = conf.customPubsubEndpoint,
customizeSubscriber = { s =>
s.setFlowControlSettings {
Expand All @@ -80,6 +82,7 @@ object Main extends IOApp {
def getExecutor: ScheduledExecutorService = scheduledExecutorService
}
}
s.setMinDurationPerAckExtension(ThreetenDuration.ofMillis(conf.minDurationPerAckExtension.toMillis))
}
)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ object ConfigSpec {
parallelPullCount = 1,
bufferSize = 500,
maxAckExtensionPeriod = 1.hour,
maxOutstandingMessagesSize = None
maxOutstandingMessagesSize = None,
minDurationPerAckExtension = 60.seconds,
awaitTerminatePeriod = 60.seconds
)
val exampleWindowPeriod = 5.minutes
val exampleOutput = Config.Output.GCS(
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object Dependencies {
val enumeratum = "1.7.0"
val aws = "1.12.261"
val aws2 = "2.21.33"
val pubsub = "1.125.13"
val jSch = "0.2.1"
val sentry = "1.7.30"
val protobuf = "3.21.7" // Fix CVE
Expand Down Expand Up @@ -193,6 +194,7 @@ object Dependencies {
val aws2kinesis = "software.amazon.awssdk" % "kinesis" % V.aws2
val aws2regions = "software.amazon.awssdk" % "regions" % V.aws2
val aws2sts = "software.amazon.awssdk" % "sts" % V.aws2 % Runtime
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf
val nettyCodec = "io.netty" % "netty-codec" % V.nettyCodec
val zookeeper = "org.apache.zookeeper" % "zookeeper" % V.zookeeper
Expand Down Expand Up @@ -227,6 +229,7 @@ object Dependencies {
val gcpDependencies = Seq(
fs2BlobstoreGCS,
fs2PubSub,
pubsub,
secretManager,
gcpStorage
)
Expand Down

0 comments on commit 9d723b0

Please sign in to comment.