Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
80efa35
added to_kafka stream
jsmaupin Jan 9, 2019
b635167
remove call to _global_sinks.add(self) as it is wrong and not needed
jsmaupin Jan 9, 2019
893300f
Locally import Confluent Kafka library
jsmaupin Jan 10, 2019
3b546f3
need to wait for subscription to happen before producing messages. se…
jsmaupin Jan 14, 2019
2802078
from_kafka_batched does not return the Kafka stream directly. The sto…
jsmaupin Jan 14, 2019
ae7a6f2
Removed atexit use. Replaced it with a provided .flush() method. Adde…
jsmaupin Jan 14, 2019
3a808ae
move blocking subscribe to another function
jsmaupin Jan 14, 2019
a1f0b09
fixed documentation
jsmaupin Jan 14, 2019
a1104b6
remove unusued parameters
jsmaupin Jan 14, 2019
c6aea02
add sleep to wait for the subscribe() operation to complete
jsmaupin Jan 14, 2019
fd7ed40
add future to to_kafka update() to allow client ot feel backpressure
jsmaupin Jan 28, 2019
7e0efc1
Rely on Confluent lib's limited queue size for back-pressure
jsmaupin Feb 14, 2019
bf68b79
added to_kafka_batched(). Docstrings and tests still incoming.
jsmaupin Feb 14, 2019
d206a69
apply patch to fix flake8 errors
jsmaupin Mar 13, 2019
04dbf77
Merge branch 'master' into master
jsmaupin Mar 13, 2019
1ae904d
some E811 noqa's shoudl be F811
jsmaupin Mar 13, 2019
62d523b
Fix test to be Py2 compatible. Use tornado gen/yield pattern.
jsmaupin Mar 13, 2019
02ed90e
More attempts at kafka security
Mar 16, 2019
6b3cbcb
flake
Mar 17, 2019
fc26b5d
set future in callback
Mar 17, 2019
38f1255
Hold on to futures, and only set upon delivery
Mar 17, 2019
c7d133c
remote to_kafka_batched and flake
Mar 17, 2019
5213c46
use polltime
Mar 17, 2019
b7d7c5b
Suggested changes
Mar 19, 2019
cd94560
Close kafka consumer when source cleaned up
Mar 19, 2019
a60238f
flake again
Mar 19, 2019
7fe9651
Revert wekref finalize of from_kafka
Mar 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,88 @@ def cb(self):
yield self._emit(x)


@Stream.register_api()
class to_kafka(Stream):
""" Writes data in the stream to Kafka

This stream accepts a string or bytes object. Call ``flush`` to ensure all
messages are pushed. Responses from Kafka are pushed downstream.

Parameters
----------
topic : string
The topic which to write
producer_config : dict
Settings to set up the stream, see
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Examples:
bootstrap.servers: Connection string (host:port) to Kafka

Examples
--------
>>> from streamz import Stream
>>> ARGS = {'bootstrap.servers': 'localhost:9092'}
>>> source = Stream()
>>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
<to_kafka>
>>> for i in range(10):
... source.emit(i)
>>> kafka.flush()
"""
def __init__(self, upstream, topic, producer_config, **kwargs):
import confluent_kafka as ck

self.topic = topic
self.producer = ck.Producer(producer_config)

Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
self.stopped = False
self.polltime = 0.2
self.loop.add_callback(self.poll)
self.futures = []

@gen.coroutine
def poll(self):
while not self.stopped:
# executes callbacks for any delivered data, in this thread
# if no messages were sent, nothing happens
self.producer.poll(0)
yield gen.sleep(self.polltime)

def update(self, x, who=None):
future = gen.Future()
self.futures.append(future)

@gen.coroutine
def _():
while True:
try:
# this runs asynchronously, in C-K's thread
self.producer.produce(self.topic, x, callback=self.cb)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify that this doesn't block if the producer is busy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does the callback get run on the event loop's thread, or C-K's thread?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it gets run in C-K's thread then you may have to use self.loop.add_callback, which is the only threadsafe mechanism to create coroutines from other threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have been more verbose here. The doc says that the method call is always async (returns immediately), and I have found no case where it isn't.

The actual marshalling of the message to kafka happens in C-K's own thread, a C-level place. But the python callbacks for delivery are triggered upon poll(), and so that call and the coroutine they refer to must happen in the same event loop - so poll() is also in a coroutine.

return
except BufferError:
yield gen.sleep(self.polltime)
except Exception as e:
future.set_exception(e)
return

self.loop.add_callback(_)
return future

@gen.coroutine
def cb(self, err, msg):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does confluent_kafka run the callback in a separate thread? If so then we can't make this a coroutine.

That might be OK. to_kafka is likely to be used as a sink, and so we don't expect to get a future from downstream that we're supposed to wait on. It might be enough here to just set the result and be done.

If cb is called in the thread where update is called, then it might be enough to just set the result and be done

def cb(self, err, msg):
    if good:
        future.set_result(...)
    else:
        future.set_exception(...)

However, if it's going to be called within a separate thread, then we'll need to use loop.add_callback, which is the only threadsafe method in torando

def cb(...):
    def _():
        future.set_result(...)
        ...
    self.loop.add_callback(_)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does confluent_kafka run the callback in a separate thread?

No, it runs in the thread which calls producer.poll(), which is here being called in a coroutine on the current loop. Only the actual low-level delivery runs in another thread.

it might be enough to just set the result and be done

There is no Sink type, though, right? I mean, it may be reasonable to expect events to be passed down a chain still.

future = self.futures.pop(0)
if msg is not None and msg.value() is not None:
future.set_result(None)
yield self._emit(msg.value())
else:
future.set_exception(err or msg.error())

