Skip to content

Commit

Permalink
temp pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 30, 2022
1 parent c560a5c commit cf106a7
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 11 deletions.
3 changes: 0 additions & 3 deletions config/config.pubsub.reference.hocon
Expand Up @@ -60,9 +60,6 @@
# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000
# The number of threads used internally by library to process the callback after message delivery
# Default is 1
"numCallbackExecutors": 1

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
Expand Down
1 change: 0 additions & 1 deletion modules/loader/src/main/resources/application.conf
Expand Up @@ -26,7 +26,6 @@
"delayThreshold": 200 milliseconds
"maxBatchSize": 500
"maxBatchBytes": 5000000
"numCallbackExecutors": 1
}
}

Expand Down
Expand Up @@ -248,8 +248,7 @@ object LoaderConfig {
topicId: String,
delayThreshold: FiniteDuration,
maxBatchSize: Long,
maxBatchBytes: Long,
numCallbackExecutors: Int) extends StreamSink
maxBatchBytes: Long) extends StreamSink

implicit def sinkConfigDecoder: Decoder[StreamSink] =
deriveConfiguredDecoder[StreamSink]
Expand Down
Expand Up @@ -50,9 +50,6 @@ object PubSubSink {
// Set the delay threshold to use for batching. After this amount of time has elapsed (counting
// from the first element added), the elements will be wrapped up in a batch and sent.
delayThreshold = config.delayThreshold,
// The number of threads used internally by permutive library to process the callback after message delivery.
// The callback does very little "work" so it is best to use minimum number of threads.
callbackExecutors = config.numCallbackExecutors,
onFailedTerminate = err => Async[F].delay(logger.error(err)("PubSub sink termination error"))
)

Expand Down
Expand Up @@ -176,8 +176,7 @@ class CliSpec extends Specification {
"my-topic",
200.milliseconds,
500L,
5000000L,
1
5000000L
),
),
Purpose.Enriched,
Expand Down

0 comments on commit cf106a7

Please sign in to comment.