# Stock Price Analysis using Spark

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark  = SparkSession.builder.appName('Stock Price Analysis').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/28 14:15:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#reading csv file with headers
stocks = spark.read.csv('spark-data', header = True)

In [4]:
#show data
stocks.show(10)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
| BRK-B|05/23/2023|  $323.11 |4031342|$328.19 |$329.27 |$322.97 |
| BRK-B|05/22/2023|  $329.13 |2763422|$330.75 |$331.49 |$328.35 |
| BRK-B|05/19/2023|  $330.39 |4323538|$331.00 |$333.94 |$329.12 |
| BRK-B|05/18/2023|  $329.76 |2808329|$326.87 |$329.98 |$325.85 |
| BRK-B|05/17/2023|  $327.39 |3047626|$325.02 |$328.26 |$324.82 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 10 rows



In [5]:
# schema of the data
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [6]:
stocks.select(['Ticker', 'Date', 'Open']).show(10)

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
| BRK-B|05/25/2023|$320.56 |
| BRK-B|05/24/2023|$322.71 |
| BRK-B|05/23/2023|$328.19 |
| BRK-B|05/22/2023|$330.75 |
| BRK-B|05/19/2023|$331.00 |
| BRK-B|05/18/2023|$326.87 |
| BRK-B|05/17/2023|$325.02 |
+------+----------+--------+
only showing top 10 rows



In [7]:
#filter some data
stocks.filter(stocks.Ticker == 'MSFT').show(10)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|  MSFT|05/30/2023|  $331.21 |29503070|$335.23 |$335.74 |$330.52 |
|  MSFT|05/26/2023|  $332.89 |36630630|$324.02 |$333.40 |$323.88 |
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|  MSFT|05/24/2023|  $313.85 |23384890|$314.73 |$316.50 |$312.61 |
|  MSFT|05/23/2023|  $315.26 |30797170|$320.03 |$322.72 |$315.25 |
|  MSFT|05/22/2023|  $321.18 |24115660|$318.60 |$322.59 |$318.01 |
|  MSFT|05/19/2023|  $318.34 |27546700|$316.74 |$318.75 |$316.37 |
|  MSFT|05/18/2023|  $318.52 |27275990|$314.53 |$319.04 |$313.72 |
|  MSFT|05/17/2023|  $314.00 |24315010|$312.29 |$314.43 |$310.74 |
+------+----------+----------+--------+--------+--------+--------+
only showing top 10 rows



In [8]:
stocks.filter((stocks.Ticker == 'MSFT') & (stocks.Date == '05/25/2023')).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
+------+----------+----------+--------+--------+--------+--------+



In [9]:
stocks.filter(((stocks.Ticker == 'V') | (stocks.Ticker == 'MSFT')) & (stocks.Date == '05/25/2023')).show(15)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|     V|05/25/2023|  $223.38 | 6827756|$222.45 |$224.69 |$220.49 |
+------+----------+----------+--------+--------+--------+--------+



In [10]:
stocks.filter((stocks.Ticker.isin(['MSFT', 'QQQ', 'SPY', 'V', 'TSLA'])) & (stocks.Date == '05/25/2023')).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|  TSLA|05/25/2023|  $184.47 |96870720|$186.54 |$186.78 |$180.58 |
|     V|05/25/2023|  $223.38 | 6827756|$222.45 |$224.69 |$220.49 |
|   SPY|05/25/2023|    414.65|90961610|  414.74|  416.16|412.4101|
|   QQQ|05/25/2023|    339.72|66862770|  339.14|  341.01| 336.665|
+------+----------+----------+--------+--------+--------+--------+



# Data Cleaning and altering data types

In [11]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

In [12]:
from datetime import datetime

In [13]:
date_parser = udf(lambda date: datetime.strptime(date,"%m/%d/%Y"), DateType())

In [14]:
stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date))

In [15]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParsedDate: date (nullable = true)



In [16]:
def num_parser(value):
    if isinstance(value, str):
        return float(value.strip('$'))
    elif isinstance(value, int) or isinstance(value,float):
        return value
    else: 
        return None

from pyspark.sql.types import FloatType
parser_number = udf(num_parser, FloatType())

In [17]:
#update the columns with chain operation
stocks = (stocks.withColumn('Open', parser_number(stocks.Open))
                .withColumn('Close', parser_number(stocks['Close/Last']))
                .withColumn('Low', parser_number(stocks.Low))
                .withColumn('High', parser_number(stocks.High))
         
         )


In [18]:
stocks.show(10)

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|ParsedDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
| BRK-B|05/23/2023|  $323.11 |4031342|328.19|329.27|322.97|2023-05-23|323.11|
| BRK-B|05/22/2023|  $329.13 |2763422|330.75|331.49|328.35|2023-05-22|329.13|
| BRK-B|05/19/2023|  $330.39 |4323538| 331.0|333.94|329.12|2023-05-19|330.39|
| BRK-B|05/18/2023|  $329.76 |2808329|326.87|329.98|325.85|2023-05-18|329.76|
| BRK-B|05/17/2023|  $327.39 |3047626|325.02|328.26|324.82|2023-

In [19]:
from pyspark.sql.types import IntegerType
parse_int = udf(lambda value: int(value), IntegerType())

In [20]:
stocks = stocks.withColumn('Volume', parse_int(stocks.Volume))

