Skip to content

Commit

Permalink
MapR [SPARK-737] Calling poll(1000) from paranoidPoll causes batch sc…
Browse files Browse the repository at this point in the history
…heduling delay (apache#675)
  • Loading branch information
vvysotskyi authored and ekrivokonmapr committed Nov 6, 2023
1 parent 84fcd71 commit dc615b9
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
val msgs = c.poll(1000)
val msgs = c.poll(0)

val newAssignment = c.assignment()
val parts = if (currentOffsets.size < newAssignment.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
val msgs = c.poll(1000)
val msgs = c.poll(0)

val newAssignment = c.assignment()
val parts = if (currentOffsets.size < newAssignment.size()) {
Expand Down

0 comments on commit dc615b9

Please sign in to comment.