## Stock price analysis using Spark

In [1]:
#pip install pandas numpy matplotlib 

In [1]:
import pyspark
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DateType, FloatType

### 1. Extracting and Filtering Stock Data

In [2]:
# Creating a SparkSession
spark = SparkSession.builder.appName("Stock Price Analysis using Spark").getOrCreate()

24/09/14 18:19:52 WARN Utils: Your hostname, Rakeshs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on interface en0)
24/09/14 18:19:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/14 18:19:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Reading the Stocks data 
stocks = spark.read.csv("StockData", header=True)


In [4]:
# Displaiyng the header of the data
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [5]:
# Printing the Schema of the Data
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [6]:
# Select Data from selected Columns
stocks.select("Ticker").show(5)

+------+
|Ticker|
+------+
| BRK-B|
| BRK-B|
| BRK-B|
| BRK-B|
| BRK-B|
+------+
only showing top 5 rows



In [7]:
stocks.select(["Ticker", "Date"]).show(5)

+------+----------+
|Ticker|      Date|
+------+----------+
| BRK-B|05/31/2023|
| BRK-B|05/30/2023|
| BRK-B|05/26/2023|
| BRK-B|05/25/2023|
| BRK-B|05/24/2023|
+------+----------+
only showing top 5 rows



In [8]:
# Fetching all unique values from Ticker Column
stocks.select("Ticker").distinct().show()

+------+
|Ticker|
+------+
| BRK-B|
|  MSFT|
|  META|
|  TSLA|
|  AAPL|
|  AMZN|
| GOOGL|
|  NVDA|
|   TSM|
|     V|
|   QQQ|
|   SPY|
+------+



In [9]:
# Filtering Data => Select rows containing Microsoft Stocks in the last one month
from pyspark.sql import functions as F

# microsoftStocks = stocks.filter(stocks.Ticker == 'MSFT')
# microsoftStocksLastMonth = microsoftStocks.filter(microsoftStocks["Date"] == '05/23/2023').show()
#### OR ####

microsoftStocksLastMonth = stocks.filter(
    ((stocks.Ticker == 'MSFT') | (stocks.Ticker == 'V')) &
    (stocks["Date"] == '05/31/2023')
).show(15)

### OR ###

# stocksIn = stocks.filter(
#     (stocks.Ticker.isin(["MSFT", "AMZN", 'V']))  &
#     (stocks["Date"] == '05/30/2023')).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



### 2.Creating custom UDFs

In [10]:
## Create an UDF to change StringType of Date column to DateType
date_parser = udf(lambda date: datetime.strptime(date, "%m/%d/%Y"), DateType())

In [11]:
stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date))

In [12]:
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+----------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|ParsedDate|
+------+----------+----------+-------+--------+--------+--------+----------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |2023-05-31|
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |2023-05-30|
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |2023-05-26|
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |2023-05-25|
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |2023-05-24|
+------+----------+----------+-------+--------+--------+--------+----------+
only showing top 5 rows



In [13]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParsedDate: date (nullable = true)



In [14]:
## remove the dollar symbol and convert it into float if it is string, leave float and int as is and return None else

def num_parser(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, float) or isinstance(value, int):
        return value
    else:
        None
print(num_parser('$222.2'))

222.2


In [15]:
number_parser = udf(num_parser, FloatType())

In [16]:
stocks = (stocks.withColumn("Close", number_parser(stocks["Close/Last"]))
                 .withColumn("Open", number_parser(stocks["Open"]))
                 .withColumn("High", number_parser(stocks["High"]))
                 .withColumn("Low", number_parser(stocks["Low"])))

In [17]:
stocks.show(5)

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|ParsedDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
+------+----------+----------+-------+------+------+------+----------+------+
only showing top 5 rows



In [18]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [19]:
## Conveting Volume datatype form String to IntegerType
from pyspark.sql.types import IntegerType
int_parser = udf(lambda x: int(x), IntegerType())

