In [0]:
# Load the raw stock data from correct location
df = spark.table("workspace.default.stock_prices_raw")

# Sort by symbol and date
from pyspark.sql.functions import col
df = df.orderBy("Symbol", "date")

print(f"✅ Data loaded: {df.count():,} records")
display(df.limit(10))

✅ Data loaded: 3,765 records


date,Symbol,open_price,high_price,low_price,close_price,volume
2025-02-13T05:00:00.000Z,AAPL,235.8819260401158,241.2883551100076,234.5477446540096,240.48187255859372,53614100
2025-02-14T05:00:00.000Z,AAPL,240.2031053319298,244.4844486934593,239.9442390607952,243.53857421875,40896200
2025-02-18T05:00:00.000Z,AAPL,243.0905000064508,244.11602908498716,240.7905267294893,243.40911865234372,48822500
2025-02-19T05:00:00.000Z,AAPL,243.5982928571997,244.9424253685456,242.1048021606556,243.807373046875,32204200
2025-02-20T05:00:00.000Z,AAPL,243.8770617009929,245.70907319749276,243.2298733230768,244.76319885253903,32316900
2025-02-21T05:00:00.000Z,AAPL,244.88267886995575,247.6107939080121,244.155851012499,244.4844207763672,53197400
2025-02-24T05:00:00.000Z,AAPL,243.86711335623195,247.7800669312537,243.3593319822528,246.0277099609375,51326400
2025-02-25T05:00:00.000Z,AAPL,246.92381259977984,248.91513366913293,243.84722519384715,245.9679718017578,48013300
2025-02-26T05:00:00.000Z,AAPL,243.26972892021308,243.91690216055417,238.0922974197643,239.31695556640625,44433600
2025-02-27T05:00:00.000Z,AAPL,238.3710927533652,241.40786042901627,236.0312844149354,236.27024841308597,41153600


In [0]:
#Calculate Simple Moving Averages (SMA)

from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

print("=" * 70)
print("📊 CALCULATING MOVING AVERAGES (SMA)")
print("=" * 70)

# Define window specifications for each stock
window_20 = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-19, 0)
window_50 = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-49, 0)

# Calculate SMAs
df_with_sma = df.withColumn("sma_20", avg("close_price").over(window_20)) \
                .withColumn("sma_50", avg("close_price").over(window_50))

print("✅ SMA 20 and SMA 50 calculated")

# Show sample
display(df_with_sma.select("Symbol", "date", "close_price", "sma_20", "sma_50").limit(50))

📊 CALCULATING MOVING AVERAGES (SMA)
✅ SMA 20 and SMA 50 calculated


Symbol,date,close_price,sma_20,sma_50
AAPL,2025-02-13T05:00:00.000Z,240.48187255859372,240.48187255859372,240.48187255859372
AAPL,2025-02-14T05:00:00.000Z,243.53857421875,242.01022338867188,242.01022338867188
AAPL,2025-02-18T05:00:00.000Z,243.40911865234372,242.47652180989584,242.47652180989584
AAPL,2025-02-19T05:00:00.000Z,243.807373046875,242.80923461914065,242.80923461914065
AAPL,2025-02-20T05:00:00.000Z,244.76319885253903,243.2000274658203,243.2000274658203
AAPL,2025-02-21T05:00:00.000Z,244.4844207763672,243.41409301757807,243.41409301757807
AAPL,2025-02-24T05:00:00.000Z,246.0277099609375,243.78746686662947,243.78746686662947
AAPL,2025-02-25T05:00:00.000Z,245.9679718017578,244.0600299835205,244.0600299835205
AAPL,2025-02-26T05:00:00.000Z,239.31695556640625,243.53302171495227,243.53302171495227
AAPL,2025-02-27T05:00:00.000Z,236.27024841308597,242.80674438476564,242.80674438476564


In [0]:
#Calculate Exponential Moving Average (EMA)

from pyspark.sql.functions import lit, when

print("=" * 70)
print("📊 CALCULATING EXPONENTIAL MOVING AVERAGE (EMA)")
print("=" * 70)

# EMA formula: EMA = (Close - Previous EMA) * multiplier + Previous EMA
# Multiplier = 2 / (period + 1)

# Calculate 12-day EMA (commonly used for MACD)
ema_12_multiplier = 2.0 / (12 + 1)

