In [None]:
from kafka import KafkaConsumer
import json
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime

# Storing data for plotting
dates = []
high_prices = []
low_prices = []
close_prices = []
volumes = []

def process_transaction(transaction):
    transaction_str = transaction.decode('utf-8')
    transaction_json = json.loads(transaction_str)
    
    # Extracting data
    date = datetime.strptime(transaction_json['date'], '%Y-%m-%dT%H:%M:%S')  # Adjust format as necessary
    dates.append(date)
    high_prices.append(transaction_json['high'])
    low_prices.append(transaction_json['low'])
    close_prices.append(transaction_json['close'])
    volumes.append(transaction_json['volume'])

    # print(f"Processed transaction: {transaction_json}")

def plot_data(n):
    plt.figure(figsize=(14, 10))

    # Setting up date formatter for x-axis
    locator = mdates.AutoDateLocator()
    formatter = mdates.ConciseDateFormatter(locator)
    
    # Plotting high and low prices
    ax1 = plt.subplot(2, 2, 1)
    ax1.xaxis.set_major_locator(locator)
    ax1.xaxis.set_major_formatter(formatter)
    ax1.plot(dates, high_prices, label='High Prices', color='green')
    ax1.plot(dates, low_prices, label='Low Prices', color='red')
    ax1.set_title('Daily High and Low Prices')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Price')
    ax1.legend()

    # Plotting closing prices
    ax2 = plt.subplot(2, 2, 2)
    ax2.xaxis.set_major_locator(locator)
    ax2.xaxis.set_major_formatter(formatter)
    ax2.plot(dates, close_prices, color='blue', label='Close Prices')
    ax2.set_title('Daily Closing Prices')
    ax2.set_xlabel('Date')
    ax2.set_ylabel('Price')
    ax2.legend()

    # Plotting volume
    ax3 = plt.subplot(2, 1, 2)
    ax3.xaxis.set_major_locator(locator)
    ax3.xaxis.set_major_formatter(formatter)
    ax3.bar(dates, volumes, color='purple')
    ax3.set_title('Daily Trading Volume')
    ax3.set_xlabel('Date')
    ax3.set_ylabel('Volume')
    title = f'S&P500 Overview  -  after {n} obs'
    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()

    print("""
    
    --------------------------------------------------------------------------------------------------------
    
    """)

def consume_transactions():
    consumer = KafkaConsumer('transactions-topic', bootstrap_servers='broker:9092')
    i=0
    for message in consumer:
        process_transaction(message.value)
        # print(message.value)
        i+=1
        if i == 5000:
            break
        if i%100 == 0:
            plot_data(i)

if __name__ == '__main__':
    consume_transactions()