In [21]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [22]:
stocks = stocks.drop('Date', 'Close/Last')

stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [23]:
stocks.show(10)

+------+-------+------+------+------+----------+------+
|Ticker| Volume|  Open|  High|   Low|ParsedDate| Close|
+------+-------+------+------+------+----------+------+
| BRK-B|6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
| BRK-B|4031342|328.19|329.27|322.97|2023-05-23|323.11|
| BRK-B|2763422|330.75|331.49|328.35|2023-05-22|329.13|
| BRK-B|4323538| 331.0|333.94|329.12|2023-05-19|330.39|
| BRK-B|2808329|326.87|329.98|325.85|2023-05-18|329.76|
| BRK-B|3047626|325.02|328.26|324.82|2023-05-17|327.39|
+------+-------+------+------+------+----------+------+
only showing top 10 rows



 Statistics about dataset


In [24]:
stocks.describe(['Volume', 'Open', 'Low', 'High', 'Close']).show()

25/01/28 14:15:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+--------------------+------------------+------------------+------------------+------------------+
|summary|              Volume|              Open|               Low|              High|             Close|
+-------+--------------------+------------------+------------------+------------------+------------------+
|  count|               15108|             15108|             15108|             15108|             15108|
|   mean|5.1868408793685466E7|180.09656566181036| 177.9982781513109| 182.1253348687101| 180.1256089860054|
| stddev| 5.496484129953463E7|101.16125813324396|100.26590135955209|101.96625521621728|101.14891782168517|
|    min|              961133|             12.07|              11.8|             12.45|             11.93|
|    max|           914080943|            479.22|            476.06|            479.98|            477.71|
+-------+--------------------+------------------+------------------+------------------+------------------+



In [25]:
# average high price per ticker
stocks.groupBy('Ticker').agg({'High': 'avg'}).show()



+------+------------------+
|Ticker|         avg(High)|
+------+------------------+
| BRK-B|249.72235103785945|
|  MSFT|210.81611595729498|
|  META| 222.3329786462761|
|  TSLA| 147.9295392964358|
|  AAPL|108.49219235093751|
|  AMZN|123.62966649886442|
| GOOGL| 90.59803017820036|
|  NVDA|134.14990470104505|
|   TSM| 77.55591741717932|
|     V|194.85096124432408|
|   QQQ|267.93787476749435|
|   SPY| 357.4884863286098|
+------+------------------+



In [26]:
# maximum volume per ticker
stocks.groupBy('Ticker').agg({'Volume': 'max'}).show()

+------+-----------+
|Ticker|max(Volume)|
+------+-----------+
| BRK-B|   22303500|
|  MSFT|  110945000|
|  META|  232316600|
|  TSLA|  914080943|
|  AAPL|  426884800|
|  AMZN|  311345600|
| GOOGL|  132345540|
|  NVDA|  250920160|
|   TSM|   60793170|
|     V|   38379570|
|   QQQ|  199448100|
|   SPY|  392220700|
+------+-----------+



Stock Analysis

In [27]:
#max stock price for various stocks
stocks.groupBy('Ticker').max('Open').withColumnRenamed('max(Open)', 'max_stock_price').show(15)

+------+---------------+
|Ticker|max_stock_price|
+------+---------------+
| BRK-B|         361.39|
|  MSFT|         344.62|
|  META|         381.68|
|  TSLA|         411.47|
|  AAPL|         182.63|
|  AMZN|          187.2|
| GOOGL|         151.25|
|  NVDA|         405.95|
|   TSM|         141.61|
|     V|         250.05|
|   QQQ|         405.57|
|   SPY|         479.22|
+------+---------------+



In [28]:
#max stock price and total volume

import pyspark.sql.functions as func
stocks.groupBy('Ticker').agg(
    func.max('Open').alias('max_stock_price'),
    func.sum('Volume').alias('total_volume')

).show(15)

+------+---------------+------------+
|Ticker|max_stock_price|total_volume|
+------+---------------+------------+
| BRK-B|         361.39|  5862401321|
|  MSFT|         344.62| 37976660472|
|  META|         381.68| 30148848043|
|  TSLA|         411.47|171802975076|
|  AAPL|         182.63|139310061360|
|  AMZN|          187.2|104503287430|
| GOOGL|         151.25| 43956560981|
|  NVDA|         405.95| 58787218324|
|   TSM|         141.61| 12506470104|
|     V|         250.05| 10410997871|
|   QQQ|         405.57| 60437153773|
|   SPY|         479.22|107925285300|
+------+---------------+------------+



In [29]:
#parse dates
stocks = (stocks.withColumn('year', func.year(stocks.ParsedDate))
                .withColumn('month', func.month(stocks.ParsedDate))
                .withColumn('day', func.dayofmonth(stocks.ParsedDate))
                .withColumn('week', func.weekofyear(stocks.ParsedDate))
         )

In [30]:
stocks.show(10)

