### 📘 **Bronze_Orchestrator – RAW → BRONZE Ingestion (Synapse/Fabric)**
---

#### 🎯 **Purpose**
This notebook ingests **RAW data** into a **Bronze Lakehouse layer** using *PySpark*.  
- 📂 Supports **CSV** and **ZIP** inputs  
- 🧩 Derives schema and row counts  
- 🗄 Archives RAW files after ingestion  
- 📝 Stages metadata into **`stage.ObjectList`** for lineage/control  
- 🔑 Optional **JDBC read** from *Azure SQL* via **Key Vault**

---

#### ✅ **Prerequisites**
- Synapse or Fabric workspace with **PySpark kernel** and **Lakehouse attached**  
- Access to **OneLake Files** paths (*RAW*, *Bronze*, *Archive*)  
- If JDBC logging is used:  
  - Azure SQL database  
  - Secrets stored in **Azure Key Vault** (server, database, user, password)  
- Proper **Spark pool** or Fabric runtime permissions  

---

#### ⚙️ **Key Parameters** *(cells 1–3)*
- **`SRCDB_NAME`** → Logical source DB name *(e.g., `OTM`)*  
- **`PARALLELISM`** → Number of Spark threads/partitions  
- **`INCLUDE_CSV` / `INCLUDE_ZIP`** → Enable or disable ingestion formats  

---

#### 📂 **Derived / Runtime Variables**
- **`TMP_EXTRACT_DIR`** → Temp folder for extracted ZIPs  
- **`DEFAULT_SCHEMA`** → *`<srcdb>silver`* (adjust if needed)  
- **`RUN_DATETIME_UTC` / `RUN_ISO`** → Execution timestamp (*watermark*)  
- **`DELTA_TABLE_NAME`** → *`stage.ObjectList`*  

---

#### 📥 **Inputs**
- Files under **RAW** in Lakehouse  
  Example:  abfss://Dev_workspace@onelake.dfs.fabric.microsoft.com/BronzeSilver_Dev.Lakehouse/Files

- Optional **JDBC query** from *Azure SQL*

---

#### 📤 **Outputs**
- **Bronze Files** → partitioned CSVs stored in `/bronze/<SRCDB_NAME>/`  
- **Metadata** → rows written into **`stage.ObjectList`** (*Delta* or *SQL staging table*)  

---

#### 🛠 **Configuration Tips**
- Adjust **`DEFAULT_SCHEMA`** and **`DELTA_TABLE_NAME`** to match naming standards  
- Update **`TMP_EXTRACT_DIR`** if RAW path differs  
- Use **Key Vault** or *Managed Identity* for secrets — *avoid hardcoding*  

---

#### ⚠️ **Limitations & Notes**
- No widgets → parameters must be edited directly in the first cells  
- Not using **Autoloader/DLT** → ingestion is *batch-based Spark*  
- Handle **schema drift** for varying CSVs  
- ZIP extraction can be slow for very large files  
- Ensure Delta writes if you require **ACID merges**  

---

#### 🔍 **Validation Checklist**
- ✅ New files appear in **Bronze path**  
- ✅ **`stage.ObjectList`** contains rows for ingested objects  
- ✅ If JDBC used → row counts align between **source** and **Lakehouse**


## 1) Parameters

In [None]:
SRCDB_NAME = "OTM"
PARALLELISM = 6
INCLUDE_CSV = True
INCLUDE_ZIP = True

In [None]:
from datetime import datetime, timezone

RUNDATE = datetime.utcnow() #.strftime("%Y-%m-%d_%H-%M-%S-%f")

RUNDATE = str(datetime.now()).replace(" ", "_") 

# --- Lakehouse source locations (adjust) ---

ABFSS_PATH = "abfss://Dev_workspace@onelake.dfs.fabric.microsoft.com/BronzeSilver.Lakehouse/Files" 
RAW_PATH = f"{ABFSS_PATH}/RAW/{SRCDB_NAME}"
BRONZE_PATH = f"{ABFSS_PATH}/bronze/{SRCDB_NAME}"
ARCHIVE_PATH = f"{ABFSS_PATH}/RAW/Archive/{SRCDB_NAME}/{RUNDATE}/"

# Include subdirectories recursively
RECURSIVE = False

# CSV options
CSV_HEADER = True
CSV_DELIMITER = ','
CSV_QUOTE = '"'
CSV_INFER_SCHEMA = True

# Where to drop extracted CSVs from ZIPs (within Lakehouse Files area)
TMP_EXTRACT_DIR = "Files/RAW/_tmp_zip_extract"

