Skip to content

Conversation

@martindurant
Copy link
Member

Inspired by #226, but in this case, consumers are thread-local,
because they should not be called simultaneously from separate threads,
and this should also work across processes.

Inspired by python-streamz#226, but in this case, consumers are thread-local,
because they should not be called simultaneously from separate threads,
and this should also work across processes.
@martindurant
Copy link
Member Author

@skmatti , it would be good to see your benchmarking code, so we can see how well this performs in various thread/process mixes.

@martindurant
Copy link
Member Author

(this fails on Travis in a way I was not seeing - perhaps the consumers need to be explicitly cleaned up? I am not sure how to make that happen, so may not come back to this for a little while)

@codecov-io
Copy link

codecov-io commented Apr 28, 2019

Codecov Report

Merging #233 into master will decrease coverage by 0.02%.
The diff coverage is 93.33%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #233      +/-   ##
==========================================
- Coverage   93.86%   93.84%   -0.03%     
==========================================
  Files          13       13              
  Lines        1566     1592      +26     
==========================================
+ Hits         1470     1494      +24     
- Misses         96       98       +2
Impacted Files Coverage Δ
streamz/compatibility.py 100% <100%> (ø) ⬆️
streamz/sources.py 95.45% <93.18%> (-0.32%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6445fa9...5bdd358. Read the comment docs.

@martindurant
Copy link
Member Author

Fixes #225
@skmatti , @jsmaupin : can you review and benchmark, please?

@martindurant
Copy link
Member Author

(NB: this is failing on a previously good test for TCP input under py2. py2 support will be dropped in a bit, but I ought to figure out why this is happening now anyway)

To guard against previous server taking time to quit
@martindurant
Copy link
Member Author

@skmatti , @jsmaupin , would still very much like to know if this does the right thing for you

@jsmaupin
Copy link
Contributor

jsmaupin commented May 1, 2019

My main issue with this implementation is that the standard pattern with Kafka is that there exists one thread for each partition. This implementation reuses consumers to read from a random assortment of partitions and locations with-in those partitions. I have to wonder if this method is fighting standard caching behavior where it is assumed that if one item is read that the following item will be read next. In this case, I'm referring to the disk cache which Kafka uses greatly for improved performance.

Is it not possible to grant each thread a partition and have that thread read the partition from start to end? It seems like this method would have far less overhead. What is the overhead of creating new TopicPartition objects and repeatedly reassigning consumers to read from different partitions?

Also, assuming that this method isn't creating extra overhead, would it be more concise to express it as a generator?

@martindurant
Copy link
Member Author

We are making tasks get get messages by (partition, low, high); whether this is executed in Dask or some other thread pool, there would be no way to deterministically ensure that the same partition is always on the same thread. Furthermore, "read the partition from start to end" - the end is a moving target, which is the point of this in the first place. I agree that, if we wanted to simply get the contents of a number of kafka partitions, it would make sense to have one thread read each; but we want real-time operations in streamz.

What is the overhead of creating new TopicPartition objects

Making them is cheap, applying them may not be, which is why I'd like to see further benchmarks

express it as a generator

I don't know how you would do that!

@jsmaupin
Copy link
Contributor

jsmaupin commented May 1, 2019

What do we get from instructing the threads to read the data in chunks instead of allowing a thread to continuously read from a given partition?

@martindurant
Copy link
Member Author

Perhaps we have lost the thread here...

What do we get from instructing the threads to read the data in chunks

You get batches. from_kafka also exists, you could make one for each kafka partition if you liked, which will not be as performant, or so I thought. It doesn't need to worry about creating/destroying consumers.

This batch-based approach is what you would need for a dask distributed workflow.

@martindurant
Copy link
Member Author

...but the previous benchmarking showed that the batched approach was much faster than message-by-message, and that before reusing consumers (which is why this PR exists in the first place). This is because of the overhead of passing single messages around in the kafka library and streamz, which are both presumably slow compared to any cache/random-access effects in kafka itself.
This should not be surprising; but then again, the non-batched consumer is still an option. I'd like to see numbers, if possible.

@skmatti
Copy link
Contributor

skmatti commented May 6, 2019

Yeah, reading in batches definitely helps but every batch involves a partition assignment and seeking to a random offset in the partition.
I have to tried to get some numbers with current Consumer Factory implementation on ThreadPoolExecutor:

from concurrent.futures.thread import ThreadPoolExecutor
import concurrent.futures
import threading
from time import time
import random
import logging

import confluent_kafka as ck

class ConsumerFactory:
    consumers = threading.local()
    def __init__(self, topic, batch_size, consumer_conf, cache):
        self.topic = topic
        self.consume_batch_size = batch_size
        self.conf = consumer_conf
        self.key = str(sorted(self.conf.items()))
        self.cache = cache
        
    def read_batch(self, v):
        part_no, low, high = v
        
        c = None
        if self.cache:
            if not hasattr(self.consumers, "map"):
                # logging.debug('created new dict')
                self.consumers.map = {}
            consumers = self.consumers.map
            if self.key not in consumers:
                # logging.debug('created new consumer')
                c = ck.Consumer(self.conf)
                consumers[self.key] = c
            else:
                # logging.debug('reusing consumer')
                c = consumers[self.key]
                
            tp = ck.TopicPartition(self.topic, part_no, low)
            c.assign([tp])
        else:
            c = ck.Consumer(self.conf)
            tp = ck.TopicPartition(self.topic, part_no, low)
            c.assign([tp])
        
        start_offset = low
        end_offset = low
        
        while high > low:
            curr_high = min(low + self.consume_batch_size, high)
            for m in c.consume(curr_high-low):
                end_offset = max(end_offset, m.offset())
                m.value()
            low = curr_high
        
        return "{}-{}".format(start_offset, end_offset)


def main():
    logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')
    topic = 'topic-1'
    batch_size = 100000
    consume_batch_size = 10000
    use_consumer_caching = True
    shuffle = False
    interleave = True
    if interleave:
        shuffle = False

    conf = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-group-1'
    }

    start = time()
    master_consumer = ck.Consumer(conf)
    parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions

    total_offset = [0]
    parts = []
    for part_no in range(len(parts_metadata)):
        low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
        index = part_no
        while high > low:
            curr_high = min(low + batch_size, high)
            if interleave:
                parts.insert(index, (part_no, low, curr_high))
                index += (part_no+1)
            else:
                parts.append((part_no, low, curr_high))
            low = curr_high
            
    if shuffle:
        random.shuffle(parts)
    print('Partition set count:', len(parts))
    
    with ThreadPoolExecutor(max_workers=len(parts_metadata)) as e:
        consumer_factory = ConsumerFactory(topic, consume_batch_size, conf, use_consumer_caching)
        
        futures = {e.submit(consumer_factory.read_batch, p):p for p in parts}
 
        for future in concurrent.futures.as_completed(futures):
            part = futures[future]
            try:
                data = future.result()
            except Exception as exc:
                print('part: %r generated an exception: %s' % (part, exc))
            else:
                print('part: %r, consumed offsets: %s' % (part, data))
            
    took = time() - start
    count = sum(x[2]-x[1] for x in parts)
    rate = (count / 1024) / took
    print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')

