## Setup PySpark

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, unix_timestamp, lag
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.functions import vector_to_array
#from pyspark.sql.functions import col


## Init Spark

In [12]:
spark = SparkSession.builder.appName("BitcoinPricePrediction").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

## Load Data

In [79]:
from elasticsearch import Elasticsearch

es = Elasticsearch(hosts=["http://localhost:9200"])

response = es.search(index="historical", body={
    "query": {
        "match_all": {}
    },
    "size": 1000  
})

data = [hit["_source"] for hit in response["hits"]["hits"]]

spark = SparkSession.builder.appName("BitcoinPricePrediction").getOrCreate()

training_df = spark.createDataFrame(data)

## Feature Engineering lag 


In [80]:
window_spec = Window.orderBy("time_numeric")

training_df = training_df.withColumn("BTC_lag_1", lag("BTC_close", 1).over(window_spec))
training_df = training_df.withColumn("BTC_lag_3", lag("BTC_close", 3).over(window_spec))
training_df = training_df.withColumn("ETH_lag_1", lag("ETH_close", 1).over(window_spec))
training_df = training_df.withColumn("ETH_lag_3", lag("ETH_close", 3).over(window_spec))

training_df = training_df.na.drop()

## Split Data

##### For time series models (such as Bitcoin predictions), randomSplit() is not ideal because future values are not randomly distributed. Instead, a rolling split should be used so that the model learns only from past data.

In [81]:
training_df = training_df.orderBy("time_numeric")
split_index = int(training_df.count() * 0.8)
train_data = training_df.limit(split_index)
test_data = training_df.subtract(train_data)

print(f"Train: {train_data.count()} rows, Test: {test_data.count()} rows")

Train: 797 rows, Test: 200 rows


## seperate features and label

In [67]:
#train_data_features = train_data.drop("BTC_close")
#test_data_features = test_data.drop("BTC_close")

#train_data_features.show(2)
#test_data_features.show(2)

#train_data_label = train_data.select("time_numeric", "BTC_close")

#train_data_label.show(5)

#test_data_label = test_data.select("time_numeric", "BTC_close")

#test_data_label.show(5)

## Vectorize features

In [82]:
feature_cols = ["BTC_ETH_ratio",  "BTC_price_change" , "BTC_volatility", "BTC_volume", "ETH_close", "ETH_price_change", "ETH_volatility", "ETH_volume", "BTC_lag_1", "BTC_lag_3", "ETH_lag_1", "ETH_lag_3"]
featureassembler = VectorAssembler(inputCols=feature_cols,outputCol="independent_features")
train_data = featureassembler.transform(train_data)
test_data = featureassembler.transform(test_data)
print(train_data.show(5))

