In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=ca0b5fd65216022a6a1f71f2669ae735ff0f74f4a4b46e87f548d0ef17b7dee8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [32]:
spark = SparkSession.builder.appName('Commodity Price Forecast').getOrCreate()

In [33]:
df = spark.read.csv('sample_data/all_fuels_data.csv', header= True, inferSchema=True)

In [34]:
df.show(5)

+------+---------+----------+------------------+------------------+------------------+------------------+------+
|ticker|commodity|      date|              open|              high|               low|             close|volume|
+------+---------+----------+------------------+------------------+------------------+------------------+------+
|  CL=F|Crude Oil|2000-08-23|31.950000762939453| 32.79999923706055|31.950000762939453| 32.04999923706055| 79385|
|  CL=F|Crude Oil|2000-08-24|31.899999618530273|  32.2400016784668|31.399999618530273|  31.6299991607666| 72978|
|  CL=F|Crude Oil|2000-08-25|31.700000762939453|32.099998474121094| 31.31999969482422| 32.04999923706055| 44601|
|  CL=F|Crude Oil|2000-08-28|32.040000915527344| 32.91999816894531|31.860000610351562|32.869998931884766| 46770|
|  CL=F|Crude Oil|2000-08-29| 32.81999969482422|33.029998779296875|32.560001373291016|32.720001220703125| 49131|
+------+---------+----------+------------------+------------------+------------------+----------

In [35]:
df.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)



In [36]:
df.describe().show()

+-------+------+---------------+-----------------+------------------+------------------+-------------------+------------------+
|summary|ticker|      commodity|             open|              high|               low|              close|            volume|
+-------+------+---------------+-----------------+------------------+------------------+-------------------+------------------+
|  count| 27140|          27140|            27140|             27140|             27140|              27140|             27140|
|   mean|  NULL|           NULL|27.05823760541182|27.447667201778074|26.644733623793527| 27.057428540817586|105572.95077376565|
| stddev|  NULL|           NULL|35.97037056055982| 36.42344275610131|35.485405286849875|  35.97531448568638|149373.35091824285|
|    min|  BZ=F|Brent Crude Oil|            -14.0|0.5070000290870667|-40.31999969482422|-37.630001068115234|                 0|
|    max|  RB=F|  RBOB Gasoline|146.0800018310547|147.42999267578125|144.27000427246094|  146.0800018310

In [37]:
#check for null values
from pyspark.sql import functions as F

# Count null values in each column
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+---------+----+----+----+---+-----+------+
|ticker|commodity|date|open|high|low|close|volume|
+------+---------+----+----+----+---+-----+------+
|     0|        0|   0|   0|   0|  0|    0|     0|
+------+---------+----+----+----+---+-----+------+



In [38]:
df.describe().show()

+-------+------+---------------+-----------------+------------------+------------------+-------------------+------------------+
|summary|ticker|      commodity|             open|              high|               low|              close|            volume|
+-------+------+---------------+-----------------+------------------+------------------+-------------------+------------------+
|  count| 27140|          27140|            27140|             27140|             27140|              27140|             27140|
|   mean|  NULL|           NULL|27.05823760541182|27.447667201778074|26.644733623793527| 27.057428540817586|105572.95077376565|
| stddev|  NULL|           NULL|35.97037056055982| 36.42344275610131|35.485405286849875|  35.97531448568638|149373.35091824285|
|    min|  BZ=F|Brent Crude Oil|            -14.0|0.5070000290870667|-40.31999969482422|-37.630001068115234|                 0|
|    max|  RB=F|  RBOB Gasoline|146.0800018310547|147.42999267578125|144.27000427246094|  146.0800018310

In [39]:
# Count distinct values in the 'Commodity' column
df.select("Commodity").distinct().show()

+---------------+
|      Commodity|
+---------------+
|    Natural Gas|
|      Crude Oil|
|  RBOB Gasoline|
|    Heating Oil|
|Brent Crude Oil|
+---------------+



In [40]:
from pyspark.sql.functions import year, month, dayofmonth

# Create new columns for Year, Month, and Day
df = df.withColumn("Year", year("Date"))
df = df.withColumn("Month", month("Date"))
df = df.withColumn("Day", dayofmonth("Date"))

In [43]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# The "Commodity" column will be converted into numerical "CommodityIndex" column
indexer = StringIndexer(inputCol="commodity", outputCol="CommodityIndex")

# Fit the StringIndexer model
indexer_model = indexer.fit(df)

# Transform the DataFrame to include the "CommodityIndex" column
df = indexer_model.transform(df)