if __name__ == '__main__':
    main()

We can enable consumer caching by setting use_consumer_caching to True.

Results:
Without caching: read: 22806000 msgs 23353344000 b in: 59.54931640625 s rate: 374.00067236812976 mb/s

With caching: read: 22806000 msgs 23353344000 b in: 61.087982416152954 s rate: 364.5804541927538 mb/s

There seems to be no performance improvement with caching consumers.

I tried to run this on dask cluster but it throws the following error:
TypeError: can't pickle _thread._local objects

Looks like I need to implement a generator for batches?.

Also, would it make more sense to have a worker level caching where a consumer is created for each partition. But, how do we block a thread from using a consumer if it being used by another thread(Here blocking a thread is desirable as we cannot have two threads reading from same partition in parallel)?.

@skmatti
Copy link
Contributor

skmatti commented May 7, 2019

So, I used get_worker method to cache consumers at worker level with one consumer per partition as below:

from time import time
import confluent_kafka as ck
from dask.distributed import Client, LocalCluster, get_worker, as_completed
import random


def main():
    topic = 'topic-1'
    batch_size = 100000
    consume_batch_size = 100000
    use_consumer_caching = True
    
    #define jobs order - order of creation, interleave partitions or random order
    interleave = True
    shuffle = False
    if interleave:
        shuffle = False

    conf = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-group'
    }

    cluster = LocalCluster(ip="0.0.0.0", scheduler_port = 8786, dashboard_address = "0.0.0.0:8787", 
                           processes=True, threads_per_worker = 20, n_workers=1)
    client = Client('0.0.0.0:8786')

    start = time()
    master_consumer = ck.Consumer(conf)
    parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions

    def r(v):
        part_no, low, high = v
        
        c = None
        if use_consumer_caching:
            worker_state = get_worker()
            if not hasattr(worker_state, 'consumers'):
                worker_state.consumers = {}
                
            consumers = worker_state.consumers
            if part_no not in consumers:
                c = ck.Consumer(conf)
                tp = ck.TopicPartition(topic, part_no, low)
                c.assign([tp])
                consumers[part_no] = c
            else:
                c = consumers[part_no]
        else:
            c = ck.Consumer(conf)
            tp = ck.TopicPartition(topic, part_no, low)
            c.assign([tp])
            
        start_offset = low
        end_offset = low
        
        while high > low:
            curr_high = min(low + consume_batch_size, high)
            for m in c.consume(curr_high-low):
                end_offset = max(end_offset, m.offset())
                m.value()
            low = curr_high
        
        return "partition:{}, {}-{}".format(part_no, start_offset, end_offset)

    print('Creating partition sets')
    parts = []
    for part_no in range(len(parts_metadata)):
        low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
        index = part_no
        while high > low:
            curr_high = min(low + batch_size, high)
            if interleave:
                parts.insert(index, (part_no, low, curr_high))
                index += (part_no+1)
            else:
                parts.append((part_no, low, curr_high))
            low = curr_high
    print('Partition set count:', len(parts))
    
    if shuffle:
        random.shuffle(parts)
        
    futures = client.map(r, parts)

    for future, result in as_completed(futures, with_results=True):
        print(result)

    took = time() - start
    count = sum(x[2]-x[1] for x in parts)
    rate = (count / 1024) / took
    print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')
    client.close()
    cluster.close()

