In [0]:
import requests
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *

def fetch_stock_data(symbol):
    api_key = "2KVHFEMJ7G9JU3PY"
    url = f"https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={symbol}&interval=5min&apikey={api_key}"
    response = requests.get(url)
    data = json.loads(response.text)
    return data["Time Series (5min)"]


symbols = ["MSFT", "AAPL", "GOOGL", "AMZN"]
all_data = []

for symbol in symbols:
    raw_data = fetch_stock_data(symbol)
    for timestamp, values in raw_data.items():
        all_data.append((symbol, timestamp, values["1. open"], values["2. high"], values["3. low"], values["4. close"], values["5. volume"]))


schema = StructType([
    StructField("symbol", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("open", StringType(), False),
    StructField("high", StringType(), False),
    StructField("low", StringType(), False),
    StructField("close", StringType(), False),
    StructField("volume", StringType(), False)
])

df = spark.createDataFrame(all_data, schema)


df = df.withColumn("open", col("open").cast(DoubleType())) \
       .withColumn("high", col("high").cast(DoubleType())) \
       .withColumn("low", col("low").cast(DoubleType())) \
       .withColumn("close", col("close").cast(DoubleType())) \
       .withColumn("volume", col("volume").cast(IntegerType()))


display(df)


symbol,timestamp,open,high,low,close,volume
MSFT,2025-03-10 19:55:00,377.99,377.99,377.0,377.45,4975
MSFT,2025-03-10 19:50:00,377.395,378.0,377.0,377.78,3750
MSFT,2025-03-10 19:45:00,377.8,378.0,376.81,377.88,5008
MSFT,2025-03-10 19:40:00,377.47,378.0,376.76,377.3,7511
MSFT,2025-03-10 19:35:00,377.345,377.47,376.76,377.11,14875
MSFT,2025-03-10 19:30:00,377.68,378.0,377.22,377.345,4141
MSFT,2025-03-10 19:25:00,377.46,378.74,377.35,378.0,3426
MSFT,2025-03-10 19:20:00,378.09,378.68,377.25,377.76,6262
MSFT,2025-03-10 19:15:00,378.3,378.75,377.5,378.36,3801
MSFT,2025-03-10 19:10:00,378.86,379.0,378.02,378.61,1553


In [0]:
from pyspark.sql.functions import isnan, when, count, col


def null_value_count(df):
    return df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()


print("Null counts before cleaning:")
null_value_count(df)


df_cleaned = df.dropna()


df_cleaned = df_cleaned.dropDuplicates()


df_cleaned = df_cleaned.filter(
    (col("open") > 0) & 
    (col("high") > 0) & 
    (col("low") > 0) & 
    (col("close") > 0) & 
    (col("volume") > 0) &
    (col("high") >= col("low"))
)

print("Null counts after cleaning:")
null_value_count(df_cleaned)


print(f"Rows before cleaning: {df.count()}")
print(f"Rows after cleaning: {df_cleaned.count()}")


display(df_cleaned)


Null counts before cleaning:
+------+---------+----+----+---+-----+------+
|symbol|timestamp|open|high|low|close|volume|
+------+---------+----+----+---+-----+------+
|     0|        0|   0|   0|  0|    0|     0|
+------+---------+----+----+---+-----+------+

Null counts after cleaning:
+------+---------+----+----+---+-----+------+
|symbol|timestamp|open|high|low|close|volume|
+------+---------+----+----+---+-----+------+
|     0|        0|   0|   0|  0|    0|     0|
+------+---------+----+----+---+-----+------+

Rows before cleaning: 400
Rows after cleaning: 400


symbol,timestamp,open,high,low,close,volume
MSFT,2025-03-10 12:00:00,380.81,381.36,380.75,381.12,266419
MSFT,2025-03-10 18:25:00,379.73,380.06,379.4,379.5382,2893
MSFT,2025-03-10 18:05:00,381.0,381.19,379.27,379.27,2908
MSFT,2025-03-10 15:15:00,378.18,378.82,377.9434,378.59,436764
MSFT,2025-03-10 17:05:00,381.37,381.5,380.82,380.82,2974
MSFT,2025-03-10 17:45:00,380.25,380.83,380.0,380.2,3581
MSFT,2025-03-10 15:25:00,378.88,379.4,378.4,378.445,402869
MSFT,2025-03-10 15:45:00,378.66,380.22,378.4009,380.0,441326
MSFT,2025-03-10 19:20:00,378.09,378.68,377.25,377.76,6262
MSFT,2025-03-10 11:45:00,381.49,382.2,381.013,381.93,241680


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, avg, col, date_format, when


window_spec = Window.partitionBy("symbol").orderBy("timestamp")


df_transformed = df_cleaned.withColumn("prev_close", lag("close").over(window_spec))
df_transformed = df_transformed.withColumn(
    "percent_change", 
    when(df_transformed.prev_close.isNotNull(), 
         (df_transformed.close - df_transformed.prev_close) / df_transformed.prev_close * 100)
    .otherwise(None)
)


df_transformed = df_transformed.withColumn(
    "ma5", 
    avg("close").over(window_spec.rowsBetween(-4, 0))
)


df_transformed = df_transformed.withColumn("trading_day", date_format("timestamp", "EEEE"))


df_transformed = df_transformed.drop("prev_close")


display(df_transformed)


symbol,timestamp,open,high,low,close,volume,percent_change,ma5,trading_day
AAPL,2025-03-10 11:40:00,226.935,227.29,226.36,226.63,621967,,226.63,Monday
AAPL,2025-03-10 11:45:00,226.64,227.0,226.33,226.92,463780,0.1279618761858501,226.775,Monday
AAPL,2025-03-10 11:50:00,226.9,227.01,226.575,226.79,467993,-0.0572889123920304,226.78,Monday
AAPL,2025-03-10 11:55:00,226.8,227.02,226.59,226.59,527865,-0.0881873098461081,226.7325,Monday
AAPL,2025-03-10 12:00:00,226.58,226.97,226.36,226.69,381139,0.0441325742530536,226.724,Monday
AAPL,2025-03-10 12:05:00,226.692,226.98,225.95,225.9899,487889,-0.3088358551325563,226.59598,Monday
AAPL,2025-03-10 12:10:00,225.99,226.31,225.52,226.29,502719,0.1327935451982527,226.46998,Monday
AAPL,2025-03-10 12:15:00,226.285,226.3699,225.17,225.195,504645,-0.4838923505236638,226.15098,Monday
AAPL,2025-03-10 12:20:00,225.22,225.34,224.22,224.79,1081927,-0.1798441350829286,225.79098,Monday
AAPL,2025-03-10 12:25:00,224.76,225.52,224.54,225.48,545405,0.3069531562791929,225.54898,Monday


In [0]:

table_name = "stock_market_data"
df_transformed.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"Data saved as Delta table: {table_name}")


df_saved = spark.table(table_name)
display(df_saved.limit(5))


Data saved as Delta table: stock_market_data


symbol,timestamp,open,high,low,close,volume,percent_change,ma5,trading_day
AAPL,2025-03-10 11:40:00,226.935,227.29,226.36,226.63,621967,,226.63,Monday
AAPL,2025-03-10 11:45:00,226.64,227.0,226.33,226.92,463780,0.1279618761858501,226.775,Monday
AAPL,2025-03-10 11:50:00,226.9,227.01,226.575,226.79,467993,-0.0572889123920304,226.78,Monday
AAPL,2025-03-10 11:55:00,226.8,227.02,226.59,226.59,527865,-0.0881873098461081,226.7325,Monday
AAPL,2025-03-10 12:00:00,226.58,226.97,226.36,226.69,381139,0.0441325742530536,226.724,Monday
