Skip to content

Commit

Permalink
use kaka-python to connect kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
jadbin committed Nov 25, 2016
1 parent ba22bee commit 5d119c4
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 105 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
*~
.DS_Store
*.pyc
.cache
*egg-info
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ Requirements
- `aiohttp`_
- `pyyaml`_
- `pymongo`_
- `pykafka`_
- `kafka-python`_
- `lxml`_

.. _MangoDB: https://www.mongodb.com/
.. _Kafka: http://kafka.apache.org/
.. _aiohttp: https://pypi.python.org/pypi/aiohttp
.. _pyyaml: https://pypi.python.org/pypi/pyyaml
.. _pymongo: https://pypi.python.org/pypi/pymongo
.. _pykafka: https://pypi.python.org/pypi/pykafka
.. _kafka-python: https://github.com/dpkp/kafka-python
.. _lxml: https://pypi.python.org/pypi/lxml


Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
aiohttp
async-timeout
pykafka
kafka-python
pymongo
pyyaml
lxml
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def main():
raise RuntimeError("Python 3.5+ is required")
install_requires = [
"aiohttp",
"pykafka",
"kafka-python",
"pymongo",
"pyyaml",
"lxml",
Expand Down
70 changes: 23 additions & 47 deletions tests/test_unikafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,63 +12,39 @@
from .helpers import wait_server_start


class TopicClient:
class TopicDict:
class Topic:
class Consumer:
class Message:
def __init__(self, value):
self.value = value
class Consumer:
class Message:
def __init__(self, value):
self.value = value

def __init__(self, topic, *args, **kw):
self._topic = topic
def __init__(self, topic, *args, **kw):
self._topic = topic

def consume(self, block=True):
if self._topic == b"None":
return None
if self._topic:
return self.Message(self._topic)
def consume(self):
if self._topic == "None":
return None
if self._topic:
return self.Message(self._topic.encode("utf-8"))

def stop(self):
pass
def __iter__(self):
return self

class Producer:
def __init__(self, topic, *args, **kw):
self._topic = topic
def __next__(self):
return self.consume()

def produce(self, msg):
assert msg == self._topic

def stop(self):
pass

def __init__(self, topic):
self._topic = topic

def get_balanced_consumer(self, *args, **kw):
return self.Consumer(self._topic, *args, **kw)

def get_producer(self, *args, **kw):
return self.Producer(self._topic, *args, **kw)

def __getitem__(self, topic):
assert isinstance(topic, bytes)
return self.Topic(topic)

class Producer:
def __init__(self, *args, **kw):
self._topic_dict = self.TopicDict()

@property
def topics(self):
return self._topic_dict

def update_cluster(self):
pass

def produce(self, topic, msg):
assert topic == msg


@pytest.fixture(scope="function")
def topic_consumer(request, monkeypatch):
monkeypatch.setattr(unikafka, "KafkaClient", TopicClient)
def kafka_env(request, monkeypatch):
monkeypatch.setattr(unikafka, "KafkaConsumer", Consumer)
monkeypatch.setattr(unikafka, "KafkaProducer", Producer)
request.addfinalizer(lambda: monkeypatch.undo())

def handle_error(loop, context):
Expand Down Expand Up @@ -99,7 +75,7 @@ def stop_loop():
return server


def test_subscribe_and_poll(topic_consumer):
def test_subscribe_and_poll(kafka_env):
async def _test():
assert await client.poll() == (None, None)
await client.subscribe(["None"])
Expand Down
26 changes: 3 additions & 23 deletions xpaw/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import threading

import yaml
from pykafka import KafkaClient
from kafka import KafkaProducer
from pymongo import MongoClient
from bson.objectid import ObjectId

Expand Down Expand Up @@ -170,7 +170,6 @@ def _handle_task_gc(self, data):
if task_set is not None:
log.debug("Task GC, keep the following tasks: {0}".format(task_set))
self._task_config.gc(task_set)
self._producer.gc(task_set)

async def send_heartbeat(self, data):
return await self._master_rpc_client.handle_heartbeat(self._pid, data)
Expand Down Expand Up @@ -287,35 +286,16 @@ def _unzip_task(self, task_id, code_dir):

class RequestProducer:
def __init__(self, kafka_addr):
self._set = set()
self._producers = {}
self._kafka_client = KafkaClient(hosts=kafka_addr)
self._lock = threading.Lock()
self._producer = KafkaProducer(bootstrap_servers=kafka_addr)

@classmethod
def from_config(cls, config):
return cls(config.get("kafka_addr"))

def gc(self, task_set):
with self._lock:
del_task = []
for t in self._set:
if t not in task_set:
del_task.append(t)
for t in del_task:
self._set.remove(t)
self._producers[t].stop()
del self._producers[t]

def push_request(self, topic, req):
log.debug("Push request (url={0}) into the topic '{1}'".format(req.url, topic))
r = pickle.dumps(req)
with self._lock:
if topic not in self._set:
self._set.add(topic)
self._kafka_client.update_cluster()
self._producers[topic] = self._kafka_client.topics[topic.encode("utf-8")].get_producer()
self._producers[topic].produce(r)
self._producer.send(topic, r)


class HeartbeatSender:
Expand Down
46 changes: 16 additions & 30 deletions xpaw/unikafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,21 @@

import aiohttp
from aiohttp import web
from pykafka import KafkaClient
from pykafka.exceptions import ConsumerStoppedException
from kafka import KafkaConsumer, KafkaProducer

log = logging.getLogger(__name__)

TOPIC_HEADER = "x-unikafka-topic"


class Unikafka:
def __init__(self, server_listen, kafka_addr, zookeeper_addr, *, group=__name__,
queue_size=100, sleep_time=1,
loop=None):
def __init__(self, server_listen, kafka_addr, *,
group=__name__, queue_size=100, sleep_time=1, loop=None):
self._server_listen = server_listen
self._kafka_addr = kafka_addr
self._zookeeper_addr = zookeeper_addr
self._group = group
self._queue_size = queue_size
self._sleep_time = sleep_time
self._kafka_client = KafkaClient(hosts=self._kafka_addr,
broker_version="0.9.2")
self._mq, self._consumers = {}, {}
self._topics = []
self._index = 0
Expand All @@ -47,7 +42,6 @@ def from_config(cls, config):
kw["loop"] = config["unikafka_loop"]
return cls(config.get("unikafka_listen"),
config.get("kafka_addr"),
config.get("zookeeper_addr"),
**kw)

def start(self):
Expand Down Expand Up @@ -133,13 +127,9 @@ async def _poll_forever(self):
if n < m:
msg = None
try:
msg = self._consumers[t].consume(block=True)
except ConsumerStoppedException:
log.warn("Consumer of topic '{0}' stopped".format(t))
self._remove_consumer(t)
self._create_consumer(t)
log.info("Reset consumer of topic '{0}'".format(t))
msg = self._consumers[t].consume(block=True)
msg = next(self._consumers[t])
except StopIteration:
pass
finally:
if msg:
q.append(msg.value)
Expand All @@ -151,35 +141,31 @@ async def _poll_forever(self):
finally:
self._topic_lock.release()
if no_work:
# sleep when there is no work
await asyncio.sleep(self._sleep_time, loop=self._loop)

def _create_consumer(self, topic):
q = deque(maxlen=self._queue_size)
self._mq[topic] = q
self._kafka_client.update_cluster()
self._consumers[topic] = self._kafka_client.topics[topic.encode("utf-8")].get_balanced_consumer(
consumer_group=self._group.encode("utf-8"),
auto_commit_enable=True,
zookeeper_connect=self._zookeeper_addr,
consumer_timeout_ms=10)
self._consumers[topic] = KafkaConsumer(topic,
group_id=self._group,
bootstrap_servers=self._kafka_addr,
consumer_timeout_ms=10,
auto_offset_reset="earliest")

def _remove_consumer(self, topic):
q = self._mq[topic]
if len(q) > 0:
producer = None
try:
self._kafka_client.update_cluster()
producer = self._kafka_client.topics[topic.encode("utf-8")].get_producer(linger_ms=100)
producer = KafkaProducer(bootstrap_servers=self._kafka_addr)
while len(q) > 0:
b = q.popleft()
producer.produce(b)
producer.send(topic, b)
producer.flush()
del producer
except Exception:
log.warning("Unexpected error occurred when save the cache of topic '{0}'".format(topic), exc_info=True)
finally:
if producer:
producer.stop()
del self._mq[topic]
self._consumers[topic].stop()
del self._consumers[topic]


Expand Down
2 changes: 1 addition & 1 deletion xpaw/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# coding=utf-8

__version__ = "0.5.5a8"
__version__ = "0.5.5a9"

0 comments on commit 5d119c4

Please sign in to comment.