Skip to content

Commit

Permalink
Change to while loop
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 12, 2014
1 parent ea873e4 commit e5e21c1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ class KafkaReceiver[
def run() {
logInfo("Starting MessageHandler.")
try {
for (msgAndMetadata <- stream) {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ class ReliableKafkaReceiver[
override def run(): Unit = {
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
for (msgAndMetadata <- stream) {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
val topicAndPartition = TopicAndPartition(
msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator.synchronized {
Expand Down

0 comments on commit e5e21c1

Please sign in to comment.