# Now, let's do one-hot encoding for the "CommodityIndex" column
# The "CommodityIndex" will be converted into one-hot encoded "CommodityVec" column
encoder = OneHotEncoder(inputCol="CommodityIndex", outputCol="CommodityVec")

# Fit the OneHotEncoder model
encoder_model = encoder.fit(df)

# Transform the DataFrame to include the "CommodityVec" column
df = encoder_model.transform(df)

# Show the DataFrame to verify the new columns
df.show()

+------+---------+----------+------------------+------------------+------------------+------------------+------+----+-----+---+--------------+-------------+
|ticker|commodity|      date|              open|              high|               low|             close|volume|Year|Month|Day|CommodityIndex| CommodityVec|
+------+---------+----------+------------------+------------------+------------------+------------------+------+----+-----+---+--------------+-------------+
|  CL=F|Crude Oil|2000-08-23|31.950000762939453| 32.79999923706055|31.950000762939453| 32.04999923706055| 79385|2000|    8| 23|           0.0|(4,[0],[1.0])|
|  CL=F|Crude Oil|2000-08-24|31.899999618530273|  32.2400016784668|31.399999618530273|  31.6299991607666| 72978|2000|    8| 24|           0.0|(4,[0],[1.0])|
|  CL=F|Crude Oil|2000-08-25|31.700000762939453|32.099998474121094| 31.31999969482422| 32.04999923706055| 44601|2000|    8| 25|           0.0|(4,[0],[1.0])|
|  CL=F|Crude Oil|2000-08-28|32.040000915527344| 32.919998

In [45]:
filtered_df = df.filter(df.commodity == 'Natural Gas')
filtered_df.show()

+------+-----------+----------+-----------------+------------------+-----------------+------------------+------+----+-----+---+--------------+-------------+
|ticker|  commodity|      date|             open|              high|              low|             close|volume|Year|Month|Day|CommodityIndex| CommodityVec|
+------+-----------+----------+-----------------+------------------+-----------------+------------------+------+----+-----+---+--------------+-------------+
|  NG=F|Natural Gas|2000-08-30|4.650000095367432| 4.815000057220459|4.630000114440918| 4.804999828338623| 34954|2000|    8| 30|           1.0|(4,[1],[1.0])|
|  NG=F|Natural Gas|2000-08-31|4.820000171661377| 4.869999885559082|4.739999771118164|  4.78000020980835| 25787|2000|    8| 31|           1.0|(4,[1],[1.0])|
|  NG=F|Natural Gas|2000-09-01|             4.75| 4.860000133514404|             4.75| 4.835000038146973|   113|2000|    9|  1|           1.0|(4,[1],[1.0])|
|  NG=F|Natural Gas|2000-09-05|4.849999904632568| 4.974999

In [46]:
from pyspark.sql import Window

# Define a window spec
windowSpec = Window.orderBy("Date").rowsBetween(-6, 0)

# Create a new column with the 7-day rolling average of Volume
df = df.withColumn("Volume_7day_avg", F.avg("Volume").over(windowSpec))

# Show the DataFrame to confirm the changes
df.select("Date", "Volume", "Volume_7day_avg").show()

+----------+------+------------------+
|      Date|Volume|   Volume_7day_avg|
+----------+------+------------------+
|2000-08-23| 79385|           79385.0|
|2000-08-24| 72978|           76181.5|
|2000-08-25| 44601| 65654.66666666667|
|2000-08-28| 46770|           60933.5|
|2000-08-29| 49131|           58573.0|
|2000-08-30| 79214|62013.166666666664|
|2000-08-30| 34954| 58147.57142857143|
|2000-08-31| 56895| 54934.71428571428|
|2000-08-31| 25787|48193.142857142855|
|2000-09-01| 45869| 48374.28571428572|
|2000-09-01| 14679|43789.857142857145|
|2000-09-01|   113| 36787.28571428572|
|2000-09-05| 55722| 33431.28571428572|
|2000-09-05| 17365|30918.571428571428|
|2000-09-05| 26096|26518.714285714286|
|2000-09-06| 74692|33505.142857142855|
|2000-09-06| 18879|29649.428571428572|
|2000-09-06| 32764|           32233.0|
|2000-09-07| 74105| 42803.28571428572|
|2000-09-07| 23851| 38250.28571428572|
+----------+------+------------------+
only showing top 20 rows



In [47]:
# Calculate daily price change
df = df.withColumn("Price_Change", F.col("Close") - F.lag("Close").over(Window.orderBy("Date")))

# Show the DataFrame to confirm the changes
df.select("Date", "Close", "Price_Change").show()

