In [7]:
!pip install ta



In [8]:
!pip install pyspark



In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import numpy as np
import pandas as pd


class SparkTripleEnsembleModel:
    """
    A class to train and predict using an ensemble of GBT, RF, and LSTM models.
    """

    def __init__(self, spark=None, num_partitions=200):
        print("Initializing Spark session...")
        self.spark = (
            spark if spark else
            SparkSession.builder
                .appName("TripleEnsembleModel")
                .config("spark.driver.memory", "16g")
                .config("spark.executor.memory", "16g")
                .config("spark.memory.offHeap.enabled", True)
                .config("spark.memory.offHeap.size", "16g")
                .config("spark.driver.maxResultSize", "8g")
                .config("spark.python.worker.memory", "8g")
                .config("spark.sql.shuffle.partitions", str(num_partitions))
                .config("spark.default.parallelism", str(num_partitions))
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
                .getOrCreate()
        )
        print("Spark session initialized successfully.")
        self.num_partitions = num_partitions
                    # Update feature columns list
        self.feature_cols = [
        'returns', 'volume_ratio', 'rsi', 'macd',
        'bb_upper', 'bb_lower', 'bb_pct'  # New features
        ]
        self.gbt_pipeline = None
        self.rf_pipeline = None
        self.lstm_model = None
        print("Model instance initialized.")

    def _calculate_features(self, df):
        print("Starting feature calculation...")
        df = df.repartition(self.num_partitions)
        print("Repartitioned data for parallel processing.")

        w = Window.orderBy('timestamp')
        w_10 = Window.orderBy('timestamp').rowsBetween(-10, -1)
        w_14 = Window.orderBy('timestamp').rowsBetween(-14, -1)
        w_20 = Window.orderBy('timestamp').rowsBetween(-20, -1) 

        print("Calculating returns and volume ratio...")
        df = (
            df.withColumn('prev_close', F.lag('close').over(w))
              .withColumn('returns', (F.col('close') - F.col('prev_close')) / F.col('prev_close'))
              .withColumn('volume_ma', F.avg('volume').over(w_10))
              .withColumn('volume_ratio', F.col('volume') / F.col('volume_ma'))
              .drop('prev_close', 'volume_ma')
        )

        print("Calculating RSI...")
        df = (
            df.withColumn('price_diff', F.col('close') - F.lag('close', 1).over(w))
              .withColumn('gain', F.when(F.col('price_diff') > 0, F.col('price_diff')).otherwise(0))
              .withColumn('loss', F.when(F.col('price_diff') < 0, -F.col('price_diff')).otherwise(0))
              .withColumn('avg_gain', F.avg('gain').over(w_14))
              .withColumn('avg_loss', F.avg('loss').over(w_14))
              .withColumn('rs', F.col('avg_gain') / F.col('avg_loss'))
              .withColumn('rsi', 100 - (100 / (1 + F.col('rs'))))
              .drop('price_diff', 'gain', 'loss', 'avg_gain', 'avg_loss', 'rs')
        )

        print("Calculating MACD...")
        df = (
            df.withColumn('ema12', F.avg('close').over(Window.orderBy('timestamp').rowsBetween(-12, -1)))
              .withColumn('ema26', F.avg('close').over(Window.orderBy('timestamp').rowsBetween(-26, -1)))
              .withColumn('macd', F.col('ema12') - F.col('ema26'))
              .drop('ema12', 'ema26')
        )

            # Add Bollinger Bands
        df = (
        df.withColumn('sma20', F.avg('close').over(w_20))
          .withColumn('stddev20', F.stddev('close').over(w_20))
          .withColumn('bb_upper', F.col('sma20') + F.col('stddev20') * 2)
          .withColumn('bb_lower', F.col('sma20') - F.col('stddev20') * 2)
          .withColumn('bb_pct', (F.col('close') - F.col('bb_lower')) / (F.col('bb_upper') - F.col('bb_lower')))
          .drop('sma20', 'stddev20') 
        )

        if 'target' not in df.columns:
            print("Calculating target column...")
            df = (
                df.withColumn('next_close', F.lead('close', 1).over(w))
                  .withColumn('target', F.when(F.col('next_close') > F.col('close'), 1).otherwise(0))
                  .drop('next_close')
            )


        keep_cols = ['timestamp'] + self.feature_cols + (['target'] if 'target' in df.columns else [])
        df = df.select(keep_cols).na.fill(0).cache()
        print(f"Feature calculation complete. Number of rows: {df.count()}")
        return df

    def _prepare_lstm_data(self, pdf, sequence_length=10):
        print(f"Preparing LSTM data with sequence length {sequence_length}...")
        data = pdf[self.feature_cols].values
        X, y = [], []
        for i in range(len(data) - sequence_length):
            X.append(data[i:(i + sequence_length)])
            if 'target' in pdf.columns:
                y.append(pdf['target'].iloc[i + sequence_length])
        print(f"LSTM data preparation complete. Samples: {len(X)}")
        return np.array(X), np.array(y) if y else None

    def _build_lstm_model(self, input_shape):
        print("Building LSTM model...")
        model = Sequential([
            LSTM(32, input_shape=input_shape, return_sequences=True),
            Dropout(0.2),
            LSTM(16),
            Dropout(0.1),
            Dense(1, activation='sigmoid')
        ])
        model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
        print("LSTM model built and compiled.")
        return model

    def train(self, train_data):
        """
        Train ensemble models, using most recent data for LSTM.
        """
        print("Starting training...")
        df = self._calculate_features(train_data).cache()
        total_rows = df.count()
        print(f"Total training rows: {total_rows}")

        print("Training GBT model...")
        assembler = VectorAssembler(inputCols=self.feature_cols, outputCol="features")
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
        gbt = GBTClassifier(featuresCol="scaled_features", labelCol="target", maxDepth=6, maxBins=32, maxIter=100)
        self.gbt_pipeline = Pipeline(stages=[assembler, scaler, gbt]).fit(df)
        print("GBT training completed.")

        print("Training RF model...")
        rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="target", numTrees=100, maxDepth=6, maxBins=32)
        self.rf_pipeline = Pipeline(stages=[assembler, scaler, rf]).fit(df)
        print("RF training completed.")

        print("Training LSTM model...")
        recent_data = df.orderBy(F.col('timestamp').desc())
        recent_data = recent_data.orderBy('timestamp')
        train_pdf = recent_data.select(['timestamp'] + self.feature_cols + ['target']).toPandas()

        X_lstm, y_lstm = self._prepare_lstm_data(train_pdf)
        self.lstm_model = self._build_lstm_model((X_lstm.shape[1], X_lstm.shape[2]))
        early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True, verbose=1)
        self.lstm_model.fit(X_lstm, y_lstm, epochs=20, batch_size=512, validation_split=0.1, callbacks=[early_stopping])

        print("Training completed.")

    def predict(self, test_data):
        """
        Predict using the ensemble with exact length matching.
        """
        print("Starting prediction...")
        df = self._calculate_features(test_data).cache()
        gbt_probs = self.gbt_pipeline.transform(df).select(
            F.udf(lambda v: float(v[1]), returnType=DoubleType())('probability').alias('probability')
        ).toPandas()['probability'].values

        rf_probs = self.rf_pipeline.transform(df).select(
            F.udf(lambda v: float(v[1]), returnType=DoubleType())('probability').alias('probability')
        ).toPandas()['probability'].values

        test_pdf = df.select(['timestamp'] + self.feature_cols).toPandas()
        X_lstm, _ = self._prepare_lstm_data(test_pdf)
        lstm_pred = self.lstm_model.predict(X_lstm).flatten()

        padded_lstm = np.zeros(df.count())
        padded_lstm[10:] = lstm_pred

        ensemble_pred = gbt_probs * 0.5 + rf_probs * 0.5
        final_predictions = (ensemble_pred > 0.5).astype(int)
        return final_predictions


