In [0]:
from pyspark.sql.functions import col, to_date, current_timestamp, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import datetime

blob_container_name = "shellcsvblob"
blob_storage_account_name = "shellcpblob"
blob_storage_access_key = "oUB3Q/2hW5nTqq0hyWeMfLM5PFmGmZ27qp0sIXr6FoR8gO1kT3unKhvOnfEJv6lqYPIw6UUYR1VD+AStntj+2A=="

adls_container_name = "bronze"
adls_storage_account_name = "shelladlssa"
adls_storage_access_key = "O6gxvhRAlCQR67WDve7erSwbDkfvM0TCY0psdehHE+GkbKNKBoezwslKT9+XgxWu0hHkK+mpfDnC+ASt+PFIEg=="

# Configure Spark to access Blob Storage and ADLS
spark.conf.set(f"fs.azure.account.key.{blob_storage_account_name}.blob.core.windows.net", blob_storage_access_key)
spark.conf.set(f"fs.azure.account.key.{adls_storage_account_name}.dfs.core.windows.net", adls_storage_access_key)
start_date = datetime.date(2024, 8, 1)
end_date = datetime.date(2024, 8, 7)

# Initialize empty DataFrames for each country
us_df_combined = None
ind_df_combined = None
nl_df_combined = None

# Define the schema
schema = StructType([
    StructField("Country_ID", StringType(), True),
    StructField("Date", StringType(), True),  
    StructField("Site_ID", StringType(), True),
    StructField("Product_ID", StringType(), True),
    StructField("Volume_Sold", StringType(), True)  
])

# Iterate through the date range
current_date = start_date
while current_date <= end_date:
    # Format the current date as string
    current_date_str = current_date.strftime('%Y-%m-%d')

    # Construct file paths
    blob_folder_path = f"{current_date_str}"
    blob_file_paths = [f"{blob_folder_path}/{country}.csv" for country in ["US", "IN", "NL"]]

    # Read the CSV files without modifying data
    dfs = []
    for file_path in blob_file_paths:
        df = (spark.read.format("csv")
              .option("header", "true")
              .schema(schema)
              .load(f"wasbs://{blob_container_name}@{blob_storage_account_name}.blob.core.windows.net/{file_path}"))
        
        dfs.append(df)

    # Combine DataFrames
    if dfs:
        combined_df = spark.createDataFrame(
            [row for df in dfs for row in df.collect()], schema)

        combined_df = combined_df.withColumn("load_timestamp", current_timestamp())

        # Pivot the combined table to create the final tables
        if us_df_combined is None:
            us_df_combined = combined_df.filter(col("Country_ID") == "USA")
        else:
            us_df_combined = us_df_combined.union(combined_df.filter(col("Country_ID") == "USA"))

        if ind_df_combined is None:
            ind_df_combined = combined_df.filter(col("Country_ID") == "IND")
        else:
            ind_df_combined = ind_df_combined.union(combined_df.filter(col("Country_ID") == "IND"))

        if nl_df_combined is None:
            nl_df_combined = combined_df.filter(col("Country_ID") == "NL")
        else:
            nl_df_combined = nl_df_combined.union(combined_df.filter(col("Country_ID") == "NL"))

    # Increment the date
    current_date += datetime.timedelta(days=1)

