# Big Data Analysis Project 2024 (Streaming)
--------
Group: 

- Dinis Fernandes 20221848
- Dinis Gaspar 20221869
- Inês Santos 20221916
- Luis Davila 20221949
- Sara Ferrer 20221947
----------
In this notebook, we will do a simple analysis and metrics (window averages and outliers) on stock data of a company to showcase the potential use of spark streaming. Our variable will be the price (high, low and close) of times series data from a certain stock. Our ultimate goal is to be able to acquire, stream, process and analyze the data using spark streaming.

### Disclaimer: In this notebook the big data safe functionality was not used.
Before running this notebook, it is needed to install the packages ‘yfinance’ to extract the financial data from the yahoo finance API (https://pypi.org/project/yfinance/).

Please run with a cluster that has spark at least 3.0 (one where spark streaming is included).

## Imports


In [0]:
%pip install yfinance

Python interpreter will be restarted.
Collecting yfinance
  Downloading yfinance-0.2.50-py2.py3-none-any.whl (102 kB)
Collecting lxml>=4.9.1
  Downloading lxml-5.3.0-cp39-cp39-manylinux_2_28_x86_64.whl (5.0 MB)
Collecting requests>=2.31
  Downloading requests-2.32.3-py3-none-any.whl (64 kB)
Collecting multitasking>=0.0.7
  Downloading multitasking-0.0.11-py3-none-any.whl (8.5 kB)
Collecting pytz>=2022.5
  Downloading pytz-2024.2-py2.py3-none-any.whl (508 kB)
Collecting frozendict>=2.3.4
  Downloading frozendict-2.4.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (116 kB)
Collecting html5lib>=1.1
  Downloading html5lib-1.1-py2.py3-none-any.whl (112 kB)
Collecting peewee>=3.16.2
  Downloading peewee-3.17.8.tar.gz (948 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'error'
  ERROR: Command errored out with exit sta

Let's import the necessary libraries and start the Spark session.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    TimestampType,
)

import yfinance as yf

# Initialize Spark Session
spark = SparkSession.builder.appName("StockAnalysis").getOrCreate()

## Analyzing Streaming Data from Yahoo Finance API

Now we will acquire the data form the API and store it. We will perform the analysis on Microsoft stock, data is from the last day with 1 min frequency.

In [0]:
# Fetch Stock Data from yfinance
def fetch_stock_data(symbol):
    # Define the symbol
    ticker = yf.Ticker(symbol)
    # Defines the time period and frequency
    data = ticker.history(
        period="1d", interval="1m"
    )  # Fetch last day, 1-minute interval
    data.reset_index(inplace=True)
    return spark.createDataFrame(data).withColumn("symbol", lit(symbol))


# Save Raw Stock Data to Delta Lake
# Save only the important variables
def save_raw_data(stock_symbol, path):
    # Fetch data
    stock_df = fetch_stock_data(stock_symbol)
    # Save only the necesary variables
    stock_df = stock_df.selectExpr(
        "Datetime as timestamp",
        "Close as price",
        "symbol",
        "High as high",
        "Low as low",
    )
    # Save it
    stock_df.write.format("csv").mode("overwrite").save(path)
    print("Raw data saved")


stock_symbol = "MSFT"

raw_data_path = f"/{stock_symbol}_raw_Data"

save_raw_data(stock_symbol, raw_data_path)

Raw data saved


This is an example of the data that is pulled from the API, we will only use the timestamp, high, low and close metrics of the price. 

In [0]:
display(fetch_stock_data("MSFT"))

Datetime,Open,High,Low,Close,Volume,Dividends,Stock Splits,symbol
2024-12-11T14:30:00.000+0000,444.0499877929688,445.739990234375,444.0499877929688,445.2699890136719,558590,0.0,0.0,MSFT
2024-12-11T14:31:00.000+0000,445.3900146484375,445.7699890136719,444.5700073242188,445.4750061035156,49652,0.0,0.0,MSFT
2024-12-11T14:32:00.000+0000,445.5,445.8299865722656,445.4299926757813,445.7300109863281,21820,0.0,0.0,MSFT
2024-12-11T14:33:00.000+0000,445.7049865722656,445.9400024414063,445.3999938964844,445.8399963378906,36167,0.0,0.0,MSFT
2024-12-11T14:34:00.000+0000,445.8500061035156,446.4400024414063,445.6600036621094,445.8500061035156,65262,0.0,0.0,MSFT
2024-12-11T14:35:00.000+0000,445.9043884277344,446.1099853515625,445.5801086425781,445.8150024414063,44615,0.0,0.0,MSFT
2024-12-11T14:36:00.000+0000,445.7948913574219,446.0999145507813,445.739990234375,445.739990234375,35145,0.0,0.0,MSFT
2024-12-11T14:37:00.000+0000,445.80999755859375,445.8465881347656,444.239990234375,444.2749938964844,69575,0.0,0.0,MSFT
2024-12-11T14:38:00.000+0000,444.2999877929688,444.5398864746094,444.0899963378906,444.4500122070313,49682,0.0,0.0,MSFT
2024-12-11T14:39:00.000+0000,444.39349365234375,445.4400024414063,444.39349365234375,445.2250061035156,45978,0.0,0.0,MSFT


Now comes the import of the data that was saved and start it as streaming data.

In [0]:
# Define the schema for the stock data
schema = StructType(
    [
        StructField(
            "timestamp", TimestampType()
        ),  # The datetime of the stock price observation
        StructField("price", DoubleType()),  # The closing price of the stock
        StructField("symbol", StringType()),  # The stock symbol
        StructField("high", DoubleType()),  # The highest price during the interval
        StructField("low", DoubleType()),  # The lowest price during the interval
    ]
)

# Create the streaming DataFrame
# Load Data from Delta Lake
stock_data = (
    spark.readStream.format("csv")  # Enable streaming mode, specify Delta format
    .schema(schema)
    .option("maxFilesPerTrigger", 1)  # Process one file per trigger
    .load(raw_data_path)  # Load the Delta table or directory
)

# In the case of running again the save_raw_data() function to get new data to remove the duplicates
# since the streamed data is already loaded.
stock_data = stock_data.dropDuplicates(["timestamp", "symbol"])

display(stock_data)

timestamp,price,symbol,high,low
2024-12-11T14:58:00.000+0000,445.6950073242188,MSFT,445.8861083984375,445.2099914550781
2024-12-11T14:32:00.000+0000,445.7300109863281,MSFT,445.8299865722656,445.4299926757813
2024-12-11T14:43:00.000+0000,444.5299987792969,MSFT,444.8999938964844,444.25
2024-12-11T14:45:00.000+0000,444.760009765625,MSFT,445.0114135742188,444.5799865722656
2024-12-11T14:47:00.000+0000,445.8800048828125,MSFT,445.989990234375,445.3800048828125
2024-12-11T14:31:00.000+0000,445.4750061035156,MSFT,445.7699890136719,444.5700073242188
2024-12-11T15:15:00.000+0000,448.0,MSFT,448.389892578125,448.0
2024-12-11T14:40:00.000+0000,445.9800109863281,MSFT,446.1734008789063,445.1900024414063
2024-12-11T14:44:00.000+0000,445.0197143554688,MSFT,445.1799926757813,444.610107421875
2024-12-11T14:37:00.000+0000,444.2749938964844,MSFT,445.8465881347656,444.239990234375


Now to guarantee that the data that came late after ingestion is still considered, we will create a watermark of 10 min to give that patience of delay.

In [0]:
# Creates a watermark of 10 min of the timestamp colunm
stock_data = stock_data.withWatermark("timestamp", "10 minutes")

Now that we have the data, we will process it to have more meaningful metrics. In our case we will aggregate the data in 1h time intervals, calculate the price window averages, volatility (standard deviation), and Bollinger Bands (the price plus 2 times the standard deviation of that hour).

In [0]:
# Calculate Window Average and Bollinger Bands
timestamp_agregation = "60 minutes"

metrics = (
    # Group data in hours
    stock_data.groupBy(window(col("timestamp"), timestamp_agregation))
    .agg(  # Compute metrics
        round(avg("price"), 2).alias("window_avg"),
        round(stddev("price"), 2).alias("volatility"),
    )  # Compute bands
    .withColumn("upper_band", round(col("window_avg") + 2 * col("volatility"), 2))
    .withColumn("lower_band", round(col("window_avg") - 2 * col("volatility"), 2))
)

display(metrics)

window,window_avg,volatility,upper_band,lower_band
"List(2024-12-11T15:00:00.000+0000, 2024-12-11T16:00:00.000+0000)",447.87,0.8,449.47,446.27
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",445.43,0.48,446.39,444.47
"List(2024-12-11T16:00:00.000+0000, 2024-12-11T17:00:00.000+0000)",449.47,0.33,450.13,448.81
"List(2024-12-11T18:00:00.000+0000, 2024-12-11T19:00:00.000+0000)",449.19,0.22,449.63,448.75
"List(2024-12-11T19:00:00.000+0000, 2024-12-11T20:00:00.000+0000)",448.71,0.33,449.37,448.05
"List(2024-12-11T17:00:00.000+0000, 2024-12-11T18:00:00.000+0000)",449.44,0.2,449.84,449.04
"List(2024-12-11T20:00:00.000+0000, 2024-12-11T21:00:00.000+0000)",448.98,0.21,449.4,448.56


The outliers are price points outside the bands defined above.

In [0]:
# Align `stock_data` timestamps with the time windows used in `metrics` to do join
aligned_stock_data = stock_data.withColumn(
    "window", window(col("timestamp"), timestamp_agregation)
)

# Detect Outliers: obervations that are outside 2 times the std of window average
# Join `aligned_stock_data` with `metrics` on the `window` column
anomalies = aligned_stock_data.join(metrics, "window").withColumn(
    "is_outlier",
    when(
        (col("price") > col("upper_band")) | (col("price") < col("lower_band")), True
    ).otherwise(False),
)

Let's start the streaming query to be able to see and analyze the data created. We will want to see more results if they come, thinking that the previous values of price won’t change (doesn't make sense for them to, and also because we will do joins between streamings we will use append method) and people will append the new ones, not update the table. We will store the data in memory to analyse it.

