In [1]:
# POJO for Trade Order

from dataclasses import dataclass, asdict, field
from datetime import datetime
import json
import uuid

@dataclass
class TradeOrder:
    stock_name: str
    price: float
    quantity: int
    name: str
    type: str
    order_id : str
    valid_till: str
    timestamp: str

    def __post_init__(self):
        
        if not isinstance(self.stock_name, str) or len(self.stock_name) == 0:
            raise ValueError("Invalid stock name; it must be a non-empty string.")
        
        if not isinstance(self.price, (float, int)) or self.price <= 0:
            raise ValueError("Invalid price; it must be a positive number.")
        
        if not isinstance(self.quantity, int) or self.quantity <= 0:
            raise ValueError("Invalid quantity; it must be a positive integer.")
        
        if not isinstance(self.name, str) or len(self.name) == 0:
            raise ValueError("Invalid name; it must be a non-empty string.")

        # Validate type field as either 'buy' or 'sell'
        if self.type not in ["buy", "sell"]:
            raise ValueError("Invalid type; it must be 'buy' or 'sell'.")

        try:
            valid_till_dt = datetime.strptime(self.valid_till, '%Y-%m-%d %H:%M:%S')
            if valid_till_dt <= datetime.now():
                raise ValueError("Invalid 'valid_till' datetime; it must be in the future.")
        except ValueError:
            raise ValueError("Invalid 'valid_till' format; expected format is 'YYYY-MM-DD HH:MM:SS'.")

        try:
            datetime.strptime(self.timestamp, '%Y-%m-%d %H:%M:%S')
        except ValueError:
            raise ValueError("Invalid 'timestamp' format; expected format is 'YYYY-MM-DD HH:MM:SS'.")
    
    def to_json(self):
        return json.dumps(asdict(self))
    
    @staticmethod
    def from_json(data):
        data = json.loads(data)
        return TradeOrder(**data)



In [2]:
# Array of 25 company stock symbols

symbols = [
    "AAPL",  # Apple
    "MSFT",  # Microsoft
    "GOOGL", # Alphabet (Google)
    "AMZN",  # Amazon
    "TSLA",  # Tesla
    "NVDA",  # NVIDIA
    "META",  # Meta Platforms (formerly Facebook)
    "BRK-B", # Berkshire Hathaway
    "JPM",   # JPMorgan Chase
    "V",     # Visa
    "PG",    # Procter & Gamble
    "UNH",   # UnitedHealth Group
    "JNJ",   # Johnson & Johnson
    "WMT",   # Walmart
    "MA",    # Mastercard
    "DIS",   # Walt Disney
    "PYPL",  # PayPal
    "HD",    # Home Depot
    "BAC",   # Bank of America
    "XOM",   # ExxonMobil
    "CVX",   # Chevron
    "LLY",   # Eli Lilly
    "ABT",   # Abbott Laboratories
    "CRM",   # Salesforce
    "KO",    # Coca-Cola
]

In [3]:
# Array of 100 names

names = ["Alice", "Bob", "Charlie", "David", "Emily", "Frank", "Grace", "Henry", "Isabella", "Jack",
               "Kelly", "Larry", "Michelle", "Nathan", "Olivia", "Patrick", "Quinn", "Rachel", "Samuel", "Thomas",
               "Ursula", "Victor", "Wendy", "Xavier", "Yasmin", "Zachary", "Abigail", "Benjamin", "Chloe", "Daniel",
               "Eleanor", "Finn", "Gabriella", "Harry", "Iris", "James", "Katherine", "Leo", "Maya", "Noah",
               "Olivia", "Paul", "Quinn", "Riley", "Sarah", "Thomas", "Uma", "Vincent", "Willow", "Xavier",
               "Yasmin", "Zachary", "Abigail", "Benjamin", "Chloe", "Daniel", "Eleanor", "Finn", "Gabriella", "Harry",
               "Iris", "James", "Katherine", "Leo", "Maya", "Noah", "Olivia", "Paul", "Quinn", "Riley",
               "Sarah", "Thomas", "Uma", "Vincent", "Willow", "Xavier", "Yasmin", "Zachary"]

In [4]:
# Order Type
order_types = ['buy', 'sell']

In [5]:
# Persists generated topics to csv

import os
import csv


