<span style="font-size:30px"> __Stock Price Analysis__

<span style="font-size:25px"> __1. Reading and Cleaning Data__

In [1]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, regexp_replace
from pyspark.sql.types import FloatType, IntegerType, StringType
from pyspark.sql.window import Window 

In [3]:
spark = SparkSession.builder.appName("StockMarketAnalysis").getOrCreate()

24/10/10 15:52:10 WARN Utils: Your hostname, Marians-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.23 instead (on interface en0)
24/10/10 15:52:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/10 15:52:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Reading CSV data
stocks = spark.read.csv("/Users/marianzabka/Documents/Education/spark_local_-_stock_market_analysis/sample_data", header=True)

In [5]:
# Seeing data => Dataframe
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [6]:
# Seeing schema in data => Data types in Dataframe
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [7]:
# Convert columns to appropriate data types
cleaned_stocks = (
    stocks.withColumn("Ticker", col("Ticker").cast(StringType()))
          .withColumn("Date", to_date(col("Date"), 'MM/dd/yyyy'))  # Adjusted the date format
          .withColumn("Year", func.year(col("Date")))
          .withColumn("Month", func.month(col("Date")))
          .withColumn("Day", func.dayofmonth(col("Date")))
          .withColumn("Week", func.weekofyear(col("Date")))
          .withColumn("Close", regexp_replace(col("Close/Last"), '[^0-9.]', '').cast(FloatType()))  # Remove non-numeric characters and cast to Float
          .withColumn("Volume", regexp_replace(col("Volume"), '[^0-9]', '').cast(IntegerType()))  # Remove non-numeric characters and cast to Integer
          .withColumn("Open", regexp_replace(col("Open"), '[^0-9.]', '').cast(FloatType()))  # Remove non-numeric characters and cast to Float
          .withColumn("High", regexp_replace(col("High"), '[^0-9.]', '').cast(FloatType())) # Remove non-numeric characters and cast to Float
          .withColumn("Low", regexp_replace(col("Low"), '[^0-9.]', '').cast(FloatType()))  # Remove non-numeric characters and cast to Float
          .drop("Close/Last")
)

In [8]:
# Verify updated schema
cleaned_stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- Close: float (nullable = true)



In [9]:
# Verify data type conversion
cleaned_stocks.show(5)

+------+----------+-------+------+------+------+----+-----+---+----+------+
|Ticker|      Date| Volume|  Open|  High|   Low|Year|Month|Day|Week| Close|
+------+----------+-------+------+------+------+----+-----+---+----+------+
| BRK-B|2023-05-31|6175417|321.12|322.41|319.39|2023|    5| 31|  22|321.08|
| BRK-B|2023-05-30|3232461|321.86|322.47| 319.0|2023|    5| 30|  22|322.19|
| BRK-B|2023-05-26|3229873|320.44|322.63|319.67|2023|    5| 26|  21| 320.6|
| BRK-B|2023-05-25|4251935|320.56|320.56|317.71|2023|    5| 25|  21|319.02|
| BRK-B|2023-05-24|3075393|322.71| 323.0|319.56|2023|    5| 24|  21| 320.2|
+------+----------+-------+------+------+------+----+-----+---+----+------+
only showing top 5 rows



In [10]:
# Filterint Data => Explore Microsoft and Tesla stocks on a particular day
cleaned_stocks.filter((cleaned_stocks.Ticker.isin(["MSFT", "TSLA"])) & (cleaned_stocks.Date == "2023-05-31")).show(15)


+------+----------+---------+------+------+------+----+-----+---+----+------+
|Ticker|      Date|   Volume|  Open|  High|   Low|Year|Month|Day|Week| Close|
+------+----------+---------+------+------+------+----+-----+---+----+------+
|  MSFT|2023-05-31| 45950550|332.29|335.94|327.33|2023|    5| 31|  22|328.39|
|  TSLA|2023-05-31|150711700|199.78|203.95|195.12|2023|    5| 31|  22|203.93|
+------+----------+---------+------+------+------+----+-----+---+----+------+



In [11]:
# Calculating basic statistics about data
cleaned_stocks.describe("Volume", "Open", "Low", "High", "Close").show()