# Object metadata
DEFAULT_SCHEMA = f"{SRCDB_NAME.lower()}silver"
DATE_FORMAT = "yyyy-MM-dd_HH:mm:ss.S"

# Watermark
RUN_DATETIME_UTC = datetime.now(timezone.utc)   # notebook run instant
RUN_ISO = RUN_DATETIME_UTC.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

# Delta names
DELTA_TABLE_NAME = "stage_ObjectList"           # lakehouse Delta table name


## 2) Imports & helpers

In [None]:

import os, io, json, uuid, re, zipfile
from typing import List, Dict
from notebookutils import mssparkutils as msu

from pyspark.sql import functions as F, types as T

def log(msg: str):
    from datetime import datetime as _dt
    print(f"[{_dt.utcnow().isoformat()}Z] {msg}")

def is_csv(path: str) -> bool:
    return path.lower().endswith(".csv")

def is_zip(path: str) -> bool:
    return path.lower().endswith(".zip")

def derive_object_name(file_name: str) -> str:
    name = os.path.basename(file_name)
    if name.lower().endswith(".csv"):
        name = name[:-4]
        name = re.sub(r'_?\d{11,14}(?:\.\d+)?$', '', name) # added by Shuo to handle mini batches
        name = re.sub(r'\d{12,14}(?:\.\d+)?$', '', name) # 2 different scenarios
    if name.startswith("file_") and "-batch" in name:
        name = name.replace("file_", "").split("-batch")[0]
    return name

def read_csv_with_spark(csv_path: str):
    reader = (spark.read
              .option("header", str(CSV_HEADER).lower())
              .option("inferSchema", str(CSV_INFER_SCHEMA).lower())
              .option("quote", CSV_QUOTE)
              .option("sep", CSV_DELIMITER))
    return reader.csv(csv_path)

def schema_to_json(df_schema) -> List[Dict[str,str]]:
    items = []
    for f in df_schema.fields:
        items.append({"name": f.name, "type": f.dataType.simpleString()})
    return items

def to_ddl(schema_json: List[Dict[str, str]]) -> str:
    cols = [f"`{c['name']}` {c['type']}" for c in schema_json]
    return ", ".join(cols)

def extract_zip_to_tmp(zip_path: str, tmp_root: str) -> List[str]:
    """
    Extract *.csv members from a ZIP stored in ABFSS to a temp ABFSS folder.
    Read ZIP via Spark (binaryFiles). Write CSVs in chunks:
      - prefer mssparkutils.fs.open('wb') if available
      - else use Hadoop FileSystem create() stream
    Returns the ABFSS paths of the extracted CSVs.
    """
    import io, os, re, zipfile

    CHUNK = 4 * 1024 * 1024  # 4MB chunks to avoid Py4J large-array issues

    # --- small helpers kept local for drop-in use ---

    def _read_zip_bytes(p: str) -> bytes:
        # Spark driver read (PortableDataStream or raw bytes)
        rdd = spark.sparkContext.binaryFiles(p, minPartitions=1)
        items = rdd.take(1)
        if not items:
            raise RuntimeError(f"Could not read ZIP: {p}")
        ds = items[0][1]
        try:
            return ds.read()          # PortableDataStream
        except AttributeError:
            return ds                 # already bytes

    def _write_bytes_stream(dest: str, src_fileobj):
        """
        Stream bytes from a Python file-like (src_fileobj) to ABFSS at dest.
        Uses mssparkutils.fs.open('wb') if present; else Hadoop FS OutputStream.
        """
        if msu is not None and hasattr(msu.fs, "open"):
            # mssparkutils binary write
            parent = dest.rsplit("/", 1)[0]
            try:
                msu.fs.mkdirs(parent)
            except Exception:
                pass
            with msu.fs.open(dest, 'wb') as out:
                while True:
                    chunk = src_fileobj.read(CHUNK)
                    if not chunk:
                        break
                    out.write(chunk)
            return

        # Hadoop FS fallback (binary-safe)
        jvm = spark.sparkContext._jvm
        jsc = spark.sparkContext._jsc
        hconf = jsc.hadoopConfiguration()
        jpath = jvm.org.apache.hadoop.fs.Path(dest)
        fs = jpath.getFileSystem(hconf)
        parent = jpath.getParent()
        if parent is not None and not fs.exists(parent):
            fs.mkdirs(parent)
        out = fs.create(jpath, True)  # overwrite
        try:
            while True:
                chunk = src_fileobj.read(CHUNK)
                if not chunk:
                    break
                # pass small chunks to Py4J to avoid NegativeArraySizeException
                out.write(bytearray(chunk))
            out.hflush()
            out.hsync()
        finally:
            out.close()

    # --- actual extraction logic ---
    tmp_root = tmp_root.rstrip("/")
    zip_bytes = _read_zip_bytes(zip_path)

    csv_paths: List[str] = []
    zip_base = os.path.splitext(os.path.basename(zip_path))[0]

    with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z:
        for info in z.infolist():
            name = str(info.filename)
            is_dir = info.is_dir() if hasattr(info, "is_dir") else name.endswith(("/", "\\"))
            if is_dir or not name.lower().endswith(".csv"):
                continue

            # sanitize inner path
            rel = name.replace("\\", "/")
            rel = re.sub(r"[\r\n]", "", rel).replace("..", "").strip("/")

            dest_path = f"{tmp_root}/{zip_base}/{rel}"

            # stream from ZIP member to ABFSS in chunks
            with z.open(info, "r") as src:
                _write_bytes_stream(dest_path, src)

            csv_paths.append(dest_path)

    return csv_paths