# Start with SMA as initial EMA
window_12 = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-11, 0)
df_with_ema = df_with_sma.withColumn("ema_12_base", avg("close_price").over(window_12))

# In production, you'd iterate through rows
df_with_ema = df_with_ema.withColumn("ema_12", col("ema_12_base"))

print("✅ EMA 12 calculated (using SMA approximation)")

# Show sample
display(df_with_ema.select("Symbol", "date", "close_price", "sma_20", "ema_12").limit(50))

📊 CALCULATING EXPONENTIAL MOVING AVERAGE (EMA)
✅ EMA 12 calculated (using SMA approximation)


Symbol,date,close_price,sma_20,ema_12
AAPL,2025-02-13T05:00:00.000Z,240.48187255859372,240.48187255859372,240.48187255859372
AAPL,2025-02-14T05:00:00.000Z,243.53857421875,242.01022338867188,242.01022338867188
AAPL,2025-02-18T05:00:00.000Z,243.40911865234372,242.47652180989584,242.47652180989584
AAPL,2025-02-19T05:00:00.000Z,243.807373046875,242.80923461914065,242.80923461914065
AAPL,2025-02-20T05:00:00.000Z,244.76319885253903,243.2000274658203,243.2000274658203
AAPL,2025-02-21T05:00:00.000Z,244.4844207763672,243.41409301757807,243.41409301757807
AAPL,2025-02-24T05:00:00.000Z,246.0277099609375,243.78746686662947,243.78746686662947
AAPL,2025-02-25T05:00:00.000Z,245.9679718017578,244.0600299835205,244.0600299835205
AAPL,2025-02-26T05:00:00.000Z,239.31695556640625,243.53302171495227,243.53302171495227
AAPL,2025-02-27T05:00:00.000Z,236.27024841308597,242.80674438476564,242.80674438476564


In [0]:
#Calculate RSI (Relative Strength Index)

from pyspark.sql.functions import lag, when, avg as spark_avg

print("=" * 70)
print("📊 CALCULATING RSI (Relative Strength Index)")
print("=" * 70)

# RSI measures momentum (0-100 scale)
# RSI > 70 = Overbought, RSI < 30 = Oversold

# Step 1: Calculate price changes
window_lag = Window.partitionBy("Symbol").orderBy("date")
df_with_changes = df_with_ema.withColumn("prev_close", lag("close_price", 1).over(window_lag))
df_with_changes = df_with_changes.withColumn("price_change", col("close_price") - col("prev_close"))

# Step 2: Separate gains and losses
df_with_changes = df_with_changes.withColumn(
    "gain", 
    when(col("price_change") > 0, col("price_change")).otherwise(0)
)
df_with_changes = df_with_changes.withColumn(
    "loss", 
    when(col("price_change") < 0, -col("price_change")).otherwise(0)
)

# Step 3: Calculate average gain and loss over 14 periods
window_14 = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-13, 0)
df_with_changes = df_with_changes.withColumn("avg_gain", spark_avg("gain").over(window_14))
df_with_changes = df_with_changes.withColumn("avg_loss", spark_avg("loss").over(window_14))

# Step 4: Calculate RS and RSI
df_with_changes = df_with_changes.withColumn(
    "rs", 
    when(col("avg_loss") != 0, col("avg_gain") / col("avg_loss")).otherwise(100)
)
df_with_changes = df_with_changes.withColumn(
    "rsi", 
    100 - (100 / (1 + col("rs")))
)

print("✅ RSI calculated")

# Show sample with RSI
display(df_with_changes.select("Symbol", "date", "close_price", "rsi").limit(50))

📊 CALCULATING RSI (Relative Strength Index)
✅ RSI calculated


Symbol,date,close_price,rsi
AAPL,2025-02-13T05:00:00.000Z,240.48187255859372,99.009900990099
AAPL,2025-02-14T05:00:00.000Z,243.53857421875,99.009900990099
AAPL,2025-02-18T05:00:00.000Z,243.40911865234372,95.93693728209648
AAPL,2025-02-19T05:00:00.000Z,243.807373046875,96.38837332061914
AAPL,2025-02-20T05:00:00.000Z,244.76319885253903,97.14870491918978
AAPL,2025-02-21T05:00:00.000Z,244.4844207763672,91.5286920672917
AAPL,2025-02-24T05:00:00.000Z,246.0277099609375,93.58355717574828
AAPL,2025-02-25T05:00:00.000Z,245.9679718017578,92.71303831303833
AAPL,2025-02-26T05:00:00.000Z,239.31695556640625,45.544589124558215
AAPL,2025-02-27T05:00:00.000Z,236.27024841308597,36.93646023144095