In [0]:
# Display the streaming DataFrame
anomalies_query = (
    anomalies.writeStream.outputMode("append")
    .format("memory")
    .queryName("anomaly_table")
    .start()
)
# Takes some minutes

In [0]:
anomalies_query.stop()
# Stop since already we have the table saved, it can continue if we have data coming in

This is our table, it has the window, which includes the timestamp, the window average and bands, price (low, high and close), volatility, and if it is an outlier.

In [0]:
display(spark.sql("SELECT * FROM anomaly_table"))

window,timestamp,price,symbol,high,low,window_avg,volatility,upper_band,lower_band,is_outlier
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:58:00.000+0000,445.6950073242188,MSFT,445.8861083984375,445.2099914550781,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:32:00.000+0000,445.7300109863281,MSFT,445.8299865722656,445.4299926757813,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:43:00.000+0000,444.5299987792969,MSFT,444.8999938964844,444.25,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:45:00.000+0000,444.760009765625,MSFT,445.0114135742188,444.5799865722656,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:47:00.000+0000,445.8800048828125,MSFT,445.989990234375,445.3800048828125,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:31:00.000+0000,445.4750061035156,MSFT,445.7699890136719,444.5700073242188,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:40:00.000+0000,445.9800109863281,MSFT,446.1734008789063,445.1900024414063,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:44:00.000+0000,445.0197143554688,MSFT,445.1799926757813,444.610107421875,445.43,0.48,446.39,444.47,False
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:37:00.000+0000,444.2749938964844,MSFT,445.8465881347656,444.239990234375,445.43,0.48,446.39,444.47,True
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2024-12-11T14:56:00.000+0000,445.9299926757813,MSFT,445.9993896484375,445.3299865722656,445.43,0.48,446.39,444.47,False