if __name__ == '__main__':
    main()

Results:
Without caching: read: 22806000 msgs 23353344000 b in: 240.3 s rate: 92.67824051874699 mb/s

With caching: read: 22806000 msgs 23353344000 b in: 82.45 s rate: 270.09103702223587 mb/s

I see 3x performance improvement with caching enabled.

@martindurant
Copy link
Member Author

A couple of questions:

  • Am I right in thinking, that, in this model, each worker will get a set of consumers, one for each partition that has ever been executed within that worker? Is there then a chance of too many consumers?
  • If a partition is first assigned to one worker and then to another, does kafka send data to both?
  • If a partition is first assigned to one worker and then to another and then back again, it seems to me that the low/high of the consumer, and its topicpartition assignment will not have been updated.

To be sure, the workflow that we are after is as follows:

  • the main process polls kafka periodically and finds out the current state of the given topic's partitions
  • it creates tasks to load sets of the corresponding events, possibly subject to a minimum event-count per batch
  • each of these tasks is given to Dask, which will execute it somewhere in the cluster, giving back a future. Although its possible to prefer or require which worker each goes to, we wouldn't normally do that, keeping in mind that workers can come and go.

Nevertheless, I think there's enough in your example to show that my PR is not useful. We can maintain conversation here for now, maybe.

@skmatti
Copy link
Contributor

skmatti commented May 8, 2019

About too many consumers

Assuming we are reading messages from only one topic, there will be as many consumers as number of partitions in that topic. It's always desirable that you have as many threads as partitions for maximum performance with Kafka. So, all of the consumers will be used by threads in parallel assuming we have enough threads. Hence we won't be having consumers sitting idle in worker.

Multiple workers reading from same partition

You mean Kafka send same data to both workers?. May be. If it's a question about receiving data at all, then both of those workers may read from different replicas of the partition so each of them receive data. But I think this will be resolved by the changes I made in the next step.

About reading messages in specific range of offsets

This seems to a problem with current implementation as consumer corresponding to same partition across workers does not seek to current position. So I made following changes to the above script to seek/assign to latest offset. There seems to no drop in performance or negligible if at any (270 mbps vs 255 mbps).

 c = None
        if use_consumer_caching:
            worker_state = get_worker()
            prefix = ""
            if not hasattr(worker_state, 'consumers'):
                worker_state.consumers = {}
                
            consumers = worker_state.consumers
            tp = ck.TopicPartition(topic, part_no, low)
            if tp not in consumers:
                c = ck.Consumer(conf)
                c.assign([tp])
                consumers[tp] = c
            else:
                c = consumers[tp]
                c.seek(tp)
        else:
            c = ck.Consumer(conf)
            tp = ck.TopicPartition(topic, part_no, low)
            c.assign([tp])

