In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, monotonically_increasing_id, year, month, dayofweek, avg, lag
from pyspark.sql.window import Window


In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("BitcoinAnalysis").enableHiveSupport().getOrCreate()


In [5]:
# Load data from Hive
df = spark.sql("SELECT * FROM project2024.bitcoi2025 LIMIT 1000")

# Show initial data
df.show(5)


+------------+-------+-------+-------+-------+----------+-------------------+-----------+-----------+-----------+------------+---------------+-----------------+
|   timestamp|   open|   high|    low|  close|    volume|           datetime|price_range|ma_close_10|ma_close_30|daily_return|close_increased|cumulative_volume|
+------------+-------+-------+-------+-------+----------+-------------------+-----------+-----------+-----------+------------+---------------+-----------------+
| 1.7115414E9|69891.0|69917.0|69891.0|69917.0|0.03676174|2024-03-27 12:10:00|       26.0|    69855.4|  69809.164| 0.034338202|              1|      3.6380708E7|
|1.71154146E9|69911.0|69933.0|69891.0|69898.0| 1.4020227|2024-03-27 12:11:00|       42.0|    69864.2|    69810.5|-0.027175078|              0|      3.6380712E7|
|1.71154152E9|69889.0|69923.0|69889.0|69918.0| 0.6444195|2024-03-27 12:12:00|       34.0|    69878.3|    69812.1| 0.028613122|              1|      3.6380712E7|
|1.71154158E9|69914.0|69943.0|6991

In [6]:
# Add an index column for easier manipulation (optional, if needed)
df = df.withColumn("index", monotonically_increasing_id())

# Remove the first row (index 0), if needed (optional)
df = df.filter("index != 0").drop("index")


In [7]:
# Fill missing values
df = df.fillna({'open': 0, 'high': 0, 'low': 0, 'close': 0, 'volume': 0})


In [8]:
# Convert 'close_increased' to binary (1 if close increased, 0 otherwise)
df = df.withColumn("close_increased", when(col("close") > col("open"), 1).otherwise(0))

# Extract year, month, and day of the week from 'datetime' (timestamp)
df = df.withColumn("Year", year(col("datetime")))
df = df.withColumn("Month", month(col("datetime")))
df = df.withColumn("DayOfWeek", dayofweek(col("datetime")))


In [9]:
# Calculate additional features for the model
df = df.withColumn("price_range", col("high") - col("low"))

# Calculate moving averages (10-period and 30-period) for closing price
window_10 = Window.orderBy("datetime").rowsBetween(-9, 0)  # 10-period window
window_30 = Window.orderBy("datetime").rowsBetween(-29, 0)  # 30-period window

df = df.withColumn("ma_close_10", avg(col("close")).over(window_10))
df = df.withColumn("ma_close_30", avg(col("close")).over(window_30))

# Calculate daily return (percentage change)
df = df.withColumn("prev_close", lag("close", 1).over(Window.orderBy("datetime")))
df = df.withColumn("daily_return", (col("close") - col("prev_close")) / col("prev_close") * 100)

# Show the transformed data
df.show(5)


+------------+-------+-------+-------+-------+----------+-------------------+-----------+-----------------+-----------------+--------------------+---------------+-----------------+----+-----+---------+----------+
|   timestamp|   open|   high|    low|  close|    volume|           datetime|price_range|      ma_close_10|      ma_close_30|        daily_return|close_increased|cumulative_volume|Year|Month|DayOfWeek|prev_close|
+------------+-------+-------+-------+-------+----------+-------------------+-----------+-----------------+-----------------+--------------------+---------------+-----------------+----+-----+---------+----------+
|1.71154146E9|69911.0|69933.0|69891.0|69898.0| 1.4020227|2024-03-27 12:11:00|       42.0|          69898.0|          69898.0|                null|              0|      3.6380712E7|2024|    3|        4|      null|
|1.71154152E9|69889.0|69923.0|69889.0|69918.0| 0.6444195|2024-03-27 12:12:00|       34.0|          69908.0|          69908.0|0.028613121977738992|  

In [12]:
# Select the features for the model
feature_cols = ["price_range", "volume", "ma_close_10", "ma_close_30", "daily_return", "close_increased"]

# Using VectorAssembler to combine the selected features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")

# Transform the data by adding a 'features' column
prepared_df = assembler.transform(df)


In [13]:
# Sample a fraction of the data (for example, 20% of the dataset)
sampled_df = prepared_df.sample(withReplacement=False, fraction=0.2, seed=42)

# Split sampled data into training and testing sets
print("📊 Splitting data into training and testing sets...")
train_df, test_df = sampled_df.randomSplit([0.8, 0.2], seed=42)
print(f"✅ Training Set: {train_df.count()} rows, Testing Set: {test_df.count()} rows")


📊 Splitting data into training and testing sets...
✅ Training Set: 161 rows, Testing Set: 27 rows


In [14]:
# Define Linear Regression Model (or Logistic Regression if you want classification)
lr = LinearRegression(featuresCol='features', labelCol='close')

# Train the model on the training data
print("🤖 Training the model...")
model = lr.fit(train_df)


🤖 Training the model...


In [15]:
# Make predictions on the testing data
print("📈 Making predictions...")
predictions = model.transform(test_df)


📈 Making predictions...


In [16]:
# Evaluate Model
evaluator = RegressionEvaluator(labelCol='close', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"✅ Model RMSE: {rmse}")


✅ Model RMSE: 57.82594405320109


In [17]:
# Display Predictions
print("📊 Displaying Predictions...")
predictions.select('datetime', 'prediction', 'close').show(10)


📊 Displaying Predictions...
+-------------------+-----------------+-------+
|           datetime|       prediction|  close|
+-------------------+-----------------+-------+
|2024-03-27 12:19:00|69919.55808086986|69941.0|
|2024-03-27 12:22:00|69865.88634802615|69861.0|
|2024-03-27 12:23:00|69931.54228656853|69893.0|
|2024-03-27 12:59:00|69618.10952126201|69554.0|
|2024-03-27 13:10:00|69620.39568577247|69751.0|
|2024-03-27 14:51:00| 69977.5944097219|69987.0|
|2024-03-27 16:09:00|70228.28465003888|70246.0|
|2024-03-27 16:10:00|70252.31678618865|70235.0|
|2024-03-27 16:14:00|70199.26607518281|70230.0|
|2024-03-27 16:35:00|70144.28900380731|70209.0|
+-------------------+-----------------+-------+
only showing top 10 rows