Now we will see only the timestamps that are outliers.

In [0]:
%sql
SELECT
  timestamp,
  price,
  is_outlier
FROM
  anomaly_table
WHERE
  is_outlier = True
ORDER BY
  timestamp;

timestamp,price,is_outlier
2024-12-11T14:37:00.000+0000,444.2749938964844,True
2024-12-11T14:38:00.000+0000,444.4500122070313,True
2024-12-11T15:00:00.000+0000,445.8999938964844,True
2024-12-11T15:03:00.000+0000,446.1600036621094,True
2024-12-11T16:00:00.000+0000,448.7550048828125,True
2024-12-11T16:03:00.000+0000,448.75,True
2024-12-11T16:04:00.000+0000,448.7449951171875,True
2024-12-11T17:05:00.000+0000,449.9400024414063,True
2024-12-11T17:06:00.000+0000,449.9400024414063,True
2024-12-11T17:07:00.000+0000,449.8800048828125,True


Count how many outliers there are per hour.

In [0]:
%sql
SELECT
  window as hour_range,
  count(is_outlier) as number_of_outliers
FROM
  anomaly_table
WHERE
  is_outlier = True
GROUP BY
  hour_range -- To parition the data of the same group in the same cluster to count faster.
  DISTRIBUTE BY hour_range -- To display the results in order (distribute by and order by cannot be used together)
  SORT BY hour_range;

hour_range,number_of_outliers
"List(2024-12-11T14:00:00.000+0000, 2024-12-11T15:00:00.000+0000)",2
"List(2024-12-11T15:00:00.000+0000, 2024-12-11T16:00:00.000+0000)",2
"List(2024-12-11T16:00:00.000+0000, 2024-12-11T17:00:00.000+0000)",3
"List(2024-12-11T17:00:00.000+0000, 2024-12-11T18:00:00.000+0000)",4
"List(2024-12-11T18:00:00.000+0000, 2024-12-11T19:00:00.000+0000)",4
"List(2024-12-11T19:00:00.000+0000, 2024-12-11T20:00:00.000+0000)",5


We can do the same with other stocks and compare results.

