In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Read silver data
df_silver = spark.table("silver_stock_analytics")

print(f"Processing {df_silver.count()} records")
df_silver.select("symbol").distinct().show()

StatementMeta(, fa630d98-7adc-4aa5-b01a-063b8a37020d, 3, Finished, Available, Finished)

Processing 25 records
+------------+
|      symbol|
+------------+
|RELIANCE.BSE|
+------------+



In [2]:
# Generate trading signals based on technical indicators
df_signals = df_silver.withColumn("signal_rsi_oversold", F.when(F.col("rsi") < 30, "BUY").otherwise(None)) \
    .withColumn("signal_rsi_overbought", F.when(F.col("rsi") > 70, "SELL").otherwise(None)) \
    .withColumn("signal_bb_breakout", F.when(F.col("bb_position") > 0.95, "OVERBOUGHT").when(F.col("bb_position") < 0.05, "OVERSOLD").otherwise(None)) \
    .withColumn("signal_volume_spike", F.when(F.col("unusual_volume") == True, "HIGH_VOLUME").otherwise(None))

# Combine signals
df_signals = df_signals.withColumn("combined_signal",
    F.concat_ws(", ", 
        F.col("signal_rsi_oversold"),
        F.col("signal_rsi_overbought"),
        F.col("signal_bb_breakout"),
        F.col("signal_volume_spike")
    ))

# Save signals table
df_signals.write.mode("overwrite").saveAsTable("gold_trading_signals")

print("✅ Trading signals generated")

StatementMeta(, fa630d98-7adc-4aa5-b01a-063b8a37020d, 4, Finished, Available, Finished)

✅ Trading signals generated


In [3]:
# Get latest timestamp for each stock
window_latest = Window.partitionBy("symbol").orderBy(F.desc("timestamp"))

df_latest = df_silver.withColumn("rank", F.row_number().over(window_latest)) \
    .filter(F.col("rank") == 1) \
    .drop("rank")

# Calculate scores
df_ranked = df_latest.withColumn("momentum_score", 
    F.when(F.col("price_change_pct") > 2, 100)
     .when(F.col("price_change_pct") > 1, 75)
     .when(F.col("price_change_pct") > 0, 50)
     .when(F.col("price_change_pct") > -1, 25)
     .otherwise(0)) \
    .withColumn("volume_score",
        F.when(F.col("volume_ratio") > 3, 100)
         .when(F.col("volume_ratio") > 2, 75)
         .when(F.col("volume_ratio") > 1.5, 50)
         .otherwise(25)) \
    .withColumn("technical_score",
        F.when((F.col("rsi") > 40) & (F.col("rsi") < 60), 100)
         .when((F.col("rsi") > 30) & (F.col("rsi") < 70), 75)
         .otherwise(50)) \
    .withColumn("total_score", 
        (F.col("momentum_score") + F.col("volume_score") + F.col("technical_score")) / 3)

# Rank stocks
df_ranked = df_ranked.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("total_score"))))

df_ranked.select("rank", "symbol", "close", "price_change_pct", "rsi", "volume_ratio", "total_score") \
    .orderBy("rank") \
    .show(10, truncate=False)

# Save rankings
df_ranked.write.mode("overwrite").saveAsTable("gold_stock_rankings")

StatementMeta(, fa630d98-7adc-4aa5-b01a-063b8a37020d, 5, Finished, Available, Finished)

+----+------------+------+----------------+----+------------------+------------------+
|rank|symbol      |close |price_change_pct|rsi |volume_ratio      |total_score       |
+----+------------+------+----------------+----+------------------+------------------+
|1   |RELIANCE.BSE|2502.0|0.08            |NULL|1.2490392006149116|41.666666666666664|
+----+------------+------+----------------+----+------------------+------------------+



In [4]:
# Intraday patterns - performance by time of day
df_hourly = df_silver.withColumn("hour", F.hour("timestamp")) \
    .groupBy("symbol", "hour") \
    .agg(
        F.avg("price_change_pct").alias("avg_return"),
        F.avg("volume").alias("avg_volume"),
        F.count("*").alias("sample_count")
    )

df_hourly.write.mode("overwrite").saveAsTable("gold_hourly_patterns")

# Best performing hours
print("\n📊 Best performing hours across all stocks:")
df_hourly.groupBy("hour") \
    .agg(F.avg("avg_return").alias("market_avg_return")) \
    .orderBy(F.desc("market_avg_return")) \
    .show()

StatementMeta(, fa630d98-7adc-4aa5-b01a-063b8a37020d, 6, Finished, Available, Finished)


📊 Best performing hours across all stocks:
+----+--------------------+
|hour|   market_avg_return|
+----+--------------------+
|  15|0.003333333333333...|
+----+--------------------+



In [5]:
# Calculate volatility metrics
df_volatility = df_silver.groupBy("symbol", "date") \
    .agg(
        F.stddev("price_change_pct").alias("intraday_volatility"),
        F.max("high").alias("day_high"),
        F.min("low").alias("day_low"),
        F.first("open").alias("day_open"),
        F.last("close").alias("day_close"),
        ((F.max("high") - F.min("low")) / F.first("open") * 100).alias("day_range_pct")
    )

df_volatility.write.mode("overwrite").saveAsTable("gold_volatility_metrics")

print("\n📈 Volatility Rankings:")
df_volatility.groupBy("symbol") \
    .agg(F.avg("intraday_volatility").alias("avg_volatility")) \
    .orderBy(F.desc("avg_volatility")) \
    .show()

StatementMeta(, fa630d98-7adc-4aa5-b01a-063b8a37020d, 7, Finished, Available, Finished)


