### Downloading essential python library

In [0]:
%pip install python-dotenv
%restart_python

### Loading the functions

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from datetime import datetime, timezone
import os
import dotenv
from dotenv import load_dotenv
import re
from delta.tables import DeltaTable

### Loading essential data from .env

In [0]:
load_dotenv(override=True)

In [0]:
catalog = os.getenv("catalog")
schema = os.getenv("bronze_schema")
container = os.getenv("container")
storage_account = os.getenv("storage_account")
print(schema)

### Bronze Path Creation

In [0]:
bronze_path=f"abfss://{container}@{storage_account}.dfs.core.windows.net/Bronze/"
# print(bronze_path)

### Ensuring schema exists
- DataFrame[]: this is the standard return type for all sql commands

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")

### Creating function for ingestion

In [0]:
RESERVED_KEYWORDS = {"case", "select", "from", "table"}  # expand as needed

def sanitize_name(name):
    """
    Replace invalid characters and handle reserved keywords.
    """
    # Replace invalid characters
    sanitized = re.sub(r"[ ,;{}()\n\t=]", "_", name)
    # Handle reserved keywords
    if sanitized.lower() in RESERVED_KEYWORDS:
        sanitized = sanitized + "_table"
    return sanitized

### Running Ingestion Function
- bronze layer in medallion architecture is used for raw data ingestion
- catalog and schema are passed for final delta table creation from raw csv files in bronze layer
#### For is_file_already_ingested:
- DeltaTable.isDeltaTable checks the folder path for _delta_log/ metadata. If present, it identifies the path as a valid Delta table.
- ingestion meta data is stored in a delta table as it has ACID properties and prevents deduplication of data
- .limit(1).count() is slightly more efficient than counting all rows because it stops scanning after finding 1 matching row. - basically checking if a log row exists for that table
- if no delta table exist yet, the functions assumes files have not been ingested and returns False

#### For log_ingestion:
- creating a row whcih hold file log values like table name, file name, file modification time
- this row is then converted into a dataframe with the schema as depicted
- it then checks, if ingestion_log_path delta table exists, if it does then, file log is appended and if not then the ingestion log_path table is created

#### Main code
- checks for files in each subfolder
- separate case to prevent multiple appends for patientinfo table
- plain overwrite for remaining

In [0]:
# def ingest_latest_files(bronze_path, catalog, schema):
#     def is_file_already_ingested(table_name, file_name):
#         if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#             log_df = spark.read.format("delta").load(ingestion_log_path)
#             return log_df.filter(
#                 (F.col("table_name") == table_name) & (F.col("file_name") == file_name)
#             ).limit(1).count() > 0
#         return False

#     def log_ingestion(table_name, file_name, file_mod_iso):
#         row = [(table_name, file_name, file_mod_iso, datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"))]
#         log_df = spark.createDataFrame(row, schema=["table_name","file_name","file_mod_ts","logged_at"])
#         if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#             log_df.write.format("delta").mode("append").save(ingestion_log_path)
#         else:
#             log_df.write.format("delta").mode("overwrite").save(ingestion_log_path)

#     # List all subfolders inside Bronze
#     subfolders = [f.path for f in dbutils.fs.ls(bronze_path) if f.isDir()]

#     for folder in subfolders:
#         table_name = os.path.basename(folder.rstrip("/")).lower()
#         output_path = f"{bronze_path}delta/{table_name}/"

#         # List files in the subfolder
#         files = [f for f in dbutils.fs.ls(folder) if f.name.endswith(".csv")]
#         if not files:
#             print(f"No files found in {folder}")
#             continue

#         if table_name == "patientinfo":
#             #For PatientInfo → append new files only
#             for file in files:
#                 file_mod_iso = datetime.utcfromtimestamp(file.modificationTime / 1000.0).strftime("%Y-%m-%d %H:%M:%S")

#                 if is_file_already_ingested(table_name, file.name):
#                     print(f"Skipping {file.name} — already ingested.")
#                     continue