+------+-------+------+------+------+----------+------+----+-----+---+----+
|Ticker| Volume|  Open|  High|   Low|ParsedDate| Close|year|month|day|week|
+------+-------+------+------+------+----------+------+----+-----+---+----+
| BRK-B|6175417|321.12|322.41|319.39|2023-05-31|321.08|2023|    5| 31|  22|
| BRK-B|3232461|321.86|322.47| 319.0|2023-05-30|322.19|2023|    5| 30|  22|
| BRK-B|3229873|320.44|322.63|319.67|2023-05-26| 320.6|2023|    5| 26|  21|
| BRK-B|4251935|320.56|320.56|317.71|2023-05-25|319.02|2023|    5| 25|  21|
| BRK-B|3075393|322.71| 323.0|319.56|2023-05-24| 320.2|2023|    5| 24|  21|
| BRK-B|4031342|328.19|329.27|322.97|2023-05-23|323.11|2023|    5| 23|  21|
| BRK-B|2763422|330.75|331.49|328.35|2023-05-22|329.13|2023|    5| 22|  21|
| BRK-B|4323538| 331.0|333.94|329.12|2023-05-19|330.39|2023|    5| 19|  20|
| BRK-B|2808329|326.87|329.98|325.85|2023-05-18|329.76|2023|    5| 18|  20|
| BRK-B|3047626|325.02|328.26|324.82|2023-05-17|327.39|2023|    5| 17|  20|
+------+----

In [31]:
# yearly high and low stock prices
yearly = stocks.groupBy(['Ticker', 'year']).agg(func.max('Open').alias('yearly_high'), func.min('Open').alias('yearly_low'))

In [32]:
yearly.show()

+------+----+-----------+----------+
|Ticker|year|yearly_high|yearly_low|
+------+----+-----------+----------+
| BRK-B|2023|      331.0|    294.68|
|  MSFT|2019|     159.45|     99.55|
|  MSFT|2021|     344.62|    212.17|
| BRK-B|2018|      224.0|    185.43|
|  MSFT|2020|     229.27|    137.01|
| BRK-B|2021|     300.88|    228.21|
|  MSFT|2018|     115.42|     95.14|
| BRK-B|2020|     233.92|     165.3|
|  MSFT|2023|     335.23|     223.0|
| BRK-B|2019|     227.27|    194.78|
|  MSFT|2022|     335.35|    217.55|
| BRK-B|2022|     361.39|    260.58|
|  META|2020|     300.16|    139.75|
|  META|2021|     381.68|     247.9|
|  TSLA|2019|       29.0|     12.07|
|  META|2018|     215.72|     123.1|
|  TSLA|2021|     411.47|    184.18|
|  TSLA|2018|       25.0|     17.02|
|  META|2022|     339.95|     90.08|
|  META|2019|     208.67|    128.99|
+------+----+-----------+----------+
only showing top 20 rows



In [33]:
#calculate average stock price for each month
## Calculate average stock price for stock each month
monthly = stocks.groupBy(['Ticker', 'Year', 'Month']).agg(func.max("Open").alias("month_high"), func.min("Open").alias("month_low"))


In [34]:
monthly.show(10)

+------+----+-----+----------+---------+
|Ticker|Year|Month|month_high|month_low|
+------+----+-----+----------+---------+
| BRK-B|2022|   10|    297.98|   260.58|
| BRK-B|2018|    9|    222.13|   209.21|
|  MSFT|2022|    6|     275.2|   243.86|
|  MSFT|2021|    2|    245.03|   230.01|
|  MSFT|2020|    1|    174.05|   157.08|
| BRK-B|2021|   10|    290.85|   273.02|
| BRK-B|2020|   10|    216.74|   200.03|
|  MSFT|2019|    6|    137.45|   121.28|
|  MSFT|2022|   10|    247.93|   219.85|
| BRK-B|2019|    9|    212.24|   201.19|
+------+----+-----+----------+---------+
only showing top 10 rows



In [35]:
#calculate average stock price for each week
weekly = stocks.groupBy(['Ticker', 'Year', 'Week']).agg(func.max("Open").alias("week_high"), func.min("Open").alias("week_low"))

In [36]:
weekly.show(15)

+------+----+----+---------+--------+
|Ticker|Year|Week|week_high|week_low|
+------+----+----+---------+--------+
| BRK-B|2022|  14|    352.0|  341.17|
| BRK-B|2022|  10|   326.59|  322.49|
| BRK-B|2021|  14|   264.22|  260.02|
| BRK-B|2018|  48|   217.23|   209.3|
|  MSFT|2022|   6|   309.87|  301.25|
|  MSFT|2021|   2|   218.47|  213.52|
| BRK-B|2020|  19|   180.05|   173.4|
|  MSFT|2020|   1|   158.78|  158.32|
| BRK-B|2021|  32|   291.81|  287.01|
| BRK-B|2021|  10|   264.22|   255.6|
| BRK-B|2020|  10|   217.39|  203.48|
| BRK-B|2020|  28|    183.1|  178.26|
|  MSFT|2021|  41|   302.34|  292.92|
|  MSFT|2020|  19|   184.98|  174.49|
| BRK-B|2019|  30|   208.37|  205.34|
+------+----+----+---------+--------+
only showing top 15 rows



In [37]:
# difference between high and low stock prices wekkly 
from pyspark.sql.functions import round
weekly.withColumn('spread', round(weekly['week_high'] - weekly['week_low'], 3)).show()