📈 Volatility Rankings:
+------------+--------------------+
|      symbol|      avg_volatility|
+------------+--------------------+
|RELIANCE.BSE|0.016329931618554522|
+------------+--------------------+



In [6]:
# Detect anomalous price movements
df_anomalies = df_silver.filter(
    (F.col("unusual_volume") == True) | 
    (F.abs(F.col("price_change_pct")) > 2) |
    (F.col("rsi") < 25) | 
    (F.col("rsi") > 75)
).select(
    "symbol", "timestamp", "close", "price_change_pct", 
    "volume_ratio", "rsi", "bb_position"
).orderBy(F.desc("timestamp"))

df_anomalies.write.mode("overwrite").saveAsTable("gold_anomalies")

print(f"\n⚠️ Found {df_anomalies.count()} anomalous events")
df_anomalies.show(10, truncate=False)

StatementMeta(, fa630d98-7adc-4aa5-b01a-063b8a37020d, 8, Finished, Available, Finished)


⚠️ Found 0 anomalous events
+------+---------+-----+----------------+------------+---+-----------+
|symbol|timestamp|close|price_change_pct|volume_ratio|rsi|bb_position|
+------+---------+-----+----------------+------------+---+-----------+
+------+---------+-----+----------------+------------+---+-----------+



In [1]:
# Read the data fresh from source
from pyspark.sql import functions as F

# Start fresh from silver layer
df_silver = spark.table("silver_stock_analytics")

print(f"Silver layer has {df_silver.count()} records")

# Regenerate gold_stock_rankings fresh
from pyspark.sql.window import Window

window_latest = Window.partitionBy("symbol").orderBy(F.desc("timestamp"))

df_latest = df_silver.withColumn("rank_temp", F.row_number().over(window_latest)) \
    .filter(F.col("rank_temp") == 1) \
    .drop("rank_temp")

df_ranked = df_latest.withColumn("momentum_score", 
    F.when(F.col("price_change_pct") > 2, 100)
     .when(F.col("price_change_pct") > 1, 75)
     .when(F.col("price_change_pct") > 0, 50)
     .when(F.col("price_change_pct") > -1, 25)
     .otherwise(0)) \
    .withColumn("volume_score",
        F.when(F.col("volume_ratio") > 3, 100)
         .when(F.col("volume_ratio") > 2, 75)
         .when(F.col("volume_ratio") > 1.5, 50)
         .otherwise(25)) \
    .withColumn("technical_score",
        F.when((F.col("rsi") > 40) & (F.col("rsi") < 60), 100)
         .when((F.col("rsi") > 30) & (F.col("rsi") < 70), 75)
         .otherwise(50)) \
    .withColumn("total_score", 
        (F.col("momentum_score") + F.col("volume_score") + F.col("technical_score")) / 3) \
    .withColumn("rank", F.row_number().over(Window.orderBy(F.desc("total_score"))))

# Drop old table completely
spark.sql("DROP TABLE IF EXISTS gold_stock_rankings")

# Create fresh
df_ranked.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold_stock_rankings")

print("✅ gold_stock_rankings recreated")

StatementMeta(, 8dd6ba0d-8654-45ad-a58b-32083d76fbc3, 3, Finished, Available, Finished)

Silver layer has 25 records
✅ gold_stock_rankings recreated


In [2]:
# Trading signals
df_signals = df_silver.withColumn("signal_rsi_oversold", F.when(F.col("rsi") < 30, "BUY").otherwise(None)) \
    .withColumn("signal_rsi_overbought", F.when(F.col("rsi") > 70, "SELL").otherwise(None)) \
    .withColumn("signal_bb_breakout", F.when(F.col("bb_position") > 0.95, "OVERBOUGHT").when(F.col("bb_position") < 0.05, "OVERSOLD").otherwise(None)) \
    .withColumn("signal_volume_spike", F.when(F.col("unusual_volume") == True, "HIGH_VOLUME").otherwise(None)) \
    .withColumn("combined_signal", F.concat_ws(", ", F.col("signal_rsi_oversold"), F.col("signal_rsi_overbought"), F.col("signal_bb_breakout"), F.col("signal_volume_spike")))

spark.sql("DROP TABLE IF EXISTS gold_trading_signals")
df_signals.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold_trading_signals")

print("✅ gold_trading_signals recreated")

# Anomalies
df_anomalies = df_silver.filter(
    (F.col("unusual_volume") == True) | 
    (F.abs(F.col("price_change_pct")) > 2) |
    (F.col("rsi") < 25) | 
    (F.col("rsi") > 75)
).select("symbol", "timestamp", "close", "price_change_pct", "volume_ratio", "rsi", "bb_position")

spark.sql("DROP TABLE IF EXISTS gold_anomalies")
df_anomalies.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold_anomalies")

print("✅ gold_anomalies recreated")

# Hourly patterns
df_hourly = df_silver.withColumn("hour", F.hour("timestamp")) \
    .groupBy("symbol", "hour") \
    .agg(
        F.avg("price_change_pct").alias("avg_return"),
        F.avg("volume").alias("avg_volume"),
        F.count("*").alias("sample_count")
    )

spark.sql("DROP TABLE IF EXISTS gold_hourly_patterns")
df_hourly.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold_hourly_patterns")

print("✅ gold_hourly_patterns recreated")

print("\n🎉 All gold tables recreated successfully!")

StatementMeta(, 8dd6ba0d-8654-45ad-a58b-32083d76fbc3, 4, Finished, Available, Finished)

✅ gold_trading_signals recreated
✅ gold_anomalies recreated
✅ gold_hourly_patterns recreated

🎉 All gold tables recreated successfully!
