diff --git a/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala b/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala index 59c98ce..17dbb73 100644 --- a/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala +++ b/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala @@ -111,13 +111,11 @@ class KinesisS3Emitter(client: AmazonS3, * @param bucket where data will be written * @param stream stream of rows with filename * @param now connection attempt start time - * @param callback a procedure to execute after successful emit, e.g. reporting * @return success status of sending to S3 */ def attemptEmit( stream: ISerializer.NamedStream, - now: Long, - callback: () => Unit + now: Long ): Unit = { def logAndSleep(attempt: Int, e: Throwable): Unit = { logger.error( @@ -138,7 +136,7 @@ class KinesisS3Emitter(client: AmazonS3, else { val request = getRequest(output.s3.bucketName, stream, now) Either.catchNonFatal(client.putObject(request)) match { - case Right(_) => callback() + case Right(_) => () case Left(error) => logAndSleep(attempt, error) go(attempt + 1) @@ -177,8 +175,9 @@ class KinesisS3Emitter(client: AmazonS3, if (successSize > 0) serializationResults.namedStreams.foreach { stream => - attemptEmit(stream, System.currentTimeMillis(), callback) + attemptEmit(stream, System.currentTimeMillis()) } + callback() failures }