In [1]:
import sys, os
from pyspark.sql.functions import col, monotonically_increasing_id, expr, date_format, sha2, concat_ws
from delta.tables import DeltaTable
from datetime import date, timedelta


# Insert the parent directory (one level up) onto Python’s module search path
sys.path.insert(0, os.path.abspath(".."))

from dev_spark_session import DevSparkSession 
from helper_modules import SCDType2Handler, HelperMethods



### Initiate Spark

In [4]:

# stockdata = GetStockData() 
spark = DevSparkSession().spark


silver_path = "/Users/PC/Desktop/VS Code Repositories/azure-stock-market/Code/Local development/Azure storage/Silver/delta-table"  # Target location for Silver Delta table
dim_symbol_path = "/Users/PC/Desktop/VS Code Repositories/azure-stock-market/Code/Local development/Azure storage/Gold/delta-tables/dim-symbol"
dim_date_path = "/Users/PC/Desktop/VS Code Repositories/azure-stock-market/Code/Local development/Azure storage/Gold/delta-tables/dim-date"
fact_daily_path = "/Users/PC/Desktop/VS Code Repositories/azure-stock-market/Code/Local development/Azure storage/Gold/delta-tables/fact-daily-summary"




## Fact Trading Loader

In [5]:
df_silver = spark.read.format("delta").load(silver_path)
df_fact =  df_silver
df_fact =  df_fact.withColumn("DateID", date_format(col("Date"), "yyyyMMdd").cast("int")).drop("Date")


# Add Hash Key for Dim Symbol 

businessColumns  = ["Symbol", "ExchangeName", "Currency"] 
df_fact = df_fact.withColumn("DimSymbolBusinessHash" , sha2( concat_ws("|", *businessColumns), 256))



df_dim_symbol = spark.table(f"delta.`{dim_symbol_path}` ")

df_fact = df_fact.alias("f").join(df_dim_symbol.alias("d"), on = expr("f.DimSymbolBusinessHash = d.__BusinessKeyHash"), how = "left" ) \
                .where("d.__CurrentFlag = true") \
                .selectExpr("d.SymbolSID", 
                            "f.Volume", 
                            "f.High",
                            "f.Low",
                            "f.Open",
                            "f.Close",
                            "f.DateID"
                )



parameters = {
        "businessColumns" : "Symbol,ExchangeName,Currency",
        "typeIColumns" : "", 
        "tableType" : "Fact"
        }

scd2Handler =  SCDType2Handler(parameters)


scd2Handler.refresh_timestamp()
add_audit_columns =  scd2Handler.add_audit_columns
df_fact = df_fact.transform(add_audit_columns)


## ADD_SID

sid_offest = spark.read.format("delta").load(fact_daily_path)
sid_offest = sid_offest.selectExpr("max(TransactionSID)").head()[0]
sid_offest = sid_offest + 1 if sid_offest else 0



df_fact = df_fact.withColumn("TransactionSID", monotonically_increasing_id() +  sid_offest)


df_fact.show()
deltaTable = DeltaTable.forPath(spark, fact_daily_path)
scd2Handler.delta_merge_typeII(deltaTable, df_fact)




25/04/27 12:43:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---------+--------+----+---+----+-----+--------+-------------+--------------------+--------------------+--------------------+--------------+
|SymbolSID|  Volume|High|Low|Open|Close|  DateID|__DeletedFlag|       __FactKeyHash| __CreatedBatchLogId|    __CreateDateTime|TransactionSID|
+---------+--------+----+---+----+-----+--------+-------------+--------------------+--------------------+--------------------+--------------+
|        1|48899500| 390|380| 384|  389|20211221|        false|442a292c8e7230d8b...|aa893b8c-eba7-415...|2025-04-27 12:43:...|           502|
|        1|52545800| 398|393| 397|  398|20211210|        false|4ef81c23fa262901f...|aa893b8c-eba7-415...|2025-04-27 12:43:...|           503|
|        1|36664900| 337|333| 334|  337|20210409|        false|2c5399bcd95ab6984...|aa893b8c-eba7-415...|2025-04-27 12:43:...|           504|
|        1|33187200| 342|339| 342|  340|20210427|        false|10fe862d9b23ad3e7...|aa893b8c-eba7-415...|2025-04-27 12:43:...|           505|
|     

In [6]:
df_history = spark.sql(f" select   year , count(*)  from  delta.`{dim_date_path}` group by year   order by year asc ")


df_history.show(100, truncate=False)

+----+--------+
|year|count(1)|
+----+--------+
|2000|366     |
|2001|365     |
|2002|365     |
|2003|365     |
|2004|366     |
|2005|365     |
|2006|365     |
|2007|365     |
|2008|366     |
|2009|365     |
|2010|365     |
|2011|365     |
|2012|366     |
|2013|365     |
|2014|365     |
|2015|365     |
|2016|366     |
|2017|365     |
|2018|365     |
|2019|365     |
|2020|366     |
|2021|365     |
+----+--------+



## Populate Dim Table if needed

In [None]:


hm = HelperMethods(spark=spark)
df_dimdate = spark.read.format("delta").load(dim_date_path)  


df_fact_daily = spark.read.format("delta").load(fact_daily_path)  



max_date_dimDate =  df_dimdate.selectExpr("cast(max(date) as date) as min_date").head()[0]
min_date_fact =  df_fact_daily.selectExpr("to_date(cast(min(DateID) as string), 'yyyyMMdd') as min_date").head()[0]
max_date_fact =  df_fact_daily.selectExpr("to_date(cast(max(DateID) as string), 'yyyyMMdd') as max_date").head()[0]

print(max_date_dimDate,min_date_fact, max_date_fact)

## populate DimDate table if we dont have records with dates 
if min_date_fact > max_date_dimDate: 
    
    print("work")
    
    start_date = max_date_dimDate + timedelta(days=1)
    end_date =  date( max_date_fact.year , 12, 31)
    
    hm.update_DimDate_fromRange(start_date, end_date )
    print(f"Dim Table Updated with date range [ {start_date} : {end_date} ] ")
    
    


