Skip to content

Commit

Permalink
Enrich-kinesis: improve sink retry policy for when throttled by kines…
Browse files Browse the repository at this point in the history
…is (close #697)
  • Loading branch information
istreeter committed Oct 6, 2022
1 parent 5c30da0 commit 27b15a7
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 131 deletions.
33 changes: 21 additions & 12 deletions config/config.kinesis.extended.hocon
Expand Up @@ -89,16 +89,19 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails.
# This policy is used in 2 places:
# - When the PutRecords request errors
# - When the requests succeeds but some records couldn't get inserted
# Optional. Policy to retry if writing to kinesis fails with unexepected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput.
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}

# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
Expand Down Expand Up @@ -136,16 +139,19 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails.
# This policy is used in 2 places:
# - When the PutRecords request errors
# - When the requests succeeds but some records couldn't get inserted
# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput.
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}

# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
Expand Down Expand Up @@ -175,16 +181,19 @@
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails.
# This policy is used in 2 places:
# - When the PutRecords request errors
# - When the requests succeeds but some records couldn't get inserted
# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput.
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}

# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
Expand Down
Expand Up @@ -17,7 +17,6 @@ import cats.implicits._

import fs2.Stream

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import cats.effect.{Blocker, Clock, Concurrent, ConcurrentEffect, ContextShift, ExitCode, Resource, Sync, Timer}
Expand All @@ -26,13 +25,12 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import retry.syntax.all._
import retry.RetryPolicies._

import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Input, Monitoring, Output, RetryCheckpointing}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Input, Monitoring, Output, RetryCheckpointing}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

