## Import Libraries 

In [1]:
import findspark
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import yfinance as yf
import pandas as pd
from delta import configure_spark_with_delta_pip
from datetime import datetime, timedelta
from pyspark.sql import Window

findspark.init()
findspark.find()

'/opt/homebrew/Cellar/apache-spark/4.0.0/libexec'

In [2]:
builder = SparkSession.builder \
    .appName("DeltaLake4App") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/30 19:13:48 WARN Utils: Your hostname, Solomons-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 172.16.1.16 instead (on interface en0)
25/07/30 19:13:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/4.0.0/libexec/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/solomon.balogun/.ivy2.5.2/cache
The jars for the packages stored in: /Users/solomon.balogun/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-159954af-07f6-44f9-8be9-dd1fad41dcf9;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 89ms :: artifacts dl 3ms
	:

In [3]:
spark

## Get and Process Stock Market Data

In [4]:
# Get yesterday's date
yesterday = datetime.now() - timedelta(days=1)
end_date = yesterday.strftime("%Y-%m-%d")

# Get start date (3 years before yesterday)
start_date = (yesterday - timedelta(days=3*365)).strftime("%Y-%m-%d")

tickers = [
    "AAPL", "MSFT", "NVDA", "GOOGL", "AMZN", "META", "AVGO", "BRK-B", "TSLA", "TSM",
    "JPM", "WMT", "LLY", "ORCL", "V", "MA", "NFLX", "XOM", "COST", "JNJ",
    "ABBV", "SAP", "BABA", "GS", "HD", "VRTX", "UNH", "MRK", "PEP", "TMO"
]

# Download stock data
raw = yf.download(tickers, start=start_date, end=end_date, group_by='ticker')

  raw = yf.download(tickers, start=start_date, end=end_date, group_by='ticker')
[*********************100%***********************]  30 of 30 completed


In [5]:
# convert multi-index DataFrame to flat DataFrame
df_list = []

for ticker in tickers:
    ticker_df = raw[ticker].reset_index()
    ticker_df['Ticker'] = ticker
    df_list.append(ticker_df)

flat_df = pd.concat(df_list, ignore_index=True)

# reorder columns 
cols = ['Ticker', 'Date'] + [col for col in flat_df.columns if col not in ['Ticker', 'Date']]
flat_df = flat_df[cols]

flat_df.sample(10)

Price,Ticker,Date,Open,High,Low,Close,Volume
9312,LLY,2023-10-26,573.012208,574.00016,559.338925,561.255554,2846500
12801,XOM,2022-10-12,88.82309,90.107612,88.577122,89.670326,12635800
14746,JNJ,2024-07-23,148.47247,148.656477,146.787384,147.54277,6261900
8804,WMT,2024-10-14,79.462181,79.918802,79.17431,79.700417,9914500
13907,COST,2024-03-14,733.355267,735.689409,724.078172,727.01825,1826100
5521,BRK-B,2023-08-29,355.040009,358.589996,354.01001,358.290009,2285600
19796,UNH,2023-10-04,494.294171,496.554694,491.500063,495.390472,2801700
16093,SAP,2023-12-11,154.570297,157.207205,154.570297,157.157837,1078700
16576,BABA,2022-11-16,77.568774,77.694319,74.758455,75.482765,26637300
18246,HD,2023-07-25,309.284957,311.026081,308.10515,309.342041,2890800


In [6]:
# store data in a spark dataframe
spark_df = spark.createDataFrame(flat_df)
spark_df.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)



### Feature Engineering

In [7]:
# calculate daily returns
window_spec = Window.partitionBy('Ticker').orderBy('Date')

spark_df = spark_df.withColumn('Prev_close', lag('Close').over(window_spec))

spark_df = spark_df.withColumn(
    'Daily_return',
    round(((col('Close') - col('Prev_close')) / col('Prev_close') * 100), 2)
)

In [8]:
# calculate daily volutility rate 
spark_df = spark_df.withColumn(
    'Volatility',
    round(((col('High') - col('Low')) / col('Low') * 100), 2)
)

In [9]:
# calculate moving averages 

ma_window_12 = Window.partitionBy('Ticker').orderBy('Date').rowsBetween(-11, 0)

ma_window_26 = Window.partitionBy('Ticker').orderBy('Date').rowsBetween(-25, 0)

ma_window_50 = Window.partitionBy('Ticker').orderBy('Date').rowsBetween(-49, 0)

ma_window_200 = Window.partitionBy('Ticker').orderBy('Date').rowsBetween(-199, 0)


spark_df = spark_df.withColumn(
    'MA_12',
    round(avg('Close').over(ma_window_12), 2)
).withColumn(
    'MA_26',
    round(avg('Close').over(ma_window_26), 2)
).withColumn(
    'MA_50',
    round(avg('Close').over(ma_window_50), 2)
).withColumn(
    'MA_200',
    round(avg('Close').over(ma_window_200), 2)
)

In [10]:
window_cum = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(Window.unboundedPreceding, 0)

spark_df = spark_df.withColumn("First_Close", first("Close").over(window_cum))
spark_df = spark_df.withColumn("Cumulative_Return", round(((col("Close") - col("First_Close")) / col("First_Close")) * 100, 2))

In [11]:
spark_df = spark_df.withColumn("Close_7_Days_Ago", lag("Close", 7).over(window_spec))
spark_df = spark_df.withColumn(
    "Momentum_7d", 
    round(((col("Close") - col("Close_7_Days_Ago")) / col("Close_7_Days_Ago")) * 100, 2)
)

In [14]:
# compute MACD and signal lines 
spark_df = spark_df.withColumn(
    'MACD',
    col('MA_12') - col('MA_26')
)

signal_window = Window.partitionBy('Ticker').orderBy('Date').rowsBetween(-8, 0)
spark_df = spark_df.withColumn(
    'Signal_line',
    round(avg(col('MACD')).over(signal_window), 2)
)

In [15]:
# compute draw down
# measure draw from peak close price

peak_close = max(col('Close')).over(Window.partitionBy('Ticker').orderBy('Date').rowsBetween(Window.unboundedPreceding, 0))

spark_df = spark_df.withColumn(
    'DrawDown',
    round(((col('Close') - peak_close) / peak_close * 100), 2)
)

In [None]:
spark_df.show()

In [16]:
spark.stop()