In [20]:
stocks = stocks.withColumn("Volume", int_parser(stocks.Volume))

In [21]:
stocks.show(5)

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|ParsedDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
+------+----------+----------+-------+------+------+------+----------+------+
only showing top 5 rows



In [22]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [23]:
## Selecting only required Columns

cleaned_stocks = stocks[["Ticker", "ParsedDate","Close", "Volume", "Open", "High", "Low"]]

In [24]:
cleaned_stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)



In [25]:
## Calculating basic statistics of the cleaned Stocks Data

cleaned_stocks[["Close", "Open", "Volume", "High", "Low"]].describe().show()

24/09/14 18:20:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+--------------------+------------------+------------------+
|summary|             Close|              Open|              Volume|              High|               Low|
+-------+------------------+------------------+--------------------+------------------+------------------+
|  count|             15108|             15108|               15108|             15108|             15108|
|   mean| 180.1256089860054|180.09656566181036|5.1868408793685466E7| 182.1253348687101| 177.9982781513109|
| stddev|101.14891782168517|101.16125813324396| 5.496484129953463E7|101.96625521621728|100.26590135955209|
|    min|             11.93|             12.07|              961133|             12.45|              11.8|
|    max|            477.71|            479.22|           914080943|            479.98|            476.06|
+-------+------------------+------------------+--------------------+------------------+------------------+



### 3.Basic Analysis

In [26]:
# Calculate the maximum stock price for various stocks
stocks.groupby(["Ticker"]).max().withColumnRenamed("max(Open)", "MaxOpenValue").show(15)

+------+-----------+------------+---------+--------+----------+
|Ticker|max(Volume)|MaxOpenValue|max(High)|max(Low)|max(Close)|
+------+-----------+------------+---------+--------+----------+
| BRK-B|   22303500|      361.39|    362.1|  355.53|    359.57|
|  MSFT|  110945000|      344.62|   349.67|   342.2|    343.11|
|  META|  232316600|      381.68|   384.33|  378.81|    382.18|
|  TSLA|  914080943|      411.47|    414.5|  405.67|    409.97|
|  AAPL|  426884800|      182.63|   182.94|  179.12|    182.01|
|  AMZN|  311345600|       187.2|   188.65|  184.84|    186.57|
| GOOGL|  132345540|      151.25|   151.55|   148.9|    149.84|
|  NVDA|  250920160|      405.95|   419.38|  399.49|    401.11|
|   TSM|   60793170|      141.61|    145.0|  139.42|    140.66|
|     V|   38379570|      250.05|   252.67|  248.22|    250.93|
|   QQQ|  199448100|      405.57|   408.71|  402.58|    403.99|
|   SPY|  392220700|      479.22|   479.98|  476.06|    477.71|
+------+-----------+------------+-------

In [31]:
## Using inbuilt pyspark functions
import pyspark.sql.functions as func
cleaned_stocks.groupBy("Ticker").agg(func.max("Close").alias("MaxCloseValue")).show()

+------+-------------+
|Ticker|MaxCloseValue|
+------+-------------+
| BRK-B|       359.57|
|  MSFT|       343.11|
|  META|       382.18|
|  TSLA|       409.97|
|  AAPL|       182.01|
|  AMZN|       186.57|
| GOOGL|       149.84|
|  NVDA|       401.11|
|   TSM|       140.66|
|     V|       250.93|
|   QQQ|       403.99|
|   SPY|       477.71|
+------+-------------+



In [32]:
cleaned_stocks.groupBy("Ticker").agg(
    func.max("Close").alias("MaxCloseValue"),
    func.sum("Volume").alias("SumOfVolumes")
).show(15)

