In [0]:
# Set up Azure Blob Storage Credentials
spark.conf.set(
    "fs.azure.account.key.mmixstorage.blob.core.windows.net",
    "UZTHs33FPYTUvC9G51zk+DQQp/FWf31YOteoW+dEnKuprRgxvk53yS+IpEiLn1062IBpOyoKaXp4+AStRcA1Cw=="
)

import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col


# Initialize Spark Session
spark = SparkSession.builder.getOrCreate()

# Retrieve widget inputs
dbutils.widgets.text("file_urls", "")
dbutils.widgets.text("merge_strategy", "")
dbutils.widgets.text("column_mappings", "")
dbutils.widgets.text("join_keys", "")
dbutils.widgets.text("join_type", "")
dbutils.widgets.text("output_filename", "")
#dbutils.widgets.text("drop_duplicates", "")

# Parse widget parameters and clean strings
file_urls = json.loads(dbutils.widgets.get("file_urls"))
merge_strategy = dbutils.widgets.get("merge_strategy").strip('"').lower()
column_mappings = json.loads(dbutils.widgets.get("column_mappings"))
try:
    join_keys = json.loads(dbutils.widgets.get("join_keys"))
    if not isinstance(join_keys, list):
        join_keys = [join_keys]
except Exception:
    raise ValueError("Join key must be a valid JSON list of column names.")

join_type = dbutils.widgets.get("join_type").strip('"')
output_filename = dbutils.widgets.get("output_filename").strip('"')

# Define base paths and add timestamp
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_blob_url = "wasbs://pre-processing@mmixstorage.blob.core.windows.net"
output_csv_dir = f"{base_blob_url}/correct_csv_outputs/{output_filename}"
sample_csv_dir = f"{base_blob_url}/correct_csv_outputs/{output_filename}_sample"

# Convert HTTPS URLs to WASBS Paths
def convert_to_wasbs(url):
    https_prefix = "https://mmixstorage.blob.core.windows.net/pre-processing/"
    return url.replace(https_prefix, f"{base_blob_url}/")

file_urls = [convert_to_wasbs(url) for url in file_urls]

# Read CSV Files and Apply Column Renaming
dfs = []
for url, mapping in zip(file_urls, column_mappings):
    df = spark.read.option("header", True).option("inferSchema", True).csv(url)
    for old_col, new_col in mapping.items():
        df = df.withColumnRenamed(old_col, new_col)

    # Filter to only keep renamed columns (values from mapping)
    df = df.select([col_name for col_name in mapping.values()])
    
    dfs.append(df)

# Merge DataFrames
if merge_strategy in ["vertical", "vertical stack"]:
    merged_df = dfs[0]
    for df in dfs[1:]:
        merged_df = merged_df.unionByName(df, allowMissingColumns=True)
elif merge_strategy in ["horizontal", "horizontal join"]:
    if not join_keys:
        raise ValueError("Join key must be provided for horizontal joins.")
    merged_df = dfs[0]
    for df in dfs[1:]:
        merged_df = merged_df.join(df, on=join_keys, how=join_type)
else:
    raise ValueError("Invalid merge strategy. Choose 'vertical' or 'horizontal'.")

# Remove rows with nulls in identifier columns and drop duplicates
if join_keys:
    for col_name in join_keys:
        if col_name in merged_df.columns:
            merged_df = merged_df.filter(col(col_name).isNotNull())
else:
    print("⚠️ No join_keys provided — skipping null identifier filtering.")

merged_df = merged_df.dropDuplicates()

# # Remove Duplicates
# merged_df = merged_df.dropDuplicates()

# Clean Existing Output Folders If Any
dbutils.fs.rm(output_csv_dir, recurse=True)
dbutils.fs.rm(sample_csv_dir, recurse=True)

# Write Full Merged Output
merged_df.coalesce(1).write.option("header", True).mode("overwrite").csv(output_csv_dir)

# Write Sample Output (First 100 Rows)
merged_df.limit(100).coalesce(1).write.option("header", True).mode("overwrite").csv(sample_csv_dir)

print("✅ Processing and file writes completed successfully!")
