In [0]:
%run ../initializeJobVariables

In [0]:
import uuid
import re
import os
import pandas as pd
import unicodedata

from pyspark.sql import functions as F
def clean_column_name(name: str) -> str:
    cleaned = re.sub(r"[\s\-\/]+", "_", name.strip())
    cleaned = re.sub(r"_+", "_", cleaned).strip("_")
    return cleaned

def dedupe_names(names):
    seen = {}
    deduped = []
    for name in names:
        if name not in seen:
            seen[name] = 0
            deduped.append(name)
        else:
            seen[name] += 1
            deduped.append(f"{name}_{seen[name]}")
    return deduped


def normalize_columns(df):
    cleaned = [clean_column_name(sanitize_identifier(c)) for c in df.columns]
    cleaned = dedupe_names(cleaned)
    for original, new_name in zip(df.columns, cleaned):
        if original != new_name:
            df = df.withColumnRenamed(original, new_name)
    return df

def derive_table_name(workbook_file_name: str) -> str:
    base = os.path.splitext(os.path.basename(workbook_file_name))[0]
    return clean_column_name(base)


def copy_to_dbfs_tmp(source_path: str) -> str:
    tmp_name = f"{uuid.uuid4().hex}.xlsx"
    dbfs_tmp = f"dbfs:/tmp/{tmp_name}"
    dbutils.fs.cp(source_path, dbfs_tmp, True)
    return dbfs_tmp

def dbfs_to_local(dbfs_path: str) -> str:
    return "/dbfs/" + dbfs_path.replace("dbfs:/", "")
    
def get_local_workbook(source_path: str):
    dbfs_tmp = copy_to_dbfs_tmp(source_path)
    return dbfs_tmp, dbfs_to_local(dbfs_tmp)

def get_sheet_names(local_path: str):
    xls = pd.ExcelFile(local_path)
    return xls.sheet_names

def sanitize_identifier(name: str) -> str:
    # Remove accents
    name = unicodedata.normalize('NFKD', name).encode('ascii', 'ignore').decode('ascii')
    
    # Replace invalid characters
    name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
    
    # Ensure doesn't start with digit
    if name and name[0].isdigit():
        name = "_" + name
        
    return name
    
def clean_col(c):
   return re.sub(r'[^a-zA-Z0-9_]', '_', c)


def detect_data_sheet(local_path: str, explicit_sheet: str) -> str:
    if explicit_sheet:
        return explicit_sheet
 
    try:
        sheet_names = get_sheet_names(local_path)
    except Exception as exc:
        raise RuntimeError(
            "Unable to auto-detect sheet names. Set data_sheet_name."
        ) from exc

 
    best_sheet = None
    best_count = -1
    failures = []
 
    for sheet in sheet_names:
        try:
            df = pd.read_excel(local_path, sheet_name=sheet, header=0)
            if df.empty:
                continue
 
            non_null_count = df.notna().any(axis=1).sum()
 
            if non_null_count > best_count:
                best_count = non_null_count
                best_sheet = sheet
        except Exception as exc:
            failures.append((sheet, str(exc)))
            continue
 
    if best_sheet is None:
        raise RuntimeError(
            "No suitable data sheet found. Set data_sheet_name explicitly. "
            f"Sheet read failures: {failures}"
        )
 
    return best_sheet

