                                                             BDA PROJECT

BIG DATA ANALYTICS MINI PROJECT

T.SAI NIKHIL

2211CS010561

SECTION-6

Title: Gold Price Prediction using Apache Spark.

Source:https://www.kaggle.com/datasets/sid321axn/gold-price-prediction-dataset

This project demonstrates how to build a machine learning pipeline in PySpark to predict gold prices using historical market data (from FINAL_USO.csv). The model is based on Gradient Boosted Trees (GBT), a powerful ensemble method for regression.

In [8]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("GoldPricePrediction") \
    .getOrCreate()


Creates a Spark session named "GoldPricePrediction".
Reads the CSV file FINAL_USO.csv into a DataFrame.
header=True → First row is treated as column names.
inferSchema=True → Automatically detects data types (int, double, string).
printSchema() → Prints column names and their types.
show(5) → Displays the first 5 rows.
This is your raw dataset. Likely has columns like Date, Open, High, Low, Close, Volume

In [10]:

df = spark.read.option("header", True).option("inferSchema", True).csv("FINAL_USO.csv")


df.printSchema()
df.show(5)


root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- SP_open: double (nullable = true)
 |-- SP_high: double (nullable = true)
 |-- SP_low: double (nullable = true)
 |-- SP_close: double (nullable = true)
 |-- SP_Ajclose: double (nullable = true)
 |-- SP_volume: integer (nullable = true)
 |-- DJ_open: double (nullable = true)
 |-- DJ_high: double (nullable = true)
 |-- DJ_low: double (nullable = true)
 |-- DJ_close: double (nullable = true)
 |-- DJ_Ajclose: double (nullable = true)
 |-- DJ_volume: integer (nullable = true)
 |-- EG_open: double (nullable = true)
 |-- EG_high: double (nullable = true)
 |-- EG_low: double (nullable = true)
 |-- EG_close: double (nullable = true)
 |-- EG_Ajclose: double (nullable = true)
 |-- EG_volume: integer (nullable = true)
 |-- EU_Price: do

Converts the "Date" column from string to actual DateType.

Creates a new column "date_ts" → numeric timestamp (seconds since 1970).

Orders rows chronologically by "Date".

Displays the top 5 rows.

This prepares your data for time series modeling (numerical time input is required).

In [12]:
from pyspark.sql.functions import to_date, unix_timestamp, col


df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
df = df.withColumn("date_ts", unix_timestamp(col("Date")).cast("double"))


df = df.orderBy("Date")
df.show(5)


+----------+----------+----------+------------------+----------+----------+--------+----------+----------+----------+----------+------------------+---------+-----------+-----------+-----------+-----------+-----------+---------+---------+---------+---------+---------+-----------------+---------+--------+------------------+-------+------+--------+--------+-------+-------+------+---------+--------+--------+-------+-------+------+--------+--------+-------+-------+------+---------+--------+---------+--------+--------+-------+---------+---------+--------+--------+-------+---------+---------+--------+--------+-------+---------+---------+-----------------+---------+---------+--------+-----------+----------+---------+---------+---------+---------+-------------+----------+------------------+------------------+---------+------------------+------------------+----------+-----------+
|      Date|      Open|      High|               Low|     Close| Adj Close|  Volume|   SP_open|   SP_high|    SP_low|

Defines a window ordered by date.
Adds a new column "prev_close" = yesterday’s closing price (lag("Close", 1)).
Drops rows with NULL values (first row won’t have a prev_close).
Shows 3 useful columns: Date, Close, and Previous Close.
This adds historical dependency (important for time series).

In [14]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag


w = Window.orderBy("Date")
df = df.withColumn("prev_close", lag("Close", 1).over(w))


df = df.na.drop()

df.select("Date", "Close", "prev_close").show(5)


+----------+------------------+----------+
|      Date|             Close|prev_close|
+----------+------------------+----------+
|2011-12-16|        155.229996|152.330002|
|2011-12-19|        154.869995|155.229996|
|2011-12-20|        156.979996|154.869995|
|2011-12-21|        157.160004|156.979996|
|2011-12-22|156.03999299999995|157.160004|
+----------+------------------+----------+
only showing top 5 rows


Defines a window ordered by date.
Adds a new column "prev_close" = yesterday’s closing price (lag("Close", 1)).
Drops rows with NULL values (first row won’t have a prev_close).
Shows 3 useful columns: Date, Close, and Previous Close.
This adds historical dependency (important for time series).

In [17]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Select features
feature_cols = ["date_ts", "Open", "High", "Low", "Volume", "prev_close"]

# Assemble feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unnormalized")

# Scale features
scaler = StandardScaler(inputCol="features_unnormalized", outputCol="features", withMean=True, withStd=True)


feature_cols: input features for the model.
VectorAssembler → combines multiple numeric columns into one vector column (features_unnormalized).
StandardScaler → standardizes features (zero mean, unit variance).
Helps ML algorithms converge better.
Final column is features.

In [19]:
from pyspark.ml.regression import GBTRegressor

# Gradient Boosted Trees Regressor
gbt = GBTRegressor(featuresCol="features", labelCol="Close", predictionCol="prediction")


Uses Gradient Boosted Trees for regression.
featuresCol="features" → input vector.
labelCol="Close" → actual value we want to predict.
predictionCol="prediction" → model output.

In [26]:
from pyspark.ml import Pipeline

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, gbt])


A Pipeline chains multiple steps into one process.
Stages:
Assemble features
Scale features
Apply Gradient Boosted Trees
This makes training and prediction seamless.

In [29]:
# Time-aware split is better for time series, but here we use randomSplit
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)


Splits dataset into 80% training, 20% testing.

In [32]:
# Fit the model on training data
model = pipeline.fit(train_df)


fit() trains the pipeline model on training data.

In [34]:

predictions = model.transform(test_df)


predictions.select("Date", "Close", "prediction").show(10)


+----------+----------+------------------+
|      Date|     Close|        prediction|
+----------+----------+------------------+
|2011-12-20|156.979996|156.29122735384558|
|2011-12-27|154.910004|156.27259409617338|
|2011-12-29|150.339996|150.55665351500136|
|2012-01-06|157.199997| 159.0582981367929|
|2012-01-17|     160.5|161.32714890954537|
|2012-01-23|163.160004| 161.7929685077807|
|2012-02-01|169.559998|168.62801236044072|
|2012-02-09|168.020004|171.32970090561616|
|2012-02-24|172.229996|170.44172246773877|
|2012-02-27|171.699997|168.22654449324068|
+----------+----------+------------------+
only showing top 10 rows


transform() applies model to test data, generating predictions.
Shows actual vs predicted gold prices for 10 rows.

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")


Root Mean Squared Error (RMSE) on test data = 0.8265727329134374


Uses RegressionEvaluator to measure accuracy.
RMSE (Root Mean Squared Error) → standard metric for regression.
Lower RMSE = better predictions.
Prints model error on test data.