Skip to content
Kafka-based Job Queue for Python
Python
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
docs
example
kq
tests Revamp the library for 2.0.0 Jun 4, 2018
.gitignore
.travis.yml
LICENSE
MANIFEST.in
README.rst
setup.cfg
setup.py

README.rst

KQ: Kafka-based Job Queue for Python

Build Status Documentation Status Package Version Python Versions Test Coverage Issues Open MIT License

KQ (Kafka Queue) is a lightweight Python library which lets you queue and execute jobs asynchronously using Apache Kafka. It uses kafka-python under the hood.

Announcements

  • Please see the releases page for latest updates.

Requirements

Installation

To install a stable version from PyPI (recommended):

~$ pip install kq

To install the latest version directly from GitHub:

~$ pip install -e git+git@github.com:joowani/kq.git@master#egg=kq

You may need to use sudo depending on your environment.

Getting Started

First, ensure that your Kafka instance is up and running:

~$ ./kafka-server-start.sh -daemon server.properties

Define your KQ worker module:

# my_worker.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)
worker.start()

Start your worker:

~$ python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...

Enqueue a function call:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, 'https://www.google.com')

Sit back and watch the worker process it in the background:

~$ python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get('https://www.google.com')
[INFO] Job c7bf2359 returned: <Response [200]>

NEW in 2.0.0: You can now specify the job timeout, message key and partition:

job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://www.google.com')

Check out the documentation for more information.

Contributing

Please have a look at this page before submitting a pull request. Thanks!

Credits

This project was inspired by RQ.

You can’t perform that action at this time.