In [0]:
from pyspark.sql import functions as F
import re
from pyspark.sql.functions import current_date
from datetime import datetime,timezone
from pyspark.sql import functions as F
import uuid
from pyspark.sql.functions import col
dict_job_data = getRunDetails()
pipeline_name = dict_job_data["jobName"]
job_id = dict_job_data["jobId"]
job_run_id = dict_job_data["jobRunId"]
start_time = datetime.now(timezone.utc).isoformat()
environment = "development"
try:
    sql = f"SELECT pipeline_id FROM {environment}_011_bronze_core.db_admin.pipeline WHERE pipeline_name = '{pipeline_name}'"
    df = spark.sql(sql)
    new_id_row = df.collect()
    pipeline_id = new_id_row[0]["pipeline_id"] if new_id_row else None
    sql = f"SELECT variable_value FROM {environment}_011_bronze_core.db_admin.pipeline_parameter WHERE pipeline_id = '{pipeline_id}' and variable_name = 'workbook_path'"
    df = spark.sql(sql)
    new_id_row = df.collect()
    master_path = new_id_row[0]["variable_value"] if new_id_row else None
    sql = f"SELECT variable_value FROM {environment}_011_bronze_core.db_admin.pipeline_parameter WHERE pipeline_id = '{pipeline_id}' and variable_name = 'db_catalog'"
    df = spark.sql(sql)
    new_id_row = df.collect()
    db_schema_name = new_id_row[0]["variable_value"] if new_id_row else None
    sql = f"SELECT etl_id FROM {environment}_011_bronze_core.db_admin.pipeline_history WHERE job_run_id = '{job_run_id}'"
    df = spark.sql(sql)
    new_id_row = df.collect()
    new_id = new_id_row[0]["etl_id"] if new_id_row else None
    pdf = master_path
    file_name = pdf.split("/")[-1]
    sql = f"SELECT file_id, file_name FROM {environment}_011_bronze_core.db_admin.pipeline_file WHERE file_name = '{file_name}'"
    df_p_file = spark.sql(sql)
    table_name = db_schema_name + "." + clean_column_name(file_name.replace(".xlsx","").replace(".xls",""))
    file_row = df_p_file.collect()
    file_exists = file_row and file_row[0]["file_name"] is not None
    
    if not file_exists:
        sql = f"""
        INSERT INTO {environment}_011_bronze_core.db_admin.pipeline_file
        (etl_id, file_name, file_path, file_date, file_import, file_status, created_date)
        VALUES (
        try_cast({new_id} as bigint),
        '{file_name}',
        '{pdf}',
        current_date(),
        TRUE,
        'Active',
        current_timestamp()
        )
        """
        spark.sql(sql)
    else:
        sql = f"""
        UPDATE {environment}_011_bronze_core.db_admin.pipeline_file
        SET file_status = 'Deleted'
        WHERE file_name = '{file_name}'
        """
        spark.sql(sql)
        sql = f"""
        INSERT INTO {environment}_011_bronze_core.db_admin.pipeline_file
        (etl_id, file_name, file_path, file_date, file_import, file_status, created_date)
        VALUES (
        try_cast({new_id} as bigint),
        '{file_name}',
        '{master_path}',
        current_date(),
        TRUE,
        'Active',
        current_timestamp()
        )
        """
        spark.sql(sql)
    sql = f"SELECT file_id, file_name FROM {environment}_011_bronze_core.db_admin.pipeline_file WHERE file_name = '{file_name}'"
    df_p_file = spark.sql(sql)
    file_row = df_p_file.collect()
    file_id = file_row[0]["file_id"] 
    print(master_path)
    dbfs_tmp, local_path = get_local_workbook(master_path)
    sql = f"SELECT variable_value FROM {environment}_011_bronze_core.db_admin.pipeline_parameter WHERE pipeline_id = '{pipeline_id}' and variable_name = 'sheet_mapping'"
    df = spark.sql(sql)
    new_id_row = df.collect()
    sheet_mapping = new_id_row[0]["variable_value"] if new_id_row else None
    try:
        if sheet_mapping:
            sheet_mapping = json.loads(sheet_mapping)
            for sheet in sheet_mapping:
                data_sheet = sheet.get('sheet_name')
                p_p = sheet.get('destination_table').split(".")
                table_name = p_p[0] + "." + p_p[1] + "." + sanitize_identifier(p_p[2]) 
                skip_rows = sheet.get('header_rows_to_skip')
                footer_rows= sheet.get('footer_rows_to_skip')
                data_sheet_name = data_sheet if data_sheet else detect_data_sheet(local_path,None)
        
                excl = pd.read_excel(local_path, sheet_name=data_sheet_name, skiprows = skip_rows)
                excl = excl.dropna(axis=1, how='all')
                if excl.empty:
                    raise RuntimeError("Selected sheet is empty")
        
                data_df = spark.createDataFrame(excl.astype(str))
                data_df = normalize_columns(data_df)
                data_df = (data_df
                .withColumn("etl_id", F.lit(new_id))
                .withColumn("file_id", F.lit(file_id))
                )
                (data_df.write.format("delta")
                    .mode("overwrite")
                    .saveAsTable(table_name)
                )
            sql = f"SELECT variable_value FROM {environment}_011_bronze_core.db_admin.pipeline_parameter WHERE pipeline_id = '{pipeline_id}' and variable_name = 'archive_folder'"
            df = spark.sql(sql)
            new_id_row = df.collect()
            archive_folder = new_id_row[0]["variable_value"] if new_id_row else None 
            archive_folder = archive_folder.rstrip("/") + "/" 
            print(f"Loaded {file_name} -> {table_name}")
            dest_base = archive_folder
            dest_path = f"{dest_base.rstrip('/')}/{file_name}"
            dbutils.fs.mv(master_path, dest_path, True)
        else:
            print("missing_sheet_name_details")
            raise Exception("missing_sheet_name_details")
    except Exception as exc:
        sql = f"SELECT variable_value FROM {environment}_011_bronze_core.db_admin.pipeline_parameter WHERE pipeline_id = '{pipeline_id}' and variable_name = 'failure_folder'"
        df = spark.sql(sql)
        new_id_row = df.collect()
        failure_folder = new_id_row[0]["variable_value"] if new_id_row else None 
        failure_folder = failure_folder.rstrip("/") + "/" 
        # errors.append((file_name, table_name, str(exc)))
        print(f"Failed {file_name} -> {table_name}: {exc}")
        dest_base = failure_folder
        dest_path = f"{dest_base.rstrip('/')}/{file_name}"
        dbutils.fs.mv(master_path, dest_path, True)
        raise Exception(exc)
except Exception as e:
    # Handle outer try block exceptions
    print(f"Pipeline execution failed: {e}")
    raise Exception(e)

In [0]:
# fileName = "/Volumes/development_011_bronze_core/atlas/atlas/loading/December 2025 Action Poker Unit Count Data.xlsx"
# print(fileName)


/Volumes/development_011_bronze_core/atlas/atlas/loading/December 2025 Action Poker Unit Count Data.xlsx


In [0]:
# dbutils.fs.mv('s3://cluster-private-bucket-481980074735-us-east-1/intake/atlas/failure/excel/Panam√° Seguimiento IGT-Sierra 27 Octubre 2025.xlsx/', 's3://cluster-private-bucket-481980074735-us-east-1/intake/atlas/manual/', True)

True