In [0]:
#Calculate MACD (Moving Average Convergence Divergence)

print("=" * 70)
print("📊 CALCULATING MACD")
print("=" * 70)

# MACD = 12-day EMA - 26-day EMA
# Signal Line = 9-day EMA of MACD

# Calculate 26-day EMA (using SMA as approximation)
window_26 = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-25, 0)
df_with_macd = df_with_changes.withColumn("ema_26", avg("close_price").over(window_26))

# Calculate MACD line
df_with_macd = df_with_macd.withColumn("macd", col("ema_12") - col("ema_26"))

# Calculate signal line (9-day EMA of MACD)
window_9 = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-8, 0)
df_with_macd = df_with_macd.withColumn("macd_signal", avg("macd").over(window_9))

# Calculate MACD histogram
df_with_macd = df_with_macd.withColumn("macd_histogram", col("macd") - col("macd_signal"))

print("✅ MACD calculated")

# Show sample
display(df_with_macd.select("Symbol", "date", "close_price", "macd", "macd_signal", "macd_histogram").limit(50))

📊 CALCULATING MACD
✅ MACD calculated


Symbol,date,close_price,macd,macd_signal,macd_histogram
AAPL,2025-02-13T05:00:00.000Z,240.48187255859372,0.0,0.0,0.0
AAPL,2025-02-14T05:00:00.000Z,243.53857421875,0.0,0.0,0.0
AAPL,2025-02-18T05:00:00.000Z,243.40911865234372,0.0,0.0,0.0
AAPL,2025-02-19T05:00:00.000Z,243.807373046875,0.0,0.0,0.0
AAPL,2025-02-20T05:00:00.000Z,244.76319885253903,0.0,0.0,0.0
AAPL,2025-02-21T05:00:00.000Z,244.4844207763672,0.0,0.0,0.0
AAPL,2025-02-24T05:00:00.000Z,246.0277099609375,0.0,0.0,0.0
AAPL,2025-02-25T05:00:00.000Z,245.9679718017578,0.0,0.0,0.0
AAPL,2025-02-26T05:00:00.000Z,239.31695556640625,0.0,0.0,0.0
AAPL,2025-02-27T05:00:00.000Z,236.27024841308597,0.0,0.0,0.0


In [0]:
#Calculate Bollinger Bands

from pyspark.sql.functions import stddev

print("=" * 70)
print("📊 CALCULATING BOLLINGER BANDS")
print("=" * 70)

# Bollinger Bands = SMA ± (2 × Standard Deviation)
# Shows volatility - wide bands = high volatility

# Calculate 20-day standard deviation
window_20_bb = Window.partitionBy("Symbol").orderBy("date").rowsBetween(-19, 0)
df_with_bb = df_with_macd.withColumn("std_dev_20", stddev("close_price").over(window_20_bb))

# Calculate upper and lower bands
df_with_bb = df_with_bb.withColumn("bollinger_upper", col("sma_20") + (2 * col("std_dev_20")))
df_with_bb = df_with_bb.withColumn("bollinger_lower", col("sma_20") - (2 * col("std_dev_20")))
df_with_bb = df_with_bb.withColumn("bollinger_middle", col("sma_20"))

print("✅ Bollinger Bands calculated")

# Show sample
display(df_with_bb.select("Symbol", "date", "close_price", "bollinger_upper", "bollinger_middle", "bollinger_lower").limit(50))

📊 CALCULATING BOLLINGER BANDS
✅ Bollinger Bands calculated