import org.http4s.client.{Client => Http4sClient}
Expand Down Expand Up @@ -109,9 +107,7 @@ object Run {
val checkpointing = input match {
case retrySettings: RetryCheckpointing =>
withRetries(
retrySettings.checkpointBackoff.minBackoff,
retrySettings.checkpointBackoff.maxBackoff,
retrySettings.checkpointBackoff.maxRetries,
retrySettings.checkpointBackoff,
"Checkpointing failed",
checkpoint
)
Expand Down Expand Up @@ -186,13 +182,11 @@ object Run {
}

private def withRetries[F[_]: Sync: Timer, A, B](
minBackOff: FiniteDuration,
maxBackOff: FiniteDuration,
maxRetries: Int,
config: BackoffPolicy,
errorMessage: String,
f: A => F[B]
): A => F[B] = { a =>
val retryPolicy = capDelay[F](maxBackOff, fullJitter[F](minBackOff)).join(limitRetries(maxRetries))
val retryPolicy = Retries.fullJitter[F](config)

f(a)
.retryingOnAllErrors(
Expand Down
Expand Up @@ -112,7 +112,7 @@ object ParsedConfigs {
if (invalidAttributes.nonEmpty) NonEmptyList(invalidAttributes.head, invalidAttributes.tail.toList).invalid
else output.valid
}
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _) if !enrichedFieldsMap.contains(key) =>
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _) if !enrichedFieldsMap.contains(key) =>
NonEmptyList.one(s"Partition key $key not valid").invalid
case _ =>
output.valid
Expand All @@ -126,7 +126,7 @@ object ParsedConfigs {
attributes.contains(s)
}
attributesFromFields(fields)
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _) =>
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _) =>
val fields = ParsedConfigs.enrichedFieldsMap.filter {
case (s, _) =>
s == key
Expand Down
Expand Up @@ -63,7 +63,7 @@ object io {
case class BackoffPolicy(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
maxRetries: Int
maxRetries: Option[Int]
)
object BackoffPolicy {
implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] =
Expand Down Expand Up @@ -269,6 +269,7 @@ object io {
region: Option[String],
partitionKey: Option[String],
backoffPolicy: BackoffPolicy,
throttledBackoffPolicy: BackoffPolicy,
recordLimit: Int,
byteLimit: Int,
customEndpoint: Option[URI]
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2.io

import retry.{RetryPolicies, RetryPolicy}

import cats.Applicative

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BackoffPolicy

object Retries {

def fullJitter[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] =
capBackoffAndRetries(config, RetryPolicies.fullJitter[F](config.minBackoff))

def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] =
capBackoffAndRetries(config, RetryPolicies.fibonacciBackoff[F](config.minBackoff))

private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = {
val capped = RetryPolicies.capDelay[F](config.maxBackoff, policy)
config.maxRetries.fold(capped)(max => capped.join(RetryPolicies.limitRetries(max)))
}

}
Expand Up @@ -103,7 +103,7 @@ class ConfigFileSpec extends Specification with CatsIO {
io.Input.Kinesis.InitPosition.TrimHorizon,
io.Input.Kinesis.Retrieval.Polling(10000),
3,
io.BackoffPolicy(100.milli, 10.second, 10),
io.BackoffPolicy(100.milli, 10.second, Some(10)),
None,
None,
None
Expand All @@ -113,7 +113,8 @@ class ConfigFileSpec extends Specification with CatsIO {
"enriched",
Some("eu-central-1"),
None,
io.BackoffPolicy(100.millis, 10.seconds, 10),
io.BackoffPolicy(100.millis, 10.seconds, Some(10)),
io.BackoffPolicy(100.millis, 1.second, None),
500,
5242880,
None
Expand All @@ -123,7 +124,8 @@ class ConfigFileSpec extends Specification with CatsIO {
"pii",
Some("eu-central-1"),
None,
io.BackoffPolicy(100.millis, 10.seconds, 10),
io.BackoffPolicy(100.millis, 10.seconds, Some(10)),
io.BackoffPolicy(100.millis, 1.second, None),
500,
5242880,
None
Expand All @@ -133,7 +135,8 @@ class ConfigFileSpec extends Specification with CatsIO {
"bad",
Some("eu-central-1"),
None,
io.BackoffPolicy(100.millis, 10.seconds, 10),
io.BackoffPolicy(100.millis, 10.seconds, Some(10)),
io.BackoffPolicy(100.millis, 1.second, None),
500,
5242880,
None
Expand Down Expand Up @@ -208,7 +211,7 @@ class ConfigFileSpec extends Specification with CatsIO {
true
),
"raw",
io.BackoffPolicy(100.millis, 10.seconds, 10)
io.BackoffPolicy(100.millis, 10.seconds, Some(10))
),
io.Outputs(
io.Output.RabbitMQ(
Expand All @@ -227,7 +230,7 @@ class ConfigFileSpec extends Specification with CatsIO {
),
"enriched",
"enriched",
io.BackoffPolicy(100.millis, 10.seconds, 10)
io.BackoffPolicy(100.millis, 10.seconds, Some(10))
),
None,
io.Output.RabbitMQ(
Expand All @@ -246,7 +249,7 @@ class ConfigFileSpec extends Specification with CatsIO {
),
"bad-1",
"bad-1",
io.BackoffPolicy(100.millis, 10.seconds, 10)
io.BackoffPolicy(100.millis, 10.seconds, Some(10))
)
),
io.Concurrency(256, 3),
Expand Down
12 changes: 12 additions & 0 deletions modules/kinesis/src/main/resources/application.conf
Expand Up @@ -25,6 +25,10 @@
"maxBackoff": 10 seconds
"maxRetries": 10
}
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}
"recordLimit": 500
"byteLimit": 5242880
}
Expand All @@ -38,6 +42,10 @@
"maxBackoff": 10 seconds
"maxRetries": 10
}
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}
"recordLimit": 500
"byteLimit": 5242880
}
Expand All @@ -49,6 +57,10 @@
"maxBackoff": 10 seconds
"maxRetries": 10
}
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}
"recordLimit": 500
"byteLimit": 5242880
}
Expand Down

0 comments on commit 27b15a7

Please sign in to comment.