+------+-------------+------------+
|Ticker|MaxCloseValue|SumOfVolumes|
+------+-------------+------------+
| BRK-B|       359.57|  5862401321|
|  MSFT|       343.11| 37976660472|
|  META|       382.18| 30148848043|
|  TSLA|       409.97|171802975076|
|  AAPL|       182.01|139310061360|
|  AMZN|       186.57|104503287430|
| GOOGL|       149.84| 43956560981|
|  NVDA|       401.11| 58787218324|
|   TSM|       140.66| 12506470104|
|     V|       250.93| 10410997871|
|   QQQ|       403.99| 60437153773|
|   SPY|       477.71|107925285300|
+------+-------------+------------+



In [34]:
## extracting year, month, day of month, week of year and storing in the cleaned_stocks data

cleaned_stocks = (cleaned_stocks.withColumn("Year", func.year(cleaned_stocks.ParsedDate)))

In [35]:
cleaned_stocks.show(5)

+------+----------+------+-------+------+------+------+----+
|Ticker|ParsedDate| Close| Volume|  Open|  High|   Low|Year|
+------+----------+------+-------+------+------+------+----+
| BRK-B|2023-05-31|321.08|6175417|321.12|322.41|319.39|2023|
| BRK-B|2023-05-30|322.19|3232461|321.86|322.47| 319.0|2023|
| BRK-B|2023-05-26| 320.6|3229873|320.44|322.63|319.67|2023|
| BRK-B|2023-05-25|319.02|4251935|320.56|320.56|317.71|2023|
| BRK-B|2023-05-24| 320.2|3075393|322.71| 323.0|319.56|2023|
+------+----------+------+-------+------+------+------+----+
only showing top 5 rows



In [36]:
cleaned_stocks = (cleaned_stocks.withColumn("Year", func.year(cleaned_stocks.ParsedDate))
                                .withColumn("Month", func.month(cleaned_stocks.ParsedDate))
                                .withColumn("Day", func.dayofmonth(cleaned_stocks.ParsedDate))
                                .withColumn("Week", func.weekofyear(cleaned_stocks.ParsedDate))
                 )

In [37]:
cleaned_stocks.show(5)

+------+----------+------+-------+------+------+------+----+-----+---+----+
|Ticker|ParsedDate| Close| Volume|  Open|  High|   Low|Year|Month|Day|Week|
+------+----------+------+-------+------+------+------+----+-----+---+----+
| BRK-B|2023-05-31|321.08|6175417|321.12|322.41|319.39|2023|    5| 31|  22|
| BRK-B|2023-05-30|322.19|3232461|321.86|322.47| 319.0|2023|    5| 30|  22|
| BRK-B|2023-05-26| 320.6|3229873|320.44|322.63|319.67|2023|    5| 26|  21|
| BRK-B|2023-05-25|319.02|4251935|320.56|320.56|317.71|2023|    5| 25|  21|
| BRK-B|2023-05-24| 320.2|3075393|322.71| 323.0|319.56|2023|    5| 24|  21|
+------+----------+------+-------+------+------+------+----+-----+---+----+
only showing top 5 rows



In [58]:
## Maximum and Minimum opening price of different stocks in every year

yearlyOpening = cleaned_stocks.groupBy(["Ticker", "Year"]).agg(
    func.max("Open").alias("MaxOpeningYear"),
    func.min("Open").alias("MinOpeningYear"),
)

In [59]:
## Maximum and Minimum opening price of different stocks in every month of an year

monthlyOpening = cleaned_stocks.groupBy(["Ticker", "Year", "Month"]).agg(
    func.max("Open").alias("MaxOpeningMonth"),
    func.min("Open").alias("MinOpeningMonth"),
)

In [60]:
## Maximum and Minimum opening price of different stocks in every week of a month

weeklyOpening = cleaned_stocks.groupBy(["Ticker", "Year", "Week"]).agg(
    func.max("Open").alias("MaxOpeningWeek"),
    func.min("Open").alias("MinOpeningWeek"),
)

In [61]:
yearlyOpening.show(20)

