In [17]:
# !pip install pyspark

In [18]:
# Timestamp	Open	High	Low	Close	Volume_(BTC)	Volume_(Currency)	Weighted_Price

# Import Libraries

In [59]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

from pyspark.sql.functions import dayofweek, hour
from pyspark.sql.functions import lag
from pyspark.sql import functions as F

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor

from pyspark.sql.window import Window

import numpy as np
import pandas as pd

# Spark Context

In [20]:
spark = SparkSession.builder.appName("BitcoinDataProcessing").getOrCreate()

# Data Import

In [21]:
bitcoin_data_full = spark.read.csv("./bitcoin_historic_data.csv", header=True, inferSchema=True)

In [22]:
bitcoin_data = bitcoin_data_full

In [23]:
bitcoin_data_full.toPandas()[['Close']].isnull().sum()

Close    1243608
dtype: int64

In [24]:
bitcoin_data_full.count()

4857377

In [25]:
type(bitcoin_data)

pyspark.sql.dataframe.DataFrame

# Data Cleaning - Time stamp correction, Missing value treatment

In [26]:
# Converting the timestamp from unixtimestamp to a readable format
bitcoin_data = bitcoin_data.withColumn("Timestamp", bitcoin_data["Timestamp"].cast("timestamp"))

# Extracting the date part from the timestamp
bitcoin_data = bitcoin_data.withColumn("Date", F.to_date("Timestamp"))

# Define a window specification partitioned by date and ordered by timestamp
window_spec = Window.partitionBy("Date").orderBy("Timestamp")

# Getting the first record of each day
bitcoin_data = bitcoin_data.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")


In [27]:
bitcoin_data.toPandas()[['Close']].isnull().sum(), bitcoin_data.toPandas()[['Close']].count()

(Close    675
 dtype: int64,
 Close    2700
 dtype: int64)

In [28]:
bitcoin_data.show(10)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----------+
|          Timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|      Date|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----------+
|2011-12-31 13:22:00|4.39|4.39|4.39| 4.39|  0.45558087|     2.0000000193|          4.39|2011-12-31|
|2012-01-01 00:00:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2012-01-01|
|2012-01-02 00:00:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2012-01-02|
|2012-01-03 00:00:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2012-01-03|
|2012-01-04 00:00:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2012-01-04|
|2012-01-05 00:00:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2012-01-05|
|2012-01-06 00:00:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2012-01-06|


In [29]:
bitcoin_data.count()

3375

In [30]:
from pyspark.sql.functions import col, when, last
from pyspark.sql.window import Window

other_columns = ["Weighted_Price", "Volume_(BTC)", "Volume_(Currency)"]
for col_name in other_columns:
#     bitcoin_data = bitcoin_data.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
    bitcoin_data = bitcoin_data.na.fill(value=0,subset=[col_name])

# Fill forward for OHLC data
# ohlc_columns = ["Open", "High", "Low", "Close"]
# for col_name in ohlc_columns:
#     # Create a window specification to fill forward based on timestamp
#     window_spec = Window.orderBy("Timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

#     # Fill forward using the last non-null value
#     bitcoin_data = bitcoin_data.withColumn(col_name, last(col_name, ignorenulls=True).over(window_spec))

# Fill forward for OHLC data    
ohlc_columns = ["Open", "High", "Low", "Close"]
bitcoin_data_pd = bitcoin_data.toPandas() # Converting to pandas df since Spark function not working as expected 
for col_name in ohlc_columns:
    bitcoin_data_pd[col_name].fillna(method='ffill', inplace=True)

bitcoin_data = spark.createDataFrame(bitcoin_data_pd)

In [31]:
# Shape (Number of Rows, Number of Columns)
num_rows = bitcoin_data.count()
num_columns = len(bitcoin_data.columns)

print("Shape: ({}, {})".format(num_rows, num_columns))

# Columns
columns = bitcoin_data.columns
print("Columns: {}".format(columns))

# Check for 'NaN' values
has_nan = bitcoin_data.select([col(c).isNull().alias(c) for c in bitcoin_data.columns]).rdd.flatMap(lambda x: x).collect()
any_nan = any(has_nan)

print("Is There any 'NaN' value: {}".format(any_nan))

# Check for duplicate values
has_duplicates = bitcoin_data.groupBy(bitcoin_data.columns).count().filter("count > 1").count() > 0

print("Is there any duplicate value: {}".format(has_duplicates))

Shape: (3375, 9)
Columns: ['Timestamp', 'Open', 'High', 'Low', 'Close', 'Volume_(BTC)', 'Volume_(Currency)', 'Weighted_Price', 'Date']
Is There any 'NaN' value: False
Is there any duplicate value: False


# Feature Engineering - Day of Week, 7 day prior information features

In [32]:
# Feature Engineering
# You can add features like day of the week, hour of the day, etc.
bitcoin_data = bitcoin_data.withColumn("DayOfWeek", dayofweek("Timestamp"))
bitcoin_data = bitcoin_data.withColumn("HourOfDay", hour("Timestamp"))

# Getting more features, last 7 days information to help predict next day closing (can increase more)

# Define the window specification
window_spec = Window().orderBy("Timestamp")

lag_cols = ["Open", "High", "Low", "Close", "Volume_(BTC)", "Volume_(Currency)"]

# Create lagged features for the last 7 days
for i in lag_cols:
    for j in range(1,8):
        bitcoin_data = bitcoin_data.withColumn(col_name + "_b_" + str(j), lag(col_name, j).over(window_spec))

bitcoin_data = bitcoin_data.dropna() # drop the first rows. They don't have previous information 
print("Updated Data Shape: ", bitcoin_data.count())

Updated Data Shape:  3368


