In [0]:
# Load Kite holdings
kite_df = (
    spark.table("personal_finance.bronze.kite_holding_statement")
    .where("ISIN IS NOT NULL AND Quantity_Available > 0 AND Average_Price IS NOT NULL")
    .selectExpr(
        "ISIN",
        "CAST(Quantity_Available AS DOUBLE) AS quantity",
        "CAST(Average_Price AS DOUBLE) AS buy_price"
    )
)

# Load Groww holdings
groww_df = (
    spark.table("personal_finance.bronze.groww_holding_statement")
    .where("ISIN IS NOT NULL AND Quantity > 0 AND Average_buy_price IS NOT NULL")
    .selectExpr(
        "ISIN",
        "CAST(Quantity AS DOUBLE) AS quantity",
        "CAST(Average_buy_price AS DOUBLE) AS buy_price"
    )
)

# Load Angel One holdings
angel_df = (
    spark.table("personal_finance.bronze.angel_one_holding_statement")
    .where("ISIN IS NOT NULL AND Total_Quantity > 0 AND Avg_Trading_Price IS NOT NULL")
    .selectExpr(
        "ISIN",
        "CAST(Total_Quantity AS DOUBLE) AS quantity",
        "CAST(Avg_Trading_Price AS DOUBLE) AS buy_price"
    )
)

# Union all broker holdings
all_holdings_df = kite_df.unionByName(groww_df).unionByName(angel_df)
filtered_holdings_df = all_holdings_df.where("quantity > 0 AND buy_price IS NOT NULL")

In [0]:
# Aggregate to get consolidated holdings
from pyspark.sql import functions as F

consolidated_df = (
    filtered_holdings_df
    .groupBy("ISIN")
    .agg(
        F.sum("quantity").alias("total_quantity"),
        (F.sum(F.col("quantity") * F.col("buy_price")) / F.nullif(F.sum("quantity"), F.lit(0))).alias("weighted_avg_buy_price")
    )
)

# Load company info
company_info_df = spark.table("personal_finance.silver.company_listing_info")

# Join with company info
final_df = (
    consolidated_df.alias("h")
    .join(company_info_df.alias("c"), F.col("h.ISIN") == F.col("c.ISINNumber"), "left")
    .select(
        "c.Symbol",
        "c.NameOfCompany",
        "h.ISIN",
        F.col("h.total_quantity").alias("TotalQuantity"),
        F.col("h.weighted_avg_buy_price").alias("WeightedAverageBuyPrice")
    )
    .orderBy(F.col("TotalQuantity").desc())
)

display(final_df)

In [0]:
final_df.write.format("delta").mode("overwrite").saveAsTable("personal_finance.silver.consolidated_equity_holdings")