# Write the final DataFrames as Delta tables
if nl_df_combined is not None:
    (nl_df_combined
     .write
     .format("delta")
     .mode("overwrite")
     .save(f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/combined/nl"))

if us_df_combined is not None and us_df_combined.count() > 0:
    (us_df_combined
     .write
     .format("delta")
     .mode("overwrite")
     .save(f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/combined/us"))

if ind_df_combined is not None and ind_df_combined.count() > 0:
    (ind_df_combined
     .write
     .format("delta")
     .mode("overwrite")
     .save(f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/combined/ind"))
    display(ind_df_combined)
    display(us_df_combined)
    display(nl_df_combined)


In [0]:
from pyspark.sql.functions import col, row_number, when, desc, to_date, regexp_replace, lit, max as max_
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

# Define paths to the Delta tables
us_delta_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/combined/us"
ind_delta_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/combined/ind"
nl_delta_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/combined/nl"

# Read the Delta tables into DataFrames
us_df = spark.read.format("delta").load(us_delta_path)
ind_df = spark.read.format("delta").load(ind_delta_path)
nl_df = spark.read.format("delta").load(nl_delta_path)


combined_df = us_df.union(ind_df).union(nl_df)
display(combined_df)


In [0]:
from pyspark.sql.functions import col

bronze_delta_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/bronze_table"

# Write the combined DataFrame to the Bronze layer as a Delta table
combined_df.write.format("delta").mode("overwrite").save(bronze_delta_path)


Reference Tables for date, country code, global product id, global product category, global site id


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

date_data = [
    ("USA", "MM/DD/YYYY", "YYYY/MM/DD"),
    ("NL", "DD/MM/YYYY", "YYYY/MM/DD"),
    ("IND", "DD/MM/YYYY", "YYYY/MM/DD"),
]
date_columns = ["Country_ID", "Original_Format", "Desired_Format"]

country_data = [
    ("INDIA","IND", 1),
    ("USA","USA", 2),
    ("Netherlands","NL", 3)
]
country_columns = ["Country_Name", "Country_Code", "Country_ID"]

product_data = [
    (89, 5001, "Regular Gasoline"),
    (91, 5002, "Premium Gasoline"),
    (94, 5003, "Diesel"),
    (98, 5004, "Premium Diesel"),
    (99, 5005, "Super Premium Diesel")
]
product_columns = ["Local_Product_ID", "Global_Product_ID", "Global_Product_Category"]

site_data = [
    # India (IN) Sites
    ("IND", 1, 1001),
    ("IND", 2, 1002),
    ("IND", 3, 1003),
    ("IND", 4, 1004),
    ("IND", 5, 1005),
    ("IND", 6, 1006),
    ("IND", 7, 1007),
    ("IND", 8, 1008),
    ("IND", 9, 1009),
    ("IND", 10, 1010),

    # USA (US) Sites
    ("USA", 1, 2001),
    ("USA", 2, 2002),
    ("USA", 3, 2003),
    ("USA", 4, 2004),
    ("USA", 5, 2005),
    ("USA", 6, 2006),
    ("USA", 7, 2007),
    ("USA", 8, 2008),
    ("USA", 9, 2009),
    ("USA", 10, 2010),

    # Netherlands (NL) Sites
    ("NL", 1, 3001),
    ("NL", 2, 3002),
    ("NL", 3, 3003),
    ("NL", 4, 3004),
    ("NL", 5, 3005),
    ("NL", 6, 3006),
    ("NL", 7, 3007),
    ("NL", 8, 3008),
    ("NL", 9, 3009),
    ("NL", 10, 3010)
]
site_columns = ["Country_Code", "Local_Site_ID", "Global_Site_ID"]

global_volume_data = [
    ("gallons", 3.78541, "liters"),
    ("liters", 1.0, "liters"),
    ("quarts", 0.946353, "liters"),
    ("pints", 0.473176, "liters"),
    ("milliliters", 0.001, "liters")
]
global_volume_columns = ["Original_Unit", "Conversion_Factor", "Standard_Unit"]
global_vol_df = spark.createDataFrame(global_volume_data, global_volume_columns)
global_vol_df = global_vol_df.withColumn(
    "Conversion_Factor", col("Conversion_Factor").cast(DoubleType())
)
global_Country_df = spark.createDataFrame(country_data, country_columns)
global_product_df = spark.createDataFrame(product_data, product_columns)
global_site_df = spark.createDataFrame(site_data, site_columns)
global_dateformat_df = spark.createDataFrame(date_data,date_columns )

# Display the Global Reference Tables
print("Global Country Reference Table:")
global_Country_df.display()

print("Global Product Reference Table:")
global_product_df.display()

print("Global Site Reference Table:")
global_site_df.display()

print("Global date fromat Reference Table:")
global_dateformat_df.display()

print("Global volume Reference Table:")
global_vol_df.display()


In [0]:
def convert_date(df):
    # Directly use the date format based on Country_ID
    df_converted_dates = df.withColumn(
        "Date",
        when(
            col("Country_ID") == "USA", to_date(col("Date"), "MM/dd/yyyy")
        ).when(
            col("Country_ID").isin("NL", "IND"), to_date(col("Date"), "dd/MM/yyyy")
        )
    )
    
    return df_converted_dates

def convert_data_types(df):
    converted_df = df \
        .withColumn("Country_ID", col("Country_ID").cast("string")) \
        .withColumn("Date", col("Date").cast("date")) \
        .withColumn("Site_ID", col("Site_ID").cast("integer")) \
        .withColumn("Product_ID", col("Product_ID").cast("integer")) \
        .withColumn("Volume_Sold", col("Volume_Sold").cast("string")) \
        .withColumn("load_timestamp", expr("CAST(load_timestamp AS TIMESTAMP)"))
    
    return converted_df

In [0]:
from pyspark.sql.functions import col, to_date, regexp_replace, round, when, trim, max as max_, lower
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

#drop nulls
null_replacement_list = ['null', 'NULL', 'NaN', 'n/a', 'N/A', 'none', 'None', 'undefined', 'missing', 'blank', ' ', 'UNKNOWN', 'Unknown', 'unknown', '-1']

quarantine_df = combined_df.filter(
    col("Volume_Sold").isin(null_replacement_list) |
    col("Date").isin(null_replacement_list) |
    col("Site_ID").isin(null_replacement_list) |
    col("Product_ID").isin(null_replacement_list)
)

quarentine_delta_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/quarentine_table"

quarantine_df.write.format("delta").mode("overwrite").save(quarentine_delta_path)

#drop nulls
null_replacement_list = ['null', 'NULL', 'NaN', 'n/a', 'N/A', 'none', 'None', 'undefined', 'missing', 'blank', ' ', 'UNKNOWN', 'Unknown', 'unknown', '-1']
cleaned_df = combined_df.replace(null_replacement_list, None)
# Trim whitespace
cleaned_df = cleaned_df.select([trim(col(c)).alias(c) for c in cleaned_df.columns])
critical_columns = ["Site_ID", "Product_ID", "Volume_Sold", "Date", "load_timestamp"]
cleaned_df = cleaned_df.dropna(subset=critical_columns)

# remove duplicate and take max volume sold
window_spec = Window.partitionBy("Country_ID", "Date", "Site_ID", "Product_ID").orderBy(desc("Volume_Sold"))
cleaned_df = cleaned_df.withColumn("row_num", row_number().over(window_spec))
cleaned_df = cleaned_df.filter(col("row_num") == 1).drop("row_num")

# Display the result
cleaned_df.display()

In [0]:
from pyspark.sql.functions import col

bronze_delta_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/cleaned_bronze_table"

cleaned_df.write.format("delta").mode("overwrite").save(bronze_delta_path)


In [0]:
#data type conversions
cleaned_df = convert_date(cleaned_df)
cleaned_df = convert_data_types(cleaned_df)
cleaned_df.printSchema()
cleaned_df.display()

In [0]:
cleaned_df = cleaned_df.withColumn(
    "Volume_Value", regexp_replace(col("Volume_Sold"), " .*", "").cast(DoubleType())
).withColumn(
    "Volume_Unit", lower(regexp_replace(col("Volume_Sold"), "^[0-9.]+ ", ""))
)

# Join with the reference table
cleaned_df = cleaned_df.join(
    global_vol_df,
    lower(cleaned_df["Volume_Unit"]) == lower(global_vol_df["Original_Unit"]),
    "left"
).withColumn(
    "Volume_Sold_Liters",
    when(col("Conversion_Factor").isNotNull(), round(col("Volume_Value") * col("Conversion_Factor"), 2))
    .otherwise(col("Volume_Value"))
).select(
    "Country_ID",
    "Date",
    "Site_ID",
    "Product_ID",
    "Volume_Sold",
    "load_timestamp",
    "Volume_Sold_Liters" 
)

# Display the resulting DataFrame
cleaned_df.printSchema()
cleaned_df.display()

In [0]:
cleaned_df = cleaned_df.join(global_Country_df, cleaned_df.Country_ID == global_Country_df.Country_Code, "left") \
                        .select(cleaned_df["*"], global_Country_df["Country_ID"].alias("Global_Country_ID"))

cleaned_df = cleaned_df.join(global_product_df, cleaned_df.Product_ID == global_product_df.Local_Product_ID, "left") \
                       .select(cleaned_df["*"], 
                               global_product_df["Global_Product_ID"], 
                               global_product_df["Global_Product_Category"])

cleaned_df.display()

In [0]:
final_df = cleaned_df.join(global_site_df, (cleaned_df.Country_ID == global_site_df.Country_Code) & 
                                           (cleaned_df.Site_ID == global_site_df.Local_Site_ID), "left") \
                       .select(cleaned_df["*"], global_site_df["Global_Site_ID"])
display(final_df)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType, LongType
schema = StructType([
    StructField("Country_ID", StringType(), True),
    StructField("Date", DateType(), True),
    StructField("Site_ID", IntegerType(), True),
    StructField("Product_ID", IntegerType(), True),
    StructField("Volume_Sold", StringType(), True),
    StructField("load_timestamp", TimestampType(), True),
    StructField("Volume_Sold_Liters", DoubleType(), True),
    StructField("Global_Country_ID", LongType(), True),
    StructField("Global_Product_ID", LongType(), True),
    StructField("Global_Product_Category", StringType(), True),
    StructField("Global_Site_ID", LongType(), True)
])

# Read CSV from blob storage
csv_path = f"wasbs://{blob_container_name}@{blob_storage_account_name}.blob.core.windows.net/silver_prev_data.csv"
silver_df = spark.read.csv(csv_path, schema=schema, header=True, inferSchema=False)

# Define Delta table path in ADLS
delta_table_path = f"abfss://{adls_container_name}@{adls_storage_account_name}.dfs.core.windows.net/delta/rbcisk"

# Write DataFrame to Delta table
silver_df.write.format("delta").mode("overwrite").save(delta_table_path)

silver_df.show(36602, truncate=False)
num_rows = silver_df.count()
print(f"Number of rows: {num_rows}")


In [0]:
from delta.tables import DeltaTable  # Import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, to_timestamp
from pyspark.sql.window import Window

# Assume spark is your SparkSession
spark = SparkSession.builder.getOrCreate()

# Ensure consistent data types
final_df = final_df.withColumn("Country_Id", col("Country_Id").cast("string")) \
                   .withColumn("Global_Country_ID", col("Global_Country_ID").cast("int")) \
                   .withColumn("Date", to_timestamp(col("Date"))) \
                   .withColumn("Site_ID", col("Site_ID").cast("int")) \
                   .withColumn("Global_Site_ID", col("Global_Site_ID").cast("int")) \
                   .withColumn("Product_ID", col("Product_ID").cast("int")) \
                   .withColumn("Global_Product_ID", col("Global_Product_ID").cast("int")) \
                   .withColumn("Global_Product_Category", col("Global_Product_Category").cast("string")) \
                   .withColumn("Volume_Sold", col("Volume_Sold").cast("string")) \
                   .withColumn("Volume_Sold_Liters", col("Volume_Sold_Liters").cast("float")) \
                   .withColumn("load_timestamp", to_timestamp(col("load_timestamp")))

# Convert both DataFrames to Delta tables
silver_df.write.format("delta").mode("overwrite").save("/tmp/silver_df")
final_df.write.format("delta").mode("overwrite").save("/tmp/final_df")

# Read them back as Delta tables
silver_table = DeltaTable.forPath(spark, "/tmp/silver_df")
final_table = DeltaTable.forPath(spark, "/tmp/final_df")

# Create a temporary view of final_table with the latest records based on load_timestamp
window_spec = Window.partitionBy("Country_Id", "Site_ID", "Product_ID").orderBy(col("load_timestamp").desc())

latest_final_df = final_df.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("rn")

latest_final_df.createOrReplaceTempView("latest_final_df")

# Perform SCD Type 1 Merge using Delta Lake
silver_table.alias("s").merge(
    latest_final_df.alias("f"),
    "s.Country_Id = f.Country_Id AND s.Site_ID = f.Site_ID AND s.Product_ID = f.Product_ID"
).whenMatchedUpdate(set={
    "Global_Country_ID": col("f.Global_Country_ID"),
    "Date": col("f.Date"),
    "Global_Site_ID": col("f.Global_Site_ID"),
    "Global_Product_ID": col("f.Global_Product_ID"),
    "Global_Product_Category": col("f.Global_Product_Category"),
    "Volume_Sold": col("f.Volume_Sold"),
    "Volume_Sold_Liters": col("f.Volume_Sold_Liters"),
    "load_timestamp": col("f.load_timestamp")
}).whenNotMatchedInsert(values={
    "Country_Id": col("f.Country_Id"),
    "Global_Country_ID": col("f.Global_Country_ID"),
    "Date": col("f.Date"),
    "Site_ID": col("f.Site_ID"),
    "Global_Site_ID": col("f.Global_Site_ID"),
    "Product_ID": col("f.Product_ID"),
    "Global_Product_ID": col("f.Global_Product_ID"),
    "Global_Product_Category": col("f.Global_Product_Category"),
    "Volume_Sold": col("f.Volume_Sold"),
    "Volume_Sold_Liters": col("f.Volume_Sold_Liters"),
    "load_timestamp": col("f.load_timestamp")
}).execute()

# Verify the records in silver_df
silver_table.toDF().orderBy("Country_Id", "Site_ID", "Product_ID").show(36602, truncate=False)
num_rows = silver_table.toDF().count()
print(f"Number of rows: {num_rows}")


In [0]:
from pyspark.sql.functions import sha2, concat_ws
primary_key_columns = ["Country_Id", "Site_ID", "Product_ID"]
# Create a hash for both old and new data (excluding the primary key columns)
old_df_with_hash = final_df.withColumn("hash", sha2(concat_ws("||", *[col for col in final_df.columns if col not in primary_key_columns]), 256))
updated_df_with_hash = silver_table.toDF().withColumn("hash", sha2(concat_ws("||", *[col for col in silver_table.toDF().columns if col not in primary_key_columns]), 256))

# Compare the hashes
changed_records = old_df_with_hash.join(updated_df_with_hash, primary_key_columns, "inner").filter(old_df_with_hash["hash"] != updated_df_with_hash["hash"])

changed_count = changed_records.count()
print(f"Number of records updated: {changed_count}")

In [0]:
# Join old and updated dataframes on the primary keys
primary_key_columns = ["Country_Id", "Site_ID", "Product_ID"]  # Replace with your actual key columns

# Anti-join to find the rows that have changed (i.e., records in old_df that are not in updated_df)
changed_records = final_df.join(silver_table.toDF(), primary_key_columns, "leftanti")

changed_count = changed_records.count()
print(f"Number of records updated: {changed_count}")

In [0]:
dbutils.fs.ls("/FileStore/Silver_table_afterSCD1.csv")


Out[4]: [FileInfo(path='dbfs:/FileStore/Silver_table_afterSCD1.csv', name='Silver_table_afterSCD1.csv', size=168821, modificationTime=1724257553000)]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as _sum, lag
from pyspark.sql.window import Window


# Load the data from CSV
file_path = "dbfs:/FileStore/Silver_table_afterSCD1.csv"  # Replace with your actual file path
data_df = spark.read.option("header", "true").csv(file_path)

# Convert the 'Date' column to DateType
data_df = data_df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

# Define the end date (today's date) and start date (one week before)
end_date = "2024-08-06"
start_date = "2024-07-30"

# Filter the data for the last week
filtered_df = data_df.filter((col("Date") > start_date) & (col("Date") <= end_date))

# Summarize the data by date to calculate total Volume Sold
weekly_summary_df = filtered_df.groupBy("Date").agg(_sum(col("Volume_Sold_Liters")).alias("Total_Volume_Sold_Liters"))

# Create a window specification to calculate the difference from the previous day
window_spec = Window.orderBy("Date")

# Calculate the difference in sales compared to the previous day
weekly_summary_df = weekly_summary_df.withColumn("Difference", col("Total_Volume_Sold_Liters") - lag(col("Total_Volume_Sold_Liters")).over(window_spec))

# Show the result
weekly_summary_df.orderBy("Date").show()


+----------+------------------------+-------------------+
|      Date|Total_Volume_Sold_Liters|         Difference|
+----------+------------------------+-------------------+
|2024-07-31|      161441.81999999998|               null|
|2024-08-01|      124797.12000000005|-36644.699999999924|
|2024-08-02|      154608.25999999995| 29811.139999999898|
|2024-08-03|      130559.22000000003| -24049.03999999992|
|2024-08-04|               138356.21|  7796.989999999962|
|2024-08-05|      120288.39999999998|-18067.810000000012|
|2024-08-06|               135196.25|  14907.85000000002|
+----------+------------------------+-------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as _sum, date_sub

# Load the data from CSV
file_path = "dbfs:/FileStore/Silver_table_afterSCD1.csv"  # Replace with your actual file path
data_df = spark.read.option("header", "true").csv(file_path)

# Convert the 'Date' column to DateType
data_df = data_df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

# Summarize the data by date to calculate total Volume Sold
daily_sales_df = data_df.groupBy("Date").agg(_sum(col("Volume_Sold_Liters")).alias("Total_Volume_Sold_Liters"))

# Create a DataFrame for today's sales
today_sales_df = daily_sales_df.filter(col("Date") == "2024-08-06").select(col("Date").alias("Today_Date"), col("Total_Volume_Sold_Liters").alias("Today_Sales"))

# Create a DataFrame for last week's same day sales
last_week_sales_df = daily_sales_df.withColumn("Last_Week_Date", date_sub(col("Date"), 7)).filter(col("Date") == "2024-08-06").select(col("Last_Week_Date").alias("Last_Week_Date"), col("Total_Volume_Sold_Liters").alias("Last_Week_Sales"))

# Join today's sales with last week's same day sales
comparison_df = today_sales_df.crossJoin(last_week_sales_df)

# Calculate the difference
comparison_df = comparison_df.withColumn("Difference", col("Today_Sales") - col("Last_Week_Sales"))

# Show the result
comparison_df.show()


+----------+-----------+--------------+---------------+----------+
|Today_Date|Today_Sales|Last_Week_Date|Last_Week_Sales|Difference|
+----------+-----------+--------------+---------------+----------+
|2024-08-06|  135196.25|    2024-07-30|      135196.25|       0.0|
+----------+-----------+--------------+---------------+----------+



In [0]:
d = {1: "Raj", 2:"Jerry"}
l = []
for i in d.keys():
    l.append(i)
print(d[max(l)])

Jerry


Gold Table(done in snowflake)

In [0]:
%sql
create database agg;

-- Switch to the appropriate database
USE DATABASE agg;

CREATE OR REPLACE STAGE my_stage2;

select * from silver_table;

-- Set the session parameter for timestamp input format
ALTER SESSION SET TIMESTAMP_INPUT_FORMAT = 'YYYY-MM-DD"T"HH24:MI:SS.FF3TZHTZM';

-- Define the file format with correct timestamp handling
CREATE OR REPLACE FILE FORMAT my_csv_format
  TYPE = 'CSV'
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  SKIP_HEADER = 1
  NULL_IF = ('NULL', 'null')
  DATE_FORMAT = 'YYYY-MM-DD'
  TIME_FORMAT = 'HH24:MI:SS.FF3TZHTZM';

-- Create or replace the stage
CREATE OR REPLACE STAGE my_stage2
  FILE_FORMAT = my_csv_format;

-- Create the table
CREATE OR REPLACE TABLE silver_table (
    Country_ID VARCHAR,
    Date DATE,
    Site_ID VARCHAR,
    Product_ID VARCHAR,
    Volume_Sold VARCHAR,
    load_timestamp TIMESTAMP_TZ,
    Volume_Sold_Liters DOUBLE,
    Global_Country_ID VARCHAR,
    Global_Product_ID VARCHAR,
    Global_Product_Category VARCHAR,
    Global_Site_ID VARCHAR,
    update_timestamp TIMESTAMP_TZ
);

-- List files in the stage (optional, for debugging)
LIST @my_stage2;

-- Load data from the stage into the table
COPY INTO silver_table
FROM @my_stage2/Silver_table_afterSCD1.csv
FILE_FORMAT = my_csv_format
ON_ERROR = 'CONTINUE';

-- Validate data loading and format errors
COPY INTO silver_table
FROM @my_stage2/Silver_table_afterSCD1.csv
FILE_FORMAT = my_csv_format
VALIDATION_MODE = 'RETURN_ERRORS';



select * from silver_table;



CREATE OR REPLACE TABLE gold_table (
    Country_ID STRING,
    Date DATE,
    Site_ID STRING,
    Product_ID STRING,
    Volume_Sold STRING,
    load_timestamp TIMESTAMP_TZ,
    Volume_Sold_Liters DOUBLE,
    Global_Country_ID STRING,
    Global_Product_ID STRING,
    Global_Product_Category STRING,
    Global_Site_ID STRING,
    Volume_Sold_Last_Week DOUBLE,
    Volume_Sold_Last_Month DOUBLE,
    Volume_Sold_Last_Year DOUBLE,
    Pct_Change_Last_Week DOUBLE,
    Pct_Change_Last_Month DOUBLE,
    Pct_Change_Last_Year DOUBLE,
    Rolling_7_Day_Sum DOUBLE,
    Pct_Change_Rolling_7_Day_Sum DOUBLE
);


INSERT INTO gold_table
SELECT
    s.Country_ID,
    s.Date,
    s.Site_ID,
    s.Product_ID,
    s.Volume_Sold,
    s.load_timestamp,
    s.Volume_Sold_Liters,
    s.Global_Country_ID,
    s.Global_Product_ID,
    s.Global_Product_Category,
    s.Global_Site_ID,
    COALESCE(lw.Volume_Sold_Liters, 0) AS Volume_Sold_Last_Week,
    COALESCE(lm.Volume_Sold_Liters, 0) AS Volume_Sold_Last_Month,
    COALESCE(ly.Volume_Sold_Liters, 0) AS Volume_Sold_Last_Year,
    CASE 
        WHEN lw.Volume_Sold_Liters IS NULL OR lw.Volume_Sold_Liters = 0 THEN NULL
        ELSE (s.Volume_Sold_Liters - lw.Volume_Sold_Liters) / lw.Volume_Sold_Liters * 100
    END AS Pct_Change_Last_Week,
    CASE 
        WHEN lm.Volume_Sold_Liters IS NULL OR lm.Volume_Sold_Liters = 0 THEN NULL
        ELSE (s.Volume_Sold_Liters - lm.Volume_Sold_Liters) / lm.Volume_Sold_Liters * 100
    END AS Pct_Change_Last_Month,
    CASE 
        WHEN ly.Volume_Sold_Liters IS NULL OR ly.Volume_Sold_Liters = 0 THEN NULL
        ELSE (s.Volume_Sold_Liters - ly.Volume_Sold_Liters) / ly.Volume_Sold_Liters * 100
    END AS Pct_Change_Last_Year,
    COALESCE(rs.Rolling_7_Day_Sum, 0) AS Rolling_7_Day_Sum,
    CASE 
        WHEN rs.Last_Rolling_Sum IS NULL OR rs.Last_Rolling_Sum = 0 THEN NULL
        ELSE (rs.Rolling_7_Day_Sum - rs.Last_Rolling_Sum) / rs.Last_Rolling_Sum * 100
    END AS Pct_Change_Rolling_7_Day_Sum
FROM
    silver_table s
LEFT JOIN 
    silver_table lw 
    ON s.Country_ID = lw.Country_ID 
    AND s.Site_ID = lw.Site_ID 
    AND s.Product_ID = lw.Product_ID 
    AND s.Date = DATEADD('day', 7, lw.Date)
LEFT JOIN 
    silver_table lm 
    ON s.Country_ID = lm.Country_ID 
    AND s.Site_ID = lm.Site_ID 
    AND s.Product_ID = lm.Product_ID 
    AND s.Date = DATEADD('day', 30, lm.Date)
LEFT JOIN 
    silver_table ly 
    ON s.Country_ID = ly.Country_ID 
    AND s.Site_ID = ly.Site_ID 
    AND s.Product_ID = ly.Product_ID 
    AND s.Date = DATEADD('year', 1, ly.Date)
LEFT JOIN (
    SELECT
        Country_ID,
        Site_ID,
        Product_ID,
        Date,
        SUM(Volume_Sold_Liters) AS Rolling_7_Day_Sum,
        LAG(SUM(Volume_Sold_Liters)) OVER (PARTITION BY Country_ID, Site_ID, Product_ID ORDER BY Date) AS Last_Rolling_Sum
    FROM
        (SELECT
            Country_ID,
            Site_ID,
            Product_ID,
            Date,
            SUM(Volume_Sold_Liters) OVER (PARTITION BY Country_ID, Site_ID, Product_ID ORDER BY Date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS Volume_Sold_Liters
         FROM
            silver_table
        ) AS inner_rs
    GROUP BY
        Country_ID,
        Site_ID,
        Product_ID,
        Date
) rs
ON s.Country_ID = rs.Country_ID 
AND s.Site_ID = rs.Site_ID 
AND s.Product_ID = rs.Product_ID 
AND s.Date = rs.Date;

select* from gold_table;

SELECT COUNT(*) AS row_count
FROM gold_table;

SELECT COUNT(*) AS row_count
FROM silver_table;