Symbol,date,close_price,bollinger_upper,bollinger_middle,bollinger_lower
AAPL,2025-02-13T05:00:00.000Z,240.48187255859372,,240.48187255859372,
AAPL,2025-02-14T05:00:00.000Z,243.53857421875,246.3330523325932,242.01022338867188,237.6873944447505
AAPL,2025-02-18T05:00:00.000Z,243.40911865234372,245.9337802108305,242.47652180989584,239.0192634089612
AAPL,2025-02-19T05:00:00.000Z,243.807373046875,245.9300660439682,242.80923461914065,239.68840319431305
AAPL,2025-02-20T05:00:00.000Z,244.76319885253903,246.4185787807699,243.2000274658203,239.98147615087072
AAPL,2025-02-21T05:00:00.000Z,244.4844207763672,246.47791995530747,243.41409301757807,240.3502660798488
AAPL,2025-02-24T05:00:00.000Z,246.0277099609375,247.21178496799135,243.78746686662947,240.3631487652676
AAPL,2025-02-25T05:00:00.000Z,245.9679718017578,247.58538372379164,244.0600299835205,240.53467624324935
AAPL,2025-02-26T05:00:00.000Z,239.31695556640625,248.10173727458204,243.53302171495227,238.9643061553225
AAPL,2025-02-27T05:00:00.000Z,236.27024841308597,249.10381291475036,242.80674438476564,236.5096758547809


In [0]:
#Create Final Clean Dataset

print("=" * 70)
print("📊 CREATING FINAL DATASET WITH ALL INDICATORS")
print("=" * 70)

# Select only the columns we need
final_df = df_with_bb.select(
    "date",
    "Symbol",
    "open_price",
    "high_price",
    "low_price",
    "close_price",
    "volume",
    "sma_20",
    "sma_50",
    "ema_12",
    "rsi",
    "macd",
    "macd_signal",
    "macd_histogram",
    "bollinger_upper",
    "bollinger_middle",
    "bollinger_lower"
)

# Show summary
print(f"✅ Final dataset created")
print(f"📊 Total records: {final_df.count():,}")
print(f"📋 Total columns: {len(final_df.columns)}")
print(f"\nColumns: {final_df.columns}")

# Display sample
display(final_df.limit(20))

📊 CREATING FINAL DATASET WITH ALL INDICATORS
✅ Final dataset created
📊 Total records: 3,765
📋 Total columns: 17

Columns: ['date', 'Symbol', 'open_price', 'high_price', 'low_price', 'close_price', 'volume', 'sma_20', 'sma_50', 'ema_12', 'rsi', 'macd', 'macd_signal', 'macd_histogram', 'bollinger_upper', 'bollinger_middle', 'bollinger_lower']


date,Symbol,open_price,high_price,low_price,close_price,volume,sma_20,sma_50,ema_12,rsi,macd,macd_signal,macd_histogram,bollinger_upper,bollinger_middle,bollinger_lower
2025-02-13T05:00:00.000Z,AAPL,235.8819260401158,241.2883551100076,234.5477446540096,240.48187255859372,53614100,240.48187255859372,240.48187255859372,240.48187255859372,99.009900990099,0.0,0.0,0.0,,240.48187255859372,
2025-02-14T05:00:00.000Z,AAPL,240.2031053319298,244.4844486934593,239.9442390607952,243.53857421875,40896200,242.01022338867188,242.01022338867188,242.01022338867188,99.009900990099,0.0,0.0,0.0,246.3330523325932,242.01022338867188,237.6873944447505
2025-02-18T05:00:00.000Z,AAPL,243.0905000064508,244.11602908498716,240.7905267294893,243.40911865234372,48822500,242.47652180989584,242.47652180989584,242.47652180989584,95.93693728209648,0.0,0.0,0.0,245.9337802108305,242.47652180989584,239.0192634089612
2025-02-19T05:00:00.000Z,AAPL,243.5982928571997,244.9424253685456,242.1048021606556,243.807373046875,32204200,242.80923461914065,242.80923461914065,242.80923461914065,96.38837332061914,0.0,0.0,0.0,245.9300660439682,242.80923461914065,239.68840319431305
2025-02-20T05:00:00.000Z,AAPL,243.8770617009929,245.70907319749276,243.2298733230768,244.76319885253903,32316900,243.2000274658203,243.2000274658203,243.2000274658203,97.14870491918978,0.0,0.0,0.0,246.4185787807699,243.2000274658203,239.98147615087072
2025-02-21T05:00:00.000Z,AAPL,244.88267886995575,247.6107939080121,244.155851012499,244.4844207763672,53197400,243.41409301757807,243.41409301757807,243.41409301757807,91.5286920672917,0.0,0.0,0.0,246.47791995530747,243.41409301757807,240.3502660798488
2025-02-24T05:00:00.000Z,AAPL,243.86711335623195,247.7800669312537,243.3593319822528,246.0277099609375,51326400,243.78746686662947,243.78746686662947,243.78746686662947,93.58355717574828,0.0,0.0,0.0,247.21178496799135,243.78746686662947,240.3631487652676
2025-02-25T05:00:00.000Z,AAPL,246.92381259977984,248.91513366913293,243.84722519384715,245.9679718017578,48013300,244.0600299835205,244.0600299835205,244.0600299835205,92.71303831303833,0.0,0.0,0.0,247.58538372379164,244.0600299835205,240.53467624324935
2025-02-26T05:00:00.000Z,AAPL,243.26972892021308,243.91690216055417,238.0922974197643,239.31695556640625,44433600,243.53302171495227,243.53302171495227,243.53302171495227,45.544589124558215,0.0,0.0,0.0,248.10173727458204,243.53302171495227,238.9643061553225
2025-02-27T05:00:00.000Z,AAPL,238.3710927533652,241.40786042901627,236.0312844149354,236.27024841308597,41153600,242.80674438476564,242.80674438476564,242.80674438476564,36.93646023144095,0.0,0.0,0.0,249.10381291475036,242.80674438476564,236.5096758547809


