In [1]:
!pip install kafka-python
!pip install snakeviz
#!pip install memray # not for notebooks?
!pip install memory_profiler



In [2]:
from kafka import KafkaConsumer, KafkaProducer
import json
import uuid
import pandas as pd
import csv
import time
import cProfile
import timeit

%load_ext snakeviz
%load_ext memory_profiler

# Data Producer 2

In [3]:
def connect_kafka_producer(servers):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print(f'Message published successfully to topic: {topic_name}.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

def produce_xy(producer, topic_name, sleep_hz):
    with open('gen2_Heizungsdaten.csv') as f:
        next(f)
        for i, line in enumerate(f):
            print(f'index {i}')
            message = json.dumps({'data': str(line)})
            print(message)
            publish_message(producer, topic_name, str(uuid.uuid4()), message)
            time.sleep(sleep_hz)
            

In [4]:
#server1 = 'broker1:9093'
#server2 = 'broker2:9095'
#server3 = 'broker3:9097'
#topic = "data_gen2"

#producer2 = connect_kafka_producer(server2)

#hz = 2
#produce_xy(producer2, topic, hz)

## Data Producer 2 Zeit Messungen

In [5]:
def connect_kafka_producer(servers):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        #print(f'Message published successfully to topic: {topic_name}.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

def produce_xy(producer, topic_name, sleep_hz, stop_by=5):
    with open('gen2_Heizungsdaten.csv') as f:
        next(f)
        for i, line in enumerate(f):
            #print(f'index {i}')
            message = json.dumps({'data': str(line)})
            #print(message)
            publish_message(producer, topic_name, str(uuid.uuid4()), message)
            time.sleep(sleep_hz)            
            
            # stop while loop for time measurement
            if i > stop_by:
                break
            

**A. Laufzeit Messungen mit Timeit**

In [6]:
server1 = 'broker1:9093'
server2 = 'broker2:9095'
server3 = 'broker3:9097'
topic = "data_gen2"

#%snakeviz producer2 = connect_kafka_producer(server2)
producer2 = connect_kafka_producer(server2)

hz = 0
%timeit produce_xy(producer2, topic, hz, stop_by=20)

25.6 ms ± 1.93 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


**B. Messungen mit SnakeViz**  
Analyse Verbindungsaufbau Kafka

In [7]:
server1 = 'broker1:9093'
server2 = 'broker2:9095'
server3 = 'broker3:9097'
topic = "data_gen2"

#%snakeviz producer2 = connect_kafka_producer(server2)
producer2 = connect_kafka_producer(server2)

hz = 0
produce_xy(producer2, topic, hz)

Analyse Producer2 (data_generator)  

In [8]:
server1 = 'broker1:9093'
server2 = 'broker2:9095'
server3 = 'broker3:9097'
topic = "data_gen2"

producer2 = connect_kafka_producer(server2)

hz = 0
%prun produce_xy(producer2, topic, hz, stop_by=100)

 

         16385 function calls in 0.146 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      412    0.090    0.000    0.090    0.000 {method 'acquire' of '_thread.lock' objects}
      206    0.024    0.000    0.024    0.000 {method 'sendall' of '_socket.socket' objects}
      102    0.002    0.000    0.002    0.000 default.py:36(murmur2)
      102    0.002    0.000    0.027    0.000 kafka.py:538(send)
        1    0.002    0.002    0.146    0.146 3180120891.py:22(produce_xy)
        1    0.001    0.001    0.001    0.001 {built-in method io.open}
      102    0.001    0.000    0.010    0.000 record_accumulator.py:200(append)
      102    0.001    0.000    0.001    0.000 encoder.py:204(iterencode)
      102    0.001    0.000    0.088    0.001 record_accumulator.py:520(await_flush_completion)
      102    0.001    0.000    0.002    0.000 legacy_records.py:391(_encode_msg)
      103    0.001    0.000    0.091    0.001 threading

In [9]:
server1 = 'broker1:9093'
server2 = 'broker2:9095'
server3 = 'broker3:9097'
topic = "data_gen2"

producer2 = connect_kafka_producer(server2)

hz = 0
%snakeviz produce_xy(producer2, topic, hz, stop_by=100)

 
*** Profile stats marshalled to file '/tmp/tmp8c93py5w'.
Embedding SnakeViz in this document...


Analyse Memory Nutzung ([memory_profiler](https://ipython-books.github.io/44-profiling-the-memory-usage-of-your-code-with-memory_profiler/))

In [10]:
# function memory measurement,function uses ca. 1
#%memit?
%memit  produce_xy(producer2, topic, hz, stop_by=10)

peak memory: 112.49 MiB, increment: -7.91 MiB


In [11]:
%%file mprun_memory_producer2.py 
from memory_profiler import profile
from kafka import KafkaConsumer, KafkaProducer
import json
import uuid
import time

@profile
def connect_kafka_producer(servers):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer
    
@profile
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        #print(f'Message published successfully to topic: {topic_name}.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

@profile
def produce_xy(producer, topic_name, sleep_hz, stop_by=5):
    with open('gen2_Heizungsdaten.csv') as f:
        next(f)
        for i, line in enumerate(f):
            #print(f'index {i}')
            message = json.dumps({'data': str(line)})
            #print(message)
            publish_message(producer, topic_name, str(uuid.uuid4()), message)
            time.sleep(sleep_hz)            
            
            # stop while loop for time measurement
            if i > stop_by:
                break

Overwriting mprun_memory_producer2.py


In [12]:
from mprun_memory_producer2 import *
from kafka import KafkaConsumer, KafkaProducer
import json

server1 = 'broker1:9093'
server2 = 'broker2:9095'
server3 = 'broker3:9097'
topic = "data_gen2"

producer2 = connect_kafka_producer(server2)

hz = 0
%mprun -f produce_xy produce_xy(producer2, topic, hz, stop_by=0)

Filename: /home/jovyan/data/mprun_memory_producer2.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     7    112.9 MiB    112.9 MiB           1   @profile
     8                                         def connect_kafka_producer(servers):
     9    112.9 MiB      0.0 MiB           1       _producer = None
    10    112.9 MiB      0.0 MiB           1       try:
    11    113.1 MiB      0.1 MiB           1           _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
    12                                             except Exception as ex:
    13                                                 print('Exception while connecting Kafka')
    14                                                 print(str(ex))
    15                                             finally:
    16    113.1 MiB      0.0 MiB           1           return _producer


Filename: /home/jovyan/data/mprun_memory_producer2.py

Line #    Mem usage    Increment  Occurrences   Line Conten

Filename: /opt/conda/lib/python3.10/site-packages/memory_profiler.py

Line #    Mem usage    Increment  Occurrences   Line Contents
  1183    113.1 MiB    113.1 MiB           3               @wraps(wrapped=func)
  1184                                                     def wrapper(*args, **kwargs):
  1185    113.1 MiB      0.0 MiB           3                   prof = get_prof()
  1186    113.1 MiB      0.0 MiB           3                   val = prof(func)(*args, **kwargs)
  1187    113.1 MiB      0.0 MiB           3                   show_results_bound(prof)
  1188    113.1 MiB      0.0 MiB           3                   return val