def main():
    try:
        spark = SparkSession.builder.appName("CryptoForecasting").getOrCreate()
        train_df = spark.read.csv('/kaggle/input/directional-forecasting-cryptocurrencies/train.csv', header=True, inferSchema=True).select('timestamp', 'close', 'high', 'low', 'volume')
        test_df = spark.read.csv('/kaggle/input/directional-forecasting-cryptocurrencies/test.csv', header=True, inferSchema=True).select('timestamp', 'close', 'high', 'low', 'volume')

        model = SparkTripleEnsembleModel(spark)
        model.train(train_df)
        predictions = model.predict(test_df)
        pd.DataFrame({'row_id': range(len(predictions)), 'target': predictions}).to_csv('submission.csv', index=False)

        spark.stop()
    except Exception as e:
        print(f"Error encountered: {str(e)}")
        raise


if __name__ == "__main__":
    main()


Initializing Spark session...
Spark session initialized successfully.
Model instance initialized.
Starting training...
Starting feature calculation...
Repartitioned data for parallel processing.
Calculating returns and volume ratio...
Calculating RSI...
Calculating MACD...
Calculating target column...
Feature calculation complete. Number of rows: 2122438
Total training rows: 2122438
Training GBT model...
GBT training completed.
Training RF model...
RF training completed.
Training LSTM model...
Preparing LSTM data with sequence length 10...
LSTM data preparation complete. Samples: 2122428
Building LSTM model...
LSTM model built and compiled.


  super().__init__(**kwargs)


Epoch 1/20
[1m3731/3731[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 23ms/step - accuracy: 0.5215 - loss: 0.6924 - val_accuracy: 0.5392 - val_loss: 0.6906
Epoch 2/20
[1m3731/3731[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m85s[0m 23ms/step - accuracy: 0.5221 - loss: 0.6921 - val_accuracy: 0.5392 - val_loss: 0.6902
Epoch 3/20
[1m3731/3731[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m85s[0m 23ms/step - accuracy: 0.5224 - loss: 0.6920 - val_accuracy: 0.5393 - val_loss: 0.6907
Epoch 4/20
[1m3731/3731[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m85s[0m 23ms/step - accuracy: 0.5230 - loss: 0.6919 - val_accuracy: 0.5392 - val_loss: 0.6903
Epoch 5/20
[1m3731/3731[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m85s[0m 23ms/step - accuracy: 0.5226 - loss: 0.6919 - val_accuracy: 0.5389 - val_loss: 0.6903
Epoch 5: early stopping
Restoring model weights from the end of the best epoch: 2.
Training completed.
Starting prediction...
Starting feature calculation...
Repartitione

In [None]:
0.49556