In [0]:
%sql
-- Use workspace catalog
USE CATALOG workspace;

-- Create schema for processed data
CREATE SCHEMA IF NOT EXISTS processed_data;

-- Verify
SHOW SCHEMAS IN workspace;

databaseName
default
information_schema
processed_data


In [0]:
print("=" * 70)
print("💾 SAVING PROCESSED DATA")
print("=" * 70)

# Save to workspace.processed_data schema
final_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.processed_data.stock_prices_with_indicators")

print("✅ Data saved as Delta table: workspace.processed_data.stock_prices_with_indicators")
print("📌 This table contains all technical indicators!")
print("📌 Ready for Power BI!")

💾 SAVING PROCESSED DATA
✅ Data saved as Delta table: workspace.processed_data.stock_prices_with_indicators
📌 This table contains all technical indicators!
📌 Ready for Power BI!


In [0]:
print("=" * 70)
print("📊 SAMPLE TECHNICAL ANALYSIS - LATEST DATA")
print("=" * 70)

# Load the processed table
processed_df = spark.table("workspace.processed_data.stock_prices_with_indicators")

# Get latest data for each stock with indicators
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc, round as spark_round

window_latest = Window.partitionBy("Symbol").orderBy(desc("date"))

latest_analysis = processed_df.withColumn("rank", row_number().over(window_latest)) \
    .filter(col("rank") == 1) \
    .select(
        "Symbol",
        "date",
        spark_round("close_price", 2).alias("close"),
        spark_round("sma_20", 2).alias("sma_20"),
        spark_round("sma_50", 2).alias("sma_50"),
        spark_round("rsi", 2).alias("rsi"),
        spark_round("macd", 2).alias("macd")
    ) \
    .orderBy("Symbol")

print("Latest indicators for all stocks:")
display(latest_analysis)


📊 SAMPLE TECHNICAL ANALYSIS - LATEST DATA
Latest indicators for all stocks:


Symbol,date,close,sma_20,sma_50,rsi,macd
AAPL,2026-02-12T05:00:00.000Z,261.73,262.2,267.85,62.5,7.34
AMZN,2026-02-12T05:00:00.000Z,199.6,229.36,231.17,16.7,-8.37
COST,2026-02-12T05:00:00.000Z,998.86,973.82,920.3,55.29,13.42
DIS,2026-02-12T05:00:00.000Z,102.38,109.26,110.43,37.54,-2.71
GOOGL,2026-02-12T05:00:00.000Z,309.0,329.26,321.41,32.43,-0.61
JNJ,2026-02-12T05:00:00.000Z,244.55,228.99,215.98,92.63,10.47
JPM,2026-02-12T05:00:00.000Z,302.64,308.45,314.57,54.02,0.18
META,2026-02-12T05:00:00.000Z,649.81,664.93,658.24,47.87,23.36
MSFT,2026-02-12T05:00:00.000Z,401.84,435.26,462.72,27.31,-25.85
NFLX,2026-02-12T05:00:00.000Z,75.86,83.3,89.97,19.68,-3.44
