In [0]:
from pyspark.sql.functions import current_timestamp
# import os

# Define paths
source_path = "dbfs:/Volumes/workspace/default/purchase_csv/"
log_table = "purchase_bronze.load_log"

# Ensure schema exists
spark.sql("CREATE SCHEMA IF NOT EXISTS purchase_bronze")

# Ensure log table exists
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {log_table} (
  filename STRING,
  tablename STRING,
  status STRING,
  message STRING,
  timestamp TIMESTAMP
)
USING DELTA
""")

# List CSV files
files = dbutils.fs.ls(source_path)

for f in files:
    if not f.name.endswith(".csv"):
        continue

    filename = f.name
    filepath = f.path
    tablename = filename.replace(".csv", "")

    try:
        if f.size == 0:
            status, message = "SKIPPED", "File is empty"
        elif spark.catalog.tableExists(f"purchase_bronze.{tablename}"):
            status, message = "SKIPPED", "Table already exists"
        else:
            # Load CSV into bronze table
            df = spark.read.option("header", "true").option("inferSchema", "true").csv(filepath)
            df.write.mode("overwrite").saveAsTable(f"purchase_bronze.{tablename}")

            status, message = "SUCCESS", "Table created successfully"
    except Exception as e:
        status, message = "FAILED", str(e)

    # Log result

    from datetime import datetime

    log_df = spark.createDataFrame([
        (filename, tablename, status, message, datetime.now())
    ], ["filename", "tablename", "status", "message", "timestamp"])

    log_df.write.mode("append").insertInto(log_table)




In [0]:
%sql
select * from purchase_bronze.purchaseorder limit 5;

In [0]:
%sql
select * from purchase_bronze.load_log where filename in ('PurchCategory.csv','VendTable.csv');

In [0]:
%sql
select * from purchase_bronze.purchcategory limit 2;

In [0]:
%sql
select * from purchase_bronze.vendtable limit 2;