+------+----+--------------+--------------+
|Ticker|Year|MaxOpeningYear|MinOpeningYear|
+------+----+--------------+--------------+
| BRK-B|2023|         331.0|        294.68|
|  MSFT|2019|        159.45|         99.55|
|  MSFT|2021|        344.62|        212.17|
| BRK-B|2018|         224.0|        185.43|
|  MSFT|2020|        229.27|        137.01|
| BRK-B|2021|        300.88|        228.21|
|  MSFT|2018|        115.42|         95.14|
| BRK-B|2020|        233.92|         165.3|
|  MSFT|2023|        335.23|         223.0|
| BRK-B|2019|        227.27|        194.78|
|  MSFT|2022|        335.35|        217.55|
| BRK-B|2022|        361.39|        260.58|
|  META|2020|        300.16|        139.75|
|  META|2021|        381.68|         247.9|
|  TSLA|2019|          29.0|         12.07|
|  META|2018|        215.72|         123.1|
|  TSLA|2021|        411.47|        184.18|
|  TSLA|2018|          25.0|         17.02|
|  META|2022|        339.95|         90.08|
|  META|2019|        208.67|    

In [62]:
monthlyOpening.show(20)

+------+----+-----+---------------+---------------+
|Ticker|Year|Month|MaxOpeningMonth|MinOpeningMonth|
+------+----+-----+---------------+---------------+
| BRK-B|2022|   10|         297.98|         260.58|
| BRK-B|2018|    9|         222.13|         209.21|
|  MSFT|2022|    6|          275.2|         243.86|
|  MSFT|2021|    2|         245.03|         230.01|
|  MSFT|2020|    1|         174.05|         157.08|
| BRK-B|2021|   10|         290.85|         273.02|
| BRK-B|2020|   10|         216.74|         200.03|
|  MSFT|2019|    6|         137.45|         121.28|
|  MSFT|2022|   10|         247.93|         219.85|
| BRK-B|2019|    9|         212.24|         201.19|
| BRK-B|2021|    6|         292.91|          275.0|
|  MSFT|2020|   12|         226.31|         210.05|
|  MSFT|2019|   10|          144.9|         134.95|
| BRK-B|2022|   11|         317.52|         286.02|
| BRK-B|2022|    7|         297.42|          272.5|
|  MSFT|2023|    2|          273.8|          248.0|
|  MSFT|2021

In [63]:
weeklyOpening.show(20)

+------+----+----+--------------+--------------+
|Ticker|Year|Week|MaxOpeningWeek|MinOpeningWeek|
+------+----+----+--------------+--------------+
| BRK-B|2022|  14|         352.0|        341.17|
| BRK-B|2022|  10|        326.59|        322.49|
| BRK-B|2021|  14|        264.22|        260.02|
| BRK-B|2018|  48|        217.23|         209.3|
|  MSFT|2022|   6|        309.87|        301.25|
|  MSFT|2021|   2|        218.47|        213.52|
| BRK-B|2020|  19|        180.05|         173.4|
|  MSFT|2020|   1|        158.78|        158.32|
| BRK-B|2021|  32|        291.81|        287.01|
| BRK-B|2021|  10|        264.22|         255.6|
| BRK-B|2020|  10|        217.39|        203.48|
| BRK-B|2020|  28|         183.1|        178.26|
|  MSFT|2021|  41|        302.34|        292.92|
|  MSFT|2020|  19|        184.98|        174.49|
| BRK-B|2019|  30|        208.37|        205.34|
|  MSFT|2019|  43|        139.39|        136.88|
|  MSFT|2021|  44|        338.51|        330.31|
|  MSFT|2019|   6|  

In [71]:
weeklyOpening.withColumn('Spread', round(weeklyOpening['MaxOpeningWeek'] - weeklyOpening['MinOpeningWeek'], 2)).show()

TypeError: type Column doesn't define __round__ method