+------+----+----+---------+--------+------+
|Ticker|Year|Week|week_high|week_low|spread|
+------+----+----+---------+--------+------+
| BRK-B|2022|  14|    352.0|  341.17| 10.83|
| BRK-B|2022|  10|   326.59|  322.49|   4.1|
| BRK-B|2021|  14|   264.22|  260.02|   4.2|
| BRK-B|2018|  48|   217.23|   209.3|  7.93|
|  MSFT|2022|   6|   309.87|  301.25|  8.62|
|  MSFT|2021|   2|   218.47|  213.52|  4.95|
| BRK-B|2020|  19|   180.05|   173.4|  6.65|
|  MSFT|2020|   1|   158.78|  158.32|  0.46|
| BRK-B|2021|  32|   291.81|  287.01|   4.8|
| BRK-B|2021|  10|   264.22|   255.6|  8.62|
| BRK-B|2020|  10|   217.39|  203.48| 13.91|
| BRK-B|2020|  28|    183.1|  178.26|  4.84|
|  MSFT|2021|  41|   302.34|  292.92|  9.42|
|  MSFT|2020|  19|   184.98|  174.49| 10.49|
| BRK-B|2019|  30|   208.37|  205.34|  3.03|
|  MSFT|2019|  43|   139.39|  136.88|  2.51|
|  MSFT|2021|  44|   338.51|  330.31|   8.2|
|  MSFT|2019|   6|    107.0|  102.87|  4.13|
|  MSFT|2022|  10|   288.53|   277.8| 10.73|
| BRK-B|20

# Using joins

In [38]:
# joining yearly and stocks dataframes and remove duplicated columns
historic_stocks = stocks.join(yearly, (stocks.Ticker == yearly.Ticker) & (stocks.year == yearly.year), 'inner').drop(yearly.year, yearly.Ticker)

In [39]:
historic_stocks.show()

+-------+------+------+------+----------+------+-----+---+----+------+----+-----------+----------+
| Volume|  Open|  High|   Low|ParsedDate| Close|month|day|week|Ticker|year|yearly_high|yearly_low|
+-------+------+------+------+----------+------+-----+---+----+------+----+-----------+----------+
|6175417|321.12|322.41|319.39|2023-05-31|321.08|    5| 31|  22| BRK-B|2023|      331.0|    294.68|
|3232461|321.86|322.47| 319.0|2023-05-30|322.19|    5| 30|  22| BRK-B|2023|      331.0|    294.68|
|3229873|320.44|322.63|319.67|2023-05-26| 320.6|    5| 26|  21| BRK-B|2023|      331.0|    294.68|
|4251935|320.56|320.56|317.71|2023-05-25|319.02|    5| 25|  21| BRK-B|2023|      331.0|    294.68|
|3075393|322.71| 323.0|319.56|2023-05-24| 320.2|    5| 24|  21| BRK-B|2023|      331.0|    294.68|
|4031342|328.19|329.27|322.97|2023-05-23|323.11|    5| 23|  21| BRK-B|2023|      331.0|    294.68|
|2763422|330.75|331.49|328.35|2023-05-22|329.13|    5| 22|  21| BRK-B|2023|      331.0|    294.68|
|4323538| 

In [40]:
cond = [(historic_stocks.Ticker==weekly.Ticker) & (historic_stocks.year == weekly.Year) & (historic_stocks.week == weekly.Week)]
historic_stocks = historic_stocks.join(weekly, cond, 'inner').drop(weekly.Year, historic_stocks.Ticker, weekly.Week)

In [41]:
historic_stocks.show()

+-------+------+------+------+----------+------+-----+---+----+-----------+----------+------+----+---------+--------+
| Volume|  Open|  High|   Low|ParsedDate| Close|month|day|year|yearly_high|yearly_low|Ticker|Week|week_high|week_low|
+-------+------+------+------+----------+------+-----+---+----+-----------+----------+------+----+---------+--------+
|6175417|321.12|322.41|319.39|2023-05-31|321.08|    5| 31|2023|      331.0|    294.68| BRK-B|  22|   321.86|  321.12|
|3232461|321.86|322.47| 319.0|2023-05-30|322.19|    5| 30|2023|      331.0|    294.68| BRK-B|  22|   321.86|  321.12|
|3229873|320.44|322.63|319.67|2023-05-26| 320.6|    5| 26|2023|      331.0|    294.68| BRK-B|  21|   330.75|  320.44|
|4251935|320.56|320.56|317.71|2023-05-25|319.02|    5| 25|2023|      331.0|    294.68| BRK-B|  21|   330.75|  320.44|
|3075393|322.71| 323.0|319.56|2023-05-24| 320.2|    5| 24|2023|      331.0|    294.68| BRK-B|  21|   330.75|  320.44|
|4031342|328.19|329.27|322.97|2023-05-23|323.11|    5| 2

In [42]:
historic_stocks = historic_stocks.join(monthly, ['Ticker', 'Year', 'Month'])

In [43]:
historic_stocks.columns

['Ticker',
 'year',
 'month',
 'Volume',
 'Open',
 'High',
 'Low',
 'ParsedDate',
 'Close',
 'day',
 'yearly_high',
 'yearly_low',
 'Week',
 'week_high',
 'week_low',
 'month_high',
 'month_low']

In [44]:
#correcting column mistakes and start columns with capital letter
corrected_columns = {
    'week_high': 'Weekly_High',
    'week_low': 'Weekly_Low',
    'yearly_high': 'Yearly_High',
    'yearly_low': 'Yearly_Low',
    'month_high': 'Monthly_High',
    'month_low': 'Monthly_Low',
    'day': 'Day'
}

# Apply renaming
for old_col, new_col in corrected_columns.items():
    historic_stocks = historic_stocks.withColumnRenamed(old_col, new_col)