In [33]:
bitcoin_data.show(10)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|          Timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|      Date|DayOfWeek|HourOfDay|Close_b_1|Close_b_2|Close_b_3|Close_b_4|Close_b_5|Close_b_6|Close_b_7|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|2012-01-07 00:00:00|4.39|4.39|4.39| 4.39|         0.0|              0.0|           0.0|2012-01-07|        7|        0|     4.39|     4.39|     4.39|     4.39|     4.39|     4.39|     4.39|
|2012-01-08 00:00:00|4.39|4.39|4.39| 4.39|         0.0|              0.0|           0.0|2012-01-08|        1|        0|     4.39|     4.39|     4.39|     4.39|     4.39|     4.39|     4.39|
|2012-01-09 00:00:00|4.39|4.39|4.39| 4.39|        

# Generate the Prediction label as next day close value

In [34]:
# Add next closing value as a label for prediction label
window_spec = Window().orderBy("Timestamp")

bitcoin_data = bitcoin_data.withColumn("upcoming_close", lag('Close', -1).over(window_spec))

bitcoin_data = bitcoin_data.dropna() # drop the last row. It doesn't have next information 
print("Updated record count after adding prediction close label:", bitcoin_data.count())

Updated record count after adding prediction close label: 3367


In [35]:
bitcoin_data.columns

['Timestamp',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume_(BTC)',
 'Volume_(Currency)',
 'Weighted_Price',
 'Date',
 'DayOfWeek',
 'HourOfDay',
 'Close_b_1',
 'Close_b_2',
 'Close_b_3',
 'Close_b_4',
 'Close_b_5',
 'Close_b_6',
 'Close_b_7',
 'upcoming_close']

# Generate the Feature vector

In [36]:
#Input all the features in one vector column
feature_columns = [i for i in bitcoin_data.columns if i not in ['upcoming_close', 'Date', 'Timestamp']]
assembler = VectorAssembler(inputCols= feature_columns, outputCol = 'Attributes')

output = assembler.transform(bitcoin_data) #transform the data using Vector Assembler

#Input vs Output
finalized_data = output.select("Attributes","upcoming_close")

finalized_data.show() # showing the final data

+--------------------+--------------+
|          Attributes|upcoming_close|
+--------------------+--------------+
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          4.39|
|[4.39,4.39,4.39,4...|          6.01|
|[6.01,6.01,6.01,6...|          6.01|
|[6.01,6.01,6.01,6...|          6.01|
|[6.01,6.01,6.01,6...|          6.01|
|[6.01,6.01,6.01,6...|          6.01|
|[6.01,6.01,6.01,6...|          6.01|
|[6.01,6.01,6.01,6...|           6.0|
|[6.09,6.09,6.0,6....|           6.0|
+--------------------+--------------+
only showing top 20 rows



# Feature Vector and Train-Test Split

In [37]:
# Feature Selection
feature_columns = [i for i in bitcoin_data.columns if i not in ['upcoming_close', 'Date', 'Timestamp', 'HourOfDay']]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# bitcoin_data = assembler.transform(bitcoin_data)

# Train-Test Split - since the problem is time series, we should perform the sequenctial split
prediction_days = int(np.round(bitcoin_data.count()*(10/100),0))
train_data = bitcoin_data.limit(int(bitcoin_data.count()-prediction_days))
test_data = bitcoin_data.exceptAll(train_data)

print("% test data = %", (prediction_days/int(bitcoin_data.count())) * 100)
print("Train data records:", train_data.count())
print("Test data records:", test_data.count())

% test data = % 10.008910008910009
Train data records: 3030
Test data records: 337


# Linear Regressor

In [62]:
# Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol="upcoming_close")

# Pipeline
pipeline = Pipeline(stages=[assembler, lr])

# Model Training
model_lr = pipeline.fit(train_data)

# Model Evaluation using RMSE and R2 - LR

In [63]:
# Make predictions on the test data
predictions = model_lr.transform(test_data)

evaluator_rmse = RegressionEvaluator(labelCol="upcoming_close", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

evaluator_r2 = RegressionEvaluator(labelCol="upcoming_close", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R2: {r2}")

# # View Predictions
# predictions.select("Close", "prediction", *feature_columns).show()

Root Mean Squared Error (RMSE): 1260.6665093920308
R2: 0.9934691260305369


# Random Forest Regressor

In [51]:
rf = RandomForestRegressor(featuresCol="features", labelCol="upcoming_close")

# Pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Model Training
model_rf = pipeline.fit(train_data)

# Model Evaluation using RMSE and R2 - Random Forest

In [52]:
# Make predictions on the test data
predictions = model_rf.transform(test_data)

evaluator_rmse = RegressionEvaluator(labelCol="upcoming_close", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

evaluator_r2 = RegressionEvaluator(labelCol="upcoming_close", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R2: {r2}")

# # View Predictions
# predictions.select("Close", "prediction", *feature_columns).show()

Root Mean Squared Error (RMSE): 16995.15992452752
R2: -0.1869198227710125


# Boosting Regressor

In [60]:
gbt = GBTRegressor(featuresCol="features", labelCol="upcoming_close")

# Pipeline
pipeline = Pipeline(stages=[assembler, gbt])

# Model Training
model_gbt = pipeline.fit(train_data)

# Model Evaluation using RMSE and R2 - GBT

In [61]:
# Make predictions on the test data
predictions = model_gbt.transform(test_data)

evaluator_rmse = RegressionEvaluator(labelCol="upcoming_close", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

evaluator_r2 = RegressionEvaluator(labelCol="upcoming_close", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R2: {r2}")

# # View Predictions
# predictions.select("Close", "prediction", *feature_columns).show()

Root Mean Squared Error (RMSE): 16015.088058957046
R2: -0.05397311689097095
