Skip to content

Issues scaling kafka consumers with Dask #240

@skmatti

Description

@skmatti

I have run two simple pipelines(Non-dask and Dask) to read stream of messages from Kafka using Streamz FromKafkaBatched method. I seem to get similar throughput for both the pipelines.

Non-Dask pipeline:

from distributed import Client
from streamz import Stream
import time
import json

def preprocess(messages):
    start_time = int(round(time.time()))
    no_of_rows = len(messages)
    if no_of_rows > 0:
        size = no_of_rows*len(messages[0])
        kafka_timestamp = json.loads(messages[0].decode('utf-8'))['timestamp']
    else:
        size = 0 
        kafka_timestamp = start_time
    end_time = int(round(time.time()))
    return "{},{},{},{},{}".format(no_of_rows, kafka_timestamp, start_time, end_time, size)


topic = "topic-1"
bootstrap_servers = 'localhost:9092'
consumer_conf = {'bootstrap.servers': bootstrap_servers,
        'group.id': 'group_1', 'session.timeout.ms': 60000}


stream = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='10s',
                     npartitions=5, asynchronous = True, dask= False)

kafka_out = stream.map(preprocess).to_kafka('test-out', consumer_conf)
kafka_out.flush()

The pipeline gives maximum throughput = ~43 MBps (topic partitions = 5).
Throughput computation = (Σ size) / (max(end_time) - min(start_time))

By setting dask=True the above pipeline is executed on a local dask cluster

Starting Dask Cluster:

# dask imports 
from distributed import Client, LocalCluster

# dask client 
cluster = LocalCluster(ip="0.0.0.0", scheduler_port=8786, diagnostics_port = 8787, processes=False, threads_per_worker=10, n_workers=1)
client = Client(cluster)
client.get_versions(check=True)

The pipeline with Dask gives maximum throughput of ~44.8 MBps when number of partitions in the topic = 1. Increasing the number of partitions in a topic decreases the performance.

My understanding from these experimentations is that confluent kafka performs as expected with single Dask thread (when number of partitions is 1) but performs poorly with multiple dask threads or multiple partitions. This may be due to underlying tornado async library. It would be great if experts help me understand this better as I am fairly new to async programming.

Hardware and Software Specs:
10 CPU cores, 30 GB RAM
Ubuntu 16.04, Streamz(latest-build off latest code base)
Kafka 2.2.0, confluent-kafka-python 1.0.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions