In [1]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import threading
import multiprocessing
import time
from IPython.display import clear_output
from message_service import MessageProcessor
from movies_pb2 import ConsumedMessage, PublishedMessage

import setup

In [5]:
class PerformanceMessageProcessor(MessageProcessor):
	def __init__(self, topics, lock):
		super().__init__(
			'performance_message_processor', 
			lambda t: PublishedMessage() if t == 'published-messages' else ConsumedMessage(), 
			topics,
			publish_metrics=False
		)
		self.new_data = []
		self.lock = lock # mutex for self.new_data

	def on_message(self, message, topic, kafka_message):
		type = 'published' if topic == 'published-messages' else 'consumed'
		client = message.publisher if topic == 'published-messages' else message.consumer

		data = pd.Series({
			'type': type, 
			'client': client, 
			'timestamp': pd.to_datetime(message.timestamp, unit='s', utc=True), 
			'hostname': message.hostname
		})

		self.lock.acquire()
		self.new_data.append(data)
		self.lock.release()

lock = multiprocessing.Lock()
# processor = PerformanceMessageProcessor(['published-messages', 'consumed-messages'], lock)
processor = PerformanceMessageProcessor(['consumed-messages'], lock)
messaging_thread = threading.Thread(target=processor.run)
messaging_thread.start()

freq = '30s'
data = None

main_thread = threading.main_thread()
while main_thread.is_alive():
	time.sleep(10)

	lock.acquire()
	new_data = processor.new_data.copy()
	processor.new_data = []
	lock.release()

	if len(new_data) == 0:
		print('no new data')
		continue

	clear_output(wait=True)
	plt.figure(figsize = (10, 5))

	new_data = pd.DataFrame(new_data)
	new_data = new_data.set_index('timestamp')
	new_data = new_data.groupby([pd.Grouper(freq=freq), 'type']).size()
	new_data = new_data.rename('n')
	new_data = new_data.reset_index()

	new_data = new_data if data is None else pd.concat([data, new_data])
	new_data = new_data.set_index('timestamp').groupby([pd.Grouper(freq=freq), 'type'])['n'].sum().reset_index().set_index('timestamp')

	data_list = []
	for type in new_data['type'].unique():
		per_type = new_data[new_data['type'] == type].asfreq(freq)
		per_type['type'] = type
		data_list.append(per_type)

	data = pd.concat(data_list).fillna(0).reset_index()

	sns.lineplot(data=data, x='timestamp', y='n', hue='type')
	plt.title('Verarbeitete Messages')
	plt.xlabel('Zeit')
	plt.ylabel('Anzahl')
	plt.show()

	print(data)


print('stopping messaging thread')
messaging_thread.stop()
messaging_thread.join()


no new data


KeyboardInterrupt: 