In [0]:
df = spark.table("workspace.default.stock_analytics_export")

df.show()

+----------+-------------+------------------+--------------------+------------------+------------------+--------------------+
|      date|       ticker|             close|             returns|             ma_20|             ma_50|          volatility|
+----------+-------------+------------------+--------------------+------------------+------------------+--------------------+
|2023-03-14|ADANIPOWER.NS|40.869998931884766|-0.04997678993494...| 33.43700017929077| 42.92120021820068| 0.04867389494905907|
|2023-03-15|ADANIPOWER.NS| 40.36000061035156|-0.01247854991097952| 33.97300024032593| 42.53580020904541|0.046732342729651884|
|2023-03-16|ADANIPOWER.NS|  39.7400016784668|-0.01536171760427...| 34.55200033187866| 42.15160022735596| 0.04473880813159996|
|2023-03-17|ADANIPOWER.NS|  39.9900016784668|0.006290890524432546| 35.07350044250488| 41.80500026702881| 0.04418559436066782|
|2023-03-20|ADANIPOWER.NS|38.040000915527344|-0.04876220757923744| 35.42400045394898|41.410800285339356| 0.04570907423

In [0]:
spark.sql("""
SELECT ticker, AVG(close) as avg_price
FROM workspace.default.stock_analytics_export
GROUP BY ticker
""").show()

+-------------+------------------+
|       ticker|         avg_price|
+-------------+------------------+
|ADANIPOWER.NS|106.44109254190568|
|      SBIN.NS| 736.9507135984277|
|SHRIRAMFIN.NS| 539.7894426233628|
+-------------+------------------+



In [0]:
df = spark.table("workspace.default.stock_analytics_export")

df.groupBy("ticker") \
  .agg({"returns": "avg"}) \
  .withColumnRenamed("avg(returns)", "avg_daily_return") \
  .orderBy("avg_daily_return", ascending=False) \
  .show(20)

+-------------+--------------------+
|       ticker|    avg_daily_return|
+-------------+--------------------+
|SHRIRAMFIN.NS|0.002232917400882778|
|ADANIPOWER.NS|0.002109297889458...|
|      SBIN.NS|0.001159238528002...|
+-------------+--------------------+



In [0]:
from pyspark.sql.functions import max

df.groupBy("ticker") \
  .agg(max("volatility").alias("max_volatility")) \
  .orderBy("max_volatility", ascending=False) \
  .show(20)

+-------------+-------------------+
|       ticker|     max_volatility|
+-------------+-------------------+
|ADANIPOWER.NS|0.06363373610370532|
|      SBIN.NS|0.04091374990180397|
|SHRIRAMFIN.NS|0.03545160868875566|
+-------------+-------------------+



In [0]:
df.orderBy("returns") \
  .select("date", "ticker", "returns", "close") \
  .show(20)

+----------+-------------+--------------------+------------------+
|      date|       ticker|             returns|             close|
+----------+-------------+--------------------+------------------+
|2024-06-04|ADANIPOWER.NS| -0.1732990201047857|144.58999633789062|
|2024-06-04|      SBIN.NS| -0.1440402098405682| 759.9407348632812|
|2025-09-24|ADANIPOWER.NS| -0.1099476774403958|             144.5|
|2024-06-04|SHRIRAMFIN.NS|-0.09901763059495083| 439.8961181640625|
|2024-11-21|ADANIPOWER.NS|-0.09149013896022895|  95.2300033569336|
|2023-08-23|ADANIPOWER.NS| -0.0706678673702078| 64.56999969482422|
|2025-01-13|ADANIPOWER.NS| -0.0671780612589925|  89.9800033569336|
|2023-10-23|ADANIPOWER.NS|-0.06458768120092928|63.290000915527344|
|2023-10-09|ADANIPOWER.NS|-0.06121327267514465|  68.4000015258789|
|2025-04-07|SHRIRAMFIN.NS| -0.0605874489958087|    607.5556640625|
|2025-04-25|SHRIRAMFIN.NS|-0.05949902945046526|    648.3232421875|
|2026-02-02|SHRIRAMFIN.NS|-0.05676472981770...| 962.0999755859

In [0]:
df.orderBy("returns", ascending=False) \
  .select("date", "ticker", "returns", "close") \
  .show(20)

+----------+-------------+-------------------+------------------+
|      date|       ticker|            returns|             close|
+----------+-------------+-------------------+------------------+
|2025-09-22|ADANIPOWER.NS| 0.1999576694448244|            170.25|
|2024-11-27|ADANIPOWER.NS| 0.1951331208643241|104.61000061035156|
|2025-01-14|ADANIPOWER.NS|0.19359854988939573| 107.4000015258789|
|2023-12-05|ADANIPOWER.NS| 0.1580589027988566|107.62999725341797|
|2024-06-03|ADANIPOWER.NS|0.15705206178376008|174.89999389648438|
|2025-09-19|ADANIPOWER.NS|0.12362404890111511| 141.8800048828125|
|2023-11-28|ADANIPOWER.NS|0.12336348339857639| 89.23999786376953|
|2023-06-21|SHRIRAMFIN.NS|0.11201243413008766| 332.7072448730469|
|2023-07-25|ADANIPOWER.NS|0.09306723628540148|52.029998779296875|
|2024-07-26|SHRIRAMFIN.NS| 0.0918254472374731| 572.1939697265625|
|2024-06-03|      SBIN.NS|0.09068469916755761| 887.8229370117188|
|2024-05-31|ADANIPOWER.NS|0.08218794092418813|151.16000366210938|
|2023-10-2

In [0]:
from pyspark.sql.functions import expr

trend = df.withColumn("trend_strength", expr("ma_20 - ma_50"))

trend.orderBy("trend_strength", ascending=False) \
     .select("date", "ticker", "trend_strength") \
     .show(20)

+----------+-------------+-----------------+
|      date|       ticker|   trend_strength|
+----------+-------------+-----------------+
|2025-12-01|SHRIRAMFIN.NS|95.48212646484376|
|2025-11-28|SHRIRAMFIN.NS|94.76432861328124|
|2025-12-02|SHRIRAMFIN.NS|93.66488281249997|
|2025-11-27|SHRIRAMFIN.NS|93.61663085937505|
|2025-11-26|SHRIRAMFIN.NS|91.88854431152345|
|2025-12-03|SHRIRAMFIN.NS|91.51139038085932|
|2025-11-25|SHRIRAMFIN.NS|89.57604125976559|
|2025-12-04|SHRIRAMFIN.NS|89.45183532714839|
|2025-11-24|SHRIRAMFIN.NS|87.86472717285153|
|2025-12-05|SHRIRAMFIN.NS|86.78116638183599|
|2025-11-21|SHRIRAMFIN.NS|86.55182495117185|
|2026-01-19|SHRIRAMFIN.NS|86.20949645996097|
|2026-01-20|SHRIRAMFIN.NS|85.51399719238282|
|2025-11-20|SHRIRAMFIN.NS|85.29249084472656|
|2026-01-16|SHRIRAMFIN.NS|84.65699829101561|
|2026-01-21|SHRIRAMFIN.NS|83.84999755859371|
|2025-11-19|SHRIRAMFIN.NS|83.18372924804692|
|2025-12-08|SHRIRAMFIN.NS|83.18094055175789|
|2026-01-15|SHRIRAMFIN.NS|82.51199829101563|
|2026-01-2

In [0]:
from pyspark.sql.functions import avg

performance = df.groupBy("ticker").agg(
    avg("returns").alias("avg_return"),
    avg("volatility").alias("avg_volatility")
)

performance.orderBy("avg_return", ascending=False).show(20)


+-------------+--------------------+--------------------+
|       ticker|          avg_return|      avg_volatility|
+-------------+--------------------+--------------------+
|SHRIRAMFIN.NS|0.002232917400882778|0.020290678023399324|
|ADANIPOWER.NS|0.002109297889458...|0.026788878390754018|
|      SBIN.NS|0.001159238528002...|0.013243697522517469|
+-------------+--------------------+--------------------+