+----------+------------------+--------------------+
|      Date|             Close|        Price_Change|
+----------+------------------+--------------------+
|2000-08-23| 32.04999923706055|                NULL|
|2000-08-24|  31.6299991607666| -0.4200000762939453|
|2000-08-25| 32.04999923706055|  0.4200000762939453|
|2000-08-28|32.869998931884766|  0.8199996948242188|
|2000-08-29|32.720001220703125|-0.14999771118164062|
|2000-08-30|33.400001525878906|  0.6800003051757812|
|2000-08-30| 4.804999828338623| -28.595001697540283|
|2000-08-31|33.099998474121094|   28.29499864578247|
|2000-08-31|  4.78000020980835| -28.319998264312744|
|2000-09-01|33.380001068115234|  28.600000858306885|
|2000-09-01|0.9764000177383423|  -32.40360105037689|
|2000-09-01| 4.835000038146973|  3.8586000204086304|
|2000-09-05| 33.79999923706055|  28.964999198913574|
|2000-09-05|0.9797000288963318| -32.820299208164215|
|2000-09-05| 4.960000038146973|   3.980300009250641|
|2000-09-06| 34.95000076293945|   29.990000724

In [48]:
df = df.na.fill({'Price_Change': 0})
df.show()

+------+-----------+----------+------------------+------------------+------------------+------------------+------+----+-----+---+--------------+-------------+------------------+--------------------+
|ticker|  commodity|      date|              open|              high|               low|             close|volume|Year|Month|Day|CommodityIndex| CommodityVec|   Volume_7day_avg|        Price_Change|
+------+-----------+----------+------------------+------------------+------------------+------------------+------+----+-----+---+--------------+-------------+------------------+--------------------+
|  CL=F|  Crude Oil|2000-08-23|31.950000762939453| 32.79999923706055|31.950000762939453| 32.04999923706055| 79385|2000|    8| 23|           0.0|(4,[0],[1.0])|           79385.0|                 0.0|
|  CL=F|  Crude Oil|2000-08-24|31.899999618530273|  32.2400016784668|31.399999618530273|  31.6299991607666| 72978|2000|    8| 24|           0.0|(4,[0],[1.0])|           76181.5| -0.4200000762939453|
|  CL

In [50]:
# Importing VectorAssembler
from pyspark.ml.feature import VectorAssembler

# Defining the feature columns
feature_columns = ['open', 'high', 'low', 'close', 'CommodityVec']

# Using VectorAssembler to assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transforming the DataFrame
df = assembler.transform(df)

In [51]:
# Splitting the data into training and test sets (70% training, 30% test)
train_data, test_data = df.randomSplit([0.7, 0.3])

In [52]:
from pyspark.ml.regression import LinearRegression

# Initialize the linear regression model
lr = LinearRegression(featuresCol='features', labelCol='Price_Change')

# Train the model on training data
lr_model = lr.fit(train_data)

In [53]:
# Make predictions on test data
test_results = lr_model.evaluate(test_data)

# Display the root mean squared error
print("Root Mean Squared Error (RMSE) on test data:", test_results.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data: 18.65820275115738


In [54]:
# Make predictions on a new DataFrame of features
predictions = lr_model.transform(test_data)

# Show the predictions
predictions.select("prediction", "Price_Change", "features").show()

+-----------------+-----------------+--------------------+
|       prediction|     Price_Change|            features|
+-----------------+-----------------+--------------------+
|75.76929433716968|73.72380208969116|(8,[0,1,2,3],[75....|
|74.48824952225766| 68.2925033569336|(8,[0,1,2,3],[70....|
|76.65739130673619|68.53460311889648|(8,[0,1,2,3],[71....|
|77.29284729465193| 70.6381025314331|(8,[0,1,2,3],[72....|
| 76.9448360270857|71.92899811267853|(8,[0,1,2,3],[73....|
|77.46124265389798|74.18360114097595|(8,[0,1,2,3],[76....|
|77.47513785310161|74.93580341339111|(8,[0,1,2,3],[75....|
|77.98668310212156|76.95489645004272|(8,[0,1,2,3],[77....|
|77.49842409204958|77.18550300598145|(8,[0,1,2,3],[79....|
|77.13962007493987|75.58210277557373|(8,[0,1,2,3],[78....|
|75.80884518346335|75.40260028839111|(8,[0,1,2,3],[77....|
|76.04805786968436|77.10169816017151|(8,[0,1,2,3],[80....|
|74.65637894272021|74.57980179786682|(8,[0,1,2,3],[78....|
|75.63022805661105|75.46979784965515|(8,[0,1,2,3],[75...