# Show the updated schema
historic_stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Yearly_High: float (nullable = true)
 |-- Yearly_Low: float (nullable = true)
 |-- Week: integer (nullable = true)
 |-- Weekly_High: float (nullable = true)
 |-- Weekly_Low: float (nullable = true)
 |-- Monthly_High: float (nullable = true)
 |-- Monthly_Low: float (nullable = true)



In [45]:
historic_stocks.columns

['Ticker',
 'year',
 'month',
 'Volume',
 'Open',
 'High',
 'Low',
 'ParsedDate',
 'Close',
 'Day',
 'Yearly_High',
 'Yearly_Low',
 'Week',
 'Weekly_High',
 'Weekly_Low',
 'Monthly_High',
 'Monthly_Low']

In [46]:
historic_stocks.show(5)

+------+----+-----+-------+------+------+------+----------+------+---+-----------+----------+----+-----------+----------+------------+-----------+
|Ticker|year|month| Volume|  Open|  High|   Low|ParsedDate| Close|Day|Yearly_High|Yearly_Low|Week|Weekly_High|Weekly_Low|Monthly_High|Monthly_Low|
+------+----+-----+-------+------+------+------+----------+------+---+-----------+----------+----+-----------+----------+------------+-----------+
| BRK-B|2023|    5|6175417|321.12|322.41|319.39|2023-05-31|321.08| 31|      331.0|    294.68|  22|     321.86|    321.12|       331.0|     320.44|
| BRK-B|2023|    5|3232461|321.86|322.47| 319.0|2023-05-30|322.19| 30|      331.0|    294.68|  22|     321.86|    321.12|       331.0|     320.44|
| BRK-B|2023|    5|3229873|320.44|322.63|319.67|2023-05-26| 320.6| 26|      331.0|    294.68|  21|     330.75|    320.44|       331.0|     320.44|
| BRK-B|2023|    5|4251935|320.56|320.56|317.71|2023-05-25|319.02| 25|      331.0|    294.68|  21|     330.75|    320.

sql queries using pyspark

In [47]:
historic_stocks.createOrReplaceTempView('stockData')

In [48]:
spark.sql("SELECT * FROM stockData where Ticker='MSFT' and Year='2023'").show(5)

+------+----+-----+--------+------+------+------+----------+------+---+-----------+----------+----+-----------+----------+------------+-----------+
|Ticker|year|month|  Volume|  Open|  High|   Low|ParsedDate| Close|Day|Yearly_High|Yearly_Low|Week|Weekly_High|Weekly_Low|Monthly_High|Monthly_Low|
+------+----+-----+--------+------+------+------+----------+------+---+-----------+----------+----+-----------+----------+------------+-----------+
|  MSFT|2023|    5|45950550|332.29|335.94|327.33|2023-05-31|328.39| 31|     335.23|     223.0|  22|     335.23|    332.29|      335.23|     305.72|
|  MSFT|2023|    5|29503070|335.23|335.74|330.52|2023-05-30|331.21| 30|     335.23|     223.0|  22|     335.23|    332.29|      335.23|     305.72|
|  MSFT|2023|    5|36630630|324.02| 333.4|323.88|2023-05-26|332.89| 26|     335.23|     223.0|  21|     324.02|    314.73|      335.23|     305.72|
|  MSFT|2023|    5|43301740|323.24| 326.9| 320.0|2023-05-25|325.92| 25|     335.23|     223.0|  21|     324.02| 

In [49]:
spark.sql("SELECT * FROM stockData WHERE Ticker IN ('TSLA', 'AMZN') AND Year='2023'").show(5)

+------+----+-----+---------+------+------+------+----------+------+---+-----------+----------+----+-----------+----------+------------+-----------+
|Ticker|year|month|   Volume|  Open|  High|   Low|ParsedDate| Close|Day|Yearly_High|Yearly_Low|Week|Weekly_High|Weekly_Low|Monthly_High|Monthly_Low|
+------+----+-----+---------+------+------+------+----------+------+---+-----------+----------+----+-----------+----------+------------+-----------+
|  TSLA|2023|    5|150711700|199.78|203.95|195.12|2023-05-31|203.93| 31|     211.76|     103.0|  22|      200.1|    199.78|       200.1|     160.01|
|  TSLA|2023|    5|128818700| 200.1|204.48|197.53|2023-05-30|201.16| 30|     211.76|     103.0|  22|      200.1|    199.78|       200.1|     160.01|
|  TSLA|2023|    5|162061500|184.62| 198.6|184.53|2023-05-26|193.17| 26|     211.76|     103.0|  21|     186.54|     180.7|       200.1|     160.01|
|  TSLA|2023|    5| 96870720|186.54|186.78|180.58|2023-05-25|184.47| 25|     211.76|     103.0|  21|     1

In [72]:
#calculate percentage volatility for each day 
stocks = stocks.withColumn('Volatility', ((stocks.High - stocks.Low) / stocks.Close) * 100)
stocks.select('Ticker', 'ParsedDate', 'High', 'Low', 'Close', 'Volatility').show()

