diff --git a/config/config.kinesis.extended.hocon b/config/config.kinesis.extended.hocon index 6e4c9ee65..bc6029711 100644 --- a/config/config.kinesis.extended.hocon +++ b/config/config.kinesis.extended.hocon @@ -89,16 +89,19 @@ # Otherwise, the partition key will be a random UUID. # "partitionKey": "user_id" - # Optional. Policy to retry if writing to kinesis fails. - # This policy is used in 2 places: - # - When the PutRecords request errors - # - When the requests succeeds but some records couldn't get inserted + # Optional. Policy to retry if writing to kinesis fails with unexepected errors "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds "maxRetries": 10 } + # Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput. + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } + # Optional. Limits the number of events in a single PutRecords request. # Several requests are made in parallel # Maximum allowed: 500 @@ -136,16 +139,19 @@ # Otherwise, the partition key will be a random UUID. # "partitionKey": "user_id" - # Optional. Policy to retry if writing to kinesis fails. - # This policy is used in 2 places: - # - When the PutRecords request errors - # - When the requests succeeds but some records couldn't get inserted + # Optional. Policy to retry if writing to kinesis fails with unexepcted errors "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds "maxRetries": 10 } + # Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput. + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } + # Optional. Limits the number of events in a single PutRecords request. # Several requests are made in parallel # Maximum allowed: 500 @@ -175,16 +181,19 @@ # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html "region": "eu-central-1" - # Optional. Policy to retry if writing to kinesis fails. - # This policy is used in 2 places: - # - When the PutRecords request errors - # - When the requests succeeds but some records couldn't get inserted + # Optional. Policy to retry if writing to kinesis fails with unexepcted errors "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds "maxRetries": 10 } + # Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput. + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } + # Optional. Limits the number of events in a single PutRecords request. # Several requests are made in parallel # Maximum allowed: 500 diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index ac65fb04e..b7e24b599 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -17,7 +17,6 @@ import cats.implicits._ import fs2.Stream -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext import cats.effect.{Blocker, Clock, Concurrent, ConcurrentEffect, ContextShift, ExitCode, Resource, Sync, Timer} @@ -26,13 +25,12 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry.syntax.all._ -import retry.RetryPolicies._ import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Input, Monitoring, Output, RetryCheckpointing} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Input, Monitoring, Output, RetryCheckpointing} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs} -import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Source} +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client import org.http4s.client.{Client => Http4sClient} @@ -109,9 +107,7 @@ object Run { val checkpointing = input match { case retrySettings: RetryCheckpointing => withRetries( - retrySettings.checkpointBackoff.minBackoff, - retrySettings.checkpointBackoff.maxBackoff, - retrySettings.checkpointBackoff.maxRetries, + retrySettings.checkpointBackoff, "Checkpointing failed", checkpoint ) @@ -186,13 +182,11 @@ object Run { } private def withRetries[F[_]: Sync: Timer, A, B]( - minBackOff: FiniteDuration, - maxBackOff: FiniteDuration, - maxRetries: Int, + config: BackoffPolicy, errorMessage: String, f: A => F[B] ): A => F[B] = { a => - val retryPolicy = capDelay[F](maxBackOff, fullJitter[F](minBackOff)).join(limitRetries(maxRetries)) + val retryPolicy = Retries.fullJitter[F](config) f(a) .retryingOnAllErrors( diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index caa801c62..c812d1d1e 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -112,7 +112,7 @@ object ParsedConfigs { if (invalidAttributes.nonEmpty) NonEmptyList(invalidAttributes.head, invalidAttributes.tail.toList).invalid else output.valid } - case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _) if !enrichedFieldsMap.contains(key) => + case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _) if !enrichedFieldsMap.contains(key) => NonEmptyList.one(s"Partition key $key not valid").invalid case _ => output.valid @@ -126,7 +126,7 @@ object ParsedConfigs { attributes.contains(s) } attributesFromFields(fields) - case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _) => + case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _) => val fields = ParsedConfigs.enrichedFieldsMap.filter { case (s, _) => s == key diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index 08828bf0d..a811003b8 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -63,7 +63,7 @@ object io { case class BackoffPolicy( minBackoff: FiniteDuration, maxBackoff: FiniteDuration, - maxRetries: Int + maxRetries: Option[Int] ) object BackoffPolicy { implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] = @@ -269,6 +269,7 @@ object io { region: Option[String], partitionKey: Option[String], backoffPolicy: BackoffPolicy, + throttledBackoffPolicy: BackoffPolicy, recordLimit: Int, byteLimit: Int, customEndpoint: Option[URI] diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Retries.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Retries.scala new file mode 100644 index 000000000..7538b6a97 --- /dev/null +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Retries.scala @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.common.fs2.io + +import retry.{RetryPolicies, RetryPolicy} + +import cats.Applicative + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BackoffPolicy + +object Retries { + + def fullJitter[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = + capBackoffAndRetries(config, RetryPolicies.fullJitter[F](config.minBackoff)) + + def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = + capBackoffAndRetries(config, RetryPolicies.fibonacciBackoff[F](config.minBackoff)) + + private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = { + val capped = RetryPolicies.capDelay[F](config.maxBackoff, policy) + config.maxRetries.fold(capped)(max => capped.join(RetryPolicies.limitRetries(max))) + } + +} diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index 9c6c9ab49..0602ca0f9 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -103,7 +103,7 @@ class ConfigFileSpec extends Specification with CatsIO { io.Input.Kinesis.InitPosition.TrimHorizon, io.Input.Kinesis.Retrieval.Polling(10000), 3, - io.BackoffPolicy(100.milli, 10.second, 10), + io.BackoffPolicy(100.milli, 10.second, Some(10)), None, None, None @@ -113,7 +113,8 @@ class ConfigFileSpec extends Specification with CatsIO { "enriched", Some("eu-central-1"), None, - io.BackoffPolicy(100.millis, 10.seconds, 10), + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), 500, 5242880, None @@ -123,7 +124,8 @@ class ConfigFileSpec extends Specification with CatsIO { "pii", Some("eu-central-1"), None, - io.BackoffPolicy(100.millis, 10.seconds, 10), + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), 500, 5242880, None @@ -133,7 +135,8 @@ class ConfigFileSpec extends Specification with CatsIO { "bad", Some("eu-central-1"), None, - io.BackoffPolicy(100.millis, 10.seconds, 10), + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), 500, 5242880, None @@ -208,7 +211,7 @@ class ConfigFileSpec extends Specification with CatsIO { true ), "raw", - io.BackoffPolicy(100.millis, 10.seconds, 10) + io.BackoffPolicy(100.millis, 10.seconds, Some(10)) ), io.Outputs( io.Output.RabbitMQ( @@ -227,7 +230,7 @@ class ConfigFileSpec extends Specification with CatsIO { ), "enriched", "enriched", - io.BackoffPolicy(100.millis, 10.seconds, 10) + io.BackoffPolicy(100.millis, 10.seconds, Some(10)) ), None, io.Output.RabbitMQ( @@ -246,7 +249,7 @@ class ConfigFileSpec extends Specification with CatsIO { ), "bad-1", "bad-1", - io.BackoffPolicy(100.millis, 10.seconds, 10) + io.BackoffPolicy(100.millis, 10.seconds, Some(10)) ) ), io.Concurrency(256, 3), diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index 3a8238b69..3ae7c15b4 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -25,6 +25,10 @@ "maxBackoff": 10 seconds "maxRetries": 10 } + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } "recordLimit": 500 "byteLimit": 5242880 } @@ -38,6 +42,10 @@ "maxBackoff": 10 seconds "maxRetries": 10 } + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } "recordLimit": 500 "byteLimit": 5242880 } @@ -49,6 +57,10 @@ "maxBackoff": 10 seconds "maxRetries": 10 } + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } "recordLimit": 500 "byteLimit": 5242880 } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala index da0cfc4ef..488ae085d 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -18,16 +18,16 @@ import java.util.UUID import scala.collection.JavaConverters._ import cats.implicits._ -import cats.{Applicative, Parallel} +import cats.{Monoid, Parallel} import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync, Timer} +import cats.effect.concurrent.Ref import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry.syntax.all._ -import retry.RetryPolicies._ -import retry.{PolicyDecision, RetryPolicy, RetryStatus} +import retry.RetryPolicy import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration @@ -35,7 +35,8 @@ import com.amazonaws.services.kinesis.model._ import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Output} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Retries object Sink { @@ -60,8 +61,7 @@ object Sink { case Some(region) => for { producer <- Resource.eval[F, AmazonKinesis](mkProducer(o, region)) - retryPolicy = getRetryPolicy[F](o.backoffPolicy) - } yield records => writeToKinesis(blocker, o, producer, records, retryPolicy) + } yield records => writeToKinesis(blocker, o, producer, toKinesisRecords(records)) case None => Resource.eval(Sync[F].raiseError(new RuntimeException(s"Region not found in the config and in the runtime"))) } @@ -97,19 +97,40 @@ object Sink { } } yield exists - private def getRetryPolicy[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = - capDelay[F](config.maxBackoff, fullJitter[F](config.minBackoff)) - .join(limitRetries(config.maxRetries)) - private def writeToKinesis[F[_]: ContextShift: Parallel: Sync: Timer]( blocker: Blocker, config: Output.Kinesis, kinesis: AmazonKinesis, - records: List[AttributedData[Array[Byte]]], - retryPolicy: RetryPolicy[F] - ): F[Unit] = - group(toKinesisRecords(records), config.recordLimit, config.byteLimit, getRecordSize) - .parTraverse_(g => writeToKinesis(blocker, config, kinesis, g, retryPolicy)) + records: List[PutRecordsRequestEntry] + ): F[Unit] = { + val policyForErrors = Retries.fullJitter[F](config.backoffPolicy) + val policyForThrottling = Retries.fibonacci[F](config.throttledBackoffPolicy) + + def runAndCaptureFailures(ref: Ref[F, List[PutRecordsRequestEntry]]): F[List[PutRecordsRequestEntry]] = + for { + records <- ref.get + failures <- group(records, config.recordLimit, config.byteLimit, getRecordSize) + .parTraverse(g => tryWriteToKinesis(blocker, config, kinesis, g, policyForErrors)) + flattened = failures.flatten + _ <- ref.set(flattened) + } yield flattened + + for { + ref <- Ref.of(records) + failures <- runAndCaptureFailures(ref) + .retryingOnFailures( + policy = policyForThrottling, + wasSuccessful = _.isEmpty, + onFailure = { + case (result, retryDetails) => + val msg = failureMessageForThrottling(result, config.streamName) + Logger[F].warn(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") + } + ) + _ <- if (failures.isEmpty) Sync[F].unit + else Sync[F].raiseError(new RuntimeException(failureMessageForThrottling(failures, config.streamName))) + } yield () + } /** * This function takes a list of records and splits it into several lists, @@ -148,67 +169,44 @@ object Sink { private def getRecordSize(record: PutRecordsRequestEntry) = record.getData.array.size + record.getPartitionKey.getBytes.size - private def writeToKinesis[F[_]: ContextShift: Sync: Timer]( + /** + * Try writing a batch, and returns a list of the failures to be retried: + * + * If we are not throttled by kinesis, then the list is empty. + * If we are throttled by kinesis, the list contains throttled records and records that gave internal errors. + * If there is an exception, or if all records give internal errors, then we retry using the policy. + */ + private def tryWriteToKinesis[F[_]: ContextShift: Sync: Timer]( blocker: Blocker, config: Output.Kinesis, kinesis: AmazonKinesis, records: List[PutRecordsRequestEntry], - retryPolicy: RetryPolicy[F], - retryStatus: RetryStatus = RetryStatus.NoRetriesYet - ): F[Unit] = { - val withoutRetry = blocker.blockOn(Sync[F].delay(putRecords(kinesis, config.streamName, records))) - - val withRetry = - withoutRetry.retryingOnAllErrors( - policy = retryPolicy, - onError = (exception, retryDetails) => - Logger[F] - .error(exception)( - s"Writing ${records.size} records to ${config.streamName} errored (${retryDetails.retriesSoFar} retries from cats-retry)" - ) - ) - - for { - _ <- retryStatus match { - case RetryStatus.NoRetriesYet => - Logger[F].debug(s"Writing ${records.size} records to ${config.streamName}") - case retry => - Logger[F].debug( - s"Waiting for ${retry.cumulativeDelay} and retrying to write ${records.size} records to ${config.streamName}" - ) *> - Timer[F].sleep(retry.cumulativeDelay) - } - putRecordsResult <- withRetry - failuresRetried <- if (putRecordsResult.getFailedRecordCount != 0) { - val failurePairs = records.zip(putRecordsResult.getRecords.asScala).filter(_._2.getErrorMessage != null) - val (failedRecords, failedResults) = failurePairs.unzip - val logging = logErrorsSummary(getErrorsSummary(failedResults)) - val maybeRetrying = - for { - nextRetry <- retryPolicy.decideNextRetry(retryStatus) - maybeRetried <- nextRetry match { - case PolicyDecision.DelayAndRetry(delay) => - writeToKinesis( - blocker, - config, - kinesis, - failedRecords, - retryPolicy, - retryStatus.addRetry(delay) - ) - case PolicyDecision.GiveUp => - Sync[F].raiseError[Unit]( - new RuntimeException( - s"Maximum number of retries reached for ${failedRecords.size} records" - ) - ) - } - } yield maybeRetried - logging *> maybeRetrying - } else - Sync[F].unit - } yield failuresRetried - } + retryPolicy: RetryPolicy[F] + ): F[Vector[PutRecordsRequestEntry]] = + Logger[F].debug(s"Writing ${records.size} records to ${config.streamName}") *> + blocker + .blockOn(Sync[F].delay(putRecords(kinesis, config.streamName, records))) + .map(TryBatchResult.build(records, _)) + .retryingOnFailuresAndAllErrors( + policy = retryPolicy, + wasSuccessful = r => !r.shouldRetrySameBatch, + onFailure = { + case (result, retryDetails) => + val msg = failureMessageForInternalErrors(records, config.streamName, result) + Logger[F].error(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") + }, + onError = (exception, retryDetails) => + Logger[F] + .error(exception)( + s"Writing ${records.size} records to ${config.streamName} errored (${retryDetails.retriesSoFar} retries from cats-retry)" + ) + ) + .flatMap { result => + if (result.shouldRetrySameBatch) + Sync[F].raiseError(new RuntimeException(failureMessageForInternalErrors(records, config.streamName, result))) + else + result.nextBatchAttempt.pure[F] + } private def toKinesisRecords(records: List[AttributedData[Array[Byte]]]): List[PutRecordsRequestEntry] = records.map { r => @@ -224,6 +222,58 @@ object Sink { prre } + /** + * The result of trying to write a batch to kinesis + * @param nextBatchAttempt Records to re-package into another batch, either because of throttling or an internal error + * @param hadSuccess Whether one or more records in the batch were written successfully + * @param wasThrottled Whether at least one of retries is because of throttling + * @param exampleInternalError A message to help with logging + */ + private case class TryBatchResult( + nextBatchAttempt: Vector[PutRecordsRequestEntry], + hadSuccess: Boolean, + wasThrottled: Boolean, + exampleInternalError: Option[String] + ) { + // Only retry the exact same again if no record was successfully inserted, and all the errors + // were not throughput exceeded exceptions + def shouldRetrySameBatch: Boolean = + !hadSuccess && !wasThrottled + } + + private object TryBatchResult { + + implicit private def tryBatchResultMonoid: Monoid[TryBatchResult] = + new Monoid[TryBatchResult] { + override val empty: TryBatchResult = TryBatchResult(Vector.empty, false, false, None) + override def combine(x: TryBatchResult, y: TryBatchResult): TryBatchResult = + TryBatchResult( + x.nextBatchAttempt ++ y.nextBatchAttempt, + x.hadSuccess || y.hadSuccess, + x.wasThrottled || y.wasThrottled, + x.exampleInternalError.orElse(y.exampleInternalError) + ) + } + + def build(records: List[PutRecordsRequestEntry], prr: PutRecordsResult): TryBatchResult = + if (prr.getFailedRecordCount.toInt =!= 0) + records + .zip(prr.getRecords.asScala) + .foldMap { + case (orig, recordResult) => + Option(recordResult.getErrorCode) match { + case None => + TryBatchResult(Vector.empty, true, false, None) + case Some("ProvisionedThroughputExceededException") => + TryBatchResult(Vector(orig), false, true, None) + case Some(_) => + TryBatchResult(Vector(orig), false, false, Option(recordResult.getErrorMessage)) + } + } + else + TryBatchResult(Vector.empty, true, false, None) + } + private def putRecords( kinesis: AmazonKinesis, streamName: String, @@ -238,23 +288,19 @@ object Sink { kinesis.putRecords(putRecordsRequest) } - private def getErrorsSummary(badResponses: List[PutRecordsResultEntry]): Map[String, (Long, String)] = - badResponses.foldLeft(Map[String, (Long, String)]())((counts, r) => - if (counts.contains(r.getErrorCode)) - counts + (r.getErrorCode -> (counts(r.getErrorCode)._1 + 1 -> r.getErrorMessage)) - else - counts + (r.getErrorCode -> ((1, r.getErrorMessage))) - ) + private def failureMessageForInternalErrors( + records: List[PutRecordsRequestEntry], + streamName: String, + result: TryBatchResult + ): String = { + val exampleMessage = result.exampleInternalError.getOrElse("none") + s"Writing ${records.size} records to $streamName errored with internal failures. Example error message [$exampleMessage]" + } - private def logErrorsSummary[F[_]: Sync](errorsSummary: Map[String, (Long, String)]): F[Unit] = - errorsSummary - .map { - case (errorCode, (count, sampleMessage)) => - Logger[F].error( - s"$count records failed with error code ${errorCode}. Example error message: ${sampleMessage}" - ) - } - .toList - .sequence_ + private def failureMessageForThrottling( + records: List[PutRecordsRequestEntry], + streamName: String + ): String = + s"Exceeded Kinesis provisioned throughput: ${records.size} records failed writing to $streamName." } diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala index 3ef6be6af..1d7d2b8cd 100644 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.rabbitmq import cats.implicits._ -import cats.{Applicative, Parallel} +import cats.Parallel import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync, Timer} @@ -24,11 +24,10 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry.syntax.all._ -import retry.RetryPolicies._ -import retry.RetryPolicy import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Output} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Retries object Sink { @@ -75,7 +74,7 @@ object Sink { .parTraverse_ { bytes => publisher(new String(bytes)) .retryingOnAllErrors( - policy = getRetryPolicy[F](rawConfig.backoffPolicy), + policy = Retries.fullJitter[F](rawConfig.backoffPolicy), onError = (exception, retryDetails) => Logger[F] .error(exception)( @@ -85,7 +84,4 @@ object Sink { } } yield sink - private def getRetryPolicy[F[_]: Applicative](backoffPolicy: BackoffPolicy): RetryPolicy[F] = - capDelay[F](backoffPolicy.maxBackoff, fullJitter[F](backoffPolicy.minBackoff)) - .join(limitRetries(backoffPolicy.maxRetries)) }