In [0]:
stock_symbol_AAPL = "AAPL"

raw_data_path_AAPL = f"/{stock_symbol_AAPL}_raw_Data"

save_raw_data(stock_symbol_AAPL, raw_data_path_AAPL)

stock_data_AAPL = (
    spark.readStream.format("csv")
    .schema(schema)
    .option("maxFilesPerTrigger", 1)
    .load(raw_data_path_AAPL)
)

stock_data_AAPL = stock_data_AAPL.dropDuplicates(["timestamp", "symbol"])

stock_data_AAPL = stock_data_AAPL.withWatermark("timestamp", "5 minutes")

timestamp_agregation = "60 minutes"

metrics_AAPL = (
    stock_data_AAPL.groupBy(window(col("timestamp"), timestamp_agregation))
    .agg(
        round(avg("price"), 2).alias("moving_avg"),
        round(stddev("price"), 2).alias("volatility"),
    )
    .withColumn("upper_band", round(col("moving_avg") + 2 * col("volatility"), 2))
    .withColumn("lower_band", round(col("moving_avg") - 2 * col("volatility"), 2))
)

# Align `stock_data` timestamps with the time windows used in `metrics`
aligned_stock_data_AAPL = stock_data_AAPL.withColumn(
    "window", window(col("timestamp"), timestamp_agregation)
)

# Detect Outliers: obervations that are outside 2 times the std of window average
# Join `aligned_stock_data` with `metrics` on the `window` column
anomalies_AAPL = aligned_stock_data_AAPL.join(metrics_AAPL, "window").withColumn(
    "is_outlier",
    when(
        (col("price") > col("upper_band")) | (col("price") < col("lower_band")), True
    ).otherwise(False),
)

Raw data saved


Now we will start the query to save the data for analysis.

In [0]:
anomalies_query_AAPL = (
    anomalies_AAPL.writeStream.outputMode("append")
    .format("memory")
    .queryName("anomaly_table_AAPL")
    .start()
)

In [0]:
anomalies_query_AAPL.stop()

This is how our table looks like.

In [0]:
%sql
SELECT
  window.start as hour_range,
  count(is_outlier) as number_of_outliers
FROM
  anomaly_table_AAPL
WHERE
  is_outlier = True
GROUP BY
  hour_range DISTRIBUTE BY hour_range SORT BY hour_range;

hour_range,number_of_outliers
2024-12-11T14:00:00.000+0000,2
2024-12-11T15:00:00.000+0000,2
2024-12-11T16:00:00.000+0000,1
2024-12-11T17:00:00.000+0000,2
2024-12-11T18:00:00.000+0000,5
2024-12-11T19:00:00.000+0000,2


Here we have a comparison between stock outliers.

In [0]:
%sql
SELECT
  a.window.start as hour_range,
  sum(
    CASE
      WHEN a.is_outlier THEN 1
    END
  ) as number_of_outliers_AAPL,
  sum(
    CASE
      WHEN m.is_outlier THEN 1
    END
  ) as number_of_outliers_MSFT
FROM
  anomaly_table m
  join anomaly_table_AAPL a on m.timestamp == a.timestamp
WHERE
  a.is_outlier = True
  or m.is_outlier == True
GROUP BY
  hour_range DISTRIBUTE BY hour_range SORT BY hour_range;

hour_range,number_of_outliers_AAPL,number_of_outliers_MSFT
2024-12-11T14:00:00.000+0000,2,2
2024-12-11T15:00:00.000+0000,2,2
2024-12-11T16:00:00.000+0000,1,3
2024-12-11T17:00:00.000+0000,2,4
2024-12-11T18:00:00.000+0000,5,4
2024-12-11T19:00:00.000+0000,2,5


# Conclusion
We just showed how we can implement streaming analysis and preprocessing in Databricks using spark streaming. This notebook could serve as a reference for future work using stock market data as it was seen, it is possible to acquire the data, create the metric that you are interested in, compare stocks and then save the results (tables) to analyze, or as an input for more complex tasks that were not implemented here (such as stock price prediction). 
It is important to mention that depending on the usage of the data, this notebook should suffer some modifications (like changing the periods and frequency of data that we fetch), for example:
+ For price prediction, if we are running the collection of data every 10 minutes, we will only need the last 10 minutes of data, since the rest is already loaded (doesn’t make sense to fetch everything again, even though the results will be the same).
+ If we want to analyze the data or make some stock comparisons when the market closes at the end of the day, we won’t need to have the queries running multiple times, we can fetch the data, save it in tables and then stop it, and the next day run it again (using the help of an external scheduler and workflows, but since we implement it using the community edition we cannot create such workflows).



##References

+ Yahoo Finance API: https://pypi.org/project/yfinance/