Instead of seek in the above script I can use c.assign([tp]) for similar performance.

We can follow same workflow as before, as partition assignment is sorted out with above changes. I still have to figure out a way to access get_worker for DaskStream users. Currently I am not sure of a clean way to do that.

@skmatti
Copy link
Contributor

skmatti commented May 9, 2019

Raised a github issue about Kafka Consumer performance inside a process- confluentinc/confluent-kafka-python#597

@martindurant
Copy link
Member Author

The following situation worries me:
A dask cluster has two workers (maybe on same machine or even two threads of a process). There is a single kafka topic with two partitions.
The streamz batched kafka node starts up, and sees that each partition has data waiting, so it creates two tasks, one for each partition, and submits them to the cluster. Worker 1 gets a task for partition 1, and Worker 2 gets a task for partition 2. They do their work, and cache a Consumer each.

In the next poll cycle, the streamz node sees there is data available again, and again makes two tasks. This time they get assigned in the reverse order. Now, after running the tasks, both workers have two consumers, and, conversely, each partition has been subscribed to by two consumers of the same kafka group. While Worker 2 was preparing to read events from partition 1, did any of those events get sent to the waiting thread of the lingering consumer on Worker 1? Is it a problem that, eventually, we will have Nworkers * Npartitions of consumers? Do each of these consumers have threads running?

@skmatti
Copy link
Contributor

skmatti commented May 16, 2019

I think two consumers from different workers subscribing to same partition isn't an issue as I am resetting the offset for every batch. This makes sure that the consumer reads messages in the specified offset range.

But there are some issues with above implementation. Consider a case where there are more threads in a worker than the number of partitions. There will be two threads(thread 1 and thread 2) using same consumer to read from a partition(partition 1). Before thread1 finishes to read all the messages, thread2 may use the consumer corresponding to partition1 and reset offset of the consumer to start of the current offset range.

This can be solved by having an additional dictionary for maintaining current status of the consumers.

class ConsumerFactory:
    def __init__(self, topic, batch_size, consumer_conf, cache):
        self.topic = topic
        self.consume_batch_size = batch_size
        self.conf = consumer_conf
        self.cache = cache
        
    def read_batch(self, v):
        part_no, low, high = v
        
        c = None
        used_caching = True
        tp = ck.TopicPartition(self.topic, part_no, low)
        if self.cache:
            worker_state = get_worker()
            if not hasattr(worker_state, "consumers"):
                worker_state.consumers = {}
                worker_state.status = {}
            consumers = worker_state.consumers
            started = worker_state.status
            if tp not in consumers:
                c = ck.Consumer(self.conf)
                c.assign([tp])
                consumers[tp] = c
                started[tp] = True
                is_new = True
            else:
                if started[tp]:
                    used_caching = False
                    c = ck.Consumer(self.conf)
                else:
                    started[tp] = True
                    c = consumers[tp]
                c.assign([tp])
        else:
            c = ck.Consumer(self.conf)
            c.assign([tp])
        
        start_offset = low
        end_offset = low
        
        while high > low:
            curr_high = min(low + self.consume_batch_size, high)
            for m in c.consume(curr_high-low):
                end_offset = max(end_offset, m.offset())
                m.value()
            low = curr_high
        
        if self.cache:
            if used_caching:
                started[tp] = False
            else:
                c.close()
        
        return "{}-{}".format(start_offset, end_offset)

I have tested this with multiple configurations of workers and threads. And this seems to work just fine with 3x performance improvement over non-caching implementation.

I am still not completely sure if this method is thread safe. Happy to hear if there is a better way. Also, there is Nworkers*Npartitions issue with this method.

@martindurant
Copy link
Member Author

I wonder if this whole conversation should be postponed until a resolution on the ck-kafka issue - ideally, we should work out an ideal batched event processing process between us. For example, I simply don't know what the implications of leaving many consumers lying around might be.

On this specific latest implementation, I daresay that you can accomplish this with locks; it seems to me that assigning into dicts on the worker is fragile to race conditions (e.g., a consumer is assigned between if tp not in consumers and assigning the new consumer, causing clobber).

@martindurant
Copy link
Member Author

aiokafka seems like an excellent alternative I was not aware of before, providing all the methods we would really want. How it's raw read performance is versus confluent, I don't know, but may be worth someone's time to look into it.

@martindurant martindurant deleted the consumer_factory branch December 10, 2020 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants