**Setup & Load Data**

"Pandas Bridge" method because of limitation of CE


In [0]:
import os
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Get the path dynamically
current_folder = os.getcwd()

# 2. Go UP one level to the main repo folder (using 'dirname')
# This takes us from '.../notebooks' to '.../indian-stock-market'
repo_root = os.path.dirname(current_folder)

# 3. Construct the correct path
csv_path = f"{repo_root}/data/nifty50_sectors.csv"

# print(f"Looking for file at: {csv_path}")

# 4. Check if file exists before reading (Debugging)
if os.path.exists(csv_path):
    print("‚úÖ File found! Loading...")
    pdf = pd.read_csv(csv_path)
    df = spark.createDataFrame(pdf)
    
    # Ensure Date is actually a Date type
    df = df.withColumn("Date", F.to_date(F.col("Date")))
    # Rename 'Close' to 'Price' to match the rest of the analysis code
    df = df.withColumnRenamed("Close", "Price")

    df.printSchema()
    display(df.limit(5))
else:
    print("‚ùå File still NOT found.")
    print("üëâ ACTION REQUIRED: Go to the 'Repos' menu on the left, click your repo, and hit the 'Pull' button to sync the data.")

**Calculate Daily Returns (The "Growth")**

We use a Window Function to look at the "Previous Day's Price" to see how much we gained or lost.

**Calculate Volatility (The "Risk")** 

Standard Deviation of returns tells us how "scary" a stock is.

High Volatility = Stock jumps up and down wildly (Riskier).

Low Volatility = Stock is stable (Safe).

In [0]:
from pyspark.sql import functions as F

indian_sector_risk = spark.createDataFrame(
    [
        ("IT", 18.0),
        ("Banking", 22.0),
        ("Pharma", 16.0),
        ("Energy", 20.0),
        ("FMCG", 14.0),
        ("Metals", 28.0),
        ("Auto", 24.0)
    ],
    ["Sector", "Indian_Std_Volatility_Pct"]
)
# Calculate Annualized Volatility (Standard Deviation * sqrt(252 trading days))
risk_profile = df_returns.groupBy("Ticker", "Sector") \
    .agg(
        F.round(F.stddev("Daily_Return") * (252**0.5) * 100, 2).alias("Volatility_Annual_Pct"),
        F.round(F.avg("Daily_Return") * 252 * 100, 2).alias("Return_Annual_Pct")
    ) \
    .orderBy("Volatility_Annual_Pct", ascending=False)

# Join with Indian sector standards
risk_profile = risk_profile.join(
    indian_sector_risk,
    on="Sector",
    how="left"
)

# Compare & classify risk level

# Define thresholds:

# Low Risk: < 80% of sector standard

# Standard Risk: 80% ‚Äì 120%

# High Risk: > 120%

risk_profile = risk_profile.withColumn(
    "Risk_Comparison",
    F.when(
        F.col("Volatility_Annual_Pct") < F.col("Indian_Std_Volatility_Pct") * 0.8,
        "Low"
    )
    .when(
        F.col("Volatility_Annual_Pct") > F.col("Indian_Std_Volatility_Pct") * 1.2,
        "High"
    )
    .otherwise("Standard")
)

# deviation from Indian standard
risk_profile = risk_profile.withColumn(
    "Volatility_vs_India_Pct",
    F.round(
        F.col("Volatility_Annual_Pct") - F.col("Indian_Std_Volatility_Pct"),
        2
    )
)

risk_profile = risk_profile.orderBy(
    F.col("Volatility_Annual_Pct").desc()
)


# Null values in a Volatility or Standard Deviation calculation usually happen for one reason: Insufficient Data.
# solution
# The "Clean Up" (Recommended)
# Since you cannot analyze the risk of a stock that has no risk data, the standard practice is to simply remove those rows.

# Remove rows where Volatility is NULL
risk_profile_clean = risk_profile.dropna(subset=["Volatility_Annual_Pct"])

# Now sort and display
risk_profile_clean = risk_profile_clean.orderBy(F.col("Volatility_Annual_Pct").desc())

display(risk_profile_clean)

# display(risk_profile)


**Technical Indicators (Moving Averages) **

Traders use the 50-Day Moving Average (MA50) to see trends.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Define Window (Current Day + 49 Previous Days = 50 Total)
window_50 = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(-49, 0)

# 2. Calculate Technicals
df_technical = df.withColumn("MA50", F.avg("Price").over(window_50))

# 3. Get Latest Date (Dynamic for Automation)
latest_date = df.select(F.max("Date")).collect()[0][0]

# 4. Generate Signals with Strength
latest_signals = df_technical.filter(F.col("Date") == latest_date) \
    .select("Ticker", "Price", "MA50") \
    .withColumn("MA_Diff_Pct", F.round((F.col("Price") - F.col("MA50")) / F.col("MA50") * 100, 2)) \
    .withColumn(
        "Trend",
        F.when(F.col("MA_Diff_Pct") > 2, "Strong Bullish üöÄ")      # Price is >2% above MA
         .when(F.col("MA_Diff_Pct") > 0, "Weak Bullish ‚ÜóÔ∏è")         # Price is slightly above MA
         .when(F.col("MA_Diff_Pct") < -2, "Strong Bearish ü©∏")     # Price is >2% below MA
         .otherwise("Weak Bearish ‚ÜòÔ∏è")                              # Price is slightly below MA
    )

display(latest_signals.orderBy("MA_Diff_Pct", ascending=False))