## 3) Discover source files

In [None]:
# 1) Load ALL files in the folder (no wildcard, no recursion)
df_all = (spark.read.format("binaryFile")
          .option("recursiveFileLookup", "false")
          .load(RAW_PATH)                      # <-- loads every file in RAW/ERP
          .select("path", "modificationTime", "length"))

# 2) Case-insensitive filter for CSV/ZIP
lower_path = F.lower(F.col("path"))
df_files = (df_all
            .where(lower_path.rlike(r"\.(csv|zip)$"))
            .withColumn("ftype", F.when(lower_path.endswith(".csv"), F.lit("csv"))
                                   .when(lower_path.endswith(".zip"), F.lit("zip"))
                                   .otherwise(F.lit("other"))))

count_candidates = df_files.count()
print(f"Discovered {count_candidates} CSV/ZIP files in {RAW_PATH}")
display(df_files.orderBy("path").limit(200))

# 3) Hand off to your scanning step
all_files = [(r["path"], r["ftype"]) for r in df_files.select("path","ftype").collect()]

## 4) Scan rows and schemas

In [None]:
# --- threaded ingestion of CSVs & ZIPs ---
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading, os

objects: Dict[str, Dict] = {}
objects_lock = threading.Lock()

# If you want *true* overlap of Spark jobs, enable FAIR scheduling on your cluster.
# Example (works if the cluster is configured for FAIR):
# spark.conf.set("spark.scheduler.mode", "FAIR")
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "ingest")

def _merge_object(obj: str, rec: Dict, extra_source: str | None = None):
    rec = dict(rec)  # shallow copy
    sp = rec.get('source_paths')
    if isinstance(sp, str):
        sp = [sp]
    rec['source_paths'] = list(sp or [])
    if extra_source:
        rec['source_paths'].append(extra_source)   # add here once

    with objects_lock:
        if obj not in objects:
            objects[obj] = rec
            return

        old = objects[obj]
        if isinstance(old.get('source_paths'), str):
            old['source_paths'] = [old['source_paths']]

        old['row_count'] = max(old.get('row_count', 0), rec.get('row_count', 0))

        if len(rec['schema_json']) != len(old['schema_json']):
            old['schema_json'] = rec['schema_json']
            old['schema_ddl']  = rec['schema_ddl']

        # dedupe via the loop only — no extra append afterwards
        for p in rec['source_paths']:
            if p not in old['source_paths']:
                old['source_paths'].append(p)
                
def _handle_csv(path: str):
    try:
        df = read_csv_with_spark(path)
        cnt = df.count()
        sch_json = schema_to_json(df.schema)
        obj = derive_object_name(path)
        out_dir = f"{BRONZE_PATH}/{obj}/{RUNDATE}/"

        rec = {
            'source_paths': [out_dir],  # keep your original semantics but normalized to list
            'row_count': cnt,
            'schema_json': sch_json,
            'schema_ddl': to_ddl(sch_json),
        }

        # Copy csv to bronze
        (df.write
            .mode("append")
            .option("header", "true")
            .option("delimiter", ",")
            .option("quote", '"')
            .option("escape", '"')
            .option("nullValue", "")
            .csv(out_dir)
        )

        _merge_object(obj, rec)
        log(f"🧾 CSV processed ✅ \n {path} → rows={cnt} • cols={len(sch_json)}")
    except Exception as e:
        log(f"❌ ERROR reading CSV | {path} | {e}")

