In [None]:
from pyspark import SparkContext
from confluent_kafka import Consumer
import pandas as pd
import numpy as np
import os
import datetime
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dense, Dropout, LSTM, BatchNormalization
import pandas as pd
from keras import regularizers
from io import StringIO


In [None]:

# creating the SparkContext
sc = SparkContext(master="local[4]")
# configing Kafka Consumer
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'simple-sentiment-group',
    'auto.offset.reset': 'earliest',
    'max.poll.interval.ms': '600000'
}
consumer = Consumer(conf)

consumer.subscribe(['crypto_topic'])

25/03/31 19:51:29 WARN Utils: Your hostname, Ubuntu-GPU-84 resolves to a loopback address: 127.0.1.1; using 192.168.90.60 instead (on interface ens160)
25/03/31 19:51:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/31 19:51:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
#builds the LSTM model we will use
def build_lstm_model(input_data, output_size, neurons=32, activ_func='linear',
                     dropout=0.4, loss='mse', optimizer='adam'):
    model = Sequential()
    model.add(LSTM(neurons, input_shape=(input_data.shape[1], input_data.shape[2]), kernel_regularizer=regularizers.l2(l2_lambda)))
    model.add(Dropout(dropout))
    model.add(BatchNormalization())
    model.add(Dense(units=output_size))
    model.add(Activation(activ_func))

    model.compile(loss=loss, optimizer=optimizer)
    return model

In [None]:
#normalizing the DataFrame
def normalize_zero_base(df):
    return df / df.iloc[0] - 1

In [None]:
#create window data
def extract_window_data(df, window_len=5, zero_base=True):
    window_data = []
    for idx in range(len(df) - window_len):
        tmp = df[idx: (idx + window_len)].copy()
        if zero_base:
            tmp = normalize_zero_base(tmp)
        window_data.append(tmp.values)
    return np.array(window_data)

In [None]:
#splits the data 80% training and 20% for testing
def train_test_split(df, test_size=0.2):
    split_row = len(df) - int(test_size * len(df))
    train_data = df.iloc[:split_row]
    test_data = df.iloc[split_row:]
    return train_data, test_data

In [None]:
#data preprocessing
def prepare_data(df, target_col, window_len=10, zero_base=True, test_size=0.2):
    train_data, test_data = train_test_split(df, test_size=test_size)
    X_train = extract_window_data(train_data, window_len, zero_base)
    X_test = extract_window_data(test_data, window_len, zero_base)
    y_train = train_data[target_col][window_len:].values
    y_test = test_data[target_col][window_len:].values
    if zero_base:
        y_train = y_train / train_data[target_col][:-window_len].values - 1
        y_test = y_test / test_data[target_col][:-window_len].values - 1

    return train_data, test_data, X_train, X_test, y_train, y_test

In [None]:
#variables to use
np.random.seed(42)
window_len = 36
test_size = 0.2
zero_base = True
lstm_neurons = 128
epochs = 50
batch_size = 32
loss = 'mse'
dropout = 0.2
optimizer = 'adam'
l2_lambda = 0.001

In [None]:
# training the model and saving it for future use
def train_model(df,coin_name,target_col = 'close'):
    train, test, X_train, X_test, y_train, y_test = prepare_data(
    df, target_col, window_len=window_len, zero_base=zero_base, test_size=test_size)
    model = build_lstm_model(
    X_train, output_size=1, neurons=lstm_neurons, dropout=dropout, loss=loss,
    optimizer=optimizer)
    history = model.fit(
    X_train, y_train, validation_data=(X_test, y_test), epochs=epochs, batch_size=batch_size, verbose=1, shuffle=True)
    model.save(f"{coin_name}_model.keras")
    return model

In [None]:
# checking if a model does not exist already or the model is old
def check_model_status(model_name):
    if not os.path.exists(model_name):
        return True  # no model exists

    last_modified = datetime.datetime.fromtimestamp(os.path.getmtime(model_name))
    if (datetime.datetime.now() - last_modified).days > 90:
        return True  # a model exist however its more then 3 month old

    return False  # a model exist and its updated

In [None]:
#loading a model that exists 
def load_model(model_name):
    return tf.keras.models.load_model(model_name)

In [None]:
def predict_next_close(model, recent_data, target_col='close', window_len=24, zero_base=True):
    # Check that we have enough data
    if len(recent_data) < window_len:
        raise ValueError(f"Need at least {window_len} rows of recent data")

    # Slice the latest window_len rows
    window_data = recent_data[-window_len:].copy()

    # Normalize if needed
    if zero_base:
        base_value = window_data[target_col].iloc[0]
        window_data = window_data / base_value - 1

    # Convert to proper shape (1 sample, window_len steps, num_features)
    input_array = np.expand_dims(window_data.values, axis=0)

    # Predict
    normalized_prediction = model.predict(input_array)[0][0]

    # Denormalize prediction if needed
    if zero_base:
        predicted_close = (normalized_prediction + 1) * base_value
    else:
        predicted_close = normalized_prediction
    last_value_in_column = recent_data[target_col].iloc[-1]
    if predicted_close < last_value_in_column:
        print(f"value will drop\nPredicted close price: {predicted_close}, Last close price: {last_value_in_column}")
    else:
        print(f"value will rise\nPredicted close price: {predicted_close}, Last close price: {last_value_in_column}")

In [None]:
# kafka DataFrame processing
def process_messages():
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue
        if msg.key().decode('utf-8') == "exit":
            break
        received_data = msg.value().decode('utf-8')
        coin_name = msg.key().decode('utf-8')
        df = pd.read_csv(StringIO(received_data))
        print(f"Recieved data for {coin_name}")
        print(df.head())
        #check if model exists in format of "{coin_name}_model.keras" 
        if check_model_status(f"{coin_name}_model.keras"):
            model = train_model(df, coin_name, target_col='close')
        else:
            model = load_model(f"{coin_name}_model.keras")
        #writes the next predicted value to the console
        if model is not None:
            predict_next_close(model, df, target_col='close', window_len=window_len, zero_base=zero_base)

In [None]:
# running the consumer program
try:
    process_messages()
except KeyboardInterrupt:
    print("Stopping Kafka Consumer...")
finally:
    consumer.close()
    sc.stop()

%6|1743442127.240|FAIL|rdkafka#consumer-1| [thrd:Ubuntu-GPU-84.academic.management.afeka.local:9092/0]: Ubuntu-GPU-84.academic.management.afeka.local:9092/0: Disconnected (after 2172510ms in state UP)
%3|1743442127.240|FAIL|rdkafka#consumer-1| [thrd:Ubuntu-GPU-84.academic.management.afeka.local:9092/0]: Ubuntu-GPU-84.academic.management.afeka.local:9092/0: Connect to ipv4#127.0.1.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%6|1743442127.240|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Ubuntu-GPU-84.academic.management.afeka.local:9092: Disconnected (after 2235060ms in state UP)
%3|1743442127.240|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Ubuntu-GPU-84.academic.management.afeka.local:9092: Connect to ipv4#127.0.1.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1743442127.241|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connect