24/10/10 15:52:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+--------------------+------------------+------------------+------------------+------------------+
|summary|              Volume|              Open|               Low|              High|             Close|
+-------+--------------------+------------------+------------------+------------------+------------------+
|  count|               15108|             15108|             15108|             15108|             15108|
|   mean|5.1868408793685466E7|180.09656566181036| 177.9982781513109| 182.1253348687101| 180.1256089860054|
| stddev| 5.496484129953464E7|101.16125813324399|100.26590135955216|101.96625521621753|101.14891782168563|
|    min|              961133|             12.07|              11.8|             12.45|             11.93|
|    max|           914080943|            479.22|            476.06|            479.98|            477.71|
+-------+--------------------+------------------+------------------+------------------+------------------+



<span style="font-size:25px"> __2. Basic Stock Analysis__

In [12]:
# Calculate Maximum Stock Price for various stocks
cleaned_stocks.groupBy("Ticker").max("Open").withColumnRenamed("max(Open)","Max Stock Price").show() 


+------+---------------+
|Ticker|Max Stock Price|
+------+---------------+
| BRK-B|         361.39|
|  MSFT|         344.62|
|  META|         381.68|
|  TSLA|         411.47|
|  AAPL|         182.63|
|  AMZN|          187.2|
| GOOGL|         151.25|
|  NVDA|         405.95|
|     V|         250.05|
|   TSM|         141.61|
|   SPY|         479.22|
|   QQQ|         405.57|
+------+---------------+



In [13]:
# Alternative way of calculating Maximum Stock Price for various stocks
cleaned_stocks.groupBy("Ticker") \
              .agg(func.max("Open").alias("Max Stock Price")) \
              .show() 

+------+---------------+
|Ticker|Max Stock Price|
+------+---------------+
| BRK-B|         361.39|
|  MSFT|         344.62|
|  META|         381.68|
|  TSLA|         411.47|
|  AAPL|         182.63|
|  AMZN|          187.2|
| GOOGL|         151.25|
|  NVDA|         405.95|
|     V|         250.05|
|   TSM|         141.61|
|   SPY|         479.22|
|   QQQ|         405.57|
+------+---------------+



In [14]:
# Max Stock price and total volume traded for various stocks
cleaned_stocks.groupBy("Ticker") \
              .agg(
                  func.max("Open").alias("Max Stock Price"),
                  func.sum("Volume").alias("Total Traded Volume")
              )\
              .show() 

+------+---------------+-------------------+
|Ticker|Max Stock Price|Total Traded Volume|
+------+---------------+-------------------+
| BRK-B|         361.39|         5862401321|
|  MSFT|         344.62|        37976660472|
|  META|         381.68|        30148848043|
|  TSLA|         411.47|       171802975076|
|  AAPL|         182.63|       139310061360|
|  AMZN|          187.2|       104503287430|
| GOOGL|         151.25|        43956560981|
|  NVDA|         405.95|        58787218324|
|     V|         250.05|        10410997871|
|   TSM|         141.61|        12506470104|
|   SPY|         479.22|       107925285300|
|   QQQ|         405.57|        60437153773|
+------+---------------+-------------------+



In [15]:
# Basic date manipulation functions => Calculate Yearly Highest and Lowest Price per Stock
cleaned_stocks.withColumn("Year", func.year(col("Date"))) \
              .groupBy("Ticker" ,"Year") \
              .agg(
                  func.max("High").alias("Yearly High"),
                  func.min("Low").alias("Yearly Low")
              )\
              .orderBy("Ticker", "Year") \
              .show()

+------+----+-----------+----------+
|Ticker|Year|Yearly High|Yearly Low|
+------+----+-----------+----------+
|  AAPL|2018|      58.37|     36.65|
|  AAPL|2019|      73.49|      35.5|
|  AAPL|2020|     138.79|     53.15|
|  AAPL|2021|     182.13|    116.21|
|  AAPL|2022|     182.94|    125.87|
|  AAPL|2023|     179.35|    124.17|
|  AMZN|2018|     102.53|     65.35|
|  AMZN|2019|     101.79|     73.05|
|  AMZN|2020|     177.61|      81.3|
|  AMZN|2021|     188.65|    144.05|
|  AMZN|2022|      171.4|     81.69|
|  AMZN|2023|     122.92|     81.43|
| BRK-B|2018|     224.07|    184.75|
| BRK-B|2019|     228.23|    191.04|
| BRK-B|2020|     234.99|     159.5|
| BRK-B|2021|     301.65|     226.1|
| BRK-B|2022|      362.1|    259.85|
| BRK-B|2023|     333.94|    292.42|
| GOOGL|2018|      64.57|     48.88|
| GOOGL|2019|      68.35|     51.12|
+------+----+-----------+----------+
only showing top 20 rows