def _handle_zip(path: str):
    try:
        csv_paths = extract_zip_to_tmp(path, TMP_EXTRACT_DIR)
        log(f"🗜️ ZIP extracted ✅ | {path} → {len(csv_paths)} file(s)")
    except Exception as e:
        log(f"💥 ERROR extracting ZIP | {path} | {e}")
        return

    # Process the inner CSVs (still on this worker thread).
    # If you want *each* inner CSV parallelized too, you can submit them to the same executor.
    for cpath in csv_paths:
        try:
            df = read_csv_with_spark(cpath)
            cnt = df.count()
            sch_json = schema_to_json(df.schema)
            obj = derive_object_name(cpath)
            out_dir = f"{BRONZE_PATH}/{obj}/{RUNDATE}/"

            rec = {
                'source_paths': [out_dir],
                'row_count': cnt,
                'schema_json': sch_json,
                'schema_ddl': to_ddl(sch_json),
            }

            (df.write
                .mode("append")
                .option("header", "true")
                .option("delimiter", ",")
                .option("quote", '"')
                .option("escape", '"')
                .option("nullValue", "")
                .csv(out_dir)
            )

            # Your original loop appended the inner CSV path into source_paths; preserve that.
            _merge_object(obj, rec, extra_source=cpath)
            log(f"📦 ZIP member processed ✅ \n {cpath} → rows={cnt} • cols={len(sch_json)}")
        except Exception as ie:
            log(f"⚠️ ERROR reading inner CSV | {cpath} | {ie}")

def _handle_item(item):
    path, ftype = item
    if ftype == 'csv':
        _handle_csv(path)
    elif ftype == 'zip':
        _handle_zip(path)
    else:
        log(f"ℹ️ Skipping unsupported type: {ftype} | {path}")

# --- fan out work across threads ---
cpu_cap = (os.cpu_count() or 4) * 2
max_workers = max(1, min(int(PARALLELISM), cpu_cap))  # cap by CPU to be safe
with ThreadPoolExecutor(max_workers=max_workers) as pool:
    futures = [pool.submit(_handle_item, it) for it in all_files]
    for _ in as_completed(futures):
        pass  # just drain exceptions if any; errors are already logged inside handlers

log(f"📊 Total objects discovered: {len(objects)}")
print(objects)

In [None]:
# Drop zip extract temp dir
try:
    mssparkutils.fs.rm(TMP_EXTRACT_DIR.rstrip("/"), True)
except:
    pass

### Archive batch to /Archive

In [None]:
# Archive files in RAW to CONTROL_PATH
mssparkutils.fs.cp(RAW_PATH.rstrip('/')+'/', ARCHIVE_PATH.rstrip('/')+'/', True)

for item in mssparkutils.fs.ls(RAW_PATH):
    if not item.isDir:                # files only
         mssparkutils.fs.rm(item.path, False)

## 5) Build `stage.ObjectList` rows (with watermark RunDateUtc)

In [None]:
from pyspark.sql import types as T

# Ensure a NOT NULL value for DatabaseName (it’s required in SQL)
db_name = globals().get("jdbc_database", "")   # falls back to '' if not set

stage_rows = []
for obj, meta in objects.items():
    stage_rows.append({
        "ObjectSchema":          DEFAULT_SCHEMA,               # nvarchar(255) NULL
        "ObjectName":            obj,                          # nvarchar(255) NOT NULL
        "EstimatedRowCount":     int(meta['row_count']),       # bigint NULL
        "WaterMarkColumn":       "",                           # nvarchar(255) NULL
        "WaterMarkColumnValue":  RUN_ISO,                      # nvarchar(255) NULL
        "WaterMarkType":         "datetime",                   # varchar(50)  NULL
        "TableName":             obj,                          # nvarchar(255) NOT NULL
        "CandidateKey":          None,                         # nvarchar(1000) NULL
        "DatabaseName":          SRCDB_NAME,                      # nvarchar(255) NOT NULL
    })

stage_schema = T.StructType([
    T.StructField("ObjectSchema",         T.StringType(),    True),
    T.StructField("ObjectName",           T.StringType(),    False),
    T.StructField("EstimatedRowCount",    T.LongType(),      True),
    T.StructField("WaterMarkColumn",      T.StringType(),    True),
    T.StructField("WaterMarkColumnValue", T.StringType(),    True),
    T.StructField("WaterMarkType",        T.StringType(),    True),
    T.StructField("TableName",            T.StringType(),    False),
    T.StructField("CandidateKey",         T.StringType(),    True),
    T.StructField("DatabaseName",         T.StringType(),    False),
])

stage_df = spark.createDataFrame(stage_rows, schema=stage_schema)


display(stage_df)

## 6) Log to ControlDb


