In [0]:
# Path to raw data in ADLS
raw_data_path = "/mnt/financial-data/raw/stock_data.csv"

# Load the CSV file into a Spark DataFrame
df = spark.read.csv(raw_data_path, header=True, inferSchema=True)

df = df.withColumnRenamed("_c0", "Date")
# Display the first few rows
df.show()

# Print the schema of the DataFrame
df.printSchema()


+----------+-----+-----+-----+-----+---------+
|      Date| Open| High|  Low|Close|   Volume|
+----------+-----+-----+-----+-----+---------+
|1999-11-01|93.25|94.19|92.12|92.37| 26630600|
|1999-11-02|92.75| 94.5|91.94|92.56| 23174500|
|1999-11-03|92.94| 93.5| 91.5| 92.0| 22258500|
|1999-11-04|92.31|92.75|90.31|91.75| 27119700|
|1999-11-05|91.81|92.87| 90.5|91.56| 35083700|
|1999-11-08|84.81|90.75|84.37|89.94|121909600|
|1999-11-09|89.75|89.87|86.44|88.87| 54884900|
|1999-11-10|88.12|89.12|86.44|87.12| 34692700|
|1999-11-11|88.25|90.44|88.25|89.62| 34637300|
|1999-11-12|89.75| 90.0|87.06|89.19| 24707100|
|1999-11-15|88.25| 88.5|86.94| 87.0| 23540200|
|1999-11-16|86.94|87.75|85.87|87.31| 29582600|
|1999-11-17|86.44|87.06| 85.0| 85.0| 33409500|
|1999-11-18|84.94|85.81| 84.5|84.94| 32246600|
|1999-11-19|84.44|86.56|84.37| 86.0| 29113000|
|1999-11-22|89.62|90.37|88.44|89.81| 45298300|
|1999-11-23|89.25|91.37|88.37|89.62| 35393700|
|1999-11-24|89.56|92.25| 89.5|91.69| 26885500|
|1999-11-26|9

In [0]:
# Summary statistics for numerical columns
df.describe().show()



+-------+------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|              Open|              High|               Low|             Close|             Volume|      Daily_Change|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+
|  count|              6305|              6305|              6305|              6305|               6305|              6305|
|   mean| 95.21055770023762| 96.19492713719255|  94.1983865662172| 95.22719563838204|4.509381306962728E7| 1.996574147501986|
| stddev|105.86132667247786|106.79821086240484|104.82617888104824|105.86565952789142|2.732674267325438E7|2.4697987251266835|
|    min|              15.2|             15.62|             14.87|             15.15|            5850800|              0.13|
|    max|             467.0|            468.35|            464.46|            467.56|          591052200|             24.61|


In [0]:
from pyspark.sql.functions import col

# Count nulls in each column
df.select([col(c).isNull().alias(c) for c in df.columns]).groupBy().sum().show()

++
||
++
||
++



In [0]:
from pyspark.sql.functions import month, year, avg, sum

# Extract year and month
df = df.withColumn("Year", year(col("Date"))).withColumn("Month", month(col("Date")))

# Monthly average close price and total volume
monthly_trends = df.groupBy("Year", "Month").agg(
    avg("Close").alias("Avg_Close"),
    sum("Volume").alias("Total_Volume")
)
monthly_trends.orderBy("Year", "Month").show()


+----+-----+------------------+------------+
|Year|Month|         Avg_Close|Total_Volume|
+----+-----+------------------+------------+
|1999|   11| 89.46238095238094|   732628000|
|1999|   12|106.18954545454545|   630488900|
|2000|    1|107.11399999999999|   637437600|
|2000|    2| 99.29799999999997|   667243800|
|2000|    3| 99.81260869565216|  1014093800|
|2000|    4| 79.00736842105262|  1129073300|
|2000|    5| 67.07045454545455|   672215400|
|2000|    6| 73.04409090909093|   733525100|
|2000|    7| 75.61800000000001|   617092900|
|2000|    8| 71.07173913043479|   609699900|
|2000|    9|            65.509|   712766900|
|2000|   10|58.603636363636355|  1234707800|
|2000|   11| 68.21095238095238|   991731300|
|2000|   12| 51.04249999999999|  1028334100|
|2001|    1|55.780952380952385|  1002765600|
|2001|    2| 59.32684210526316|   768447800|
|2001|    3| 55.70454545454545|   947674900|
|2001|    4| 62.36749999999999|  1037903500|
|2001|    5| 69.77272727272728|   888652900|
|2001|    

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Calculate yearly average close prices
yearly_trends = df.groupBy("Year").agg(avg("Close").alias("Avg_Close"))

# Define a window to calculate the lag (previous year's Avg_Close)
window_spec = Window.orderBy("Year")

# Add a column for the previous year's Avg_Close
yearly_trends = yearly_trends.withColumn("Prev_Year_Avg", lag("Avg_Close").over(window_spec))

# Calculate Growth Rate
yearly_trends = yearly_trends.withColumn(
    "Growth_Rate",
    ((col("Avg_Close") - col("Prev_Year_Avg")) / col("Prev_Year_Avg") * 100)
)

# Show the results
yearly_trends.show()