+------+----------+------+------+------+------------------+
|Ticker|ParsedDate|  High|   Low| Close|        Volatility|
+------+----------+------+------+------+------------------+
| BRK-B|2023-05-31|322.41|319.39|321.08|0.9405721751493111|
| BRK-B|2023-05-30|322.47| 319.0|322.19|1.0770046228651002|
| BRK-B|2023-05-26|322.63|319.67| 320.6|0.9232661879995099|
| BRK-B|2023-05-25|320.56|317.71|319.02|0.8933628617840262|
| BRK-B|2023-05-24| 323.0|319.56| 320.2|1.0743292661656905|
| BRK-B|2023-05-23|329.27|322.97|323.11|1.9497966879958835|
| BRK-B|2023-05-22|331.49|328.35|329.13|0.9540254866697351|
| BRK-B|2023-05-19|333.94|329.12|330.39|1.4588840795771747|
| BRK-B|2023-05-18|329.98|325.85|329.76|1.2524274504200426|
| BRK-B|2023-05-17|328.26|324.82|327.39|1.0507352965850962|
| BRK-B|2023-05-16|324.69|322.36|323.75|0.7196963984073359|
| BRK-B|2023-05-15|323.83|320.13|323.53| 1.143628629003009|
| BRK-B|2023-05-12|324.24|320.54|322.49|1.1473167544716973|
| BRK-B|2023-05-11|322.96|319.81|322.64|

In [81]:
# calculate the monthly average price
stocks = stocks.withColumn('Yearly_Avg_Price', 
                           (stocks.Open + stocks.High + stocks.Low + stocks.Close) / 4)

yearly_avg = stocks.groupBy('Ticker', 'year').agg(
    func.round(func.avg('Yearly_Avg_Price'), 2).alias('Avg_Price')
)

yearly_avg.orderBy('year', ascending = False).show(35)

+------+----+---------+
|Ticker|year|Avg_Price|
+------+----+---------+
| BRK-B|2023|   313.52|
|  MSFT|2023|    273.7|
|  META|2023|   194.21|
|  TSLA|2023|   174.72|
|  AMZN|2023|   100.97|
|  AAPL|2023|    155.9|
| GOOGL|2023|   101.87|
|  NVDA|2023|    246.0|
|     V|2023|   225.32|
|   TSM|2023|    89.13|
|   SPY|2023|    404.1|
|   QQQ|2023|   306.28|
|  MSFT|2022|   268.95|
| BRK-B|2022|   304.52|
|  META|2022|   180.29|
|  TSLA|2022|   263.58|
|  AAPL|2022|   154.81|
|  AMZN|2022|   126.19|
|  NVDA|2022|   185.71|
| GOOGL|2022|   114.83|
|     V|2022|   206.92|
|   TSM|2022|    91.03|
|   SPY|2022|   408.87|
|   QQQ|2022|   310.79|
|  MSFT|2021|   275.77|
| BRK-B|2021|    272.7|
|  META|2021|   321.17|
|  TSLA|2021|   259.97|
|  AMZN|2021|   167.25|
|  AAPL|2021|   140.92|
| GOOGL|2021|   124.17|
|  NVDA|2021|    195.1|
|   TSM|2021|   118.91|
|     V|2021|   222.29|
|   SPY|2021|   426.15|
+------+----+---------+
only showing top 35 rows



In [50]:
spark.sql("""
    SELECT Ticker, Month, AVG(Close) AS Avg_Close
    FROM stockData
    WHERE Ticker='MSFT' AND Year='2023'
    GROUP BY Ticker, Month
    ORDER BY Month
""").show()

+------+-----+------------------+
|Ticker|Month|         Avg_Close|
+------+-----+------------------+
|  MSFT|    1|237.28099975585937|
|  MSFT|    2|259.69737002724094|
|  MSFT|    3| 266.7426101021145|
|  MSFT|    4|288.64105224609375|
|  MSFT|    5|314.51045365767044|
+------+-----+------------------+



25/01/28 14:15:18 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [82]:
# analyze which months had the highest trading volume
monthly_volume = stocks.groupBy('Ticker', 'year', 'month').agg(
    func.sum('Volume').alias('Total_Volume')).orderBy(func.desc('Total_Volume'))
monthly_volume.show()

+------+----+-----+------------+
|Ticker|year|month|Total_Volume|
+------+----+-----+------------+
|  TSLA|2020|    2|  7102586599|
|  TSLA|2020|    3|  6316006343|
|  AAPL|2020|    3|  6281326760|
|  TSLA|2020|    1|  6124570978|
|   SPY|2020|    3|  5926710300|
|  TSLA|2020|    4|  5723429838|
|  TSLA|2020|    7|  5680830222|
|  TSLA|2020|    9|  5210857991|
|  TSLA|2020|    8|  4673629206|
|  TSLA|2018|   10|  4271037461|
|  TSLA|2019|    5|  4238980543|
|  TSLA|2018|    8|  4125272693|
|  TSLA|2020|    5|  4092712973|
|  AAPL|2020|    8|  4070343860|
|  TSLA|2023|    1|  3897499400|
|  AAPL|2020|    9|  3886793100|
|  TSLA|2020|    6|  3838774921|
|  AAPL|2018|   11|  3808828080|
|  AMZN|2018|   10|  3637687100|
|  TSLA|2023|    2|  3625947300|
+------+----+-----+------------+
only showing top 20 rows



# Advanced Analysis

In [52]:
snapshot = historic_stocks.select(['Ticker', 'ParsedDate', 'Open'])

In [55]:
snapshot.show()

