Skip to content

Commit

Permalink
Fix duplicate statsd metrics when loading lzo files (close #229)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 18, 2021
1 parent 144fc88 commit 7bdb376
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,11 @@ class KinesisS3Emitter(client: AmazonS3,
* @param bucket where data will be written
* @param stream stream of rows with filename
* @param now connection attempt start time
* @param callback a procedure to execute after successful emit, e.g. reporting
* @return success status of sending to S3
*/
def attemptEmit(
stream: ISerializer.NamedStream,
now: Long,
callback: () => Unit
now: Long
): Unit = {
def logAndSleep(attempt: Int, e: Throwable): Unit = {
logger.error(
Expand All @@ -138,7 +136,7 @@ class KinesisS3Emitter(client: AmazonS3,
else {
val request = getRequest(output.s3.bucketName, stream, now)
Either.catchNonFatal(client.putObject(request)) match {
case Right(_) => callback()
case Right(_) => ()
case Left(error) =>
logAndSleep(attempt, error)
go(attempt + 1)
Expand Down Expand Up @@ -177,8 +175,9 @@ class KinesisS3Emitter(client: AmazonS3,

if (successSize > 0)
serializationResults.namedStreams.foreach { stream =>
attemptEmit(stream, System.currentTimeMillis(), callback)
attemptEmit(stream, System.currentTimeMillis())
}
callback()

failures
}
Expand Down

0 comments on commit 7bdb376

Please sign in to comment.