**Correlation Analysis and Cumulative Returns.**

These answer the "Big Picture" questions:

Correlation: "If IT stocks crash, will Banking stocks crash too?" (Hedging).

Cumulative Return: "If I invested ‚Çπ1 Lakh 5 years ago, who made me the most money?" (Performance).

**Correlation Matrix (Sector Rotation)**
This calculates how much sectors move together.

+1.0: They move identically (High Risk).

-1.0: They move oppositely (Great for Hedging).

In [0]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# 1. Pivot the data
# We use 'Daily_Return' for correlation
df_pivot = df_returns.groupBy("Date").pivot("Ticker").mean("Daily_Return")

# 2. Drop dates with nulls
df_pivot = df_pivot.na.drop()

# error fix in the code
# Before: inputCols=["HDFCBANK.NS"] -> Spark looks for a struct named HDFCBANK.

# After: We pass the list of strings, but VectorAssembler handles the column selection internally. If the error persists specifically in the VectorAssembler line, try renaming the columns to remove the dot entirely

# Rename columns to remove '.NS' (e.g., 'HDFCBANK.NS' -> 'HDFCBANK')
for col_name in df_pivot.columns:
    if "." in col_name:
        df_pivot = df_pivot.withColumnRenamed(col_name, col_name.replace(".", "_"))

# Now run correlation on new names (HDFCBANK_NS, etc.)
stock_columns = [c for c in df_pivot.columns if c != "Date"]
assembler = VectorAssembler(inputCols=stock_columns, outputCol="corr_features")
df_vector = assembler.transform(df_pivot)
pearson_matrix = Correlation.corr(df_vector, "corr_features").head()[0]

print("‚úÖ Correlation Matrix Calculated!")
# Convert Spark correlation matrix to pandas DataFrame for display
corr_array = pearson_matrix.toArray()
corr_df = pd.DataFrame(corr_array, columns=stock_columns, index=stock_columns)
display(corr_df)

# Save the Correlation Matrix for the Dashboard
output_path_corr = f"{repo_root}/data/stock_correlation.csv"

# Since it's already a Pandas DF, saving is easy
corr_df.to_csv(output_path_corr)

print(f"‚úÖ Correlation Matrix saved to: {output_path_corr}")

**Cumulative Return (The "Wealth Chart")**

This shows the growth of an investment over time. We start everyone at 1.0 (or 100%) on Day 1.

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# 1. Define the Indian Format Function
def indian_format(num):
    if num is None: return "0"
    s = str(int(num))
    if len(s) <= 3:
        return s
    else:
        last_three = s[-3:]
        rest = s[:-3]
        parts = []
        while len(rest) > 2:
            parts.append(rest[-2:])
            rest = rest[:-2]
        if rest:
            parts.append(rest)
        return ','.join(reversed(parts)) + ',' + last_three

indian_format_udf = udf(indian_format, StringType())

# 2. Calculate Growth (Keep it Numeric!)
df_growth = df_returns.withColumn("Log_Return", F.log(F.col("Daily_Return") + 1)) \
    .withColumn("Sum_Log_Return", F.sum("Log_Return").over(window_cum)) \
    .withColumn("Cumulative_Growth", F.exp("Sum_Log_Return")) \
    .withColumn("Investment_Value_Numeric", F.round(F.col("Cumulative_Growth") * 100000, 0)) # Keep raw number

# 3. Get Latest Date Dynamically
latest_date = df_growth.select(F.max("Date")).collect()[0][0]

# 4. Filter, Sort (Numeric), THEN Format
leaderboard = df_growth.filter(F.col("Date") == latest_date) \
    .orderBy("Investment_Value_Numeric", ascending=False) \
    .withColumn("Current_Value_INR", indian_format_udf(F.col("Investment_Value_Numeric"))) \
    .select("Ticker", "Current_Value_INR") # Only show the pretty string

display(leaderboard)

In [0]:
import os

# 1. Setup Path
current_folder = os.getcwd()
repo_root = os.path.dirname(current_folder)
data_folder = f"{repo_root}/data"

print(f"üìÇ Saving all analysis files to: {data_folder}")

# --- SAVE 1: Risk Profile (Volatility) ---
try:
    path_risk = f"{data_folder}/stock_risk.csv"
    # Convert to Pandas and save
    risk_profile.toPandas().to_csv(path_risk, index=False)
    print(f"‚úÖ Saved Risk Profile to: {path_risk}")
except NameError:
    print("‚ö†Ô∏è 'risk_profile' table not found. (Did you run the Volatility cell?)")

# --- SAVE 2: Buy/Sell Signals (Indicators) ---
try:
    path_signals = f"{data_folder}/stock_signals.csv"
    latest_signals.toPandas().to_csv(path_signals, index=False)
    print(f"‚úÖ Saved Signals to: {path_signals}")
except NameError:
    print("‚ö†Ô∏è 'latest_signals' table not found. (Did you run the Indicator cell?)")

# --- SAVE 3: Growth Leaderboard ---
try:
    path_growth = f"{data_folder}/stock_growth.csv"
    df_growth.toPandas().to_csv(path_growth, index=False)
    print(f"‚úÖ Saved Growth Data to: {path_growth}")
except NameError:
    print("‚ö†Ô∏è 'df_growth' table not found. (Did you run the Cumulative Return cell?)")

print("\nüöÄ DONE! Go to Git Menu -> Commit & Push now.")