#                 df_new = (
#                     spark.read.option("header","true").option("inferSchema","true").csv(file.path)
#                     .withColumn("_source_file", F.lit(file.name))
#                     .withColumn("_ingested_at", F.to_timestamp(F.lit(file_mod_iso)))
#                 )

#                 df_new.write.format("delta").mode("append").option("mergeSchema","true").save(output_path)
#                 log_ingestion(table_name, file.name, file_mod_iso)

#                 spark.sql(f"""
#                   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#                   USING DELTA
#                   LOCATION '{output_path}'
#                 """)
#                 spark.sql(f"ALTER TABLE {catalog}.{schema}.{table_name} SET LOCATION '{output_path}'")
#                 print(f"Appended {file.name} into {table_name}")

#         else:
#             # For all other tables → full overwrite
#             latest_file = sorted(files, key=lambda f: f.modificationTime)[-1]
#             df = (
#                 spark.read.option("header","true").option("inferSchema","true").csv(latest_file.path)
#                 .withColumn("_source_file", F.lit(latest_file.name))
#                 .withColumn("_ingested_at", F.current_timestamp())
#             )
#             for c in df.columns:
#                 new_col = sanitize_name(c)
#                 if new_col != c:
#                     df = df.withColumnRenamed(c, new_col)

#             df.write.format("delta").mode("overwrite").option("overwriteSchema","true").save(output_path)

#             spark.sql(f"""
#               CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#               USING DELTA
#               LOCATION '{output_path}'
#             """)

#             print(f"Overwrote {table_name} with {latest_file.name}")

#     print("Bronze ingestion run complete.")


In [0]:
# def ingest_latest_files(bronze_path, catalog, schema):
#     def is_file_already_ingested(table_name, file_name):
#         if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#             log_df = spark.read.format("delta").load(ingestion_log_path)
#             return log_df.filter(
#                 (F.col("table_name") == table_name) & (F.col("file_name") == file_name)
#             ).limit(1).count() > 0
#         return False

#     def log_ingestion(table_name, file_name, file_mod_iso):
#         row = [(table_name, file_name, file_mod_iso, datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"))]
#         log_df = spark.createDataFrame(row, schema=["table_name","file_name","file_mod_ts","logged_at"])
#         if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#             log_df.write.format("delta").mode("append").save(ingestion_log_path)
#         else:
#             log_df.write.format("delta").mode("overwrite").save(ingestion_log_path)

#     # List all subfolders inside Bronze
#     subfolders = [f.path for f in dbutils.fs.ls(bronze_path) if f.isDir()]

#     for folder in subfolders:
#         table_name = os.path.basename(folder.rstrip("/")).lower()
#         output_path = f"{bronze_path}delta/{table_name}/"

#         # List files in the subfolder
#         files = [f for f in dbutils.fs.ls(folder) if f.name.endswith(".csv")]
#         if not files:
#             print(f"No files found in {folder}")
#             continue

#         if table_name == "patientinfo":
#             #For PatientInfo → append new files only
#             for file in files:
#                 file_mod_iso = datetime.utcfromtimestamp(file.modificationTime / 1000.0).strftime("%Y-%m-%d %H:%M:%S")

#                 if is_file_already_ingested(table_name, file.name):
#                     print(f"Skipping {file.name} — already ingested.")
#                     continue

#                 df_new = (
#                     spark.read.option("header","true").option("inferSchema","true").csv(file.path)
#                     .withColumn("_source_file", F.lit(file.name))
#                     .withColumn("_ingested_at", F.to_timestamp(F.lit(file_mod_iso)))
#                 )

#                 df_new.write.format("delta").mode("append").option("mergeSchema","true").save(output_path)
#                 log_ingestion(table_name, file.name, file_mod_iso)

#                 spark.sql(f"""
#                   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#                   USING DELTA
#                   LOCATION '{output_path}'
#                 """)
#                 spark.sql(f"ALTER TABLE {catalog}.{schema}.{table_name} SET LOCATION '{output_path}'")
#                 print(f"Appended {file.name} into {table_name}")

