Skip to content

Commit

Permalink
Duc and Graphex pointed out that there is an error in callback implem…
Browse files Browse the repository at this point in the history
…entation, now it's fixed. Thanks!
  • Loading branch information
Marcin Kuthan committed Nov 6, 2017
1 parent f616e8d commit 1d2b23f
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/main/scala/org/mkuthan/spark/KafkaDStreamSink.scala
Expand Up @@ -66,7 +66,8 @@ class KafkaDStreamSinkExceptionHandler extends Callback {

private val lastException = new AtomicReference[Option[Exception]](None)

override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = lastException.set(Option(exception))
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
Option(exception).foreach{ ex => lastException.set(Some(ex))}

def throwExceptionIfAny(): Unit = lastException.getAndSet(None).foreach(ex => throw ex)

Expand Down

0 comments on commit 1d2b23f

Please sign in to comment.