In [16]:
# Prepare data for export for monthly stock statistics including spread
monthly_stock_statistics = (
    cleaned_stocks
        .withColumn("Year", func.year(col("Date")))  # Extract year
        .withColumn("Month", func.month(col("Date")))  # Extract month
        .groupBy("Ticker", "Year", "Month")  # Group by Ticker, Year, and Month
        .agg(
            func.max("High").alias("Monthly High"),  # Get max High
            func.min("Low").alias("Monthly Low")  # Get min Low
        )
        .orderBy("Ticker", "Year", "Month")  # Order by Ticker,Year and Month
)

monthly_stock_statistics = monthly_stock_statistics.withColumn("Spread", col("Monthly High") - col("Monthly Low"))

In [17]:
monthly_stock_statistics.show()

+------+----+-----+------------+-----------+----------+
|Ticker|Year|Month|Monthly High|Monthly Low|    Spread|
+------+----+-----+------------+-----------+----------+
|  AAPL|2018|    5|       47.06|      46.54|0.52000046|
|  AAPL|2018|    6|       48.55|      45.18|  3.369999|
|  AAPL|2018|    7|       48.99|      45.86|  3.130001|
|  AAPL|2018|    8|       57.22|      49.33| 7.8899994|
|  AAPL|2018|    9|       57.42|      53.83| 3.5899963|
|  AAPL|2018|   10|       58.37|      51.52| 6.8499985|
|  AAPL|2018|   11|       55.59|      42.57|     13.02|
|  AAPL|2018|   12|       46.24|      36.65|      9.59|
|  AAPL|2019|    1|       42.25|       35.5|      6.75|
|  AAPL|2019|    2|       43.97|      41.48| 2.4900017|
|  AAPL|2019|    3|       49.42|      42.38|  7.039997|
|  AAPL|2019|    4|       52.12|       47.1| 5.0200005|
|  AAPL|2019|    5|       53.83|      43.75| 10.080002|
|  AAPL|2019|    6|       50.39|      42.57| 7.8199997|
|  AAPL|2019|    7|       55.34|       49.6| 5.7

<span style="font-size:25px"> __3. Advanced Analysis__

In [18]:
cleaned_stocks.show()

+------+----------+-------+------+------+------+----+-----+---+----+------+
|Ticker|      Date| Volume|  Open|  High|   Low|Year|Month|Day|Week| Close|
+------+----------+-------+------+------+------+----+-----+---+----+------+
| BRK-B|2023-05-31|6175417|321.12|322.41|319.39|2023|    5| 31|  22|321.08|
| BRK-B|2023-05-30|3232461|321.86|322.47| 319.0|2023|    5| 30|  22|322.19|
| BRK-B|2023-05-26|3229873|320.44|322.63|319.67|2023|    5| 26|  21| 320.6|
| BRK-B|2023-05-25|4251935|320.56|320.56|317.71|2023|    5| 25|  21|319.02|
| BRK-B|2023-05-24|3075393|322.71| 323.0|319.56|2023|    5| 24|  21| 320.2|
| BRK-B|2023-05-23|4031342|328.19|329.27|322.97|2023|    5| 23|  21|323.11|
| BRK-B|2023-05-22|2763422|330.75|331.49|328.35|2023|    5| 22|  21|329.13|
| BRK-B|2023-05-19|4323538| 331.0|333.94|329.12|2023|    5| 19|  20|330.39|
| BRK-B|2023-05-18|2808329|326.87|329.98|325.85|2023|    5| 18|  20|329.76|
| BRK-B|2023-05-17|3047626|325.02|328.26|324.82|2023|    5| 17|  20|327.39|
| BRK-B|2023

In [19]:
snapshot = cleaned_stocks.select("Ticker", "Date", "Open")

In [31]:
# Preparing window condition for calculating moving average for the past 50 days
moving_avg_window = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(-50,0)