In [0]:
from pyspark.sql.functions import col

score = performance.withColumn(
    "score",
    col("avg_return") / col("avg_volatility")
)

score.orderBy("score", ascending=False).show(20)

+-------------+--------------------+--------------------+-------------------+
|       ticker|          avg_return|      avg_volatility|              score|
+-------------+--------------------+--------------------+-------------------+
|SHRIRAMFIN.NS|0.002232917400882778|0.020290678023399324|0.11004646558916194|
|      SBIN.NS|0.001159238528002...|0.013243697522517469|0.08753133526579426|
|ADANIPOWER.NS|0.002109297889458...|0.026788878390754018|0.07873782017638375|
+-------------+--------------------+--------------------+-------------------+



In [0]:
display(df)

date,ticker,close,returns,ma_20,ma_50,volatility
2023-03-14,ADANIPOWER.NS,40.869998931884766,-0.0499767899349453,33.43700017929077,42.92120021820068,0.048673894949059
2023-03-15,ADANIPOWER.NS,40.36000061035156,-0.0124785499109795,33.97300024032593,42.53580020904541,0.0467323427296518
2023-03-16,ADANIPOWER.NS,39.7400016784668,-0.0153617176042794,34.55200033187866,42.15160022735596,0.0447388081315999
2023-03-17,ADANIPOWER.NS,39.9900016784668,0.0062908905244325,35.07350044250488,41.80500026702881,0.0441855943606678
2023-03-20,ADANIPOWER.NS,38.040000915527344,-0.0487622075792374,35.42400045394898,41.41080028533936,0.0457090742386519
2023-03-21,ADANIPOWER.NS,39.93999862670898,0.0499473623936241,35.79200029373169,41.07600025177002,0.045708881059858
2023-03-22,ADANIPOWER.NS,40.810001373291016,0.0217827435276933,36.12250032424927,40.741400260925296,0.0448901237638345
2023-03-23,ADANIPOWER.NS,40.27999877929688,-0.0129870761126955,36.51200017929077,40.43020023345947,0.0430174719441501
2023-03-24,ADANIPOWER.NS,38.52000045776367,-0.0436941006670985,36.89450016021728,40.09580020904541,0.0425728667338288
2023-03-27,ADANIPOWER.NS,36.59999847412109,-0.0498442876642178,37.25800008773804,39.7332001876831,0.0425696269849896


In [0]:
from pyspark.sql.functions import avg, stddev, col

df = spark.table("workspace.default.stock_analytics_export")

sharpe = df.groupBy("ticker").agg(
    avg("returns").alias("mean_return"),
    stddev("returns").alias("std_return")
)

sharpe = sharpe.withColumn(
    "sharpe_ratio",
    col("mean_return") / col("std_return")
)

sharpe.orderBy("sharpe_ratio", ascending=False).show(20)

+-------------+--------------------+--------------------+-------------------+
|       ticker|         mean_return|          std_return|       sharpe_ratio|
+-------------+--------------------+--------------------+-------------------+
|SHRIRAMFIN.NS|0.002232917400882778|0.021036680162737704|0.10614400103101568|
|      SBIN.NS|0.001159238528002...|0.014377440239914859|0.08062899296802156|
|ADANIPOWER.NS|0.002109297889458...| 0.02940733317647528|0.07172693548239044|
+-------------+--------------------+--------------------+-------------------+



In [0]:
from pyspark.sql.functions import stddev, abs

std = df.select(stddev("returns")).collect()[0][0]

anomalies = df.filter(abs(col("returns")) > 2 * std)

anomalies.select("date", "ticker", "returns").show(20)