def flush(self, timeout=-1):
self.producer.flush(timeout)


def sync(loop, func, *args, **kwargs):
"""
Run coroutine in loop running in separate thread.
Expand Down
52 changes: 25 additions & 27 deletions streamz/sources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from glob import glob
import os
import weakref

import time
import tornado.ioloop
Expand Down Expand Up @@ -180,8 +179,7 @@ class from_kafka(Source):
... {'bootstrap.servers': 'localhost:9092',
... 'group.id': 'streamz'}) # doctest: +SKIP
"""
def __init__(self, topics, consumer_params, poll_interval=0.1, start=False,
**kwargs):
def __init__(self, topics, consumer_params, poll_interval=0.1, start=False, **kwargs):
self.cpars = consumer_params
self.consumer = None
self.topics = topics
Expand All @@ -194,7 +192,7 @@ def __init__(self, topics, consumer_params, poll_interval=0.1, start=False,
def do_poll(self):
if self.consumer is not None:
msg = self.consumer.poll(0)
if msg and msg.value():
if msg and msg.value() and msg.error() is None:
return msg.value()

@gen.coroutine
Expand All @@ -207,26 +205,19 @@ def poll_kafka(self):
yield gen.sleep(self.poll_interval)
if self.stopped:
break
self._close_consumer()

def start(self):
import confluent_kafka as ck
import distributed
if self.stopped:
finalize = distributed.compatibility.finalize
self.stopped = False
self.loop.add_callback(self.poll_kafka)
self.consumer = ck.Consumer(self.cpars)
self.consumer.subscribe(self.topics)
tp = ck.TopicPartition(self.topics[0], 0, 0)

def close(ref):
ob = ref()
if ob is not None and ob.consumer is not None:
consumer = ob.consumer
ob.consumer = None
consumer.unsubscribe()
consumer.close() # may raise with latest ck, that's OK

finalize(self, close, weakref.ref(self))
# blocks for consumer thread to come up
self.consumer.get_watermark_offsets(tp)
self.loop.add_callback(self.poll_kafka)

def _close_consumer(self):
if self.consumer is not None:
Expand All @@ -253,36 +244,43 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
@gen.coroutine
def poll_kafka(self):
import confluent_kafka as ck
consumer = ck.Consumer(self.consumer_params)

try:
while not self.stopped:
out = []

for partition in range(self.npartitions):
tp = ck.TopicPartition(self.topic, partition, 0)
try:
low, high = consumer.get_watermark_offsets(tp,
timeout=0.1)
low, high = self.consumer.get_watermark_offsets(
tp, timeout=0.1)
except (RuntimeError, ck.KafkaException):
continue
current_position = self.positions[partition]
lowest = max(current_position, low)
out.append((self.consumer_params, self.topic, partition,
lowest, high - 1))
self.positions[partition] = high
if high > lowest:
out.append((self.consumer_params, self.topic, partition,
lowest, high - 1))
self.positions[partition] = high

for part in out:
yield self._emit(part)

else:
yield gen.sleep(self.poll_interval)
finally:
consumer.close()
self.consumer.unsubscribe()
self.consumer.close()

def start(self):
self.stopped = False
self.loop.add_callback(self.poll_kafka)
import confluent_kafka as ck
if self.stopped:
self.consumer = ck.Consumer(self.consumer_params)
self.stopped = False
tp = ck.TopicPartition(self.topic, 0, 0)

# blocks for consumer thread to come up
self.consumer.get_watermark_offsets(tp)
self.loop.add_callback(self.poll_kafka)


@Stream.register_api(staticmethod)
Expand Down Expand Up @@ -352,7 +350,7 @@ def get_message_batch(kafka_params, topic, partition, low, high, timeout=None):
try:
while True:
msg = consumer.poll(0)
if msg and msg.value():
if msg and msg.value() and msg.error() is None:
if high >= msg.offset():
out.append(msg.value())
if high <= msg.offset():
Expand Down
4 changes: 2 additions & 2 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,8 @@ def dont_test_stream_kwargs(clean): # noqa: F811
sin.emit(1)


@pytest.fixture # noqa: F811
def thread(loop):
@pytest.fixture
def thread(loop): # noqa: F811
from threading import Thread, Event
thread = Thread(target=loop.start)
thread.daemon = True
Expand Down
16 changes: 8 additions & 8 deletions streamz/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def test_zip(c, s, a, b):
assert L == [(1, 'a'), (2, 'b')]


@pytest.mark.slow # noqa: F811
def test_sync(loop):
@pytest.mark.slow
def test_sync(loop): # noqa: F811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as client: # noqa: F841
source = Stream()
Expand All @@ -90,8 +90,8 @@ def f():
assert L == list(map(inc, range(10)))


@pytest.mark.slow # noqa: F811
def test_sync_2(loop):
@pytest.mark.slow
def test_sync_2(loop): # noqa: F811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop): # noqa: F841
source = Stream()
Expand Down Expand Up @@ -131,8 +131,8 @@ def test_buffer(c, s, a, b):
assert source.loop == c.loop


@pytest.mark.slow # noqa: F811
def test_buffer_sync(loop):
@pytest.mark.slow
def test_buffer_sync(loop): # noqa: F811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as c: # noqa: F841
source = Stream()
Expand All @@ -155,9 +155,9 @@ def test_buffer_sync(loop):
assert L == list(map(inc, range(10)))


@pytest.mark.xfail(reason='') # noqa: F811
@pytest.mark.xfail(reason='')
@pytest.mark.slow
def test_stream_shares_client_loop(loop):
def test_stream_shares_client_loop(loop): # noqa: F811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as client: # noqa: F841
source = Stream()
Expand Down
Loading