-
Notifications
You must be signed in to change notification settings - Fork 150
Description
I would like to propose an idea of having cached consumers for FromKafkaBatched class and reuse consumers for getting next batch.
The current FromKafkaBatched class create a new Kafka consumer for a every new batch which may introduce a lot of overhead. I have looked into Spark integration with Kafka and want to reference the following text from there,
The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
This means FromKafkaBatched class holds the references to consumers created initially, and reuse them to poll next batch. Also, this makes sense from Kafka point of view as it can have as many concurrent consumers as the number of partitions in a topic.
But I am not sure how this can be handled when stream is running in dask mode. Dask scheduler should be able to redirect request for new batches to appropriate consumer on dask workers.