+------+----------+------+
|Ticker|ParsedDate|  Open|
+------+----------+------+
| BRK-B|2023-05-31|321.12|
| BRK-B|2023-05-30|321.86|
| BRK-B|2023-05-26|320.44|
| BRK-B|2023-05-25|320.56|
| BRK-B|2023-05-24|322.71|
| BRK-B|2023-05-23|328.19|
| BRK-B|2023-05-22|330.75|
| BRK-B|2023-05-19| 331.0|
| BRK-B|2023-05-18|326.87|
| BRK-B|2023-05-17|325.02|
| BRK-B|2023-05-16|322.46|
| BRK-B|2023-05-15|322.89|
| BRK-B|2023-05-12|323.82|
| BRK-B|2023-05-11| 321.0|
| BRK-B|2023-05-10|326.08|
| BRK-B|2023-05-09|324.87|
| BRK-B|2023-05-08|328.26|
| BRK-B|2023-05-05|323.36|
| BRK-B|2023-05-04|323.44|
| BRK-B|2023-05-03|327.13|
+------+----------+------+
only showing top 20 rows



In [56]:
# comparetion of previous open day with next day
from pyspark.sql.window import Window
lag1day  = Window.partitionBy('Ticker').orderBy('ParsedDate')


In [58]:
snapshot.withColumn('PreviousOpen', func.lag('Open', 1).over(lag1day)).show()

+------+----------+-----+------------+
|Ticker|ParsedDate| Open|PreviousOpen|
+------+----------+-----+------------+
|  AAPL|2018-05-31|46.81|        NULL|
|  AAPL|2018-06-01| 47.0|       46.81|
|  AAPL|2018-06-04|47.91|        47.0|
|  AAPL|2018-06-05|48.27|       47.91|
|  AAPL|2018-06-06|48.41|       48.27|
|  AAPL|2018-06-07|48.54|       48.41|
|  AAPL|2018-06-08|47.79|       48.54|
|  AAPL|2018-06-11|47.84|       47.79|
|  AAPL|2018-06-12|47.85|       47.84|
|  AAPL|2018-06-13|48.11|       47.85|
|  AAPL|2018-06-14|47.89|       48.11|
|  AAPL|2018-06-15|47.51|       47.89|
|  AAPL|2018-06-18|46.97|       47.51|
|  AAPL|2018-06-19|46.29|       46.97|
|  AAPL|2018-06-20|46.59|       46.29|
|  AAPL|2018-06-21|46.81|       46.59|
|  AAPL|2018-06-22|46.53|       46.81|
|  AAPL|2018-06-25|45.85|       46.53|
|  AAPL|2018-06-26|45.75|       45.85|
|  AAPL|2018-06-27|46.31|       45.75|
+------+----------+-----+------------+
only showing top 20 rows



In [62]:
# calculate 50 day moving average

movingAverage = Window.partitionBy('Ticker').orderBy('ParsedDate').rowsBetween(-50,0)

In [60]:
(snapshot.withColumn('MA50', func.avg('Open').over(movingAverage))
         .withColumn('MA50', func.round('MA50', 2))).show()

+------+----------+-----+-----+
|Ticker|ParsedDate| Open| MA50|
+------+----------+-----+-----+
|  AAPL|2018-05-31|46.81|46.81|
|  AAPL|2018-06-01| 47.0|46.91|
|  AAPL|2018-06-04|47.91|47.24|
|  AAPL|2018-06-05|48.27| 47.5|
|  AAPL|2018-06-06|48.41|47.68|
|  AAPL|2018-06-07|48.54|47.82|
|  AAPL|2018-06-08|47.79|47.82|
|  AAPL|2018-06-11|47.84|47.82|
|  AAPL|2018-06-12|47.85|47.82|
|  AAPL|2018-06-13|48.11|47.85|
|  AAPL|2018-06-14|47.89|47.86|
|  AAPL|2018-06-15|47.51|47.83|
|  AAPL|2018-06-18|46.97|47.76|
|  AAPL|2018-06-19|46.29|47.66|
|  AAPL|2018-06-20|46.59|47.59|
|  AAPL|2018-06-21|46.81|47.54|
|  AAPL|2018-06-22|46.53|47.48|
|  AAPL|2018-06-25|45.85|47.39|
|  AAPL|2018-06-26|45.75| 47.3|
|  AAPL|2018-06-27|46.31|47.25|
+------+----------+-----+-----+
only showing top 20 rows



In [63]:
maximumStock = Window.partitionBy("Ticker").orderBy(snapshot.Open.desc())

In [64]:
snapshot.withColumn("MaxOpen", func.row_number().over(maximumStock)).show()

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AAPL|2022-01-05|179.61|      4|
|  AAPL|2021-12-30|179.47|      5|
|  AAPL|2021-12-29|179.33|      6|
|  AAPL|2021-12-16|179.28|      7|
|  AAPL|2022-03-30|178.55|      8|
|  AAPL|2021-12-31|178.09|      9|
|  AAPL|2022-03-31|177.84|     10|
|  AAPL|2022-01-03|177.83|     11|
|  AAPL|2022-04-05| 177.5|     12|
|  AAPL|2023-05-31|177.33|     13|
|  AAPL|2021-12-27|177.09|     14|
|  AAPL|2023-05-30|176.96|     15|
|  AAPL|2022-03-29|176.69|     16|
|  AAPL|2023-05-19|176.39|     17|
|  AAPL|2022-01-12|176.12|     18|
|  AAPL|2022-02-09|176.05|     19|
|  AAPL|2021-12-23|175.85|     20|
+------+----------+------+-------+
only showing top 20 rows