In [45]:
# Calculate 5 day moving average and round it to two decimal points
moving_average = (
        snapshot.withColumn("MA50", func.avg("Open").over(moving_avg_window)) \
                .withColumn("MA50", func.round("MA50", 2)) \
)

In [46]:
moving_average.show()

+------+----------+-----+-----+
|Ticker|      Date| Open| MA50|
+------+----------+-----+-----+
|  AAPL|2018-05-31|46.81|46.81|
|  AAPL|2018-06-01| 47.0|46.91|
|  AAPL|2018-06-04|47.91|47.24|
|  AAPL|2018-06-05|48.27| 47.5|
|  AAPL|2018-06-06|48.41|47.68|
|  AAPL|2018-06-07|48.54|47.82|
|  AAPL|2018-06-08|47.79|47.82|
|  AAPL|2018-06-11|47.84|47.82|
|  AAPL|2018-06-12|47.85|47.82|
|  AAPL|2018-06-13|48.11|47.85|
|  AAPL|2018-06-14|47.89|47.86|
|  AAPL|2018-06-15|47.51|47.83|
|  AAPL|2018-06-18|46.97|47.76|
|  AAPL|2018-06-19|46.29|47.66|
|  AAPL|2018-06-20|46.59|47.59|
|  AAPL|2018-06-21|46.81|47.54|
|  AAPL|2018-06-22|46.53|47.48|
|  AAPL|2018-06-25|45.85|47.39|
|  AAPL|2018-06-26|45.75| 47.3|
|  AAPL|2018-06-27|46.31|47.25|
+------+----------+-----+-----+
only showing top 20 rows



In [30]:
# Calculate top 5 highest stock prices for each stock in a year
maximum_stocks = Window.partitionBy("Ticker").orderBy(snapshot.Open.desc())

In [39]:
# Adding row_number column to rank stocks by Open price within each Ticker
ranked_stocks = snapshot.withColumn("rank",func.row_number().over(maximum_stocks))

In [40]:
# Filter to get the top 5 stocks based on their Open price
top_5_stocks = ranked_stocks.filter(col("rank") <= 5)

In [42]:
# Show the result
top_5_stocks.select("Ticker", "Date", "Open", "rank").show()

+------+----------+------+----+
|Ticker|      Date|  Open|rank|
+------+----------+------+----+
|  AAPL|2022-01-04|182.63|   1|
|  AAPL|2021-12-13|181.12|   2|
|  AAPL|2021-12-28|180.16|   3|
|  AAPL|2022-01-05|179.61|   4|
|  AAPL|2021-12-30|179.47|   5|
|  AMZN|2021-07-12| 187.2|   1|
|  AMZN|2021-07-09|186.13|   2|
|  AMZN|2021-07-07|185.87|   3|
|  AMZN|2021-11-19|185.63|   4|
|  AMZN|2021-07-14|185.44|   5|
| BRK-B|2022-03-29|361.39|   1|
| BRK-B|2022-03-28|360.59|   2|
| BRK-B|2022-03-31| 359.0|   3|
| BRK-B|2022-03-30|354.66|   4|
| BRK-B|2022-03-25| 353.9|   5|
| GOOGL|2022-02-02|151.25|   1|
| GOOGL|2021-11-19|149.98|   2|
| GOOGL|2021-11-08|149.83|   3|
| GOOGL|2021-11-22|149.33|   4|
| GOOGL|2021-11-09|149.23|   5|
+------+----------+------+----+
only showing top 20 rows



<span style="font-size:25px"> __4. Saving the Data__

<span style="font-size:15px"> __CSV vs Parquet vs Avro__

|                 |CSV              | Avro            | Parquet         |
|-----------------|-----------------|-----------------|-----------------|
|Columnar         | No              | No              | Yes             |
|Compressable     | Yes             | Yes             | Yes             |
|Human Readable   | Yes             | No              | Yes             |
|Schema           | No              | Yes             | Yes             |

In [49]:
# Parquet
(moving_average.write.option("header", True)
               .partitionBy("Ticker")
               .mode("overwrite")
               .parquet("ma50_results_parquet"))

In [50]:
# CSV
(top_5_stocks.write.option("header", True)
             .mode("overwrite")
             .csv("stock_results_csv"))