+------------------+---------+--------------------+------------------+----------+---------+--------------------+------------------+----------+------------+-------------------+---------+---------+---------+---------+--------------------+
|     BTC_ETH_ratio|BTC_close|    BTC_price_change|    BTC_volatility|BTC_volume|ETH_close|    ETH_price_change|    ETH_volatility|ETH_volume|time_numeric|          timestamp|BTC_lag_1|BTC_lag_3|ETH_lag_1|ETH_lag_3|independent_features|
+------------------+---------+--------------------+------------------+----------+---------+--------------------+------------------+----------+------------+-------------------+---------+---------+---------+---------+--------------------+
|30.423582894388584|104601.45|                 0.0| 258.5500000000029|   0.07967|  3438.17|                 0.0|               0.0|    0.2055|  1737306840|2025-01-19T17:14:00|104601.45| 104780.0|  3438.17|  3450.22|[30.4235828943885...|
|30.274098463286403| 104452.3|-0.00142588845565...|1

## Normalize Features

In [86]:
scaler = MinMaxScaler(inputCol="independent_features", outputCol="scaled_features")
scaler_model = scaler.fit(train_data)

train_data = scaler_model.transform(train_data)
test_data = scaler_model.transform(test_data)

train_data.show(truncate=False)
test_data.show(truncate=False)

IllegalArgumentException: requirement failed: Output column scaled_features already exists.

In [61]:
#finalized_train_data_output = train_data_feature.join(train_data_label, on="time_numeric")
#finalized_test_data_output = train_data_feature.join(train_data_label, on="time_numeric")

In [62]:
#finalized_train_data_output.show(2, truncate=False)
#finalized_test_data_output.show(2, truncate=False)

In [84]:
finalized_train_data_output = train_data.select("scaled_features","BTC_close")
print(finalized_train_data_output.show(5))

+--------------------+---------+
|     scaled_features|BTC_close|
+--------------------+---------+
|[0.76877820614125...|104601.45|
|[0.76362530491760...| 104452.3|
|[0.76362530491760...| 104452.3|
|[0.76620773548049...|104345.07|
|[0.76511546293558...|104601.45|
+--------------------+---------+
only showing top 5 rows

None


In [None]:
test_data

## Train Model

In [85]:
from pyspark.ml.regression import LinearRegression

regressor = LinearRegression(featuresCol="scaled_features", labelCol="BTC_close")
regressor = regressor.fit(finalized_train_data_output)


                                                                                

In [87]:
predictions = regressor.transform(test_data)

In [88]:
predictions.show(10)

+------------------+---------+--------------------+------------------+----------+---------+--------------------+------------------+----------+------------+-------------------+---------+---------+---------+---------+--------------------+--------------------+------------------+
|     BTC_ETH_ratio|BTC_close|    BTC_price_change|    BTC_volatility|BTC_volume|ETH_close|    ETH_price_change|    ETH_volatility|ETH_volume|time_numeric|          timestamp|BTC_lag_1|BTC_lag_3|ETH_lag_1|ETH_lag_3|independent_features|     scaled_features|        prediction|
+------------------+---------+--------------------+------------------+----------+---------+--------------------+------------------+----------+------------+-------------------+---------+---------+---------+---------+--------------------+--------------------+------------------+
| 31.20574822646554|102712.16|                 0.0|12.839999999996508|     0.001|  3291.45|-0.00733764001230...|25.550000000000182|    0.0824|  1737354660|2025-01-20T06:

In [89]:
from pyspark.ml.evaluation import RegressionEvaluator

# Mean Absolute Error (MAE)
evaluator_mae = RegressionEvaluator(labelCol="BTC_close", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

# Root Mean Squared Error (RMSE)
evaluator_rmse = RegressionEvaluator(labelCol="BTC_close", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)

# R-squared (R²)
evaluator_r2 = RegressionEvaluator(labelCol="BTC_close", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

# Display results
print(f"MAE: {mae}")
print(f"RMSE: {rmse}")
print(f"R²: {r2}")

MAE: 540.9364086765158
RMSE: 696.6081724404379
R²: 0.9025110387098105


In [30]:
import matplotlib.pyplot as plt

predictions = regressor.transform(train_data_output)

pdf = predictions.select("timestamp", "BTC_close", "prediction").toPandas().head(700)

print(pdf.head(2))

#pdf["time"] = pdf["independent_features"].apply(lambda x: x[0]) 
plt.figure(figsize=(10, 5))
plt.plot(pdf["timestamp"], pdf["BTC_close"], label="Actual Price", marker="o", linestyle="dashed")
plt.plot(pdf["timestamp"], pdf["prediction"], label="Predicted Price", marker="s", linestyle="solid")

plt.xlabel("Time (Unix Timestamp)")
plt.ylabel("Bitcoin Price")
plt.title("Bitcoin Price Prediction")
plt.legend()
plt.grid()
plt.show()

NameError: name 'train_data_output' is not defined

In [24]:
pdf["signal"] = pdf["prediction"] > pdf["BTC_close"]

pdf["profit"] = pdf["BTC_close"].diff() * pdf["signal"].shift(1).fillna(0)

print(pdf.head(5))

total_earnings = pdf["profit"].sum()

print(total_earnings)

             timestamp  BTC_close     prediction  signal profit
0  2025-01-19T17:14:00  104601.45  104299.210540   False    NaN
1  2025-01-19T17:15:00  104452.30  104164.944952   False   -0.0
2  2025-01-19T17:16:00  104452.30  104070.486469   False    0.0
3  2025-01-19T17:17:00  104345.07  104081.462301   False   -0.0
4  2025-01-19T17:18:00  104601.45  104080.022204   False    0.0
29330.129999999932


In [85]:
#regressor.save("regression_model version 1")

In [None]:
spark.stop()