+----+------------------+------------------+-------------------+
|Year|         Avg_Close|     Prev_Year_Avg|        Growth_Rate|
+----+------------------+------------------+-------------------+
|1999| 98.02046511627907|              NULL|               NULL|
|2000| 76.21960317460321| 98.02046511627907|-22.241132926490483|
|2001|62.542459677419366| 76.21960317460321|-17.944390848968812|
|2002|54.549047619047634|62.542459677419366|-12.780776610961645|
|2003|29.238214285714268|54.549047619047634| -46.40013792742229|
|2004| 27.12471825396824|29.238214285714268|-7.2285400575187575|
|2005|25.871030555555564| 27.12471825396824| -4.621938140239546|
|2006| 26.29079163346613|25.871030555555564|  1.622513942802438|
|2007| 30.44587609561754| 26.29079163346613| 15.804333776173968|
|2008|26.647508300395227| 30.44587609561754|-12.475803893089674|
|2009|22.976555555555564|26.647508300395227| -13.77596998359962|
|2010|27.058353174603162|22.976555555555564| 17.765054510360017|
|2011|26.052156746031752|

In [0]:
# Most volatile days
volatile_days = df.orderBy(col("Daily_Change").desc()).limit(10)
volatile_days.show()


+----------+--------+------+------+------+--------+------------+----+-----+
|      Date|    Open|  High|   Low| Close|  Volume|Daily_Change|Year|Month|
+----------+--------+------+------+------+--------+------------+----+-----+
|2023-07-18|  345.83|366.78|342.17|359.49|64872705|       24.61|2023|    7|
|2022-02-24|  272.51|295.16|271.52|294.59|56989686|       23.64|2022|    2|
|2020-03-13|   147.5|161.91|140.73|158.83|92727446|       21.18|2020|    3|
|2022-01-24|   292.2|297.11|276.05|296.37|86035393|       21.06|2022|    1|
|2024-01-31|  406.96|415.32|397.21|397.58|47871097|       18.11|2024|    1|
|2022-10-13|  219.85| 236.1|219.13|234.24|42551818|       16.97|2022|   10|
|2022-01-13|320.4658|320.88| 304.0| 304.8|45365979|       16.88|2022|    1|
|2022-01-26| 307.985| 308.5|293.03|296.71|90428853|       15.47|2022|    1|
|2022-11-30|  240.57|255.33|239.86|255.14|47594239|       15.47|2022|   11|
|2024-08-05|  389.17|401.04|385.58|395.15|40709238|       15.46|2024|    8|
+----------+

In [0]:
# Average closing price by month across all years
seasonality = df.groupBy("Month").agg(avg("Close").alias("Avg_Close"))
seasonality.orderBy("Month").show()


+-----+-----------------+
|Month|        Avg_Close|
+-----+-----------------+
|    1|91.11360749506906|
|    2|92.56489416666669|
|    3|91.20604168190128|
|    4|93.22244941860464|
|    5|94.25067627118644|
|    6|95.10028662900191|
|    7|99.46647628083488|
|    8|99.18476276978426|
|    9|98.07370475247524|
|   10|98.11604710144925|
|   11|99.28311878557875|
|   12|90.62514581749056|
+-----+-----------------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Define the window specification for 7-day moving average
window_spec = Window.orderBy("Date").rowsBetween(-6, 0)

# Add a 7-Day Moving Average column
df_filtered = df_filtered.withColumn("7_Day_Avg_Close", round(avg(col("Close")).over(window_spec), 2))
df_filtered.show(5)


+----------+-------+--------+-------+------+--------+------------+---------------+
|      Date|   Open|    High|    Low| Close|  Volume|Daily_Change|7_Day_Avg_Close|
+----------+-------+--------+-------+------+--------+------------+---------------+
|2024-01-02| 373.86|   375.9|366.771|370.87|25258633|        9.13|         370.87|
|2024-01-03| 369.01|373.2562| 368.51| 370.6|23083465|        4.75|         370.74|
|2024-01-04|370.665|   373.1| 367.17|367.94|20901502|        5.93|          369.8|
|2024-01-05| 368.97|  372.06|  366.5|367.75|20074451|        5.56|         369.29|
|2024-01-08|  369.3|   375.2| 369.01|374.69|23133967|        6.19|         370.37|
+----------+-------+--------+-------+------+--------+------------+---------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col, round

# Add Daily_Change column (High - Low)
df = df.withColumn("Daily_Change", round(col("High") - col("Low"), 2))
df.show(5)



+----------+-----+-----+-----+-----+--------+------------+----+-----+
|      Date| Open| High|  Low|Close|  Volume|Daily_Change|Year|Month|
+----------+-----+-----+-----+-----+--------+------------+----+-----+
|1999-11-01|93.25|94.19|92.12|92.37|26630600|        2.07|1999|   11|
|1999-11-02|92.75| 94.5|91.94|92.56|23174500|        2.56|1999|   11|
|1999-11-03|92.94| 93.5| 91.5| 92.0|22258500|         2.0|1999|   11|
|1999-11-04|92.31|92.75|90.31|91.75|27119700|        2.44|1999|   11|
|1999-11-05|91.81|92.87| 90.5|91.56|35083700|        2.37|1999|   11|
+----------+-----+-----+-----+-----+--------+------------+----+-----+
only showing top 5 rows



In [0]:
# Define the processed data path
processed_data_path = "/mnt/financial-data/processed/stock_data_with_daily_change.csv"

# Save the DataFrame as a CSV file
df.write.mode("overwrite").csv(processed_data_path, header=True)

print(f"Processed data with 'Daily_Change' saved to {processed_data_path}")


Processed data with 'Daily_Change' saved to /mnt/financial-data/processed/stock_data_with_daily_change.csv
