In [1]:
from kafka_cffi import Producer as cffi_Producer
from confluent_kafka import Producer as confluent_Producer
import time
import platform

conf = {
    "bootstrap.servers": "n1.kafka10.service.consul:9092",
    "queue.buffering.max.messages": "1000000",
    "queue.buffering.max.kbytes": "2097151",
    "acks": "1",
#     "compression.codec": "snappy",
}

cffi_prod = cffi_Producer(conf)
confluent_prod = confluent_Producer(conf)

def callback(err, msg):
    pass

def benchmark(prod, samples, per_batch, msg_length=100, on_delivery=None):
    print(type(prod))
    timings = []
    for s in range(samples):
        t = time.time()
        for i in range(per_batch):
            prod.produce("moo", "0" * msg_length, on_delivery=on_delivery)
        prod.flush(-1)
        timings.append(time.time() - t)
    
    print("%d Samples, %d msgs per batch, %d bytes/msg" % (samples, per_batch, msg_length))
    print("Mean (per batch): %0.3f msgs/sec" % (sum(per_batch / x for x in timings) / samples))
    print("Mean (overall): %0.3f msgs/sec" % (samples * per_batch / sum(timings)))
    print("Fastest batch: %0.3f msgs/sec" % (per_batch / min(timings)))
    print("Slowest batch: %0.3f msgs/sec" % (per_batch / max(timings)))
    
print(platform.python_implementation(), platform.python_version())

('PyPy', '2.7.13')


In [2]:
for i in range(1000):
    # warm cold topic up first for both clients to be fair
    cffi_prod.produce("moo", "hello world")
    confluent_prod.produce("moo", "hello world")
    
cffi_prod.flush()
confluent_prod.flush()

0L

In [3]:
benchmark(cffi_prod, samples=30, per_batch=100000, msg_length=100)
benchmark(confluent_prod, samples=30, per_batch=100000, msg_length=100)

<class 'kafka_cffi.producer.Producer'>
30 Samples, 100000 msgs per batch, 100 bytes/msg
Mean (per batch): 902266.140 msgs/sec
Mean (overall): 806266.819 msgs/sec
Fastest batch: 1190985.061 msgs/sec
Slowest batch: 231103.226 msgs/sec
<type 'cimpl.Producer'>
30 Samples, 100000 msgs per batch, 100 bytes/msg
Mean (per batch): 212746.361 msgs/sec
Mean (overall): 205116.147 msgs/sec
Fastest batch: 299744.942 msgs/sec
Slowest batch: 119480.591 msgs/sec


In [4]:
benchmark(cffi_prod, samples=60, per_batch=20000, msg_length=450)
benchmark(confluent_prod, samples=60, per_batch=20000, msg_length=450)

<class 'kafka_cffi.producer.Producer'>
60 Samples, 20000 msgs per batch, 450 bytes/msg
Mean (per batch): 421290.130 msgs/sec
Mean (overall): 392536.391 msgs/sec
Fastest batch: 591642.840 msgs/sec
Slowest batch: 148628.247 msgs/sec
<type 'cimpl.Producer'>
60 Samples, 20000 msgs per batch, 450 bytes/msg
Mean (per batch): 114376.228 msgs/sec
Mean (overall): 105116.919 msgs/sec
Fastest batch: 220068.314 msgs/sec
Slowest batch: 55570.219 msgs/sec


In [5]:
# performance with a noop callback
benchmark(cffi_prod, samples=60, per_batch=20000, msg_length=450, on_delivery=callback)
benchmark(confluent_prod, samples=60, per_batch=20000, msg_length=450, on_delivery=callback)

<class 'kafka_cffi.producer.Producer'>
60 Samples, 20000 msgs per batch, 450 bytes/msg
Mean (per batch): 317071.627 msgs/sec
Mean (overall): 277635.114 msgs/sec
Fastest batch: 478228.607 msgs/sec
Slowest batch: 62312.450 msgs/sec
<type 'cimpl.Producer'>
60 Samples, 20000 msgs per batch, 450 bytes/msg
Mean (per batch): 66119.658 msgs/sec
Mean (overall): 64036.089 msgs/sec
Fastest batch: 95028.128 msgs/sec
Slowest batch: 40538.679 msgs/sec