def write_order_to_csv(order, file_path='trade_orders.csv'):
    # Check if the file exists to write header only once
    file_exists = os.path.isfile(file_path)
    
    with open(file_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        
        # Write header if file does not exist
        if not file_exists:
            writer.writerow(['stock_name', 'price', 'quantity', 'name', 'type', 'order_id','valid_till', 'timestamp'])
        
        # Write the order data as a row
        writer.writerow([
            order.stock_name,
            order.price,
            order.quantity,
            order.name,
            order.type,
            order.order_id,
            order.valid_till,
            order.timestamp
            
        ])
    print(f"Trade order saved to CSV: {order}")

In [6]:
# kafka_stock_producer_with_matching.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
import random
from datetime import datetime, timedelta
import time
from kafka import KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic


MATCH_PROBABILITY = 0.7  # 70% probability of matching orders
unmatched_orders = []    # Queue to store unmatched orders

group_id = 'stock_trade_consumer_group'  # Define the group ID here
topic_name = 'orders'

def create_kafka_topic(topic_name, num_partitions, replication_factor=1):
    admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092',api_version=(0,11,5))
    topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
    
    try:
        admin_client.create_topics([topic])
        print(f"Topic '{topic_name}' created with {num_partitions} partitions.")
    except Exception as e:
        print(f"Failed to create topic: {e}")
    finally:
        admin_client.close()

# Example usage
#create_kafka_topic(topic_name_buyer, num_partitions=3)
#create_kafka_topic(topic_name_seller, num_partitions=3)
create_kafka_topic(topic_name, num_partitions=2)


Topic 'orders' created with 2 partitions.


In [7]:
# Define a function for serializing the value
def json_serializer(data):
    # If the data is already a string, just encode it
    if isinstance(data, str):
        return data.encode('utf-8')
    # Otherwise, convert the dictionary to a JSON string and then encode it
    return json.dumps(data).encode('utf-8')

In [8]:
def create_kafka_producer():
    return KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        api_version=(0,11,5),
        value_serializer=lambda v: v.to_json().encode('utf-8'),
        key_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else json.dumps(v).encode('utf-8'),
        retries=5
    )

In [9]:
def generate_random_order():
    stock_name = random.choice(symbols)
    price = round(random.uniform(100, 1500), 2)
    quantity = random.randint(1, 100)
    name = random.choice(names)
    order_type = random.choice(order_types)
    order_id = str(uuid.uuid4())  # Generates a GUID in string format
    valid_till = (datetime.now() + timedelta(days=random.randint(1, 5))).strftime('%Y-%m-%d %H:%M:%S')
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    return TradeOrder(stock_name, price, quantity, name, order_type, order_id, valid_till, timestamp)


In [10]:
def generate_matching_order(order):
    # Create a matching order with the opposite type
    opposite_type = 'sell' if order.type == 'buy' else 'buy'
    return TradeOrder(
        stock_name=order.stock_name,
        price=order.price,
        quantity=order.quantity,
        name=random.choice(names),
        type=opposite_type,
        order_id = order.order_id,
        valid_till=order.valid_till,
        timestamp=order.timestamp
    )

In [11]:
def generate_trade_order():
    # Generate a new random order
    new_order = generate_random_order()

    # 70% chance of creating a matching order
    if random.random() < MATCH_PROBABILITY:
        matching_order = generate_matching_order(new_order)
        return [new_order, matching_order]

    # If no match, just return the new order as unmatched
    unmatched_orders.append(new_order)
    return [new_order]

In [12]:
def send_trade_order(producer, topic, order):
    try:
        #future = producer.send(topic, order)
        index_of_stock = symbols.index(order.stock_name)

        if index_of_stock < 10:
            partition = 0
        else:
            partition = 1
        future = producer.send(topic, key=order.order_id, value=order,partition=partition)
        result = future.get(timeout=10)
        print(f"Trade order sent to {topic}: {order}")
        print(f"Processing trade order: {order}")
        #write_order_to_csv(order)
    except KafkaError as e:
        print(f"Failed to send trade order: {order}, Error: {e}")
    finally:
        producer.flush()

In [13]:
if __name__ == "__main__":
    
    producer = create_kafka_producer()
    
    try:
        while True:
            orders = generate_trade_order()
            for order in orders:
                print(order.stock_name)
                send_trade_order(producer, topic_name, order)
            time.sleep(10)
    except KeyboardInterrupt:
        print("Stopping producer.")
    finally:
        producer.close()

AAPL
Trade order sent to orders: TradeOrder(stock_name='AAPL', price=1262.01, quantity=34, name='Uma', type='sell', order_id='ddf72781-c6be-4138-a6ad-a7a221058c01', valid_till='2024-11-20 23:55:04', timestamp='2024-11-17 23:55:04')
Processing trade order: TradeOrder(stock_name='AAPL', price=1262.01, quantity=34, name='Uma', type='sell', order_id='ddf72781-c6be-4138-a6ad-a7a221058c01', valid_till='2024-11-20 23:55:04', timestamp='2024-11-17 23:55:04')
V
Trade order sent to orders: TradeOrder(stock_name='V', price=947.97, quantity=14, name='Benjamin', type='sell', order_id='9bc0aaf1-e8ec-45bc-ad3e-7d7ac948ab07', valid_till='2024-11-19 23:55:14', timestamp='2024-11-17 23:55:14')
Processing trade order: TradeOrder(stock_name='V', price=947.97, quantity=14, name='Benjamin', type='sell', order_id='9bc0aaf1-e8ec-45bc-ad3e-7d7ac948ab07', valid_till='2024-11-19 23:55:14', timestamp='2024-11-17 23:55:14')
V
Trade order sent to orders: TradeOrder(stock_name='V', price=978.6, quantity=8, name='Wil