In [71]:
# calculate top 5 highest close price for each stock in a year
snapshot.withColumn("MaxOpen", func.row_number().over(maximumStock)).filter("MaxOpen<=3").show(35)

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AMZN|2021-07-12| 187.2|      1|
|  AMZN|2021-07-09|186.13|      2|
|  AMZN|2021-07-07|185.87|      3|
| BRK-B|2022-03-29|361.39|      1|
| BRK-B|2022-03-28|360.59|      2|
| BRK-B|2022-03-31| 359.0|      3|
| GOOGL|2022-02-02|151.25|      1|
| GOOGL|2021-11-19|149.98|      2|
| GOOGL|2021-11-08|149.83|      3|
|  META|2021-09-13|381.68|      1|
|  META|2021-09-02| 381.5|      2|
|  META|2021-09-10|381.36|      3|
|  MSFT|2021-11-22|344.62|      1|
|  MSFT|2021-12-28|343.15|      2|
|  MSFT|2021-11-19|342.64|      3|
|  NVDA|2023-05-30|405.95|      1|
|  NVDA|2023-05-31|394.88|      2|
|  NVDA|2023-05-25|385.23|      3|
|   QQQ|2021-11-22|405.57|      1|
|   QQQ|2021-12-28| 404.4|      2|
|   QQQ|2021-11-19|403.43|      3|
|   SPY|2022-01-04|479.22|      1|
|   SPY|2021-12-30|4

In [73]:
# calculate the cumulative volume for each stock ticker
from pyspark.sql import Window
volume_window = Window.partitionBy('Ticker').orderBy('ParsedDate').rowsBetween(Window.unboundedPreceding, 0)
stocks = stocks.withColumn('Cumulative_Volume', func.sum('Volume').over(volume_window))
stocks.select('Ticker', 'ParsedDate', 'Volume', 'Cumulative_Volume').show()

+------+----------+---------+-----------------+
|Ticker|ParsedDate|   Volume|Cumulative_Volume|
+------+----------+---------+-----------------+
|  AAPL|2018-05-31|109874800|        109874800|
|  AAPL|2018-06-01| 93510760|        203385560|
|  AAPL|2018-06-04|104762480|        308148040|
|  AAPL|2018-06-05| 86142280|        394290320|
|  AAPL|2018-06-06| 83651280|        477941600|
|  AAPL|2018-06-07| 85316680|        563258280|
|  AAPL|2018-06-08|106322840|        669581120|
|  AAPL|2018-06-11| 72279720|        741860840|
|  AAPL|2018-06-12| 67436080|        809296920|
|  AAPL|2018-06-13| 85920480|        895217400|
|  AAPL|2018-06-14| 86338880|        981556280|
|  AAPL|2018-06-15|245753160|       1227309440|
|  AAPL|2018-06-18| 73833640|       1301143080|
|  AAPL|2018-06-19|134154200|       1435297280|
|  AAPL|2018-06-20| 81960360|       1517257640|
|  AAPL|2018-06-21|102596960|       1619854600|
|  AAPL|2018-06-22|108740520|       1728595120|
|  AAPL|2018-06-25|126529280|       1855

In [91]:
# find the top 5 gaining and losing stocks each year
from pyspark.sql import functions as func

# grouping by ticker and year 
yearly_change = stocks.groupBy('Ticker', 'year').agg(
    func.first('Close').alias('First_Close'),
    func.last('Close').alias('Last_Close')
).withColumn(
    'Yearly_Change',
    func.round(((func.col('Last_Close') - func.col('First_Close')) / func.col('First_Close')) * 100, 3) 
)

# getting top 5 yearly gainers and losers
top_gainers = yearly_change.orderBy(func.desc('Yearly_Change')).limit(5)
top_losers = yearly_change.orderBy('Yearly_Change').limit(5)


print("top 5 yearly gainers:")
top_gainers.show()

print("top 5 yearly losers:")
top_losers.show()

top 5 yearly gainers:
+------+----+-----------+----------+-------------+
|Ticker|year|First_Close|Last_Close|Yearly_Change|
+------+----+-----------+----------+-------------+
|  TSLA|2022|     123.18|    399.93|      224.671|
|  META|2022|     120.34|    338.54|       181.32|
|  NVDA|2022|     146.14|    301.21|      106.111|
|  AMZN|2022|       84.0|     170.4|      102.857|
|  NVDA|2018|      33.38|     63.05|       88.886|
+------+----+-----------+----------+-------------+

top 5 yearly losers:
+------+----+-----------+----------+-------------+
|Ticker|year|First_Close|Last_Close|Yearly_Change|
+------+----+-----------+----------+-------------+
|  TSLA|2020|     235.22|     28.68|      -87.807|
|  NVDA|2023|     378.34|    143.15|      -62.164|
|  NVDA|2021|     294.11|    131.14|      -55.411|
|  NVDA|2020|     130.55|     59.98|      -54.056|
|  META|2023|     264.72|    124.74|      -52.879|
+------+----+-----------+----------+-------------+



# Saving Data

In [95]:
# parquet format
(moving_avg.write.option("header",True)
             .partitionBy("Ticker", "ParsedDate")
             .mode("overwrite")
             .parquet("movingAverage_parquet"))

                                                                                