+----------+-------------+--------------------+
|      date|       ticker|             returns|
+----------+-------------+--------------------+
|2023-03-14|ADANIPOWER.NS|-0.04997678993494...|
|2023-03-20|ADANIPOWER.NS|-0.04876220757923744|
|2023-03-21|ADANIPOWER.NS|0.049947362393624184|
|2023-03-27|ADANIPOWER.NS|-0.04984428766421789|
|2023-03-28|ADANIPOWER.NS|-0.04999994788664719|
|2023-03-29|ADANIPOWER.NS| 0.04975552256140525|
|2023-03-31|ADANIPOWER.NS| 0.04986300533764987|
|2023-04-20|ADANIPOWER.NS| 0.04538176199913768|
|2023-04-28|ADANIPOWER.NS|  0.0499649614326374|
|2023-05-02|ADANIPOWER.NS| 0.04936618513996893|
|2023-05-19|ADANIPOWER.NS|  0.0491002661982467|
|2023-05-22|ADANIPOWER.NS|0.049978834166502395|
|2023-05-23|ADANIPOWER.NS| 0.04981842103238421|
|2023-06-07|ADANIPOWER.NS|0.045057087743986424|
|2023-06-23|ADANIPOWER.NS|-0.05533905240600212|
|2023-07-25|ADANIPOWER.NS| 0.09306723628540148|
|2023-07-31|ADANIPOWER.NS|0.057048942470882213|
|2023-08-18|ADANIPOWER.NS| 0.06299069462

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

window = Window.partitionBy("ticker").orderBy("date")

vol_spike = df.withColumn(
    "prev_volatility",
    lag("volatility").over(window)
)

vol_spike = vol_spike.withColumn(
    "volatility_spike",
    col("volatility") - col("prev_volatility")
)

vol_spike.orderBy("volatility_spike", ascending=False).show(20)

+----------+-------------+------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+
|      date|       ticker|             close|             returns|             ma_20|             ma_50|          volatility|     prev_volatility|    volatility_spike|
+----------+-------------+------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+
|2024-11-27|ADANIPOWER.NS|104.61000061035156|  0.1951331208643241|109.95700073242188| 120.4102001953125|0.053807499269733716| 0.02647008330117052|0.027337415968563195|
|2025-01-14|ADANIPOWER.NS| 107.4000015258789| 0.19359854988939573| 101.8395004272461| 105.6436003112793| 0.05244393552439809|0.026734690390398805|0.025709245133999286|
|2025-09-22|ADANIPOWER.NS|            170.25|  0.1999576694448244|127.25399971008301|121.68820022583007|0.052757080455884536|0.031371685775545215| 0.02138539468

In [0]:
performance = df.groupBy("ticker").agg(
    avg("returns").alias("avg_return"),
    avg("volatility").alias("avg_volatility")
)

performance = performance.withColumn(
    "performance_score",
    col("avg_return") / col("avg_volatility")
)

performance.orderBy("performance_score", ascending=False).show(20)

+-------------+--------------------+--------------------+-------------------+
|       ticker|          avg_return|      avg_volatility|  performance_score|
+-------------+--------------------+--------------------+-------------------+
|SHRIRAMFIN.NS|0.002232917400882778|0.020290678023399324|0.11004646558916194|
|      SBIN.NS|0.001159238528002...|0.013243697522517469|0.08753133526579426|
|ADANIPOWER.NS|0.002109297889458...|0.026788878390754018|0.07873782017638375|
+-------------+--------------------+--------------------+-------------------+



In [0]:
final = df.groupBy("ticker").agg(
    avg("returns").alias("avg_return"),
    stddev("returns").alias("risk"),
    avg("volatility").alias("volatility")
)

final = final.withColumn(
    "sharpe_ratio",
    col("avg_return") / col("risk")
)

display(final.orderBy("sharpe_ratio", ascending=False))

ticker,avg_return,risk,volatility,sharpe_ratio
SHRIRAMFIN.NS,0.0022329174008827,0.0210366801627377,0.0202906780233993,0.1061440010310156
SBIN.NS,0.0011592385280022,0.0143774402399148,0.0132436975225174,0.0806289929680215
ADANIPOWER.NS,0.0021092978894582,0.0294073331764752,0.026788878390754,0.0717269354823904