#         else:
#             # For all other tables → full overwrite
#             #latest_file = sorted(files, key=lambda f: f.modificationTime)[-1]
#             for file in files:
#                 file_mod_iso = datetime.utcfromtimestamp(file.modificationTime / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
#                 if is_file_already_ingested(table_name,file.name):
#                     print(f"Already ingested, skipping {file.name}")
#                     continue
                
#                 df = (
#                     spark.read.option("header","true").option("inferSchema","true").csv(file.path)
#                     .withColumn("_source_file", F.lit(file.name))
#                     .withColumn("_ingested_at", F.current_timestamp())
#                 )
#                 for c in df.columns:
#                     new_col = sanitize_name(c)
#                     if new_col != c:
#                         df = df.withColumnRenamed(c, new_col)

#                 df.write.format("delta").mode("append").option("mergeSchema","true").save(output_path)
#                 log_ingestion(table_name, file.name, file_mod_iso)

#                 spark.sql(f"""
#                 CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#                 USING DELTA
#                 LOCATION '{output_path}'
#                 """)

#                 print(f"Overwrote {table_name} with {file.name}")

#     print("Bronze ingestion run complete.")


In [0]:
from delta.tables import DeltaTable
from datetime import datetime, timezone
import pyspark.sql.functions as F
import os