In [None]:
# --- Azure Key Vault Configuration ---
key_vault_uri = "https://dg-fabric-kv-dev.vault.azure.net/"
secret_name = "controlDBConnection"

In [None]:
from pyspark.sql import SparkSession
from urllib.parse import urlparse, parse_qs


# --- Function to parse JDBC URL ---
def parse_jdbc_url(jdbc_url_string):
    """
    Parses a JDBC URL string and returns a dictionary of its components.
    
    Args:
        jdbc_url_string (str): The full JDBC connection string.
        
    Returns:
        dict: A dictionary containing the parsed key-value pairs.
    """
    # The first part is the protocol and the server/port.
    # The remaining parts are key-value pairs separated by semicolons.
    parts = jdbc_url_string.split(';')
    
    # Extract the server and port from the first part
    protocol_and_host = parts[0].split("://")[1]
    server_port_part = protocol_and_host.split(":")
    
    params = {}
    params['server'] = server_port_part[0]
    # Check if a port is present, otherwise use default
    if len(server_port_part) > 1:
        params['port'] = server_port_part[1]
    else:
        params['port'] = '1433' # Default SQL Server port
    
    # Parse the remaining key-value pairs
    for part in parts[1:]:
        if '=' in part:
            key, value = part.split('=', 1)
            params[key.strip()] = value.strip()
    return params


In [None]:
# --- Retrieve the secret from Azure Key Vault ---
print(f"Retrieving secret '{secret_name}' from Azure Key Vault...")
try:  
    jdbc_connection_string_param = notebookutils.credentials.getSecret(key_vault_uri, secret_name)
    print("Secret retrieved successfully.")
except Exception as e:
    print(f"An error occurred while retrieving the secret from Key Vault: {e}")
    jdbc_connection_string_param = None
    
if jdbc_connection_string_param:

    connection_params = parse_jdbc_url(jdbc_connection_string_param)

    jdbc_server = connection_params.get('server')
    jdbc_database = connection_params.get('database')
    jdbc_user = connection_params.get('user')
    jdbc_password = connection_params.get('password')

    jdbc_url = (
        f"jdbc:sqlserver://{jdbc_server}:1433;"
        f"database={jdbc_database};"
        "encrypt=true;"
        "trustServerCertificate=false;"
        "hostNameInCertificate=*.database.windows.net;"
        "loginTimeout=30;"
    )


    try:    
        # Overwrite rows in stage.ObjectList (keeps table definition, PKs, indexes)
        (stage_df.write
                .format("jdbc")
                .mode("append")
                #.option("truncate", "true")   # TRUNCATE + INSERT
                .option("url", jdbc_url)
                .option("dbtable", "stage.ObjectList")   # or "[stage].[ObjectList]"
                .option("user", jdbc_user)
                .option("password", jdbc_password)
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .save())

    except Exception as e:
        print(f"An error occurred during Spark operation: {e}")
        
else:
    print("JDBC connection string could not be retrieved. Aborting Spark session.")






In [None]:


# --- Retrieve the secret from Azure Key Vault ---
print(f"Retrieving secret '{secret_name}' from Azure Key Vault...")
try:  
    jdbc_connection_string_param = notebookutils.credentials.getSecret(key_vault_uri, secret_name)
    print("Secret retrieved successfully.")
except Exception as e:
    print(f"An error occurred while retrieving the secret from Key Vault: {e}")
    jdbc_connection_string_param = None
    
if jdbc_connection_string_param:

    connection_params = parse_jdbc_url(jdbc_connection_string_param)

    jdbc_server = connection_params.get('server')
    jdbc_database = connection_params.get('database')
    jdbc_user = connection_params.get('user')
    jdbc_password = connection_params.get('password')

    jdbc_url = (
        f"jdbc:sqlserver://{jdbc_server}:1433;"
        f"database={jdbc_database};"
        "encrypt=true;"
        "trustServerCertificate=false;"
        "hostNameInCertificate=*.database.windows.net;"
        "loginTimeout=30;"
    )

    # The SQL query to be executed
    query_to_run = "SELECT TOP (1000) * FROM [dbo].[ObjectList]" 


    try:
        df = spark.read \
          .format("jdbc") \
          .option("url", jdbc_url) \
          .option("dbtable", f"({query_to_run}) AS T") \
          .option("user", jdbc_user) \
          .option("password", jdbc_password) \
          .load()

        # Show the resulting DataFrame
        print("\nDataFrame successfully loaded from Azure SQL database:")
        display(df)
        

    except Exception as e:
        print(f"An error occurred during Spark operation: {e}")
        
else:
    print("JDBC connection string could not be retrieved. Aborting Spark session.")


