In [4]:
from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict, deque
import time

# Kafka configuration
kafka_broker = 'localhost:9094'  # Change to your Kafka broker address
input_topic = 'binance_tickers'   # The topic to consume from
output_topic = 'buy_signals'       # The topic to produce buy signals to

# List of ticker pairs to monitor
tickers = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "XRPUSDT", "ADAUSDT", "PEPEUSDT"]

# Create a Kafka consumer
consumer = KafkaConsumer(
    input_topic,
    bootstrap_servers=kafka_broker,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='ticker_processor_group'
)

# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers=kafka_broker,
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# Initialize a defaultdict to hold price history for each ticker
price_history = defaultdict(lambda: deque(maxlen=3))

print(f"Listening for messages on topic '{input_topic}'...")

# Consume messages
try:
    for message in consumer:
        data = message.value
        
        if isinstance(data, dict) and 's' in data and 'c' in data:  # Ensure data is valid
            symbol = data['s']
            current_price = float(data['c'])
            
            # Check if the symbol is in our list of tickers
            if symbol in tickers:
                price_history[symbol].append(current_price)

                # Check if we have enough data to evaluate
                if len(price_history[symbol]) == 3:
                    # Check if the last three prices are increasing
                    if price_history[symbol][0] < price_history[symbol][1] < price_history[symbol][2]:
                        buy_signal = {
                            'symbol': symbol,
                            'signal': 'buy',
                            'price': current_price,
                            'timestamp': time.time()
                        }
                        producer.send(output_topic, value=buy_signal)
                        print(f"Sent buy signal: {buy_signal}")
except KeyboardInterrupt:
    print("Consumer stopped.")
finally:
    # Close the consumer and producer connections
    consumer.close()
    producer.close()

Listening for messages on topic 'binance_tickers'...
Sent buy signal: {'symbol': 'BTCUSDT', 'signal': 'buy', 'price': 104380.01, 'timestamp': 1738300398.9594998}
Sent buy signal: {'symbol': 'ETHUSDT', 'signal': 'buy', 'price': 3236.11, 'timestamp': 1738300399.2121897}
Sent buy signal: {'symbol': 'ETHUSDT', 'signal': 'buy', 'price': 3236.75, 'timestamp': 1738300418.221347}
Sent buy signal: {'symbol': 'ETHUSDT', 'signal': 'buy', 'price': 3236.76, 'timestamp': 1738300419.2245302}
Sent buy signal: {'symbol': 'BNBUSDT', 'signal': 'buy', 'price': 677.39, 'timestamp': 1738300423.496255}
Sent buy signal: {'symbol': 'XRPUSDT', 'signal': 'buy', 'price': 3.0866, 'timestamp': 1738300479.0757077}
Sent buy signal: {'symbol': 'ETHUSDT', 'signal': 'buy', 'price': 3234.41, 'timestamp': 1738300502.2635715}
Sent buy signal: {'symbol': 'ETHUSDT', 'signal': 'buy', 'price': 3235.4, 'timestamp': 1738300512.2694638}
Sent buy signal: {'symbol': 'ADAUSDT', 'signal': 'buy', 'price': 0.9429, 'timestamp': 17383005