def ingest_latest_files(bronze_path: str, catalog: str, schema: str):
    def is_file_already_ingested(table_name: str, file_name: str, file_mod_iso: str) -> bool:
        if DeltaTable.isDeltaTable(spark, ingestion_log_path):
            log_df = spark.read.format("delta").load(ingestion_log_path)
            return (
                log_df.filter(
                    (F.col("table_name") == table_name) & 
                    (F.col("file_name") == file_name) &
                    (F.col("file_mod_ts") == file_mod_iso)
                )
                .limit(1)
                .count() > 0
            )
        return False

    def log_ingestion(table_name: str, file_name: str, file_mod_iso: str):
        row = [(table_name, file_name, file_mod_iso,
                datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"))]

        log_df = spark.createDataFrame(
            row, schema=["table_name", "file_name", "file_mod_ts", "logged_at"]
        )

        if DeltaTable.isDeltaTable(spark, ingestion_log_path):
            log_df.write.format("delta").mode("append").save(ingestion_log_path)
        else:
            log_df.write.format("delta").mode("overwrite").save(ingestion_log_path)

    #Listing subfolders in Bronze
    subfolders = [f.path for f in dbutils.fs.ls(bronze_path) if f.isDir()]

    for folder in subfolders:
        table_name = os.path.basename(folder.rstrip("/")).lower()
        output_path = f"{bronze_path}delta/{table_name}/"

       # finding csv files in each folder
        files = [f for f in dbutils.fs.ls(folder) if f.name.endswith(".csv")]
        if not files:
            print(f"No files found in {folder}")
            continue

        #processing new files if found
        for file in files:
            file_mod_iso = datetime.utcfromtimestamp(
                file.modificationTime / 1000.0
            ).strftime("%Y-%m-%d %H:%M:%S")

            if is_file_already_ingested(table_name, file.name, file_mod_iso):
                print(f"Skipping {file.name} — already ingested.")
                continue

            # Read CSV into DataFrame
            df = (
                spark.read.option("header", "true")
                .option("inferSchema", "true")
                .csv(file.path)
                .withColumn("_source_file", F.lit(file.name))
                .withColumn("_ingested_at", F.to_timestamp(F.lit(file_mod_iso)))
            )

            # Standardize column names
            for c in df.columns:
                new_col = sanitize_name(c)
                if new_col != c:
                    df = df.withColumnRenamed(c, new_col)

            # Append into Delta
            df.write.format("delta").mode("append").option("mergeSchema", "true").save(output_path)
            log_ingestion(table_name, file.name, file_mod_iso)

            # Register table in catalog
            spark.sql(f"""
              CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
              USING DELTA
              LOCATION '{output_path}'
            """)

            print(f"Appended {file.name} into {table_name}")

    print("Bronze ingestion run complete.")


In [0]:
# Running the ingestion funtion
ingestion_log_path = f"{bronze_path}ingestion_log/"
ingest_latest_files(bronze_path, catalog, schema)

In [0]:
# df = spark.table("veersadatabricks.bronze.patientinfo")
# print(df.count())
# df.filter(col("_source_file") == "PatientInfo - Copy.csv").display()

In [0]:
# from datetime import datetime, timezone
# from delta.tables import DeltaTable
# import pyspark.sql.functions as F
# from pyspark.sql.functions import col

# def ingest_latest_files(bronze_path, catalog, schema):
#     # Ensure schema is set
#     if schema is None:
#         raise ValueError("Schema is None. Make sure your environment variable is loaded correctly.")

#     # Config: where to keep the ingestion log (Delta)
#     ingestion_log_path = f"{bronze_path}ingestion_log/"   # delta path for ingestion audit/log
#     patient_table_name = "patientinfo"                    # sanitized name you use for patient table

#     # helper to check if a file already logged for a table
#     def is_file_already_ingested(table_name, file_name):
#         if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#             log_df = spark.read.format("delta").load(ingestion_log_path)
#             return log_df.filter((col("table_name") == table_name) & (col("file_name") == file_name)).limit(1).count() > 0
#         return False

#     # helper to append a log row
#     def log_ingestion(table_name, file_name, file_mod_iso):
#         row = [(table_name, file_name, file_mod_iso, datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"))]
#         log_df = spark.createDataFrame(row, schema=["table_name","file_name","file_mod_ts","logged_at"])
#         if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#             log_df.write.format("delta").mode("append").save(ingestion_log_path)
#         else:
#             log_df.write.format("delta").mode("overwrite").save(ingestion_log_path)

#     # List all subfolders in Bronze (same as your original)
#     subfolders = [f.path for f in dbutils.fs.ls(bronze_path) if f.isDir() and not f.name.startswith("_") and f.name != "delta/"]

#     for folder in subfolders:
#         table_name = sanitize_name(os.path.basename(folder.rstrip("/")))

#         # List files in subfolder
#         files = [f for f in dbutils.fs.ls(folder) if f.name.endswith(".csv")]
#         if not files:
#             print(f"No files found in {folder}")
#             continue

#         # Pick file with latest modification time
#         latest_file = sorted(files, key=lambda f: f.modificationTime)[-1]
#         input_path = latest_file.path
#         output_path = f"{bronze_path}delta/{table_name}/"

#         print(f"Processing file {latest_file.name} for table {schema}.{table_name}")

#         # Convert file mod time (ms since epoch) -> ISO timestamp string
#         file_mod_iso = datetime.utcfromtimestamp(latest_file.modificationTime / 1000.0).strftime("%Y-%m-%d %H:%M:%S")

#         # Read CSV
#         df = (spark.read
#               .option("header", "true")
#               .option("inferSchema", "true")
#               .csv(input_path)
#         )

#         # Add deterministic metadata columns (crucial for idempotency / SCD2)
#         df = df.withColumn("_source_file", F.lit(latest_file.name))\
#                .withColumn("ingested_at", F.to_timestamp(F.lit(file_mod_iso)))  # deterministic timestamp
#         df =  df.withColumn("_source_file_mod_ts", F.to_timestamp(F.lit(file_mod_iso)))
#         # Sanitize column names (preserve newly added metadata columns)
#         for c in df.columns:
#             new_col = sanitize_name(c)
#             if new_col != c:
#                 df = df.withColumnRenamed(c, new_col)

#         # --- Special behavior for patientinfo: idempotent append + ingestion log ---
#         if table_name == patient_table_name:
#             if is_file_already_ingested(table_name, latest_file.name):
#                 print(f"Skipping {latest_file.name} for {table_name} — already ingested.")
#                 continue

#             # Append the new snapshot (will create table if not exists)
#             df.write.format("delta").mode("append").option("mergeSchema", "true").save(output_path)
#             # record ingestion
#             log_ingestion(table_name, latest_file.name, file_mod_iso)

#             # Register table in catalog if desired (safe even if already exists)
#             try:
#                 spark.sql(f"""
#                   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#                   USING DELTA
#                   LOCATION '{output_path}'
#                 """)
#             except Exception as e:
#                 print("Could not register table in catalog (maybe catalog not configured):", e)

#             print(f"Ingested {latest_file.name} -> {output_path} (append)")

#         else:
#             # Default/legacy behavior: full overwrite (unchanged)
#             df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(output_path)

#             # Register as external table in catalog
#             try:
#                 spark.sql(f"""
#                   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#                   USING DELTA
#                   LOCATION '{output_path}'
#                 """)
#             except Exception as e:
#                 print("Could not register table in catalog (maybe catalog not configured):", e)

#             print(f"Overwrote {output_path} with {latest_file.name}")

#     print("Ingestion run complete.")


In [0]:
# def ingest_csv_folder(bronze_path, catalog, schema):
#     # List all files in Bronze folder
#     files = dbutils.fs.ls(bronze_path)
    
#     for file in files:
#         if file.name.endswith(".csv"):  # process only CSV files
#             table_name = os.path.splitext(file.name)[0]  # filename without .csv
#             input_path = bronze_path + file.name
#             output_path = bronze_path + f"delta/{table_name}/"

#             print(f"Processing {file.name} → table {schema}.{table_name}")

#             # Read CSV
#             df = (spark.read
#                   .option("header", "true")
#                   .option("inferSchema", "true")
#                   .csv(input_path)
#                   .withColumn("_ingestion_date", current_timestamp())
#                   .withColumn("_source_file", input_file_name()))

#             # Write to Delta
#             df.write.format("delta").mode("overwrite").save(output_path)

#             # Register as external Delta table
#             spark.sql(f"""
#               CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#               USING DELTA
#               LOCATION '{output_path}'
#             """)


In [0]:
# def ingest_latest_files(bronze_path, catalog, schema):
#     # Ensure schema is set
#     if schema is None:
#         raise ValueError("Schema is None. Make sure your environment variable is loaded correctly.")

#     # List all subfolders in Bronze
#     subfolders = [f.path for f in dbutils.fs.ls(bronze_path) if f.isDir() and not f.name.startswith("_") and f.name != "delta/"]

#     for folder in subfolders:
#         table_name = sanitize_name(os.path.basename(folder.rstrip("/")))

#         # List files in subfolder
#         files = [f for f in dbutils.fs.ls(folder) if f.name.endswith(".csv")]
#         if not files:
#             print(f"No files found in {folder}")
#             continue

#         # Pick file with latest modification time
#         latest_file = sorted(files, key=lambda f: f.modificationTime)[-1]
#         input_path = latest_file.path
#         output_path = f"{bronze_path}delta/{table_name}/"

#         print(f"Loading {latest_file.name} into table {schema}.{table_name}")

#         # Read CSV
#         df = (spark.read
#               .option("header", "true")
#               .option("inferSchema", "true")
#               .csv(input_path)
#               .withColumn("ingested_at", F.current_timestamp())
#               #.withColumn("_source_file", F.col("_metadata.file_path"))
#         )

#         # Sanitize column names
#         for col in df.columns:
#             new_col = sanitize_name(col)
#             if new_col != col:
#                 df = df.withColumnRenamed(col, new_col)

#         # Write to Delta
#         df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(output_path)

#         # Register as external table in catalog
#         spark.sql(f"""
#           CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name}
#           USING DELTA
#           LOCATION '{output_path}'
#         """)


In [0]:
# # Databricks notebook source
# # MAGIC %pip install python-dotenv

# # COMMAND ----------

# import os
# import requests
# from dotenv import load_dotenv
# from pyspark.sql import functions as F

# # Load environment variables
# load_dotenv()

# # COMMAND ----------

# # Env variables
# DATABRICKS_INSTANCE = os.getenv("DATABRICKS_INSTANCE")
# CONFIG_JOB_ID = os.getenv("CONFIG_JOB_ID")
# TOKEN = os.getenv("DATABRICKS_TOKEN")

# # Your schema namespace (change if needed)
# SCHEMA_NAME = "agent_productivity.config"

# # COMMAND ----------

# # Run Databricks Job
# def run_job(job_id: str):
#     url = f"{DATABRICKS_INSTANCE}/api/2.1/jobs/run-now"
#     headers = {"Authorization": f"Bearer {TOKEN}"}
#     payload = {"job_id": job_id}

#     response = requests.post(url, headers=headers, json=payload)
#     response.raise_for_status()
#     return response.json()

# # Guard clause → ensure schema exists
# if not spark.catalog.databaseExists(SCHEMA_NAME):
#     print(f"Schema {SCHEMA_NAME} missing → running job {CONFIG_JOB_ID}")
#     run_output = run_job(CONFIG_JOB_ID)
#     print("Job triggered:", run_output)

# # COMMAND ----------

# # Retry wrapper for notebook runs
# def run_with_retry(notebook, timeout, max_retries=3):
#     num_retries = 0
#     while True:
#         try:
#             return dbutils.notebook.run(notebook, timeout)
#         except Exception as e:
#             if num_retries >= max_retries:
#                 raise e
#             else:
#                 print(f"Retrying ({num_retries+1}/{max_retries}) due to error: {e}")
#                 num_retries += 1

# # Example usage
# update_config = run_with_retry(
#     "/Users/your_email/ingestion/update_config", 
#     0, 
#     max_retries=3
# )

# # COMMAND ----------

# # Utility: clean header rows (avoid duplicate headers in CSVs)
# def clean_header_rows(df, schema):
#     first_col = schema.fields[0].name
#     return df.filter(F.col(first_col) != first_col)

# # Utility: add metadata column
# def with_metadata(df):
#     return df.withColumn("ingested_at", F.current_timestamp())

# # COMMAND ----------

# # Example ingestion function
# def reader(src):
#     # Infer schema once from latest file
#     schema = (spark.read
#               .format("csv")
#               .option("header", "true")
#               .load(src["latest_file_path"]).schema)

#     # Read batch (not streaming for now)
#     df = (spark.read
#           .format("csv")
#           .option("header", "true")
#           .schema(schema)
#           .load(src["folder_path"]))

#     df = clean_header_rows(df, schema)
#     df = with_metadata(df)
#     return df

# # COMMAND ----------

# # Load config
# CONFIG_PATH = "agent_productivity.config.ingestion_config"
# sources = spark.table(CONFIG_PATH)

# # Process sources
# for src in sources.collect():
#     df = reader(src)
#     target = f"agent_productivity.ingestion.{src['table_name']}"
#     spark.sql("CREATE SCHEMA IF NOT EXISTS agent_productivity.ingestion")

#     # Write batch
#     df.write.mode("append").saveAsTable(target)
#     print(f"Ingested {src['table_name']} into Bronze")

# # Update config table with ingestion timestamp
# sources.withColumn("last_ingestion_time", F.current_timestamp()) \
#        .write.mode("overwrite").saveAsTable(CONFIG_PATH)


In [0]:
# # Ingestion log path
# ingestion_log_path = f"{bronze_path}ingestion_log/"

#     # --- Clear log at the beginning ---
# if DeltaTable.isDeltaTable(spark, ingestion_log_path):
#     spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.bronze_ingestion_log")
#     dbutils.fs.rm(ingestion_log_path, recurse=True)
# print("✅ Cleared ingestion log. Fresh run starting.")