In [0]:
#functions(F)->id used for math, aggregations, transformations
#Window->time series calculation
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [0]:
# retrieving the silver tables and storing them in the form of dataframes
stock_df=spark.table("stock_prices_clean")
investor_df=spark.table("investor_master_clean")
txn_df=spark.table("portfolio_transactions_clean")

## **GOLD ANALYTICS – STOCK RETURNS**

#### **Daily Returns**

In [0]:
# grouping the data stock-wise and ordering them by date 
window_spec = Window.partitionBy("symbol").orderBy("trade_date")

stock_returns_df = stock_df.withColumn(
    "daily_return",
    (F.col("close_price") - F.lag("close_price").over(window_spec)) /
    F.lag("close_price").over(window_spec)
)


#### **Volatility (Risk Metric)**

In [0]:
#groups the stocks by symbol and calculates the standard deviation of daily returns 
stock_volatility_df = stock_returns_df.groupBy("symbol") \
    .agg(
        F.stddev("daily_return").alias("volatility")
    )


In [0]:
stock_volatility_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_stock_volatility")

In [0]:
%sql
SELECT * FROM gold_portfolio_value LIMIT 10;

trade_date,portfolio_value
2022-01-03,57686.4
2022-01-04,8623.86
2022-01-06,31358.7
2022-01-10,52921.95999999999
2022-01-11,134541.01
2022-01-13,7300.599999999999
2022-01-17,60824.4
2022-01-18,
2022-01-20,60152.4
2022-01-21,53858.88


## **GOLD ANALYTICS – PORTFOLIO PERFORMANCE**

#### **Join Transactions with Prices**

In [0]:
portfolio_base_df = txn_df.join(
    stock_df.select("symbol", "trade_date", "close_price"),
    ["symbol", "trade_date"],
    "inner"
)

#### **Position Value**

In [0]:
portfolio_positions_df = portfolio_base_df.withColumn(
    "position_value",
    F.col("quantity") * F.col("close_price")
)

#### **Daily Portfolio Value**

In [0]:
portfolio_value_df = portfolio_positions_df.groupBy("trade_date") \
    .agg(
        F.sum("position_value").alias("portfolio_value")
    ) \
    .orderBy("trade_date")

In [0]:
portfolio_value_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_portfolio_value")


In [0]:
%sql
SELECT * FROM gold_portfolio_value LIMIT 10;


trade_date,portfolio_value
2022-01-03,57686.4
2022-01-04,8623.86
2022-01-06,31358.7
2022-01-10,52921.95999999999
2022-01-11,134541.01
2022-01-13,7300.599999999999
2022-01-17,60824.4
2022-01-18,
2022-01-20,60152.4
2022-01-21,53858.88


## **GOLD ANALYTICS – SECTOR PERFORMANCE**

In [0]:
sector_kpi_df = stock_df.groupBy("sector") \
    .agg(
        F.sum("volume").alias("total_volume"),
        F.avg("close_price").alias("avg_close_price")
    )


In [0]:
sector_kpi_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_sector_kpis")

In [0]:
%sql
select * from gold_sector_kpis LIMIT 10;

sector,total_volume,avg_close_price
Telecom,258381647.0,297.6717113402059
Retail,248908944.0,162.84385714285705
Energy,250345650.0,309.9992900608519
Manufacturing,242555106.0,417.95985743380834
Technology,254453136.0,403.14660569105695
Automobile,247723840.0,290.5534888438135
Finance,247695748.0,324.8683265306119
Pharma,236183233.0,453.6865082644629
Consumer,250812774.0,437.3993951612903
Healthcare,246842115.0,149.01075203252023


## **Gold Layer: Risk-Adjusted Return**

#### **Average Return per Stock**

In [0]:
avg_return_df = stock_returns_df.groupBy("symbol") \
    .agg(
        F.avg("daily_return").alias("avg_daily_return")
    )


#### **Join with Volatility**

In [0]:
risk_adjusted_df = avg_return_df.join(
    stock_volatility_df,
    "symbol"
)


#### **Compute Risk-Adjusted Score**

In [0]:
risk_adjusted_df = risk_adjusted_df.withColumn(
    "risk_adjusted_return",
    F.col("avg_daily_return") / F.col("volatility")
)


In [0]:
risk_adjusted_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_risk_adjusted_returns")

In [0]:
%sql
select * from gold_risk_adjusted_returns LIMIT 10;

symbol,avg_daily_return,volatility,risk_adjusted_return
ALPH,0.0009011752172119902,0.0113034574781612,0.0797256254516014
AUTO,-0.0001411095631312413,0.0161420699031618,-0.008741726679278
BNKX,0.00011591826569761947,0.0145496212647212,0.0079670984961435
CONS,4.505973587685943e-05,0.0098376134995914,0.0045803523261745
ENGY,-0.0005303693509360594,0.0155769358112945,-0.0340483749410778
HLTH,-0.0012723423563044,0.03070161651666,-0.0414421942771016
METL,0.0008564105000399447,0.0113257498929175,0.0756162292243001
PHRM,0.0001967982041191458,0.0101613656851022,0.019367298670067
RETL,0.0014327459207634,0.0290412814991279,0.0493348036589396
TELC,-0.0004176669224933199,0.0150490538190333,-0.0277536998349407


## **Investor Trading Behavior Analysis**

In [0]:
from pyspark.sql.functions import count, when, col
# Calculate trading frequency
investor_trade_count = (
    txn_df
    .groupBy("investor_id")
    .agg(count("*").alias("trade_count"))
)



In [0]:

# Classify investor behavior
gold_investor_trading_behavior = (
    investor_trade_count
    .withColumn(
        "trading_behavior",
        when(col("trade_count") <= 3, "Long-Term Investor")
        .when(col("trade_count") <= 10, "Swing Trader")
        .otherwise("Frequent Trader")
    )
)


In [0]:
investor_master_df = spark.table("investor_master_clean")
gold_investor_trading_behavior = (
    gold_investor_trading_behavior
    .join(
        investor_master_df.select("investor_id", "region"),
        on="investor_id",
        how="left"
    )
)


In [0]:
gold_investor_trading_behavior.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("gold_investor_trading_behavior")


In [0]:
%sql
SELECT * from gold_investor_trading_behavior LIMIT 15;

investor_id,trade_count,trading_behavior,region
INV009,1,Long-Term Investor,US
INV155,2,Long-Term Investor,Europe
INV061,3,Long-Term Investor,Europe
INV052,2,Long-Term Investor,Asia
INV135,1,Long-Term Investor,Asia
INV289,5,Swing Trader,US
INV115,2,Long-Term Investor,Europe
INV147,4,Swing Trader,US
INV133,1,Long-Term Investor,US
INV007,3,Long-Term Investor,US


In [0]:
%sql
SHOW TABLES;


database,tableName,isTemporary
default,gold_investor_trading_behavior,False
default,gold_portfolio_value,False
default,gold_risk_adjusted_returns,False
default,gold_sector_kpis,False
default,gold_stock_volatility,False
default,investor_master_clean,False
default,portfolio_transactions_clean,False
default,stock_prices_clean,False


## Inserting a record to test manual refresh behaviour 

In [0]:
%sql
INSERT INTO gold_investor_trading_behavior
VALUES (
    9999,
    15,
    'Frequent Trader',
    'North America'
);


num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
SELECT count(*) from gold_investor_trading_behavior


count(*)
274
