In [1]:
import re

StatementMeta(demospark, 4, 2, Finished, Available, Finished)

In [4]:
INPUT_PATH = "abfss://ds-india@demodsindia.dfs.core.windows.net/sample_invoices.xml"

StatementMeta(demospark, 4, 5, Finished, Available, Finished)

In [5]:
# ---- USER INPUTS ----
ROW_TAG = "LineItem"   # anchor at the deep repeating node
INPUT_PATH = "abfss://ds-india@demodsindia.dfs.core.windows.net/sample_invoices.xml"
CATALOG_DB = "demo_xml"
BRONZE_TABLE = "lineitem_bronze"
GOLD_PREFIX  = "norm_"
PARTITION_COLS = []  # e.g. ["invoice_year","invoice_month","invoice_day"]


StatementMeta(demospark, 4, 6, Finished, Available, Finished)

In [6]:
from pyspark.sql.types import *

line_item_schema = StructType([
    StructField("_id", StringType()),              # attribute id -> _id (attributePrefix="_")
    StructField("ItemId", StringType()),
    StructField("Description", StringType()),
    StructField("Quantity", StringType()),
    StructField("Price", StringType()),
    StructField("Meta", StructType([
        StructField("CreatedAt", StringType()),
        StructField("UpdatedAt", StringType()),
        StructField("Flags", StructType([
            StructField("Flag", ArrayType(StructType([
                StructField("_name", StringType()),
                StructField("_value", StringType())
            ])))
        ]))
    ])),
    StructField("Taxes", StructType([
        StructField("Tax", ArrayType(StructType([
            StructField("Code", StringType()),
            StructField("Amount", StringType()),
            StructField("TaxBreakup", StructType([
                StructField("Break", ArrayType(StructType([
                    StructField("Type", StringType()),
                    StructField("Rate", StringType())
                ])))
            ]))
        ])))
    ])),
    StructField("Attributes", StructType([
        StructField("Attribute", ArrayType(StructType([
            StructField("_name", StringType()),
            StructField("_code", StringType()),
            StructField("_", StringType())   # element text captured via valueTag -> "_"
        ])))
    ])),
    StructField("Packaging", StructType([
        StructField("Boxes", StructType([
            StructField("Box", ArrayType(StructType([
                StructField("_id", StringType()),
                StructField("Weight", StringType()),
                StructField("Dimensions", StructType([
                    StructField("Length", StringType()),
                    StructField("Width", StringType()),
                    StructField("Height", StringType()),
                ])),
                StructField("Labels", StructType([
                    StructField("Label", ArrayType(StringType()))
                ]))
            ])))
        ]))
    ])),
    StructField("Serials", StructType([
        StructField("Serial", ArrayType(StringType()))
    ]))
])


StatementMeta(demospark, 4, 7, Finished, Available, Finished)

In [8]:
from pyspark.sql import functions as F

df_raw = (
    spark.read
    .format("xml")
    .option("rowTag", ROW_TAG)
    .option("attributePrefix", "_")       # keep attributes as _attr
    .option("valueTag", "_VALUE")         # text node column name (must differ)
    .option("treatEmptyValuesAsNulls", "true")
    .schema(line_item_schema)             # ensure schema matches names below
    .load(INPUT_PATH)
    .withColumn("_ingest_file", F.input_file_name())
    .withColumn("_ingest_ts", F.current_timestamp())
)

df_raw.printSchema()
display(df_raw.limit(10))                 # <- you had df, use df_raw


StatementMeta(demospark, 4, 9, Finished, Available, Finished)

root
 |-- _id: string (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Meta: struct (nullable = true)
 |    |-- CreatedAt: string (nullable = true)
 |    |-- UpdatedAt: string (nullable = true)
 |    |-- Flags: struct (nullable = true)
 |    |    |-- Flag: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _name: string (nullable = true)
 |    |    |    |    |-- _value: string (nullable = true)
 |-- Taxes: struct (nullable = true)
 |    |-- Tax: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Code: string (nullable = true)
 |    |    |    |-- Amount: string (nullable = true)
 |    |    |    |-- TaxBreakup: struct (nullable = true)
 |    |    |    |    |-- Break: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)


SynapseWidget(Synapse.DataFrame, d8b23dca-1c7e-4beb-926c-d7e2a30635a9)

In [9]:
def flatten_structs(df):
    """Recursively flattens struct columns; leaves arrays/maps for explicit handling."""
    while True:
        struct_cols = [ (c, t) for c,t in df.dtypes if t.startswith("struct") ]
        if not struct_cols:
            return df
        select_exprs = []
        for c in df.columns:
            if df.schema[c].dataType.__class__ == StructType:
                for f in df.schema[c].dataType.fields:
                    select_exprs.append(F.col(f"{c}.{f.name}").alias(f"{c}__{f.name}"))
            else:
                select_exprs.append(F.col(c))
        df = df.select(*select_exprs)


StatementMeta(demospark, 4, 10, Finished, Available, Finished)

In [10]:
# Keep keys to join back (LineItem scope)
PARENT_KEYS = ["ItemId", "_id"]  # item id + lineitem attribute id

# Map array paths (relative to LineItem) to output suffixes
NORMALIZE_PLAN = {
    "Taxes.Tax"                      : "item_taxes",
    "Taxes.Tax.TaxBreakup.Break"     : "item_tax_break",
    "Attributes.Attribute"           : "item_attributes",
    "Packaging.Boxes.Box"            : "item_boxes",
    "Packaging.Boxes.Box.Labels.Label": "item_box_labels",
    "Serials.Serial"                 : "item_serials"
}


StatementMeta(demospark, 4, 11, Finished, Available, Finished)

In [17]:
bronze = spark.table(f"{CATALOG_DB}.{BRONZE_TABLE}")

def explode_path(df, path, alias):
    parts = path.split(".")
    cur = df
    parent_alias = None
    for i, p in enumerate(parts):
        current = ".".join(parts[:i+1])
        a = f"{alias}_{i}"
        cur = cur.withColumn(a, F.explode_outer(F.col(current)))
        parent_alias = a
    return cur, parent_alias

# Base (scalar) table for the LineItem itself (without arrays)
base_scalar = flatten_structs(bronze.drop(*[c for c,t in bronze.dtypes if t.startswith("array") or t.startswith("struct<array")]))
base_name = f"{CATALOG_DB}.{GOLD_PREFIX}lineitem"
base_scalar.write.mode("overwrite").format("delta").saveAsTable(base_name)

# Child tables
for path, suffix in NORMALIZE_PLAN.items():
    df = bronze
    parts = path.split(".")
    cur = df
    alias_chain = []
    ref = "ROOT"

    # Walk and explode at each array level
    running_col = parts[0]
    for i, p in enumerate(parts):
        running_col = ".".join(parts[:i+1])
        alias = f"elem{i}"
        cur = cur.withColumn(alias, F.explode_outer(F.col(running_col)))
        alias_chain.append(alias)

    leaf = alias_chain[-1]

    # Select parent keys + leaf columns + lineage
    leaf_fields = [F.col(f"{leaf}.{f.name}").alias(f.name) for f in cur.select(leaf).schema[0].dataType.fields] \
                  if hasattr(cur.select(leaf).schema[0].dataType, "fields") else [F.col(leaf).alias("value")]

    out = (
        cur.select(*[F.col(k) for k in PARENT_KEYS], *leaf_fields, "_ingest_file", "_ingest_ts")
    )
    out = flatten_structs(out)

    name = f"{CATALOG_DB}.{GOLD_PREFIX}{suffix}"
    writer = out.write.mode("overwrite").format("delta")
    if PARTITION_COLS:
        writer = writer.partitionBy(*PARTITION_COLS)
    writer.saveAsTable(name)


StatementMeta(demospark, 4, 18, Finished, Available, Finished)

AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "explode(Taxes)" due to data type mismatch: Parameter 1 requires the ("ARRAY" or "MAP") type, however "Taxes" has the type "STRUCT<Tax: ARRAY<STRUCT<Code: STRING, Amount: STRING, TaxBreakup: STRUCT<Break: ARRAY<STRUCT<Type: STRING, Rate: STRING>>>>>>".;
'Project [_id#637, ItemId#638, Description#639, Quantity#640, Price#641, Meta#642, Taxes#643, Attributes#644, Packaging#645, Serials#646, _ingest_file#647, _ingest_ts#648, generatorouter(explode(Taxes#643)) AS elem0#1542]
+- SubqueryAlias spark_catalog.demo_xml.lineitem_bronze
   +- Relation spark_catalog.demo_xml.lineitem_bronze[_id#637,ItemId#638,Description#639,Quantity#640,Price#641,Meta#642,Taxes#643,Attributes#644,Packaging#645,Serials#646,_ingest_file#647,_ingest_ts#648] parquet


In [15]:
spark.sql("SHOW DATABASES").show(truncate=False)

spark.sql("SELECT current_database()").show()

StatementMeta(demospark, 4, 16, Finished, Available, Finished)

+---------+
|namespace|
+---------+
|default  |
+---------+

+------------------+
|current_database()|
+------------------+
|           default|
+------------------+



In [16]:
# Set names (adjust if you already have them)
CATALOG_DB = "demo_xml"
BRONZE_TABLE = "lineitem_bronze"

# 1) Make sure the DB exists
spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_DB}")

# 2) Write bronze
df_raw.write.mode("overwrite").format("delta").saveAsTable(f"{CATALOG_DB}.{BRONZE_TABLE}")

# 3) Read it back
bronze = spark.table(f"{CATALOG_DB}.{BRONZE_TABLE}")
bronze.limit(5).display()


StatementMeta(demospark, 4, 17, Finished, Available, Finished)

AttributeError: 'DataFrame' object has no attribute 'display'

In [19]:
bronze.printSchema()

# Example: verify specific nodes
for p in ["Taxes", "Taxes.Tax", "Taxes.Tax.TaxBreakup", "Taxes.Tax.TaxBreakup.Break"]:
    print(p, "->", dtype_at(bronze.schema, p))


StatementMeta(demospark, 4, 20, Finished, Available, Finished)

root
 |-- _id: string (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Meta: struct (nullable = true)
 |    |-- CreatedAt: string (nullable = true)
 |    |-- UpdatedAt: string (nullable = true)
 |    |-- Flags: struct (nullable = true)
 |    |    |-- Flag: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _name: string (nullable = true)
 |    |    |    |    |-- _value: string (nullable = true)
 |-- Taxes: struct (nullable = true)
 |    |-- Tax: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Code: string (nullable = true)
 |    |    |    |-- Amount: string (nullable = true)
 |    |    |    |-- TaxBreakup: struct (nullable = true)
 |    |    |    |    |-- Break: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)


NameError: name 'dtype_at' is not defined

In [2]:
from functools import reduce
from pyspark.sql.types import StructType, ArrayType, MapType
from pyspark.sql import functions as F

def dtype_at(schema: StructType, path: str):
    """
    Return the DataType at a dotted path in a (nested) schema, or None if not found.
    """
    tokens = path.split(".")
    cur = schema
    for tk in tokens:
        if isinstance(cur, StructType):
            fld = next((f for f in cur.fields if f.name == tk), None)
            if fld is None:
                return None
            cur = fld.dataType
        elif isinstance(cur, ArrayType):
            # step into the element type; tk should address a field inside it (if struct)
            cur = cur.elementType
            if isinstance(cur, StructType):
                fld = next((f for f in cur.fields if f.name == tk), None)
                if fld is None:
                    return None
                cur = fld.dataType
            else:
                # primitive array – tk shouldn’t exist here
                return None
        else:
            return None
    return cur

def explode_arrays_along(df, path: str, base_alias="elem"):
    """
    Walk a dotted path and explode only where the prefix is an ARRAY.
    Supports multiple arrays along the path, e.g. A.B[] . C . D[] .
    Returns (df_with_explodes, leaf_colref, alias_chain)
    """
    parts = path.split(".")
    orig_schema = df.schema
    cur = df
    alias_chain = []
    last_array_i = -1
    current_alias = None

    for i in range(len(parts)):
        prefix = ".".join(parts[:i+1])
        dt = dtype_at(orig_schema, prefix)

        # Build the *effective* column reference for this level:
        if current_alias is None:
            effective = prefix         # still referencing original nested column
        else:
            # continue inside the struct produced by last explode
            rest = ".".join(parts[last_array_i+1:i+1])
            effective = f"{current_alias}.{rest}" if rest else current_alias

        # Only explode when this prefix is an array
        if isinstance(dt, ArrayType):
            current_alias = f"{base_alias}{len(alias_chain)}"
            cur = cur.withColumn(current_alias, F.explode_outer(F.col(effective)))
            alias_chain.append(current_alias)
            last_array_i = i

    # Determine the leaf column reference
    if current_alias is None:
        # No arrays on the path → leaf is the original (possibly struct) path
        leaf = ".".join(parts)
    else:
        # Leaf is the last alias produced by explode
        leaf = current_alias

    return cur, leaf, alias_chain


StatementMeta(demospark, 6, 3, Finished, Available, Finished)

In [3]:
from pyspark.sql.types import StructType
from pyspark.sql import functions as F

for path, suffix in NORMALIZE_PLAN.items():
    # explode only arrays along the path
    cur, leaf, alias_chain = explode_arrays_along(bronze, path, base_alias="elem")

    # determine leaf dtype
    leaf_dtype = cur.select(F.col(leaf).alias(leaf)).schema[0].dataType

    # prefix child columns to avoid collisions with parent keys (e.g., _id)
    if isinstance(leaf_dtype, StructType):
        leaf_fields = [
            F.col(f"{leaf}.{f.name}").alias(f"{suffix}_{f.name}") for f in leaf_dtype.fields
        ]
    else:
        leaf_fields = [F.col(leaf).alias(f"{suffix}_value")]

    # build the row
    out = cur.select(
        *[F.col(k) for k in PARENT_KEYS],   # keep parent keys as-is
        *leaf_fields,
        F.col("_ingest_file"),
        F.col("_ingest_ts")
    )

    # now flatten safely; names are unique at top-level
    out = flatten_structs(out)

    name = f"{CATALOG_DB}.{GOLD_PREFIX}{suffix}"
    w = out.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true")    # <- reset schema (and partitioning if changed)
# only specify partitionBy on first creation; otherwise you can omit it if unchanged
    if PARTITION_COLS:
        w = w.partitionBy(*PARTITION_COLS)
    w.saveAsTable(name)



StatementMeta(demospark, 6, 4, Finished, Available, Finished)

NameError: name 'NORMALIZE_PLAN' is not defined

In [29]:
bronze = spark.table(f"{CATALOG_DB}.{BRONZE_TABLE}")
bronze.printSchema()
display(bronze.limit(50))          # nice grid in Synapse
# or: bronze.show(50, truncate=False)


StatementMeta(demospark, 5, 2, Finished, Available, Finished)

NameError: name 'CATALOG_DB' is not defined

In [27]:
# example for one child table
tbl = f"{CATALOG_DB}.{GOLD_PREFIX}item_taxes"   # replace with your actual suffix
spark.table(tbl).printSchema()
display(spark.table(tbl).limit(50))


StatementMeta(demospark, 4, 28, Finished, Available, Finished)

root
 |-- ItemId: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- item_taxes_Code: string (nullable = true)
 |-- item_taxes_Amount: string (nullable = true)
 |-- item_taxes_TaxBreakup__Break: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Type: string (nullable = true)
 |    |    |-- Rate: string (nullable = true)
 |-- _ingest_file: string (nullable = true)
 |-- _ingest_ts: timestamp (nullable = true)



SynapseWidget(Synapse.DataFrame, 251166c6-0105-4e77-84dd-ec6b5892021e)

In [23]:
bronze.printSchema()

# Example: verify specific nodes
for p in ["Taxes", "Taxes.Tax", "Taxes.Tax.TaxBreakup", "Taxes.Tax.TaxBreakup.Break"]:
    print(p, "->", dtype_at(bronze.schema, p))


StatementMeta(demospark, 4, 24, Finished, Available, Finished)

root
 |-- _id: string (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Meta: struct (nullable = true)
 |    |-- CreatedAt: string (nullable = true)
 |    |-- UpdatedAt: string (nullable = true)
 |    |-- Flags: struct (nullable = true)
 |    |    |-- Flag: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _name: string (nullable = true)
 |    |    |    |    |-- _value: string (nullable = true)
 |-- Taxes: struct (nullable = true)
 |    |-- Tax: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Code: string (nullable = true)
 |    |    |    |-- Amount: string (nullable = true)
 |    |    |    |-- TaxBreakup: struct (nullable = true)
 |    |    |    |    |-- Break: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)


Sample

In [None]:
abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml

In [8]:
# Import required libraries
from pyspark.sql.functions import lit, current_timestamp, input_file_name, explode, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType
from pyspark.sql.dataframe import DataFrame
import logging

# Configure logging for Synapse monitoring
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configuration
CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",  # Replace with actual source (e.g., "hr_system")
    "year": "2025",
    "month": "09",
    "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_raw_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_raw/source={source_name}/",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "xsd_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xsd",
    "spark_partitions": 16,
    "row_tag": "record"
}

# Define explicit schema for XML parsing
SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True),
    StructField("_corrupt_record", StringType(), True)
])

def set_spark_config(partitions: int = CONFIG["spark_partitions"]) -> None:
    """Configure Spark settings for Synapse notebook."""
    try:
        spark.conf.set("spark.sql.adaptive.enabled", "true")
        spark.conf.set("spark.sql.shuffle.partitions", str(partitions))
        spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
        logger.info("Spark configurations set successfully.")
    except Exception as e:
        logger.error(f"Failed to set Spark configurations: {str(e)}")
        raise

def copy_raw_xml(landing_path: str, xml_raw_path: str) -> None:
    """Copy raw XML from Landing to History/xml_raw in Synapse."""
    try:
        df_raw = spark.read.text(landing_path)
        df_raw = df_raw.withColumn("source", lit(CONFIG["source_name"])) \
                       .withColumn("year", lit(CONFIG["year"])) \
                       .withColumn("month", lit(CONFIG["month"]))
        df_raw.write.mode("append") \
            .partitionBy("source", "year", "month") \
            .save(xml_raw_path)
        logger.info(f"Raw XML copied to {xml_raw_path}")
    except Exception as e:
        logger.error(f"Failed to copy raw XML: {str(e)}")
        raise

def parse_and_flatten_xml(landing_path: str, xsd_path: str, schema: StructType) -> tuple:
    """Parse XML, apply XSD validation, explode/flatten, and return structured and corrupt DataFrames."""
    try:
        df_xml = spark.read.format("com.databricks.spark.xml") \
            .option("rowTag", CONFIG["row_tag"]) \
            .option("mode", "PERMISSIVE") \
            .option("rowValidationXSDPath", xsd_path) \
            .schema(schema) \
            .load(landing_path)

        df_structured = df_xml.select(
            col("id"),
            col("details.name").alias("name"),
            col("details.department").alias("department"),
            explode(col("details.roles.role")).alias("role"),
            col("_corrupt_record")
        ).withColumn("source", lit(CONFIG["source_name"])) \
         .withColumn("ingestion_timestamp", current_timestamp()) \
         .withColumn("file_path", input_file_name())

        corrupt_df = df_structured.filter(df_structured["_corrupt_record"].isNotNull())
        valid_structured_df = df_structured.filter(df_structured["_corrupt_record"].isNull()) \
            .select("id", "name", "department", "role", "source", "ingestion_timestamp", "file_path")

        logger.info("XML parsed and flattened successfully.")
        return valid_structured_df, corrupt_df
    except Exception as e:
        logger.error(f"Failed to parse and flatten XML: {str(e)}")
        raise

def save_structured_data(df: DataFrame, output_path: str) -> None:
    """Save structured DataFrame as Parquet in Synapse."""
    try:
        df.write.mode("append") \
            .partitionBy("source", "year", "month") \
            .parquet(output_path)
        logger.info(f"Structured data saved to {output_path}")
    except Exception as e:
        logger.error(f"Failed to save structured data: {str(e)}")
        raise

def save_corrupt_records(df: DataFrame, corrupt_path: str) -> None:
    """Save corrupt records as Parquet in Synapse."""
    try:
        df.write.mode("overwrite") \
            .parquet(corrupt_path)
        logger.info(f"Corrupt records saved to {corrupt_path}")
    except Exception as e:
        logger.error(f"Failed to save corrupt records: {str(e)}")
        raise

def main():
    """Main function to process Bronze layer in Synapse notebook."""
    try:
        # Format paths
        landing_path = CONFIG["landing_path"].format(**CONFIG)
        xml_raw_path = CONFIG["xml_raw_path"].format(**CONFIG)
        xml_structured_path = CONFIG["xml_structured_path"].format(**CONFIG)
        corrupt_records_path = CONFIG["corrupt_records_path"].format(**CONFIG)
        xsd_path = CONFIG["xsd_path"].format(**CONFIG)

        # Configure Spark settings
        set_spark_config()

        # Copy raw XML
        copy_raw_xml(landing_path, xml_raw_path)

        # Parse and flatten XML
        valid_structured_df, corrupt_df = parse_and_flatten_xml(landing_path, xsd_path, SCHEMA)

        # Save structured data and corrupt records
        save_structured_data(valid_structured_df, xml_structured_path)
        save_corrupt_records(corrupt_df, corrupt_records_path)

        # Display sample for verification in Synapse notebook
        valid_structured_df.show(5, truncate=False)
        corrupt_df.show(5, truncate=False)

        logger.info("Bronze layer processing completed successfully.")
    except Exception as e:
        logger.error(f"Bronze layer processing failed: {str(e)}")
        raise

# Execute in Synapse notebook
main()

StatementMeta(demospark, 6, 9, Finished, Available, Finished)

Failed to copy raw XML: 
Bronze layer processing failed: 


AssertionError: 

In [3]:
from pyspark.sql.functions import lit, current_timestamp, input_file_name, explode, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.dataframe import DataFrame
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025",
    "month": "09",
    "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_raw_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_raw/source={source_name}/",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "xsd_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xsd",
    "spark_partitions": 16,
    "row_tag": "record"
}

SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True)
    # NOTE: do NOT include _corrupt_record in the schema; let spark-xml add it
])

def set_spark_config(partitions: int = CONFIG["spark_partitions"]) -> None:
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.shuffle.partitions", str(partitions))
    spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")  # harmless if you keep Parquet
    logger.info("Spark configurations set.")

def copy_raw_xml_file(landing_path: str, xml_raw_path: str) -> None:
    # Byte-for-byte copy; keeps filename. Works in Synapse.
    from notebookutils import mssparkutils
    target_dir = xml_raw_path.format(**CONFIG)
    mssparkutils.fs.mkdirs(target_dir)
    # Add date partitions to the path to match your strategy
    dated = f"{target_dir}year={CONFIG['year']}/month={CONFIG['month']}/day={CONFIG['day']}/"
    mssparkutils.fs.mkdirs(dated)
    file_name = landing_path.split("/")[-1]
    dest = dated + file_name
    ok = mssparkutils.fs.cp(landing_path, dest, recurse=False)
    if not ok:
        raise RuntimeError(f"Failed to copy {landing_path} to {dest}")
    logger.info(f"Raw XML copied to {dest}")

def parse_and_flatten_xml(landing_path: str, xsd_path: str, schema: StructType):
    df_xml = (spark.read.format("com.databricks.spark.xml")
              .option("rowTag", CONFIG["row_tag"])
              .option("mode", "PERMISSIVE")
              .option("columnNameOfCorruptRecord", "_corrupt_record")
              .option("rowValidationXSDPath", xsd_path)
              .schema(schema)
              .load(landing_path))

    df_structured = (df_xml.select(
            col("id"),
            col("details.name").alias("name"),
            col("details.department").alias("department"),
            explode(col("details.roles.role")).alias("role"),
            col("_corrupt_record")
        )
        .withColumn("source", lit(CONFIG["source_name"]))
        .withColumn("year", lit(CONFIG["year"]))
        .withColumn("month", lit(CONFIG["month"]))
        .withColumn("day", lit(CONFIG["day"]))
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("file_path", input_file_name())
    )

    corrupt_df = df_structured.filter(col("_corrupt_record").isNotNull())
    valid_structured_df = (df_structured
        .filter(col("_corrupt_record").isNull())
        .drop("_corrupt_record"))

    logger.info("XML parsed and flattened.")
    return valid_structured_df, corrupt_df

def save_structured_data(df: DataFrame, output_path: str) -> None:
    (df.write.mode("append")
       .partitionBy("source", "year", "month", "day")
       .parquet(output_path))
    logger.info(f"Structured data saved to {output_path}")

def save_corrupt_records(df: DataFrame, corrupt_path: str) -> None:
    if df.rdd.isEmpty():
        logger.info("No corrupt records to save.")
        return
    # Keep history; don’t overwrite the entire folder.
    (df.write.mode("append")
       .partitionBy("source", "year", "month", "day")
       .parquet(corrupt_path))
    logger.info(f"Corrupt records saved to {corrupt_path}")

def main():
    landing_path = CONFIG["landing_path"].format(**CONFIG)
    xml_raw_path = CONFIG["xml_raw_path"].format(**CONFIG)
    xml_structured_path = CONFIG["xml_structured_path"].format(**CONFIG)
    corrupt_records_path = CONFIG["corrupt_records_path"].format(**CONFIG)
    xsd_path = CONFIG["xsd_path"].format(**CONFIG)

    set_spark_config()
    copy_raw_xml_file(landing_path, xml_raw_path)
    valid_structured_df, corrupt_df = parse_and_flatten_xml(landing_path, xsd_path, SCHEMA)
    save_structured_data(valid_structured_df, xml_structured_path)
    save_corrupt_records(corrupt_df, corrupt_records_path)

    # Quick peek
    logger.info("Sample structured rows:")
    valid_structured_df.show(5, truncate=False)
    logger.info("Sample corrupt rows (if any):")
    corrupt_df.show(5, truncate=False)

    logger.info("Bronze layer processing completed.")

main()


StatementMeta(demospark, 7, 4, Finished, Available, Finished)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `_corrupt_record` cannot be resolved. Did you mean one of the following? [`role`, `details`, `id`].;
'Project [id#30, details#31.name AS name#34, details#31.department AS department#35, role#40, '_corrupt_record]
+- Generate explode(details#31.roles.role), false, [role#40]
   +- Relation [id#30,details#31] XmlRelation(com.databricks.spark.xml.DefaultSource$$Lambda$3742/0x000071b87719eea0@f523ee7,Some(abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml),Map(path -> abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml, rowvalidationxsdpath -> abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd, mode -> PERMISSIVE, rowtag -> record, columnnameofcorruptrecord -> _corrupt_record),StructType(StructField(id,IntegerType,true),StructField(details,StructType(StructField(name,StringType,true),StructField(department,StringType,true),StructField(roles,StructType(StructField(role,ArrayType(StringType,true),true)),true)),true)))


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, input_file_name, explode, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

spark = SparkSession.builder \
    .appName("XML to Bronze History Zone") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "16") \
    .getOrCreate()

# Define explicit schema for XML parsing
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True),
    StructField("_corrupt_record", StringType(), True)
])

# Copy raw XML to xml_raw
df_raw = spark.read.text("abfss://container@storageaccount.dfs.core.windows.net/bronze/landing/xml_data/source=source_name/year=2025/month=09/day=07/sample.xml")
df_raw.write.mode("append") \
    .partitionBy("source", "year", "month") \
    .save("abfss://container@storageaccount.dfs.core.windows.net/bronze/history/xml_raw/source=source_name/")

# Parse, explode, flatten, and store as Parquet in xml_structured
df_xml = spark.read.format("xml") \
    .option("rowTag", "record") \
    .option("mode", "PERMISSIVE") \
    .option("rowValidationXSDPath", "abfss://container@storageaccount.dfs.core.windows.net/bronze/schemas/source_name.xsd") \
    .schema(schema) \
    .load("abfss://container@storageaccount.dfs.core.windows.net/bronze/landing/xml_data/source=source_name/year=2025/month=09/day=07/sample.xml")

df_structured = df_xml.select(
    col("id"),
    col("details.name").alias("name"),
    col("details.department").alias("department"),
    explode(col("details.roles.role")).alias("role"),
    col("_corrupt_record")
).withColumn("source", lit("source_name")) \
 .withColumn("ingestion_timestamp", current_timestamp()) \
 .withColumn("file_path", input_file_name())

# Handle malformed records
corrupt_df = df_structured.filter(df_structured["_corrupt_record"].isNotNull())
corrupt_df.write.mode("overwrite") \
    .parquet("abfss://container@storageaccount.dfs.core.windows.net/bronze/corrupt_records/source=source_name/")

# Store valid structured data
valid_structured_df = df_structured.filter(df_structured["_corrupt_record"].isNull()) \
    .select("id", "name", "department", "role", "source", "ingestion_timestamp", "file_path")
valid_structured_df.write.mode("append") \
    .partitionBy("source", "year", "month") \
    .parquet("abfss://container@storageaccount.dfs.core.windows.net/bronze/history/xml_structured/")

StatementMeta(demospark, 6, 5, Finished, Available, Finished)

Py4JJavaError: An error occurred while calling o4223.text.
: org.apache.hadoop.fs.FileAlreadyExistsException: Operation failed: "This endpoint does not support BlobStorageEvents or SoftDelete. Please disable these account features if you would like to use this endpoint.", 409, HEAD, https://storageaccount.dfs.core.windows.net/container/?upn=false&action=getAccessControl&timeout=90
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1439)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:652)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:640)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1759)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.exists(AzureBlobFileSystem.java:1236)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:759)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:757)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: Operation failed: "This endpoint does not support BlobStorageEvents or SoftDelete. Please disable these account features if you would like to use this endpoint.", 409, HEAD, https://storageaccount.dfs.core.windows.net/container/?upn=false&action=getAccessControl&timeout=90
	at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231)
	at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
	at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)
	at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:1000)
	at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:981)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:422)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:1037)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:650)
	... 19 more


In [2]:
# --- MVP: Read XML and preview ---------------------------------------------
# Requires spark-xml on the pool (e.g. com.databricks:spark-xml_2.12:0.18.0)

from pyspark.sql.functions import col

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "row_tag": "record",   # <-- change to your top-level repeating element
    # Optional: if you have XSD, set this path; otherwise leave as None
    "xsd_path": None  # e.g. "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xsd"
}

landing_path = CONFIG["landing_path"].format(**CONFIG)

reader = (
    spark.read.format("com.databricks.spark.xml")
         .option("rowTag", CONFIG["row_tag"])
         .option("mode", "PERMISSIVE")                       # don't fail the whole file
         .option("columnNameOfCorruptRecord", "_corrupt_record")  # capture bad rows
)

# If you *do* have an XSD, uncomment the next line
# if CONFIG["xsd_path"]:
#     reader = reader.option("rowValidationXSDPath", CONFIG["xsd_path"].format(**CONFIG))

df = reader.load(landing_path)

print("=== Schema ===")
df.printSchema()

print("\n=== Sample rows ===")
df.show(10, truncate=False)

print("\n=== Corrupt rows (if any) ===")
df.filter(col("_corrupt_record").isNotNull()).show(5, truncate=False)


StatementMeta(demospark, 7, 3, Finished, Available, Finished)

=== Schema ===
root
 |-- details: struct (nullable = true)
 |    |-- department: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- roles: struct (nullable = true)
 |    |    |-- role: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)


=== Sample rows ===
+----------------------------------------+---+
|details                                 |id |
+----------------------------------------+---+
|{Sales, John Doe, {[Manager, Analyst]}} |1  |
|{Marketing, Jane Smith, {[Coordinator]}}|2  |
|{IT, Bob, {[Engineer]}}                 |abc|
+----------------------------------------+---+


=== Corrupt rows (if any) ===


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `_corrupt_record` cannot be resolved. Did you mean one of the following? [`details`, `id`].;
'Filter isnotnull('_corrupt_record)
+- Relation [details#15,id#16] XmlRelation(com.databricks.spark.xml.DefaultSource$$Lambda$3742/0x000071b87719eea0@5a8155d4,Some(abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml),Map(rowtag -> record, mode -> PERMISSIVE, columnnameofcorruptrecord -> _corrupt_record, path -> abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml),null)


In [4]:
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StringType

reader = (
    spark.read.format("com.databricks.spark.xml")
         .option("rowTag", "record")
         .option("mode", "PERMISSIVE")
         .option("columnNameOfCorruptRecord", "_corrupt_record")
)

df = reader.load("abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml")

# Ensure the column exists so downstream code is stable
if "_corrupt_record" not in df.columns:
    df = df.withColumn("_corrupt_record", lit(None).cast(StringType()))

print("=== Schema ===")
df.printSchema()

print("\n=== Sample rows ===")
df.show(10, truncate=False)

print("\n=== Corrupt rows (if any) ===")
df.filter(col("_corrupt_record").isNotNull()).show(5, truncate=False)


StatementMeta(demospark, 7, 5, Finished, Available, Finished)

=== Schema ===
root
 |-- details: struct (nullable = true)
 |    |-- department: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- roles: struct (nullable = true)
 |    |    |-- role: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)


=== Sample rows ===
+----------------------------------------+---+---------------+
|details                                 |id |_corrupt_record|
+----------------------------------------+---+---------------+
|{Sales, John Doe, {[Manager, Analyst]}} |1  |NULL           |
|{Marketing, Jane Smith, {[Coordinator]}}|2  |NULL           |
|{IT, Bob, {[Engineer]}}                 |abc|NULL           |
+----------------------------------------+---+---------------+


=== Corrupt rows (if any) ===
+-------+---+---------------+
|details|id |_corrupt_record|
+-------+---+---------------+
+-------+---+---------------+



In [2]:
# === XML → Bronze (history) — complete script =================================
# Requires spark-xml on pool: com.databricks:spark-xml_2.12:0.18.0 (or compatible)

from pyspark.sql.functions import (
    lit, current_timestamp, input_file_name, explode, col, when, array
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.dataframe import DataFrame
import logging

# ---------------- Config ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("xml-bronze")

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025",
    "month": "09",
    "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_raw_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_raw/source={source_name}/",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    # Set to None to skip validation
    "xsd_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xsd",
    "spark_partitions": 16,
    "row_tag": "record"
}

SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)  # may be scalar or array at runtime
        ]), True)
    ]), True)
    # Do NOT include _corrupt_record; spark-xml adds it when needed.
])

# ---------------- Helpers ----------------
def set_spark_config(partitions: int = CONFIG["spark_partitions"]) -> None:
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.shuffle.partitions", str(partitions))
    spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
    logger.info("Spark configs set.")

def _import_mssparkutils():
    try:
        from notebookutils import mssparkutils  # Synapse
        return mssparkutils
    except Exception:
        import mssparkutils  # fallback
        return mssparkutils

def copy_raw_xml_file(landing_path: str, xml_raw_path: str) -> None:
    mssparkutils = _import_mssparkutils()
    target_dir = xml_raw_path.format(**CONFIG)
    dated = f"{target_dir}year={CONFIG['year']}/month={CONFIG['month']}/day={CONFIG['day']}/"
    mssparkutils.fs.mkdirs(dated)
    file_name = landing_path.rsplit("/", 1)[-1]
    dest = dated + file_name
    if not mssparkutils.fs.cp(landing_path, dest, recurse=False):
        raise RuntimeError(f"Failed to copy {landing_path} → {dest}")
    logger.info(f"Raw XML copied to {dest}")

def parse_and_flatten_xml(landing_path: str, xsd_path: str | None, schema: StructType):
    # Build reader
    reader = (spark.read.format("com.databricks.spark.xml")
              .option("rowTag", CONFIG["row_tag"])
              .option("mode", "PERMISSIVE")
              .option("columnNameOfCorruptRecord", "_corrupt_record")
              .schema(schema))
    if xsd_path:
        reader = reader.option("rowValidationXSDPath", xsd_path)

    df_xml = reader.load(landing_path)

    # Ensure _corrupt_record exists even if none are corrupt
    if "_corrupt_record" not in df_xml.columns:
        df_xml = df_xml.withColumn("_corrupt_record", lit(None).cast(StringType()))

    # Normalize roles to array<string> without dtype introspection
    roles_col = col("details.roles.role")
    roles_as_array = roles_col.cast(ArrayType(StringType()))
    roles_norm = (when(roles_as_array.isNotNull(), roles_as_array)              # already array
                  .when(roles_col.isNull(), array().cast(ArrayType(StringType())))  # null → empty array
                  .otherwise(array(roles_col.cast(StringType()))))              # scalar → wrap

    df_structured = (
        df_xml.select(
            col("id"),
            col("details.name").alias("name"),
            col("details.department").alias("department"),
            explode(roles_norm).alias("role"),
            col("_corrupt_record")
        )
        .withColumn("source", lit(CONFIG["source_name"]))
        .withColumn("year", lit(CONFIG["year"]))
        .withColumn("month", lit(CONFIG["month"]))
        .withColumn("day", lit(CONFIG["day"]))
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("file_path", input_file_name())
    )

    corrupt_df = df_structured.filter(col("_corrupt_record").isNotNull())
    valid_structured_df = df_structured.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")

    logger.info("XML parsed + flattened.")
    return valid_structured_df, corrupt_df

def save_structured_data(df: DataFrame, output_path: str) -> None:
    (df.write.mode("append")
       .partitionBy("source", "year", "month", "day")
       .parquet(output_path))
    logger.info(f"Structured → {output_path}")

def save_corrupt_records(df: DataFrame, corrupt_path: str) -> None:
    if df.rdd.isEmpty():
        logger.info("No corrupt records.")
        return
    (df.write.mode("append")
       .partitionBy("source", "year", "month", "day")
       .parquet(corrupt_path))
    logger.info(f"Corrupt → {corrupt_path}")

# ---------------- Main ----------------
def main():
    set_spark_config()

    landing_path = CONFIG["landing_path"].format(**CONFIG)
    xml_raw_path = CONFIG["xml_raw_path"].format(**CONFIG)
    xml_structured_path = CONFIG["xml_structured_path"].format(**CONFIG)
    corrupt_records_path = CONFIG["corrupt_records_path"].format(**CONFIG)
    xsd_path = CONFIG["xsd_path"].format(**CONFIG) if CONFIG.get("xsd_path") else None

    copy_raw_xml_file(landing_path, xml_raw_path)
    valid_structured_df, corrupt_df = parse_and_flatten_xml(landing_path, xsd_path, SCHEMA)
    save_structured_data(valid_structured_df, xml_structured_path)
    save_corrupt_records(corrupt_df, corrupt_records_path)

    logger.info("Preview structured:")
    valid_structured_df.show(5, truncate=False)
    logger.info("Preview corrupt (if any):")
    corrupt_df.show(5, truncate=False)

    logger.info("Bronze processing complete.")

main()


StatementMeta(demospark, 9, 2, Finished, Available, Finished)

Py4JJavaError: An error occurred while calling o4313.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (vm-2b885156 executor 2): java.util.concurrent.ExecutionException: org.xml.sax.SAXParseException; schema_reference.4: Failed to read schema document 'file:/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1757237607632_0001/container_1757237607632_0001_01_000003/./abfss:/ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd', because 1) could not find the document; 2) the document could not be read; 3) the root element of the document is not <xsd:schema>.
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
	at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
	at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
	at com.databricks.spark.xml.util.ValidatorUtil$.getSchema(ValidatorUtil.scala:54)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.$anonfun$parse$2(StaxXmlParser.scala:47)
	at scala.Option.map(Option.scala:230)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.$anonfun$parse$1(StaxXmlParser.scala:47)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:865)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:865)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.xml.sax.SAXParseException; schema_reference.4: Failed to read schema document 'file:/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1757237607632_0001/container_1757237607632_0001_01_000003/./abfss:/ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd', because 1) could not find the document; 2) the document could not be read; 3) the root element of the document is not <xsd:schema>.
	at java.xml/com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.createSAXParseException(ErrorHandlerWrapper.java:204)
	at java.xml/com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.error(ErrorHandlerWrapper.java:135)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:396)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:306)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.reportSchemaErr(XSDHandler.java:4258)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.reportSchemaError(XSDHandler.java:4241)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.getSchemaDocument1(XSDHandler.java:2532)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.getSchemaDocument(XSDHandler.java:2239)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.parseSchema(XSDHandler.java:589)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaLoader.loadSchema(XMLSchemaLoader.java:618)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaLoader.loadGrammar(XMLSchemaLoader.java:577)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaLoader.loadGrammar(XMLSchemaLoader.java:543)
	at java.xml/com.sun.org.apache.xerces.internal.jaxp.validation.XMLSchemaFactory.newSchema(XMLSchemaFactory.java:281)
	at java.xml/javax.xml.validation.SchemaFactory.newSchema(SchemaFactory.java:612)
	at java.xml/javax.xml.validation.SchemaFactory.newSchema(SchemaFactory.java:628)
	at com.databricks.spark.xml.util.ValidatorUtil$$anon$1.load(ValidatorUtil.scala:44)
	at com.databricks.spark.xml.util.ValidatorUtil$$anon$1.load(ValidatorUtil.scala:35)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	... 34 more
Caused by: java.io.FileNotFoundException: /mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1757237607632_0001/container_1757237607632_0001_01_000003/./abfss:/ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd (No such file or directory)
	at java.base/java.io.FileInputStream.open0(Native Method)
	at java.base/java.io.FileInputStream.open(FileInputStream.java:216)
	at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
	at java.base/java.io.FileInputStream.<init>(FileInputStream.java:111)
	at java.base/sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:86)
	at java.base/sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:195)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.setupCurrentEntity(XMLEntityManager.java:653)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLVersionDetector.determineDocVersion(XMLVersionDetector.java:150)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.opti.SchemaParsingConfig.parse(SchemaParsingConfig.java:593)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.opti.SchemaParsingConfig.parse(SchemaParsingConfig.java:696)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.opti.SchemaDOMParser.parse(SchemaDOMParser.java:530)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.getSchemaDocument(XSDHandler.java:2227)
	... 45 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3055)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2991)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2990)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2990)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1294)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1294)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1294)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3193)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3182)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2568)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:361)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:322)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:358)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:230)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:191)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:214)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:220)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:961)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:214)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:202)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:180)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:262)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:905)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:413)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:380)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:242)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:838)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.ExecutionException: org.xml.sax.SAXParseException; schema_reference.4: Failed to read schema document 'file:/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1757237607632_0001/container_1757237607632_0001_01_000003/./abfss:/ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd', because 1) could not find the document; 2) the document could not be read; 3) the root element of the document is not <xsd:schema>.
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
	at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
	at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
	at com.databricks.spark.xml.util.ValidatorUtil$.getSchema(ValidatorUtil.scala:54)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.$anonfun$parse$2(StaxXmlParser.scala:47)
	at scala.Option.map(Option.scala:230)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.$anonfun$parse$1(StaxXmlParser.scala:47)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:865)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:865)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.xml.sax.SAXParseException; schema_reference.4: Failed to read schema document 'file:/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1757237607632_0001/container_1757237607632_0001_01_000003/./abfss:/ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd', because 1) could not find the document; 2) the document could not be read; 3) the root element of the document is not <xsd:schema>.
	at java.xml/com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.createSAXParseException(ErrorHandlerWrapper.java:204)
	at java.xml/com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.error(ErrorHandlerWrapper.java:135)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:396)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:306)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.reportSchemaErr(XSDHandler.java:4258)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.reportSchemaError(XSDHandler.java:4241)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.getSchemaDocument1(XSDHandler.java:2532)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.getSchemaDocument(XSDHandler.java:2239)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.parseSchema(XSDHandler.java:589)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaLoader.loadSchema(XMLSchemaLoader.java:618)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaLoader.loadGrammar(XMLSchemaLoader.java:577)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaLoader.loadGrammar(XMLSchemaLoader.java:543)
	at java.xml/com.sun.org.apache.xerces.internal.jaxp.validation.XMLSchemaFactory.newSchema(XMLSchemaFactory.java:281)
	at java.xml/javax.xml.validation.SchemaFactory.newSchema(SchemaFactory.java:612)
	at java.xml/javax.xml.validation.SchemaFactory.newSchema(SchemaFactory.java:628)
	at com.databricks.spark.xml.util.ValidatorUtil$$anon$1.load(ValidatorUtil.scala:44)
	at com.databricks.spark.xml.util.ValidatorUtil$$anon$1.load(ValidatorUtil.scala:35)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	... 34 more
Caused by: java.io.FileNotFoundException: /mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1757237607632_0001/container_1757237607632_0001_01_000003/./abfss:/ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xsd (No such file or directory)
	at java.base/java.io.FileInputStream.open0(Native Method)
	at java.base/java.io.FileInputStream.open(FileInputStream.java:216)
	at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
	at java.base/java.io.FileInputStream.<init>(FileInputStream.java:111)
	at java.base/sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:86)
	at java.base/sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:195)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.setupCurrentEntity(XMLEntityManager.java:653)
	at java.xml/com.sun.org.apache.xerces.internal.impl.XMLVersionDetector.determineDocVersion(XMLVersionDetector.java:150)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.opti.SchemaParsingConfig.parse(SchemaParsingConfig.java:593)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.opti.SchemaParsingConfig.parse(SchemaParsingConfig.java:696)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.opti.SchemaDOMParser.parse(SchemaDOMParser.java:530)
	at java.xml/com.sun.org.apache.xerces.internal.impl.xs.traversers.XSDHandler.getSchemaDocument(XSDHandler.java:2227)
	... 45 more


In [3]:
# --- Bronze MVP: read XML -> flatten -> show -> write (Parquet) --------------
# Requires spark-xml on the pool: com.databricks:spark-xml_2.12:0.18.0 (or compatible)

from pyspark.sql.functions import lit, current_timestamp, input_file_name, explode, col, when, array
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025",
    "month": "09",
    "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "row_tag": "record"   # change to your repeating element if different
}

# Minimal schema (no _corrupt_record here)
SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)  # may arrive as scalar or array
        ]), True)
    ]), True)
])

# Optional: a couple of sane Spark settings for small jobs
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "16")

# 1) Read XML (PERMISSIVE, no XSD)
landing_path = CONFIG["landing_path"].format(**CONFIG)
df_xml = (spark.read.format("com.databricks.spark.xml")
          .option("rowTag", CONFIG["row_tag"])
          .option("mode", "PERMISSIVE")
          .option("columnNameOfCorruptRecord", "_corrupt_record")
          .schema(SCHEMA)
          .load(landing_path))

# Ensure _corrupt_record exists even if no corrupt rows were produced
if "_corrupt_record" not in df_xml.columns:
    from pyspark.sql.types import StringType
    df_xml = df_xml.withColumn("_corrupt_record", lit(None).cast(StringType()))

# Normalize roles.role → always array<string>
roles_col = col("details.roles.role")
roles_as_array = roles_col.cast(ArrayType(StringType()))
roles_norm = (when(roles_as_array.isNotNull(), roles_as_array)
              .when(roles_col.isNull(), array().cast(ArrayType(StringType())))
              .otherwise(array(roles_col.cast(StringType()))))

# 2) Flatten
df_structured = (
    df_xml.select(
        col("id"),
        col("details.name").alias("name"),
        col("details.department").alias("department"),
        explode(roles_norm).alias("role"),
        col("_corrupt_record")
    )
    .withColumn("source", lit(CONFIG["source_name"]))
    .withColumn("year", lit(CONFIG["year"]))
    .withColumn("month", lit(CONFIG["month"]))
    .withColumn("day", lit(CONFIG["day"]))
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("file_path", input_file_name())
)

# Split (optional for MVP): keep both to inspect quality quickly
df_valid   = df_structured.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")
df_corrupt = df_structured.filter(col("_corrupt_record").isNotNull())

# 3) Preview
print("=== Structured sample ===")
df_valid.show(10, truncate=False)
print("=== Corrupt sample (if any) ===")
df_corrupt.show(5, truncate=False)

# 4) Write structured to Parquet (partitioned)
out_path = CONFIG["xml_structured_path"].format(**CONFIG)
(df_valid.write.mode("append")
   .partitionBy("source", "year", "month", "day")
   .parquet(out_path))

print(f"Wrote structured rows to: {out_path}")


StatementMeta(demospark, 9, 3, Finished, Available, Finished)

=== Structured sample ===
+----+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|id  |name      |department|role       |source     |year|month|day|ingestion_timestamp       |file_path                                                            |
+----+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|1   |John Doe  |Sales     |Manager    |source_name|2025|09   |07 |2025-09-07 09:43:08.742535|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|1   |John Doe  |Sales     |Analyst    |source_name|2025|09   |07 |2025-09-07 09:43:08.742535|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|2   |Jane Smith|Marketing |Coordinator|source_name|2025|09   |07 |2025-09-07 09:43:08.742535|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMID

In [5]:
# --- Compare spark-xml modes: PERMISSIVE vs DROPMALFORMED vs FAILFAST (fixed) --
# Requires spark-xml (e.g., com.databricks:spark-xml_2.12:0.18.0)

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col, explode, when, array, lit
import uuid

# 0) Paths & helpers
CONTAINER = "ds-india"
ACCOUNT   = "demodsindia"
BASE      = f"abfss://{CONTAINER}@{ACCOUNT}.dfs.core.windows.net/SEMIDATA/_tmp_mode_demo/"
SAMPLE_XML_PATH     = BASE + f"valid_{uuid.uuid4().hex}.xml"
MALFORMED_XML_PATH  = BASE + f"malformed_{uuid.uuid4().hex}.xml"   # optional demo

def put_text(path: str, text: str):
    try:
        from notebookutils import mssparkutils
    except Exception:
        import mssparkutils
    mssparkutils.fs.mkdirs(BASE)
    mssparkutils.fs.put(path, text, True)

# 1) Valid-structure XML (has id="abc")
sample_xml = """<?xml version="1.0" encoding="UTF-8"?>
<records>
    <record>
        <id>1</id>
        <details>
            <name>John Doe</name>
            <department>Sales</department>
            <roles>
                <role>Manager</role>
                <role>Analyst</role>
            </roles>
        </details>
    </record>
    <record>
        <id>2</id>
        <details>
            <name>Jane Smith</name>
            <department>Marketing</department>
            <roles>
                <role>Coordinator</role>
            </roles>
        </details>
    </record>
    <record>
        <id>abc</id>
        <details>
            <name>Bob</name>
            <department>IT</department>
            <roles>
                <role>Engineer</role>
            </roles>
        </details>
    </record>
</records>
"""
put_text(SAMPLE_XML_PATH, sample_xml)

# 2) Malformed XML (optional: broken closing tag to demo structural errors)
malformed_xml = """<?xml version="1.0" encoding="UTF-8"?>
<records>
  <record>
    <id>10</id>
    <details><name>Alice</name></details>
  </record>
  <record>
    <id>11  <!-- MISSING closing tag on purpose -->
    <details><name>Broken</name></details>
  </record>
</records>
"""
put_text(MALFORMED_XML_PATH, malformed_xml)

# 3) Schema: declare id as IntegerType to test type mismatch
SCHEMA = StructType([
    StructField("id", IntegerType(), True),  # "abc" -> becomes NULL (not corrupt)
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True)
])

def read_with_mode(path: str, mode: str):
    df = (spark.read.format("com.databricks.spark.xml")
            .option("rowTag", "record")
            .option("mode", mode)  # PERMISSIVE | DROPMALFORMED | FAILFAST
            .option("columnNameOfCorruptRecord", "_corrupt_record")
            .schema(SCHEMA)
            .load(path))
    # <-- Guard: add _corrupt_record if the source didn't create it
    if "_corrupt_record" not in df.columns:
        df = df.withColumn("_corrupt_record", lit(None).cast(StringType()))
    return df

def flatten(df):
    roles_col = col("details.roles.role")
    roles_as_array = roles_col.cast(ArrayType(StringType()))
    roles_norm = (when(roles_as_array.isNotNull(), roles_as_array)
                  .when(roles_col.isNull(), array().cast(ArrayType(StringType())))
                  .otherwise(array(roles_col.cast(StringType()))))
    return (df.select(
                col("id"),
                col("details.name").alias("name"),
                explode(roles_norm).alias("role"),
                col("_corrupt_record"))
            )

# 4) Read valid-structure XML in each mode
for mode in ["PERMISSIVE", "DROPMALFORMED", "FAILFAST"]:
    print(f"\n===== MODE: {mode} (valid structure, id='abc') =====")
    df = read_with_mode(SAMPLE_XML_PATH, mode)
    df_flat = flatten(df)
    df_flat.select("id","name","role","_corrupt_record").show(truncate=False)

    total = df.count()
    null_id = df.where(col("id").isNull()).count()
    corrupt_rows = df.where(col("_corrupt_record").isNotNull()).count()
    print(f"Rows total: {total} | id NULL: {null_id} | _corrupt_record rows: {corrupt_rows}")

# 5) Structural errors demo with MALFORMED XML
print("\n===== Structural errors demo with MALFORMED XML =====")
for mode in ["PERMISSIVE", "DROPMALFORMED", "FAILFAST"]:
    print(f"\n--- MODE: {mode} (malformed structure) ---")
    try:
        df_bad = read_with_mode(MALFORMED_XML_PATH, mode)
        df_bad.select("id","_corrupt_record").show(truncate=False)
        print("Read succeeded.")
    except Exception as e:
        print(f"Read failed with {mode}: {type(e).__name__}: {str(e)[:300]} ...")


StatementMeta(demospark, 10, 3, Finished, Available, Finished)


===== MODE: PERMISSIVE (valid structure, id='abc') =====
+----+----------+-----------+---------------+
|id  |name      |role       |_corrupt_record|
+----+----------+-----------+---------------+
|1   |John Doe  |Manager    |NULL           |
|1   |John Doe  |Analyst    |NULL           |
|2   |Jane Smith|Coordinator|NULL           |
|NULL|Bob       |Engineer   |NULL           |
+----+----------+-----------+---------------+

Rows total: 3 | id NULL: 1 | _corrupt_record rows: 0

===== MODE: DROPMALFORMED (valid structure, id='abc') =====
+---+----------+-----------+---------------+
|id |name      |role       |_corrupt_record|
+---+----------+-----------+---------------+
|1  |John Doe  |Manager    |NULL           |
|1  |John Doe  |Analyst    |NULL           |
|2  |Jane Smith|Coordinator|NULL           |
+---+----------+-----------+---------------+

Rows total: 3 | id NULL: 0 | _corrupt_record rows: 0

===== MODE: FAILFAST (valid structure, id='abc') =====


Py4JJavaError: An error occurred while calling o4505.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 15) (vm-94408436 executor 1): java.lang.IllegalArgumentException: Malformed line in FAILFAST mode
	at com.databricks.spark.xml.parsers.StaxXmlParser$.failedRecord(StaxXmlParser.scala:106)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.doParseColumn(StaxXmlParser.scala:88)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.$anonfun$parse$3(StaxXmlParser.scala:49)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:431)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:900)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:900)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NumberFormatException: For input string: "abc"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67)
	at java.base/java.lang.Integer.parseInt(Integer.java:668)
	at java.base/java.lang.Integer.parseInt(Integer.java:786)
	at scala.collection.immutable.StringLike.toInt(StringLike.scala:310)
	at scala.collection.immutable.StringLike.toInt$(StringLike.scala:310)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
	at com.databricks.spark.xml.util.TypeCast$.castTo(TypeCast.scala:55)
	at com.databricks.spark.xml.util.TypeCast$.signSafeToInt(TypeCast.scala:303)
	at com.databricks.spark.xml.util.TypeCast$.convertTo(TypeCast.scala:192)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.convertField(StaxXmlParser.scala:192)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.convertObject(StaxXmlParser.scala:334)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.doParseColumn(StaxXmlParser.scala:82)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3055)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2991)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2990)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2990)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1294)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1294)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1294)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3193)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3182)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2568)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2589)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2608)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:573)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:526)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4358)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3327)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4348)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:815)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4346)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:220)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:961)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4346)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3327)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3550)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:293)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:328)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Malformed line in FAILFAST mode
	at com.databricks.spark.xml.parsers.StaxXmlParser$.failedRecord(StaxXmlParser.scala:106)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.doParseColumn(StaxXmlParser.scala:88)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.$anonfun$parse$3(StaxXmlParser.scala:49)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:431)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:900)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:900)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.lang.NumberFormatException: For input string: "abc"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67)
	at java.base/java.lang.Integer.parseInt(Integer.java:668)
	at java.base/java.lang.Integer.parseInt(Integer.java:786)
	at scala.collection.immutable.StringLike.toInt(StringLike.scala:310)
	at scala.collection.immutable.StringLike.toInt$(StringLike.scala:310)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
	at com.databricks.spark.xml.util.TypeCast$.castTo(TypeCast.scala:55)
	at com.databricks.spark.xml.util.TypeCast$.signSafeToInt(TypeCast.scala:303)
	at com.databricks.spark.xml.util.TypeCast$.convertTo(TypeCast.scala:192)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.convertField(StaxXmlParser.scala:192)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.convertObject(StaxXmlParser.scala:334)
	at com.databricks.spark.xml.parsers.StaxXmlParser$.doParseColumn(StaxXmlParser.scala:82)
	... 24 more


In [6]:
# === XML Bronze MVP with switchable behavior ==================================
# Requires: com.databricks:spark-xml_2.12:0.18.0 (or compatible)

from pyspark.sql.functions import (
    col, explode, when, array, lit, current_timestamp, input_file_name
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# ---- Toggle here -------------------------------------------------------------
MODE = "PERMISSIVE_VALIDATE"  # PERMISSIVE_VALIDATE | DROPMALFORMED | FAILFAST
ROW_TAG = "record"

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025",
    "month": "09",
    "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
}

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "16")

landing_path = CONFIG["landing_path"].format(**CONFIG)
out_struct   = CONFIG["xml_structured_path"].format(**CONFIG)
out_corrupt  = CONFIG["corrupt_records_path"].format(**CONFIG)

def normalize_roles(df):
    roles = col("details.roles.role")
    roles_arr = roles.cast(ArrayType(StringType()))
    roles_norm = when(roles_arr.isNotNull(), roles_arr) \
                 .when(roles.isNull(), array().cast(ArrayType(StringType()))) \
                 .otherwise(array(roles.cast(StringType())))
    return roles_norm

def with_common_cols(df):
    return (df
        .withColumn("source", lit(CONFIG["source_name"]))
        .withColumn("year",   lit(CONFIG["year"]))
        .withColumn("month",  lit(CONFIG["month"]))
        .withColumn("day",    lit(CONFIG["day"]))
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("file_path", input_file_name())
    )

if MODE == "PERMISSIVE_VALIDATE":
    # Read id as STRING; keep everything
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("details", StructType([
            StructField("name", StringType(), True),
            StructField("department", StringType(), True),
            StructField("roles", StructType([
                StructField("role", ArrayType(StringType()), True)
            ]), True)
        ]), True)
    ])

    df = (spark.read.format("com.databricks.spark.xml")
            .option("rowTag", ROW_TAG)
            .option("mode", "PERMISSIVE")
            .option("columnNameOfCorruptRecord", "_corrupt_record")
            .schema(schema)
            .load(landing_path))

    # ensure _corrupt_record exists for consistent downstream logic
    if "_corrupt_record" not in df.columns:
        df = df.withColumn("_corrupt_record", lit(None).cast(StringType()))

    roles_norm = normalize_roles(df)

    base = (df.select(
                col("id").alias("id_raw"),
                col("details.name").alias("name"),
                col("details.department").alias("department"),
                explode(roles_norm).alias("role"),
                col("_corrupt_record"))
            )

    base = with_common_cols(base)

    # manual validation: id must be integer
    id_int = col("id_raw").cast(IntegerType())
    is_bad_type = id_int.isNull() & col("id_raw").isNotNull()
    # rows structurally corrupt (rare here) OR type-invalid go to corrupt bucket
    corrupt_df = (base.where(is_bad_type | col("_corrupt_record").isNotNull())
                        .withColumn("_corrupt_reason",
                                    when(col("_corrupt_record").isNotNull(), lit("STRUCTURAL_CORRUPT"))
                                    .otherwise(lit("INVALID_INT:id"))))
    valid_df = (base.where(~is_bad_type & col("_corrupt_record").isNull())
                    .withColumn("id", id_int)
                    .drop("id_raw", "_corrupt_record", "_corrupt_reason"))

    print("=== PERMISSIVE_VALIDATE ===")
    print("Valid sample:")
    valid_df.show(10, truncate=False)
    print("Corrupt sample:")
    corrupt_df.select("id_raw","name","role","_corrupt_reason").show(10, truncate=False)

    # write
    (valid_df.write.mode("append")
        .partitionBy("source","year","month","day")
        .parquet(out_struct))
    if not corrupt_df.rdd.isEmpty():
        (corrupt_df.write.mode("append")
            .partitionBy("source","year","month","day")
            .parquet(out_corrupt))

elif MODE in ("DROPMALFORMED", "FAILFAST"):
    # Read with typed schema; bad rows are dropped (DROPMALFORMED) or job fails (FAILFAST)
    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("details", StructType([
            StructField("name", StringType(), True),
            StructField("department", StringType(), True),
            StructField("roles", StructType([
                StructField("role", ArrayType(StringType()), True)
            ]), True)
        ]), True)
    ])

    try:
        df = (spark.read.format("com.databricks.spark.xml")
                .option("rowTag", ROW_TAG)
                .option("mode", MODE)  # DROPMALFORMED | FAILFAST
                .option("columnNameOfCorruptRecord", "_corrupt_record")
                .schema(schema)
                .load(landing_path))
        # guard (in case PERMISSIVE sneaks in)
        if "_corrupt_record" not in df.columns:
            df = df.withColumn("_corrupt_record", lit(None).cast(StringType()))

        roles_norm = normalize_roles(df)
        flat = (df.select(
                    col("id"),
                    col("details.name").alias("name"),
                    explode(roles_norm).alias("role"),
                    col("_corrupt_record"))
                )
        flat = with_common_cols(flat).drop("_corrupt_record")

        print(f"=== {MODE} ===")
        flat.show(10, truncate=False)

        (flat.write.mode("append")
            .partitionBy("source","year","month","day")
            .parquet(out_struct))

    except Exception as e:
        # FAILFAST will land here for first malformed/type-failure
        print(f"{MODE} aborted: {type(e).__name__}: {str(e)[:300]}")

else:
    raise ValueError("MODE must be one of: PERMISSIVE_VALIDATE | DROPMALFORMED | FAILFAST")


StatementMeta(demospark, 10, 4, Finished, Available, Finished)

=== PERMISSIVE_VALIDATE ===
Valid sample:
+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+---+
|name      |department|role       |source     |year|month|day|ingestion_timestamp       |file_path                                                            |id |
+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+---+
|John Doe  |Sales     |Manager    |source_name|2025|09   |07 |2025-09-07 10:48:06.020513|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|1  |
|John Doe  |Sales     |Analyst    |source_name|2025|09   |07 |2025-09-07 10:48:06.020513|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|1  |
|Jane Smith|Marketing |Coordinator|source_name|2025|09   |07 |2025-09-07 10:48:06.020513|abfss://ds-india@demodsindia.dfs.core.windows.net

Sample 

In [8]:
# === Schema-driven XML validation — corrected order ===========================
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, ArrayType, DataType
)

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025", "month": "09", "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "row_tag": "record"
}

# 1) Target schema (typed output you want)
TARGET_SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True)
])

# 2) Shape-only (stringy) schema
def to_stringy_schema(dt: DataType) -> DataType:
    if isinstance(dt, StructType):
        return StructType([StructField(f.name, to_stringy_schema(f.dataType), True) for f in dt.fields])
    if isinstance(dt, ArrayType):
        return ArrayType(StringType(), True)
    return StringType()

RAW_SCHEMA = to_stringy_schema(TARGET_SCHEMA)

# 3) Read raw (all text), permissive
landing = CONFIG["landing_path"].format(**CONFIG)
df_raw = (spark.read.format("com.databricks.spark.xml")
          .option("rowTag", CONFIG["row_tag"])
          .option("mode", "PERMISSIVE")
          .option("columnNameOfCorruptRecord", "_corrupt_record")
          .schema(RAW_SCHEMA)
          .load(landing))

if "_corrupt_record" not in df_raw.columns:
    df_raw = df_raw.withColumn("_corrupt_record", F.lit(None).cast(StringType()))

# Normalize roles.role -> array<string>
roles_col = F.col("details.roles.role")
roles_arr = roles_col.cast(ArrayType(StringType()))
roles_norm = (F.when(roles_arr.isNotNull(), roles_arr)
              .when(roles_col.isNull(), F.array().cast(ArrayType(StringType())))
              .otherwise(F.array(roles_col.cast(StringType()))))
df_raw = df_raw.withColumn("_roles_norm", roles_norm)

# 4) Build validation errors on RAW (IMPORTANT: before casting/replacing columns)
def collect_errors(path: str, dt: DataType):
    if isinstance(dt, StructType):
        errs = []
        for f in dt.fields:
            errs += collect_errors(f"{path}.{f.name}", f.dataType)
        return errs
    if isinstance(dt, ArrayType):
        el = dt.elementType
        # flag if any non-null element fails cast
        invalid_elem = F.exists(F.col(path), lambda x: x.isNotNull() & x.cast(el).isNull())
        return [F.when(invalid_elem, F.lit(f"INVALID_ARRAY_ELEMENT_{el.simpleString()}:{path}"))]
    # primitive
    casted = F.col(path).cast(dt)
    invalid = F.col(path).isNotNull() & casted.isNull()
    return [F.when(invalid, F.lit(f"INVALID_{dt.simpleString()}:{path}"))]

errs = [F.when(F.col("_corrupt_record").isNotNull(), F.lit("STRUCTURAL_CORRUPT"))]
for f in TARGET_SCHEMA.fields:
    errs += collect_errors(f.name, f.dataType)

errors_array = F.array_remove(F.array(*[e for e in errs if e is not None]), None)
df_enriched = (df_raw
               .withColumn("_validation_errors", errors_array)
               .withColumn("_is_corrupt", F.size(errors_array) > 0))

# 5) Now create the TYPED projection (safe to reuse names)
def typed_col(path: str, dt: DataType):
    if isinstance(dt, StructType):
        return F.struct(*[typed_col(f"{path}.{f.name}", f.dataType).alias(f.name) for f in dt.fields])
    if isinstance(dt, ArrayType):
        return F.transform(F.col(path), lambda x: x.cast(dt.elementType))
    return F.col(path).cast(dt)

typed_cols = [typed_col(f.name, f.dataType).alias(f.name) for f in TARGET_SCHEMA.fields]
df_typed = df_enriched.select(*typed_cols, "_roles_norm", "_validation_errors", "_is_corrupt")

# 6) Flatten, add audit cols, split valid/corrupt
df_flat = (df_typed.select(
              F.col("id"),
              F.col("details.name").alias("name"),
              F.col("details.department").alias("department"),
              F.explode(F.col("_roles_norm")).alias("role"),
              "_is_corrupt", "_validation_errors")
          .withColumn("source", F.lit(CONFIG["source_name"]))
          .withColumn("year", F.lit(CONFIG["year"]))
          .withColumn("month", F.lit(CONFIG["month"]))
          .withColumn("day", F.lit(CONFIG["day"]))
          .withColumn("ingestion_timestamp", F.current_timestamp())
          .withColumn("file_path", F.input_file_name())
)

valid_df   = df_flat.where(~F.col("_is_corrupt")).drop("_is_corrupt", "_validation_errors")
corrupt_df = df_flat.where(F.col("_is_corrupt"))

print("=== VALID sample ===")
valid_df.show(10, truncate=False)
print("=== CORRUPT sample (with reasons) ===")
corrupt_df.select("id","name","role","_validation_errors").show(10, truncate=False)

# 7) Optional writes
out_struct  = CONFIG["xml_structured_path"].format(**CONFIG)
out_corrupt = CONFIG["corrupt_records_path"].format(**CONFIG)
(valid_df.write.mode("append")
    .partitionBy("source","year","month","day")
    .parquet(out_struct))
if not corrupt_df.rdd.isEmpty():
    (corrupt_df.write.mode("append")
        .partitionBy("source","year","month","day")
        .parquet(out_corrupt))


StatementMeta(demospark, 10, 6, Finished, Available, Finished)

=== VALID sample ===
+----+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|id  |name      |department|role       |source     |year|month|day|ingestion_timestamp       |file_path                                                            |
+----+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|1   |John Doe  |Sales     |Manager    |source_name|2025|09   |07 |2025-09-07 11:01:17.904902|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|1   |John Doe  |Sales     |Analyst    |source_name|2025|09   |07 |2025-09-07 11:01:17.904902|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|2   |Jane Smith|Marketing |Coordinator|source_name|2025|09   |07 |2025-09-07 11:01:17.904902|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/s

Sample 

In [12]:
# === XML -> Bronze with schema-driven validation (no XSD) =====================
# Requires: com.databricks:spark-xml_2.12:0.18.0 (or compatible) on your Spark pool

from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, ArrayType, DataType
)

# ---------------- Config ----------------
CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025", "month": "09", "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "row_tag": "record"
}

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "16")

# ---------------- Target schema (typed output you want) ----------------
TARGET_SCHEMA = StructType([
    StructField("id", IntegerType(), True),                    # validate as INT
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True) # array<string>
        ]), True)
    ]), True)
])

# ---------------- Build "shape-only" schema (same structure, leaves as string) ----------------
def to_stringy_schema(dt: DataType) -> DataType:
    if isinstance(dt, StructType):
        return StructType([StructField(f.name, to_stringy_schema(f.dataType), True) for f in dt.fields])
    if isinstance(dt, ArrayType):
        return ArrayType(StringType(), True)
    return StringType()

RAW_SCHEMA = to_stringy_schema(TARGET_SCHEMA)

# ---------------- Read raw XML as text (PERMISSIVE) ----------------
landing = CONFIG["landing_path"].format(**CONFIG)
df_raw = (spark.read.format("com.databricks.spark.xml")
          .option("rowTag", CONFIG["row_tag"])
          .option("mode", "PERMISSIVE")
          .option("columnNameOfCorruptRecord", "_corrupt_record")
          .schema(RAW_SCHEMA)
          .load(landing))

# Ensure _corrupt_record exists (only created when structure is malformed)
if "_corrupt_record" not in df_raw.columns:
    df_raw = df_raw.withColumn("_corrupt_record", F.lit(None).cast(StringType()))

# Keep raw id for cast-failure detection later
df_raw = df_raw.withColumn("_id_raw", F.col("id"))

# Normalize roles: sometimes scalar, sometimes array -> always array<string>
roles_col = F.col("details.roles.role")
roles_arr = roles_col.cast(ArrayType(StringType()))
empty_arr = F.expr("array()").cast(ArrayType(StringType()))
roles_norm = (F.when(roles_arr.isNotNull(), roles_arr)
              .when(roles_col.isNull(), empty_arr)
              .otherwise(F.array(roles_col.cast(StringType()))))
df_raw = df_raw.withColumn("_roles_norm", roles_norm)

# ---------------- Build validation errors on RAW text ----------------
# datatype rule: id must be integer
id_cast = F.col("_id_raw").cast(IntegerType())
invalid_id = F.col("_id_raw").isNotNull() & id_cast.isNull()

errs = [
    F.when(F.col("_corrupt_record").isNotNull(), F.lit("STRUCTURAL_CORRUPT")),
    F.when(invalid_id, F.lit("INVALID_int:id"))
]

errors_array = F.array_remove(F.array(*[e for e in errs if e is not None]), None)
df_enriched = (df_raw
               .withColumn("_validation_errors", errors_array)
               .withColumn("_has_validation_errors", F.size(errors_array) > 0))

# ---------------- Create typed columns (use casted id) ----------------
df_typed = (df_enriched
            .withColumn("id", id_cast)  # typed id
            .select(
                "id",
                "details",
                "_roles_norm",
                "_validation_errors",
                "_has_validation_errors",
                "_id_raw"
            ))

# ---------------- Flatten, add audit columns ----------------
df_flat = (df_typed.select(
              F.col("id"),
              F.col("details.name").alias("name"),
              F.col("details.department").alias("department"),
              F.explode(F.col("_roles_norm")).alias("role"),
              "_has_validation_errors", "_validation_errors", "_id_raw")
          .withColumn("source", F.lit(CONFIG["source_name"]))
          .withColumn("year",   F.lit(CONFIG["year"]))
          .withColumn("month",  F.lit(CONFIG["month"]))
          .withColumn("day",    F.lit(CONFIG["day"]))
          .withColumn("ingestion_timestamp", F.current_timestamp())
          .withColumn("file_path", F.input_file_name())
)

# ---------------- Final split using RAW vs TYPED check ----------------
bad_id = F.col("_id_raw").isNotNull() & F.col("id").isNull()
has_other_errors = F.col("_has_validation_errors")
is_corrupt = bad_id | has_other_errors

# Build reasons first (before any renames), using concat for arrays
reasons_base = F.coalesce(F.col("_validation_errors"), empty_arr)
reasons_appended = F.when(bad_id, F.concat(reasons_base, F.array(F.lit("INVALID_int:id")))).otherwise(reasons_base)
reasons = F.array_distinct(reasons_appended)  # avoid duplicates

# Compute corrupt columns, THEN rename _id_raw -> id_raw
corrupt_pre = (
    df_flat.where(is_corrupt)
           .withColumn("_corrupt_reasons", reasons)
           .withColumn("_corrupt_reason", F.concat_ws("|", F.col("_corrupt_reasons")))
)

corrupt_df = corrupt_pre.withColumnRenamed("_id_raw", "id_raw")

valid_df = (
    df_flat.where(~is_corrupt)
           .drop("_validation_errors", "_has_validation_errors", "_id_raw")
)

# ---------------- Preview ----------------
print("=== VALID sample ===")
valid_df.show(10, truncate=False)
print("=== CORRUPT sample ===")
corrupt_df.select("id_raw","name","role","_corrupt_reasons").show(10, truncate=False)

# ---------------- Writes (Parquet; switch to .format('delta') if you prefer) ----------------
out_struct  = CONFIG["xml_structured_path"].format(**CONFIG)
out_corrupt = CONFIG["corrupt_records_path"].format(**CONFIG)

(valid_df.write.mode("append")
    .partitionBy("source","year","month","day")
    .parquet(out_struct))

if not corrupt_df.rdd.isEmpty():
    (corrupt_df.write.mode("append")
        .partitionBy("source","year","month","day")
        .parquet(out_corrupt))

print(f"Wrote valid → {out_struct}")
if not corrupt_df.rdd.isEmpty():
    print(f"Wrote corrupt → {out_corrupt}")
else:
    print("No corrupt rows this run.")


StatementMeta(demospark, 11, 5, Finished, Available, Finished)

=== VALID sample ===
+---+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|id |name      |department|role       |source     |year|month|day|ingestion_timestamp       |file_path                                                            |
+---+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|1  |John Doe  |Sales     |Manager    |source_name|2025|09   |07 |2025-09-07 11:45:41.335427|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|1  |John Doe  |Sales     |Analyst    |source_name|2025|09   |07 |2025-09-07 11:45:41.335427|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|2  |Jane Smith|Marketing |Coordinator|source_name|2025|09   |07 |2025-09-07 11:45:41.335427|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.

Sample with Config and Metadata

In [15]:
# ========================= XML Bronze Framework (no XSD) =========================
# Requires: com.databricks:spark-xml_2.12:0.18.0 (or compatible).

from pyspark.sql import functions as F
from pyspark.sql.types import *
from typing import Dict, List, Any
import json

# ---------------------------- Utilities -----------------------------------

def to_stringy_schema(dt: DataType) -> DataType:
    """Shape-only schema: same structure as target, but leaves are strings (arrays preserved)."""
    if isinstance(dt, StructType):
        return StructType([StructField(f.name, to_stringy_schema(f.dataType), True) for f in dt.fields])
    if isinstance(dt, ArrayType):
        return ArrayType(to_stringy_schema(dt.elementType), True)
    return StringType()

def _sanitize(path: str) -> str:
    """Turn a dot path into a safe column name."""
    return path.replace(".", "__").replace("[", "_").replace("]", "_")

def ensure_array(df, path: str, array_dt: ArrayType, out_col: str):
    """Normalize 'path' to always be ArrayType(array_dt.elementType)."""
    col_ref = F.col(path)
    arr_cast = col_ref.cast(array_dt)
    empty_arr = F.expr("array()").cast(array_dt)
    norm = F.when(arr_cast.isNotNull(), arr_cast) \
            .when(col_ref.isNull(), empty_arr) \
            .otherwise(F.array(col_ref.cast(array_dt.elementType)))
    return df.withColumn(out_col, norm)

def exists_invalid_primitive(col_arr, elem_type: DataType):
    """True if any non-null array element fails cast to elem_type."""
    return F.exists(col_arr, lambda x: x.isNotNull() & x.cast(elem_type).isNull())

def exists_invalid_struct_field(col_arr, field: StructField):
    """True if any struct element has a non-null child that fails cast to the child's type."""
    return F.exists(col_arr, lambda e: e[field.name].isNotNull() & e[field.name].cast(field.dataType).isNull())

def get_dt(path: str, dt: DataType) -> DataType:
    """Walk a dot path on a StructType to return the nested DataType."""
    if path == "" or path is None:
        return dt
    if not isinstance(dt, StructType):
        raise KeyError(f"Path '{path}' not found in schema.")
    parts = path.split(".", 1)
    head = parts[0]
    tail = parts[1] if len(parts) > 1 else ""
    child = next((f for f in dt.fields if f.name == head), None)
    if child is None:
        raise KeyError(f"Field '{head}' not found under path '{path}'.")
    return get_dt(tail, child.dataType)

def collect_validation_errors(df, target_schema: StructType, normalized_map: Dict[str, str]):
    """
    Build an array<string> of validation error labels by comparing RAW strings to target types.
    Uses normalized array columns when validating arrays (so it works even if source was scalar).
    """
    errs = [F.when(F.col("_corrupt_record").isNotNull(), F.lit("STRUCTURAL_CORRUPT"))]

    def walk(path: str, dt: DataType):
        nonlocal errs
        if isinstance(dt, StructType):
            for f in dt.fields:
                walk(f"{path}.{f.name}" if path else f.name, f.dataType)
        elif isinstance(dt, ArrayType):
            use_col = F.col(normalized_map.get(path, path))
            elem = dt.elementType
            if isinstance(elem, StructType):
                for ch in elem.fields:
                    errs.append(F.when(
                        exists_invalid_struct_field(use_col, ch),
                        F.lit(f"INVALID_ARRAY_ELEM_FIELD_{ch.dataType.simpleString()}:{path}.{ch.name}")
                    ))
            else:
                errs.append(F.when(
                    exists_invalid_primitive(use_col, elem),
                    F.lit(f"INVALID_ARRAY_ELEMENT_{elem.simpleString()}:{path}")
                ))
        else:
            raw_col = F.col(path)
            casted  = raw_col.cast(dt)
            errs.append(F.when(raw_col.isNotNull() & casted.isNull(),
                               F.lit(f"INVALID_{dt.simpleString()}:{path}")))
    walk("", target_schema)

    return F.array_remove(F.array(*[e for e in errs if e is not None]), None)

def typed_projection(path: str, dt: DataType, normalized_map: Dict[str, str]):
    """Produce a typed Column (recursively cast to target schema)."""
    if isinstance(dt, StructType):
        return F.struct(*[typed_projection(f"{path}.{f.name}", f.dataType, normalized_map).alias(f.name)
                          for f in dt.fields])
    if isinstance(dt, ArrayType):
        col_ref = F.col(normalized_map.get(path, path))
        elem = dt.elementType
        if isinstance(elem, StructType):
            return F.transform(col_ref, lambda e:
                               F.struct(*[(e[ch.name].cast(ch.dataType)).alias(ch.name) for ch in elem.fields]))
        else:
            return F.transform(col_ref, lambda x: x.cast(elem))
    return F.col(path).cast(dt)

def explode_paths(df, explode_plan: List[Dict[str, str]], normalized_map: Dict[str, str]):
    """Apply explode (outer/inner) for each configured array path in order."""
    for item in explode_plan or []:
        p = item["path"]
        alias = item.get("alias") or _sanitize(p)
        mode  = (item.get("explode_mode") or "outer").lower()
        norm_col = normalized_map[p]  # must exist
        if mode == "inner":
            df = df.withColumn(alias, F.explode(F.col(norm_col)))
        else:
            df = df.withColumn(alias, F.explode_outer(F.col(norm_col)))
    return df

def add_audit_cols(df, config: Dict[str, Any]):
    return (df
            .withColumn("source", F.lit(config.get("source_name", "unknown")))
            .withColumn("year",   F.lit(config["year"]))
            .withColumn("month",  F.lit(config["month"]))
            .withColumn("day",    F.lit(config["day"]))
            .withColumn("ingestion_timestamp", F.current_timestamp())
            .withColumn("file_path", F.input_file_name()))

def load_metadata(json_str_or_dict):
    """Accepts JSON string or dict; normalizes keys to a unified internal shape."""
    meta = json.loads(json_str_or_dict) if isinstance(json_str_or_dict, str) else dict(json_str_or_dict)

    # normalize 'select' entries: allow either source/alias or expr/as
    norm_select = []
    for it in meta.get("select", []):
        expr  = it.get("source") or it.get("expr")
        alias = it.get("alias") or it.get("as") or expr
        if not expr:
            raise ValueError("select item missing 'source'/'expr'")
        norm_select.append({"expr": expr, "as": alias})
    meta["select"] = norm_select

    # normalize 'explode' entries
    norm_explode = []
    for it in meta.get("explode", []):
        path = it["path"]
        alias = it.get("alias") or it.get("as") or _sanitize(path)
        norm_explode.append({
            "path": path,
            "alias": alias,
            "explode_mode": it.get("explode_mode", "outer"),
            "null_handling": it.get("null_handling", "empty_array")
        })
    meta["explode"] = norm_explode

    # raw evidence fields (optional)
    evid = meta.get("evidence", [])
    meta["evidence"] = evid if isinstance(evid, list) else []

    # optional partitions override
    parts = meta.get("partitions")
    if parts and isinstance(parts, list):
        meta["partitions"] = parts

    meta["version"] = meta.get("version", "1.0")
    return meta

# ---------------------------- Main entry point ---------------------------------

def process_xml(config: Dict[str, Any],
                target_schema: StructType,
                metadata: Dict[str, Any]):
    """
    Config:
      - landing_path (templated), xml_structured_path, corrupt_records_path
      - row_tag, read_mode ('PERMISSIVE'|'DROPMALFORMED'|'FAILFAST'), write_format ('parquet'|'delta')
      - partitions (list), source_name, year, month, day

    Metadata (JSON normalized via load_metadata):
      - explode: [{ "path": "<dot.path>", "alias": "<name>", "explode_mode": "outer|inner" }]
      - select:  [{ "expr": "<col or expr>", "as": "<alias>" }, ...]
      - evidence: ["<dot.path>", ...]  # raw fields to keep & cast-check (e.g., ["id"])
      - partitions: ["source","year","month","day"]  # optional override
    """
    # 1) Read raw as strings (shape-only schema)
    raw_schema = to_stringy_schema(target_schema)
    landing = config["landing_path"].format(**config)
    read_mode = config.get("read_mode", "PERMISSIVE")

    df_raw = (spark.read.format("com.databricks.spark.xml")
              .option("rowTag", config["row_tag"])
              .option("mode", read_mode)
              .option("columnNameOfCorruptRecord", "_corrupt_record")
              .schema(raw_schema)
              .load(landing))

    if "_corrupt_record" not in df_raw.columns:
        df_raw = df_raw.withColumn("_corrupt_record", F.lit(None).cast(StringType()))

    # 2) Normalize arrays to explode & capture evidence columns
    normalized_map = {}  # path -> normalized col name
    for item in (metadata.get("explode") or []):
        p = item["path"]
        arr_dt_target = get_dt(p, target_schema)
        if not isinstance(arr_dt_target, ArrayType):
            raise TypeError(f"explode path '{p}' is not ArrayType in target schema.")
        out_col = f"__norm__{_sanitize(p)}"
        df_raw = ensure_array(df_raw, p, ArrayType(to_stringy_schema(arr_dt_target.elementType), True), out_col)
        normalized_map[p] = out_col

    evidence_map = {}  # path -> raw col name
    for p in (metadata.get("evidence") or []):
        ev_col = f"__raw__{_sanitize(p)}"
        df_raw = df_raw.withColumn(ev_col, F.col(p))
        evidence_map[p] = ev_col

    # 3) Validation errors on RAW vs target types
    errors_col = collect_validation_errors(df_raw, target_schema, normalized_map)
    df_checked = (df_raw
                  .withColumn("_validation_errors", errors_col)
                  .withColumn("_has_validation_errors", F.size(errors_col) > 0))

    # 4) Typed projection (from RAW + normalized arrays)
    typed_cols = [typed_projection(f.name, f.dataType, normalized_map).alias(f.name)
                  for f in target_schema.fields]
    keep_cols = list(normalized_map.values()) + list(evidence_map.values()) + ["_validation_errors", "_has_validation_errors"]
    df_typed = df_checked.select(*typed_cols, *[F.col(c) for c in keep_cols])

    # 5) Generic cast-failure guard for evidence fields (raw non-null & typed null)
    empty_str_arr = F.expr("array()").cast(ArrayType(StringType()))
    bad_any = F.lit(False)
    cast_reasons = empty_str_arr
    for p, raw_col in evidence_map.items():
        cond = F.col(raw_col).isNotNull() & F.col(p).isNull()
        bad_any = bad_any | cond
        cast_reasons = F.concat(cast_reasons,
                                F.when(cond, F.array(F.lit(f"CAST_FAIL:{p}"))).otherwise(empty_str_arr))
    df_typed = (df_typed
                .withColumn("_bad_cast_any", bad_any)
                .withColumn("_cast_fail_reasons", F.array_distinct(cast_reasons)))

    # 6) Explode arrays & project final columns
    df_exp = explode_paths(df_typed, metadata.get("explode"), normalized_map)

    select_items = metadata.get("select") or [{"expr": f.name, "as": f.name} for f in target_schema.fields]
    proj_cols = [F.expr(item["expr"]).alias(item.get("as", item["expr"])) for item in select_items]
    df_out = df_exp.select(*proj_cols,
                           "_validation_errors", "_has_validation_errors",
                           "_bad_cast_any", "_cast_fail_reasons",
                           *[F.col(c) for c in evidence_map.values()])
    df_out = add_audit_cols(df_out, config)

    # 7) Final split & reasons
    is_corrupt = F.col("_has_validation_errors") | F.col("_bad_cast_any")
    reasons = F.array_distinct(F.concat(
        F.coalesce(F.col("_validation_errors"), empty_str_arr),
        F.coalesce(F.col("_cast_fail_reasons"), empty_str_arr)
    ))

    corrupt_df = (df_out.where(is_corrupt)
                  .withColumn("_corrupt_reasons", reasons)
                  .drop("_validation_errors", "_has_validation_errors", "_bad_cast_any", "_cast_fail_reasons"))

    # rename evidence columns to readable raw_<path>
    for p, raw_col in evidence_map.items():
        corrupt_df = corrupt_df.withColumn(f"raw_{_sanitize(p)}", F.col(raw_col)).drop(raw_col)

    valid_df = (df_out.where(~is_corrupt)
                .drop("_validation_errors", "_has_validation_errors", "_bad_cast_any", "_cast_fail_reasons",
                      *evidence_map.values()))

    # 8) Writes
    writer_fmt = config.get("write_format", "parquet")
    partitions = metadata.get("partitions") or config.get("partitions") or ["source","year","month","day"]
    out_struct  = config["xml_structured_path"].format(**config)
    out_corrupt = config["corrupt_records_path"].format(**config)

    if writer_fmt == "delta":
        (valid_df.write.format("delta").mode("append").partitionBy(*partitions).save(out_struct))
        if not corrupt_df.rdd.isEmpty():
            (corrupt_df.write.format("delta").mode("append").partitionBy(*partitions).save(out_corrupt))
    else:
        (valid_df.write.mode("append").partitionBy(*partitions).parquet(out_struct))
        if not corrupt_df.rdd.isEmpty():
            (corrupt_df.write.mode("append").partitionBy(*partitions).parquet(out_corrupt))

    # Preview
    print("=== VALID sample ===")
    valid_df.show(10, truncate=False)
    print("=== CORRUPT sample ===")
    corrupt_df.show(10, truncate=False)

    return valid_df, corrupt_df

# =============================== Example usage ===============================

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "16")

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025", "month": "09", "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "row_tag": "record",
    "read_mode": "PERMISSIVE",
    "write_format": "parquet",
    "partitions": ["source","year","month","day"]
}

TARGET_SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True)
])

# Professional JSON metadata
METADATA_JSON = """
{
  "version": "1.0",
  "explode": [
    { "path": "details.roles.role", "alias": "role", "explode_mode": "outer" }
  ],
  "select": [
    { "source": "id",                 "alias": "id" },
    { "source": "details.name",       "alias": "name" },
    { "source": "details.department", "alias": "department" },
    { "source": "role",               "alias": "role" }
  ],
  "evidence": ["id"],
  "partitions": ["source","year","month","day"]
}
"""
METADATA = load_metadata(METADATA_JSON)

# Run
valid_df, corrupt_df = process_xml(CONFIG, TARGET_SCHEMA, METADATA)


StatementMeta(demospark, 11, 8, Finished, Available, Finished)

=== VALID sample ===
+---+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|id |name      |department|role       |source     |year|month|day|ingestion_timestamp       |file_path                                                            |
+---+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|1  |John Doe  |Sales     |Manager    |source_name|2025|09   |07 |2025-09-07 12:10:12.730214|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|1  |John Doe  |Sales     |Analyst    |source_name|2025|09   |07 |2025-09-07 12:10:12.730214|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|2  |Jane Smith|Marketing |Coordinator|source_name|2025|09   |07 |2025-09-07 12:10:12.730214|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.

In [18]:
# ========================= XML Bronze Framework (no XSD) =========================
# Requires: com.databricks:spark-xml_2.12:0.18.0 (or compatible).

from pyspark.sql import functions as F
from pyspark.sql.types import *
from typing import Dict, List, Any
import json

# ---------------------------- Utilities -----------------------------------

def to_stringy_schema(dt: DataType) -> DataType:
    """Shape-only schema: same structure as target, but leaves are strings (arrays preserved)."""
    if isinstance(dt, StructType):
        return StructType([StructField(f.name, to_stringy_schema(f.dataType), True) for f in dt.fields])
    if isinstance(dt, ArrayType):
        return ArrayType(to_stringy_schema(dt.elementType), True)
    return StringType()

def _sanitize(path: str) -> str:
    """Turn a dot path into a safe column name."""
    return path.replace(".", "__").replace("[", "_").replace("]", "_")

def ensure_array(df, path: str, array_dt: ArrayType, out_col: str):
    """
    Normalize 'path' to ArrayType(array_dt.elementType) without scalar fallback.
    Avoids CASE branches that force Spark to type-check array->struct casts.
    """
    col_ref = F.col(path)
    empty_arr = F.expr("array()").cast(array_dt)
    norm = F.coalesce(col_ref.cast(array_dt), empty_arr)
    return df.withColumn(out_col, norm)


def exists_invalid_primitive(col_arr, elem_type: DataType):
    """True if any non-null array element fails cast to elem_type."""
    return F.exists(col_arr, lambda x: x.isNotNull() & x.cast(elem_type).isNull())

def exists_invalid_struct_field(col_arr, field: StructField):
    """True if any struct element has a non-null child that fails cast to the child's type."""
    return F.exists(col_arr, lambda e: e[field.name].isNotNull() & e[field.name].cast(field.dataType).isNull())

def get_dt(path: str, dt: DataType) -> DataType:
    """Walk a dot path on a StructType to return the nested DataType."""
    if path == "" or path is None:
        return dt
    if not isinstance(dt, StructType):
        raise KeyError(f"Path '{path}' not found in schema.")
    parts = path.split(".", 1)
    head = parts[0]
    tail = parts[1] if len(parts) > 1 else ""
    child = next((f for f in dt.fields if f.name == head), None)
    if child is None:
        raise KeyError(f"Field '{head}' not found under path '{path}'.")
    return get_dt(tail, child.dataType)

def collect_validation_errors(df, target_schema: StructType, normalized_map: Dict[str, str]):
    """
    Build an array<string> of validation error labels by comparing RAW strings to target types.
    Uses normalized array columns when validating arrays (so it works even if source was scalar).
    """
    errs = [F.when(F.col("_corrupt_record").isNotNull(), F.lit("STRUCTURAL_CORRUPT"))]

    def walk(path: str, dt: DataType):
        nonlocal errs
        if isinstance(dt, StructType):
            for f in dt.fields:
                walk(f"{path}.{f.name}" if path else f.name, f.dataType)
        elif isinstance(dt, ArrayType):
            use_col = F.col(normalized_map.get(path, path))
            elem = dt.elementType
            if isinstance(elem, StructType):
                for ch in elem.fields:
                    errs.append(F.when(
                        exists_invalid_struct_field(use_col, ch),
                        F.lit(f"INVALID_ARRAY_ELEM_FIELD_{ch.dataType.simpleString()}:{path}.{ch.name}")
                    ))
            else:
                errs.append(F.when(
                    exists_invalid_primitive(use_col, elem),
                    F.lit(f"INVALID_ARRAY_ELEMENT_{elem.simpleString()}:{path}")
                ))
        else:
            raw_col = F.col(path)
            casted  = raw_col.cast(dt)
            errs.append(F.when(raw_col.isNotNull() & casted.isNull(),
                               F.lit(f"INVALID_{dt.simpleString()}:{path}")))
    walk("", target_schema)

    return F.array_remove(F.array(*[e for e in errs if e is not None]), None)

def typed_projection(path: str, dt: DataType, normalized_map: Dict[str, str]):
    """Produce a typed Column (recursively cast to target schema)."""
    if isinstance(dt, StructType):
        return F.struct(*[typed_projection(f"{path}.{f.name}", f.dataType, normalized_map).alias(f.name)
                          for f in dt.fields])
    if isinstance(dt, ArrayType):
        col_ref = F.col(normalized_map.get(path, path))
        elem = dt.elementType
        if isinstance(elem, StructType):
            return F.transform(col_ref, lambda e:
                               F.struct(*[(e[ch.name].cast(ch.dataType)).alias(ch.name) for ch in elem.fields]))
        else:
            return F.transform(col_ref, lambda x: x.cast(elem))
    return F.col(path).cast(dt)

def explode_paths(df, explode_plan: List[Dict[str, str]], normalized_map: Dict[str, str]):
    """Apply explode (outer/inner) for each configured array path in order."""
    for item in explode_plan or []:
        p = item["path"]
        alias = item.get("alias") or _sanitize(p)
        mode  = (item.get("explode_mode") or "outer").lower()
        norm_col = normalized_map[p]  # must exist
        if mode == "inner":
            df = df.withColumn(alias, F.explode(F.col(norm_col)))
        else:
            df = df.withColumn(alias, F.explode_outer(F.col(norm_col)))
    return df

def add_audit_cols(df, config: Dict[str, Any]):
    return (df
            .withColumn("source", F.lit(config.get("source_name", "unknown")))
            .withColumn("year",   F.lit(config["year"]))
            .withColumn("month",  F.lit(config["month"]))
            .withColumn("day",    F.lit(config["day"]))
            .withColumn("ingestion_timestamp", F.current_timestamp())
            .withColumn("file_path", F.input_file_name()))

def load_metadata(json_str_or_dict):
    """Accepts JSON string or dict; normalizes keys to a unified internal shape."""
    meta = json.loads(json_str_or_dict) if isinstance(json_str_or_dict, str) else dict(json_str_or_dict)

    # normalize 'select' entries: allow either source/alias or expr/as
    norm_select = []
    for it in meta.get("select", []):
        expr  = it.get("source") or it.get("expr")
        alias = it.get("alias") or it.get("as") or expr
        if not expr:
            raise ValueError("select item missing 'source'/'expr'")
        norm_select.append({"expr": expr, "as": alias})
    meta["select"] = norm_select

    # normalize 'explode' entries
    norm_explode = []
    for it in meta.get("explode", []):
        path = it["path"]
        alias = it.get("alias") or it.get("as") or _sanitize(path)
        norm_explode.append({
            "path": path,
            "alias": alias,
            "explode_mode": it.get("explode_mode", "outer"),
            "null_handling": it.get("null_handling", "empty_array")
        })
    meta["explode"] = norm_explode

    # raw evidence fields (optional)
    evid = meta.get("evidence", [])
    meta["evidence"] = evid if isinstance(evid, list) else []

    # optional partitions override
    parts = meta.get("partitions")
    if parts and isinstance(parts, list):
        meta["partitions"] = parts

    meta["version"] = meta.get("version", "1.0")
    return meta

# ---------------------------- Main entry point ---------------------------------

def process_xml(config: Dict[str, Any],
                target_schema: StructType,
                metadata: Dict[str, Any]):
    """
    Config:
      - landing_path (templated), xml_structured_path, corrupt_records_path
      - row_tag, read_mode ('PERMISSIVE'|'DROPMALFORMED'|'FAILFAST'), write_format ('parquet'|'delta')
      - partitions (list), source_name, year, month, day

    Metadata (JSON normalized via load_metadata):
      - explode: [{ "path": "<dot.path>", "alias": "<name>", "explode_mode": "outer|inner" }]
      - select:  [{ "expr": "<col or expr>", "as": "<alias>" }, ...]
      - evidence: ["<dot.path>", ...]  # raw fields to keep & cast-check (e.g., ["id"])
      - partitions: ["source","year","month","day"]  # optional override
    """
    # 1) Read raw as strings (shape-only schema)
    raw_schema = to_stringy_schema(target_schema)
    landing = config["landing_path"].format(**config)
    read_mode = config.get("read_mode", "PERMISSIVE")

    df_raw = (spark.read.format("com.databricks.spark.xml")
              .option("rowTag", config["row_tag"])
              .option("mode", read_mode)
              .option("columnNameOfCorruptRecord", "_corrupt_record")
              .schema(raw_schema)
              .load(landing))

    if "_corrupt_record" not in df_raw.columns:
        df_raw = df_raw.withColumn("_corrupt_record", F.lit(None).cast(StringType()))

    # 2) Normalize arrays to explode & capture evidence columns
    normalized_map = {}  # path -> normalized col name
    for item in (metadata.get("explode") or []):
        p = item["path"]
        arr_dt_target = get_dt(p, target_schema)
        if not isinstance(arr_dt_target, ArrayType):
            raise TypeError(f"explode path '{p}' is not ArrayType in target schema.")
        out_col = f"__norm__{_sanitize(p)}"
        df_raw = ensure_array(df_raw, p, ArrayType(to_stringy_schema(arr_dt_target.elementType), True), out_col)
        normalized_map[p] = out_col

    evidence_map = {}  # path -> raw col name
    for p in (metadata.get("evidence") or []):
        ev_col = f"__raw__{_sanitize(p)}"
        df_raw = df_raw.withColumn(ev_col, F.col(p))
        evidence_map[p] = ev_col

    # 3) Validation errors on RAW vs target types
    errors_col = collect_validation_errors(df_raw, target_schema, normalized_map)
    df_checked = (df_raw
                  .withColumn("_validation_errors", errors_col)
                  .withColumn("_has_validation_errors", F.size(errors_col) > 0))

    # 4) Typed projection (from RAW + normalized arrays)
    typed_cols = [typed_projection(f.name, f.dataType, normalized_map).alias(f.name)
                  for f in target_schema.fields]
    keep_cols = list(normalized_map.values()) + list(evidence_map.values()) + ["_validation_errors", "_has_validation_errors"]
    df_typed = df_checked.select(*typed_cols, *[F.col(c) for c in keep_cols])

    # 5) Generic cast-failure guard for evidence fields (raw non-null & typed null)
    empty_str_arr = F.expr("array()").cast(ArrayType(StringType()))
    bad_any = F.lit(False)
    cast_reasons = empty_str_arr
    for p, raw_col in evidence_map.items():
        cond = F.col(raw_col).isNotNull() & F.col(p).isNull()
        bad_any = bad_any | cond
        cast_reasons = F.concat(cast_reasons,
                                F.when(cond, F.array(F.lit(f"CAST_FAIL:{p}"))).otherwise(empty_str_arr))
    df_typed = (df_typed
                .withColumn("_bad_cast_any", bad_any)
                .withColumn("_cast_fail_reasons", F.array_distinct(cast_reasons)))

    # 6) Explode arrays & project final columns
    df_exp = explode_paths(df_typed, metadata.get("explode"), normalized_map)

    select_items = metadata.get("select") or [{"expr": f.name, "as": f.name} for f in target_schema.fields]
    proj_cols = [F.expr(item["expr"]).alias(item.get("as", item["expr"])) for item in select_items]
    df_out = df_exp.select(*proj_cols,
                           "_validation_errors", "_has_validation_errors",
                           "_bad_cast_any", "_cast_fail_reasons",
                           *[F.col(c) for c in evidence_map.values()])
    df_out = add_audit_cols(df_out, config)

    # 7) Final split & reasons
    is_corrupt = F.col("_has_validation_errors") | F.col("_bad_cast_any")
    reasons = F.array_distinct(F.concat(
        F.coalesce(F.col("_validation_errors"), empty_str_arr),
        F.coalesce(F.col("_cast_fail_reasons"), empty_str_arr)
    ))

    corrupt_df = (df_out.where(is_corrupt)
                  .withColumn("_corrupt_reasons", reasons)
                  .drop("_validation_errors", "_has_validation_errors", "_bad_cast_any", "_cast_fail_reasons"))

    # rename evidence columns to readable raw_<path>
    for p, raw_col in evidence_map.items():
        corrupt_df = corrupt_df.withColumn(f"raw_{_sanitize(p)}", F.col(raw_col)).drop(raw_col)

    valid_df = (df_out.where(~is_corrupt)
                .drop("_validation_errors", "_has_validation_errors", "_bad_cast_any", "_cast_fail_reasons",
                      *evidence_map.values()))

    # 8) Writes
    writer_fmt = config.get("write_format", "parquet")
    partitions = metadata.get("partitions") or config.get("partitions") or ["source","year","month","day"]
    out_struct  = config["xml_structured_path"].format(**config)
    out_corrupt = config["corrupt_records_path"].format(**config)

    if writer_fmt == "delta":
        (valid_df.write.format("delta").mode("append").partitionBy(*partitions).save(out_struct))
        if not corrupt_df.rdd.isEmpty():
            (corrupt_df.write.format("delta").mode("append").partitionBy(*partitions).save(out_corrupt))
    else:
        (valid_df.write.mode("append").partitionBy(*partitions).parquet(out_struct))
        if not corrupt_df.rdd.isEmpty():
            (corrupt_df.write.mode("append").partitionBy(*partitions).parquet(out_corrupt))

    # Preview
    print("=== VALID sample ===")
    valid_df.show(10, truncate=False)
    print("=== CORRUPT sample ===")
    corrupt_df.show(10, truncate=False)

    return valid_df, corrupt_df

# =============================== Example usage ===============================

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "16")

CONFIG = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "source_name",
    "year": "2025", "month": "09", "day": "07",
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/sample.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "row_tag": "record",
    "read_mode": "PERMISSIVE",
    "write_format": "parquet",
    "partitions": ["source","year","month","day"]
}

TARGET_SCHEMA = StructType([
    StructField("id", IntegerType(), True),
    StructField("details", StructType([
        StructField("name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("roles", StructType([
            StructField("role", ArrayType(StringType()), True)
        ]), True)
    ]), True)
])

# Professional JSON metadata
METADATA_JSON = """
{
  "version": "1.0",
  "explode": [
    { "path": "details.roles.role", "alias": "role", "explode_mode": "outer" }
  ],
  "select": [
    { "source": "id",                 "alias": "id" },
    { "source": "details.name",       "alias": "name" },
    { "source": "details.department", "alias": "department" },
    { "source": "role",               "alias": "role" }
  ],
  "evidence": ["id"],
  "partitions": ["source","year","month","day"]
}
"""
METADATA = load_metadata(METADATA_JSON)

# Run
valid_df, corrupt_df = process_xml(CONFIG, TARGET_SCHEMA, METADATA)


StatementMeta(demospark, 11, 11, Finished, Available, Finished)

=== VALID sample ===
+---+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|id |name      |department|role       |source     |year|month|day|ingestion_timestamp       |file_path                                                            |
+---+----------+----------+-----------+-----------+----+-----+---+--------------------------+---------------------------------------------------------------------+
|1  |John Doe  |Sales     |Manager    |source_name|2025|09   |07 |2025-09-07 12:33:56.649778|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|1  |John Doe  |Sales     |Analyst    |source_name|2025|09   |07 |2025-09-07 12:33:56.649778|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.xml|
|2  |Jane Smith|Marketing |Coordinator|source_name|2025|09   |07 |2025-09-07 12:33:56.649778|abfss://ds-india@demodsindia.dfs.core.windows.net/SEMIDATA/sample.

In [19]:
# ====================== Invoices XML: CONFIG, TARGET_SCHEMA, METADATA ======================

# 1) CONFIG — point this at your file and set rowTag to "Invoice"
CONFIG_INVOICES = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "invoices_v1",
    "year": "2025", "month": "09", "day": "07",
    # Update this path to where you saved the XML shown in your message:
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/invoices.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "row_tag": "Invoice",
    "read_mode": "PERMISSIVE",
    "write_format": "parquet",
    "partitions": ["source","year","month","day"]
}

# 2) TARGET_SCHEMA — typed, with arrays/structs and XML attributes (_*) + text nodes (_VALUE)
from pyspark.sql.types import *

TARGET_SCHEMA_INVOICES = StructType([
    StructField("Header", StructType([
        StructField("InvoiceNumber", StringType(), True),
        StructField("InvoiceDate", DateType(), True),
        StructField("Buyer", StructType([
            StructField("Name", StringType(), True),
            StructField("Address", StructType([
                StructField("Line1", StringType(), True),
                StructField("City", StringType(), True),
                StructField("State", StringType(), True),
                StructField("Postcode", StringType(), True),
                StructField("Country", StringType(), True)
            ]), True)
        ]), True)
    ]), True),

    StructField("Body", StructType([
        StructField("Items", StructType([
            StructField("LineItem", ArrayType(StructType([
                StructField("_id", StringType(), True),                 # <LineItem id="...">
                StructField("ItemId", StringType(), True),
                StructField("Description", StringType(), True),
                StructField("Quantity", IntegerType(), True),
                StructField("Price", DecimalType(18, 2), True),

                StructField("Meta", StructType([
                    StructField("CreatedAt", TimestampType(), True),
                    StructField("UpdatedAt", TimestampType(), True),
                    StructField("Flags", StructType([
                        StructField("Flag", ArrayType(StructType([      # <Flag name="..." value="..."/>
                            StructField("_name", StringType(), True),
                            StructField("_value", BooleanType(), True)   # cast "true"/"false" to boolean
                        ])), True)
                    ]), True)
                ]), True),

                StructField("Taxes", StructType([
                    StructField("Tax", ArrayType(StructType([
                        StructField("Code", StringType(), True),
                        StructField("Amount", DecimalType(18, 3), True),           # 1.999, 0.500, 4.995
                        StructField("TaxBreakup", StructType([
                            StructField("Break", ArrayType(StructType([
                                StructField("Type", StringType(), True),           # "percentage"/"fixed"
                                StructField("Rate", DecimalType(18, 3), True)      # 10, 0.5, etc.
                            ])), True)
                        ]), True)
                    ])), True)
                ]), True),

                StructField("Attributes", StructType([
                    StructField("Attribute", ArrayType(StructType([               # <Attribute name="" code="">VALUE</Attribute>
                        StructField("_name", StringType(), True),
                        StructField("_code", StringType(), True),
                        StructField("_VALUE", StringType(), True)                 # element text
                    ])), True)
                ]), True),

                StructField("Packaging", StructType([
                    StructField("Boxes", StructType([
                        StructField("Box", ArrayType(StructType([
                            StructField("_id", StringType(), True),
                            StructField("Weight", DecimalType(18, 2), True),
                            StructField("Dimensions", StructType([
                                StructField("Length", IntegerType(), True),
                                StructField("Width", IntegerType(), True),
                                StructField("Height", IntegerType(), True)
                            ]), True),
                            StructField("Labels", StructType([
                                StructField("Label", ArrayType(StringType()), True)
                            ]), True)
                        ])), True)
                    ]), True)
                ]), True),

                StructField("Serials", StructType([
                    StructField("Serial", ArrayType(StringType()), True)
                ]), True)
            ]), True))
        ]), True)
    ]), True)
])

# 3) METADATA (JSON) — explode line items + taxes; select key columns
#    (Exploding more arrays can produce a cross-product; this keeps it tidy.)
METADATA_JSON_INVOICES = """
{
  "version": "1.0",
  "explode": [
    { "path": "Body.Items.LineItem",            "alias": "li",  "explode_mode": "outer" },
    { "path": "Body.Items.LineItem.Taxes.Tax",  "alias": "tax", "explode_mode": "outer" }
  ],
  "select": [
    { "source": "Header.InvoiceNumber",          "alias": "invoice_number" },
    { "source": "Header.InvoiceDate",            "alias": "invoice_date"   },
    { "source": "Header.Buyer.Name",             "alias": "buyer_name"     },
    { "source": "Header.Buyer.Address.Line1",    "alias": "addr_line1"     },
    { "source": "Header.Buyer.Address.City",     "alias": "addr_city"      },
    { "source": "Header.Buyer.Address.State",    "alias": "addr_state"     },
    { "source": "Header.Buyer.Address.Postcode", "alias": "addr_postcode"  },
    { "source": "Header.Buyer.Address.Country",  "alias": "addr_country"   },

    { "source": "li._id",                        "alias": "line_id"        },
    { "source": "li.ItemId",                     "alias": "item_id"        },
    { "source": "li.Description",                "alias": "description"    },
    { "source": "li.Quantity",                   "alias": "qty"            },
    { "source": "li.Price",                      "alias": "price"          },

    { "source": "tax.Code",                      "alias": "tax_code"       },
    { "source": "tax.Amount",                    "alias": "tax_amount"     },

    { "source": "size(li.Serials.Serial)",       "alias": "serial_count"   },
    { "source": "size(li.Packaging.Boxes.Box)",  "alias": "box_count"      }
  ],
  "evidence": ["Header.InvoiceNumber"],
  "partitions": ["source","year","month","day"]
}
"""

# If you used my previous framework code as-is, it already normalizes ALL arrays from the target schema,
# and validates/casts according to types above (including attributes & text nodes).

# 4) Run with your existing framework helpers:
METADATA_INVOICES = load_metadata(METADATA_JSON_INVOICES)

# Small tip: for attributes/text nodes, spark-xml defaults usually match our schema,
# but if you customized the reader, ensure attribute/value options are:
#   .option("attributePrefix", "_").option("valueTag", "_VALUE")
# In my framework, you can add them in the read() call if needed.

valid_df, corrupt_df = process_xml(CONFIG_INVOICES, TARGET_SCHEMA_INVOICES, METADATA_INVOICES)


StatementMeta(demospark, 11, 12, Finished, Available, Finished)

KeyError: "Path 'Taxes.Tax' not found in schema."

In [21]:
# ========================= XML Bronze Framework (no XSD) =========================
# Requires cluster/pool with: com.databricks:spark-xml_2.12:0.18.0 (or compatible)

from pyspark.sql import functions as F
from pyspark.sql.types import *
from typing import Dict, List, Any, Tuple
import json

# ---------------------------- Utilities -----------------------------------

def to_stringy_schema(dt: DataType) -> DataType:
    """Same shape as target, but every leaf is StringType (arrays preserved)."""
    if isinstance(dt, StructType):
        return StructType([StructField(f.name, to_stringy_schema(f.dataType), True) for f in dt.fields])
    if isinstance(dt, ArrayType):
        return ArrayType(to_stringy_schema(dt.elementType), True)
    return StringType()

def _sanitize(path: str) -> str:
    return path.replace(".", "__").replace("[", "_").replace("]", "_")

def ensure_array(df, path: str, array_dt: ArrayType, out_col: str):
    """
    Branch-free normalization to ArrayType(element). Assumes source already array-shaped
    (per reader schema). Avoids CASE branches that trigger type-analysis failures.
    """
    empty_arr = F.expr("array()").cast(array_dt)
    norm = F.coalesce(F.col(path).cast(array_dt), empty_arr)
    return df.withColumn(out_col, norm)

def exists_invalid_primitive(col_arr, elem_type: DataType):
    return F.exists(col_arr, lambda x: x.isNotNull() & x.cast(elem_type).isNull())

def exists_invalid_struct_field(col_arr, field: StructField):
    return F.exists(col_arr, lambda e: e[field.name].isNotNull() & e[field.name].cast(field.dataType).isNull())

def get_dt(path: str, dt: DataType) -> DataType:
    """
    Resolve a dot path on StructType/ArrayType. If current node is ArrayType,
    step into elementType transparently and continue.
    """
    if not path:
        return dt
    if isinstance(dt, ArrayType):
        return get_dt(path, dt.elementType)
    if not isinstance(dt, StructType):
        raise KeyError(f"Path '{path}' not found in schema.")
    if "." in path:
        head, tail = path.split(".", 1)
    else:
        head, tail = path, ""
    child = next((f for f in dt.fields if f.name == head), None)
    if child is None:
        raise KeyError(f"Field '{head}' not found under path '{path}'.")
    return get_dt(tail, child.dataType)

def list_array_paths(dt: DataType, base="") -> List[Tuple[str, ArrayType]]:
    """List ALL array paths in the schema."""
    paths = []
    if isinstance(dt, StructType):
        for f in dt.fields:
            sub = f"{base}.{f.name}" if base else f.name
            paths += list_array_paths(f.dataType, sub)
    elif isinstance(dt, ArrayType):
        paths.append((base, dt))
        paths += list_array_paths(dt.elementType, base)
    return paths

def top_level_array_paths(schema: StructType) -> List[str]:
    """Return array paths that are NOT nested under another array path."""
    all_arrays = [p for p, _ in list_array_paths(schema)]
    tops = []
    for p in all_arrays:
        is_nested = any(parent != p and p.startswith(parent + ".") for parent in all_arrays)
        if not is_nested:
            tops.append(p)
    return tops

def collect_validation_errors(df, target_schema: StructType, normalized_map: Dict[str, str]):
    """
    Build array<string> of validation errors by comparing RAW strings to target types.
    Uses normalized_map for pre-normalized arrays; falls back to raw columns otherwise.
    """
    errs = [F.when(F.col("_corrupt_record").isNotNull(), F.lit("STRUCTURAL_CORRUPT"))]

    def walk(path: str, dt: DataType):
        nonlocal errs
        if isinstance(dt, StructType):
            for f in dt.fields:
                walk(f"{path}.{f.name}" if path else f.name, f.dataType)
        elif isinstance(dt, ArrayType):
            use_col = F.col(normalized_map.get(path, path))
            elem = dt.elementType
            if isinstance(elem, StructType):
                for ch in elem.fields:
                    errs.append(F.when(
                        exists_invalid_struct_field(use_col, ch),
                        F.lit(f"INVALID_ARRAY_ELEM_FIELD_{ch.dataType.simpleString()}:{path}.{ch.name}")
                    ))
            else:
                errs.append(F.when(
                    exists_invalid_primitive(use_col, elem),
                    F.lit(f"INVALID_ARRAY_ELEMENT_{elem.simpleString()}:{path}")
                ))
        else:
            raw_col = F.col(path)
            casted  = raw_col.cast(dt)
            errs.append(F.when(raw_col.isNotNull() & casted.isNull(),
                               F.lit(f"INVALID_{dt.simpleString()}:{path}")))
    walk("", target_schema)

    return F.array_remove(F.array(*[e for e in errs if e is not None]), None)

def typed_projection(path: str, dt: DataType, normalized_map: Dict[str, str]):
    """
    Produce a typed Column (recursively cast). Uses normalized arrays when available,
    otherwise raw arrays from the reader schema (which are already array-shaped).
    """
    if isinstance(dt, StructType):
        return F.struct(*[typed_projection(f"{path}.{f.name}", f.dataType, normalized_map).alias(f.name)
                          for f in dt.fields])
    if isinstance(dt, ArrayType):
        col_ref = F.col(normalized_map.get(path, path))
        elem = dt.elementType
        if isinstance(elem, StructType):
            return F.transform(col_ref, lambda e:
                               F.struct(*[(e[ch.name].cast(ch.dataType)).alias(ch.name) for ch in elem.fields]))
        else:
            return F.transform(col_ref, lambda x: x.cast(elem))
    return F.col(path).cast(dt)

def explode_paths(df, explode_plan, normalized_map):
    """
    Explode each configured array in order.
    - If base path was pre-normalized (top-level), explode the normalized column.
    - If the path is nested under a previously exploded alias, rewrite to that alias.
    """
    alias_map = {}  # base path -> alias column name
    for item in (explode_plan or []):
        base_path = item["path"]                      # e.g. Body.Items.LineItem.Taxes.Tax
        alias     = item.get("alias") or base_path.replace(".", "__")
        mode      = (item.get("explode_mode") or "outer").lower()

        # Rewrite to use prior aliases when inside an already-exploded one
        effective = base_path
        for parent_path, parent_alias in alias_map.items():
            if effective.startswith(parent_path + "."):
                effective = parent_alias + effective[len(parent_path):]

        # Pick the column to explode
        if base_path in normalized_map:
            col_to_explode = F.col(normalized_map[base_path])
        else:
            col_to_explode = F.col(effective)

        df = df.withColumn(alias,
                           F.explode(col_to_explode) if mode == "inner" else F.explode_outer(col_to_explode))

        alias_map[base_path] = alias
    return df

def add_audit_cols(df, config: Dict[str, Any]):
    return (df
            .withColumn("source", F.lit(config.get("source_name", "unknown")))
            .withColumn("year",   F.lit(config["year"]))
            .withColumn("month",  F.lit(config["month"]))
            .withColumn("day",    F.lit(config["day"]))
            .withColumn("ingestion_timestamp", F.current_timestamp())
            .withColumn("file_path", F.input_file_name()))

def load_metadata(json_str_or_dict):
    meta = json.loads(json_str_or_dict) if isinstance(json_str_or_dict, str) else dict(json_str_or_dict)
    # select
    norm_select = []
    for it in meta.get("select", []):
        expr  = it.get("source") or it.get("expr")
        alias = it.get("alias") or it.get("as") or expr
        if not expr:
            raise ValueError("select item missing 'source'/'expr'")
        norm_select.append({"expr": expr, "as": alias})
    meta["select"] = norm_select
    # explode
    norm_explode = []
    for it in meta.get("explode", []):
        path = it["path"]
        alias = it.get("alias") or it.get("as") or _sanitize(path)
        norm_explode.append({"path": path, "alias": alias, "explode_mode": it.get("explode_mode", "outer")})
    meta["explode"] = norm_explode
    # evidence
    evid = meta.get("evidence", [])
    meta["evidence"] = evid if isinstance(evid, list) else []
    # partitions
    if "partitions" in meta and isinstance(meta["partitions"], list):
        meta["partitions"] = meta["partitions"]
    meta["version"] = meta.get("version", "1.0")
    return meta

# ---------------------------- Main processing ---------------------------------

def process_xml(config: Dict[str, Any],
                target_schema: StructType,
                metadata: Dict[str, Any]):

    raw_schema = to_stringy_schema(target_schema)
    landing = config["landing_path"].format(**config)
    read_mode = config.get("read_mode", "PERMISSIVE")

    # Reader: be explicit about attributes/text conventions
    df_raw = (spark.read.format("com.databricks.spark.xml")
              .option("rowTag", config["row_tag"])
              .option("mode", read_mode)
              .option("columnNameOfCorruptRecord", "_corrupt_record")
              .option("attributePrefix", "_")
              .option("valueTag", "_VALUE")
              .schema(raw_schema)
              .load(landing))

    if "_corrupt_record" not in df_raw.columns:
        df_raw = df_raw.withColumn("_corrupt_record", F.lit(None).cast(StringType()))

    # Pre-normalize ONLY top-level arrays to avoid array-of-array cast issues
    normalized_map = {}
    for p in top_level_array_paths(target_schema):           # e.g., "Body.Items.LineItem"
        arr_dt = get_dt(p, target_schema)                    # ArrayType(...)
        stringy_arr_dt = ArrayType(to_stringy_schema(arr_dt.elementType), True)
        out_col = f"__norm__{_sanitize(p)}"
        df_raw = ensure_array(df_raw, p, stringy_arr_dt, out_col)
        normalized_map[p] = out_col

    # Evidence (raw) fields to carry into corrupt output
    evidence_map = {}
    for p in (metadata.get("evidence") or []):
        ev_col = f"__raw__{_sanitize(p)}"
        df_raw = df_raw.withColumn(ev_col, F.col(p))
        evidence_map[p] = ev_col

    # Validation on RAW vs target types
    errors_col = collect_validation_errors(df_raw, target_schema, normalized_map)
    df_checked = (df_raw
                  .withColumn("_validation_errors", errors_col)
                  .withColumn("_has_validation_errors", F.size(errors_col) > 0))

    # Typed projection (from RAW + normalized arrays)
    typed_cols = [typed_projection(f.name, f.dataType, normalized_map).alias(f.name)
                  for f in target_schema.fields]
    keep_cols = list(normalized_map.values()) + list(evidence_map.values()) + ["_validation_errors", "_has_validation_errors"]
    df_typed = df_checked.select(*typed_cols, *[F.col(c) for c in keep_cols])

    # Explode per metadata (alias-aware)
    df_exp = explode_paths(df_typed, metadata.get("explode"), normalized_map)

    # Final projection
    select_items = metadata.get("select") or [{"expr": f.name, "as": f.name} for f in target_schema.fields]
    proj_cols = [F.expr(item["expr"]).alias(item.get("as", item["expr"])) for item in select_items]
    df_out = df_exp.select(*proj_cols, "_validation_errors", "_has_validation_errors",
                           *[F.col(c) for c in evidence_map.values()])
    df_out = add_audit_cols(df_out, config)

    # Split valid vs corrupt
    is_corrupt = F.col("_has_validation_errors")
    corrupt_df = (df_out.where(is_corrupt)
                  .withColumn("_corrupt_reasons",
                              F.array_distinct(F.coalesce(F.col("_validation_errors"), F.array())))
                  .drop("_validation_errors", "_has_validation_errors"))
    for p, raw_col in evidence_map.items():
        corrupt_df = corrupt_df.withColumn(f"raw_{_sanitize(p)}", F.col(raw_col)).drop(raw_col)

    valid_df = df_out.where(~is_corrupt).drop("_validation_errors", "_has_validation_errors", *evidence_map.values())

    # Writes
    writer_fmt = config.get("write_format", "parquet")
    partitions = metadata.get("partitions") or config.get("partitions") or ["source","year","month","day"]
    out_struct  = config["xml_structured_path"].format(**config)
    out_corrupt = config["corrupt_records_path"].format(**config)

    if writer_fmt == "delta":
        (valid_df.write.format("delta").mode("append").partitionBy(*partitions).save(out_struct))
        if not corrupt_df.rdd.isEmpty():
            (corrupt_df.write.format("delta").mode("append").partitionBy(*partitions).save(out_corrupt))
    else:
        (valid_df.write.mode("append").partitionBy(*partitions).parquet(out_struct))
        if not corrupt_df.rdd.isEmpty():
            (corrupt_df.write.mode("append").partitionBy(*partitions).parquet(out_corrupt))

    # Preview
    print("=== VALID sample ===")
    valid_df.show(10, truncate=False)
    display(valid_df)
    print("=== CORRUPT sample ===")
    corrupt_df.show(10, truncate=False)

    return valid_df, corrupt_df

# ====================== Invoices XML: CONFIG, TARGET_SCHEMA, METADATA ======================

CONFIG_INVOICES = {
    "container": "ds-india",
    "storage_account": "demodsindia",
    "source_name": "invoices_v1",
    "year": "2025", "month": "09", "day": "07",
    # Point to your XML file location:
    "landing_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/invoices.xml",
    "xml_structured_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/history/xml_structured/",
    "corrupt_records_path": "abfss://{container}@{storage_account}.dfs.core.windows.net/SEMIDATA/bronze/corrupt_records/source={source_name}/",
    "row_tag": "Invoice",
    "read_mode": "PERMISSIVE",
    "write_format": "parquet",
    "partitions": ["source","year","month","day"]
}

TARGET_SCHEMA_INVOICES = StructType([
    StructField("Header", StructType([
        StructField("InvoiceNumber", StringType(), True),
        StructField("InvoiceDate", DateType(), True),
        StructField("Buyer", StructType([
            StructField("Name", StringType(), True),
            StructField("Address", StructType([
                StructField("Line1", StringType(), True),
                StructField("City", StringType(), True),
                StructField("State", StringType(), True),
                StructField("Postcode", StringType(), True),
                StructField("Country", StringType(), True)
            ]), True)
        ]), True)
    ]), True),

    StructField("Body", StructType([
        StructField("Items", StructType([
            StructField("LineItem", ArrayType(StructType([
                StructField("_id", StringType(), True),                 # <LineItem id="...">
                StructField("ItemId", StringType(), True),
                StructField("Description", StringType(), True),
                StructField("Quantity", IntegerType(), True),
                StructField("Price", DecimalType(18, 2), True),

                StructField("Meta", StructType([
                    StructField("CreatedAt", TimestampType(), True),
                    StructField("UpdatedAt", TimestampType(), True),
                    StructField("Flags", StructType([
                        StructField("Flag", ArrayType(StructType([      # <Flag name="..." value="..."/>
                            StructField("_name", StringType(), True),
                            StructField("_value", BooleanType(), True)
                        ])), True)
                    ]), True)
                ]), True),

                StructField("Taxes", StructType([
                    StructField("Tax", ArrayType(StructType([
                        StructField("Code", StringType(), True),
                        StructField("Amount", DecimalType(18, 3), True),
                        StructField("TaxBreakup", StructType([
                            StructField("Break", ArrayType(StructType([
                                StructField("Type", StringType(), True),
                                StructField("Rate", DecimalType(18, 3), True)
                            ])), True)
                        ]), True)
                    ])), True)
                ]), True),

                StructField("Attributes", StructType([
                    StructField("Attribute", ArrayType(StructType([
                        StructField("_name", StringType(), True),
                        StructField("_code", StringType(), True),
                        StructField("_VALUE", StringType(), True)
                    ])), True)
                ]), True),

                StructField("Packaging", StructType([
                    StructField("Boxes", StructType([
                        StructField("Box", ArrayType(StructType([
                            StructField("_id", StringType(), True),
                            StructField("Weight", DecimalType(18, 2), True),
                            StructField("Dimensions", StructType([
                                StructField("Length", IntegerType(), True),
                                StructField("Width", IntegerType(), True),
                                StructField("Height", IntegerType(), True)
                            ]), True),
                            StructField("Labels", StructType([
                                StructField("Label", ArrayType(StringType()), True)
                            ]), True)
                        ])), True)
                    ]), True)
                ]), True),

                StructField("Serials", StructType([
                    StructField("Serial", ArrayType(StringType()), True)
                ]), True)
            ]), True))
        ]), True)
    ]), True)
])

METADATA_JSON_INVOICES = """
{
  "version": "1.0",
  "explode": [
    { "path": "Body.Items.LineItem",            "alias": "li",  "explode_mode": "outer" },
    { "path": "Body.Items.LineItem.Taxes.Tax",  "alias": "tax", "explode_mode": "outer" }
  ],
  "select": [
    { "source": "Header.InvoiceNumber",          "alias": "invoice_number" },
    { "source": "Header.InvoiceDate",            "alias": "invoice_date"   },
    { "source": "Header.Buyer.Name",             "alias": "buyer_name"     },
    { "source": "Header.Buyer.Address.Line1",    "alias": "addr_line1"     },
    { "source": "Header.Buyer.Address.City",     "alias": "addr_city"      },
    { "source": "Header.Buyer.Address.State",    "alias": "addr_state"     },
    { "source": "Header.Buyer.Address.Postcode", "alias": "addr_postcode"  },
    { "source": "Header.Buyer.Address.Country",  "alias": "addr_country"   },

    { "source": "li._id",                        "alias": "line_id"        },
    { "source": "li.ItemId",                     "alias": "item_id"        },
    { "source": "li.Description",                "alias": "description"    },
    { "source": "li.Quantity",                   "alias": "qty"            },
    { "source": "li.Price",                      "alias": "price"          },

    { "source": "tax.Code",                      "alias": "tax_code"       },
    { "source": "tax.Amount",                    "alias": "tax_amount"     },

    { "source": "size(li.Serials.Serial)",       "alias": "serial_count"   },
    { "source": "size(li.Packaging.Boxes.Box)",  "alias": "box_count"      }
  ],
  "evidence": ["Header.InvoiceNumber"],
  "partitions": ["source","year","month","day"]
}
"""

METADATA_INVOICES = load_metadata(METADATA_JSON_INVOICES)

# (Optional) Spark tuning
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "16")

# Run
valid_df, corrupt_df = process_xml(CONFIG_INVOICES, TARGET_SCHEMA_INVOICES, METADATA_INVOICES)


StatementMeta(demospark, 11, 14, Finished, Available, Finished)

=== VALID sample ===
+--------------+------------+------------+-------------+---------+----------+-------------+------------+--------+---------+------------+---+-----+--------+----------+------------+---------+-----------+----+-----+---+--------------------------+-----------------------------------------------------------------------+
|invoice_number|invoice_date|buyer_name  |addr_line1   |addr_city|addr_state|addr_postcode|addr_country|line_id |item_id  |description |qty|price|tax_code|tax_amount|serial_count|box_count|source     |year|month|day|ingestion_timestamp       |file_path                                                              |
+--------------+------------+------------+-------------+---------+----------+-------------+------------+--------+---------+------------+---+-----+--------+----------+------------+---------+-----------+----+-----+---+--------------------------+-----------------------------------------------------------------------+
|INV-1001      |2025-08-30  |Ac

SynapseWidget(Synapse.DataFrame, 1d2e2246-07e0-43b7-8ff9-a13e5ebe6a93)

=== CORRUPT sample ===
+--------------+------------+----------+----------+---------+----------+-------------+------------+-------+-------+-----------+---+-----+--------+----------+------------+---------+------+----+-----+---+-------------------+---------+----------------+-------------------------+
|invoice_number|invoice_date|buyer_name|addr_line1|addr_city|addr_state|addr_postcode|addr_country|line_id|item_id|description|qty|price|tax_code|tax_amount|serial_count|box_count|source|year|month|day|ingestion_timestamp|file_path|_corrupt_reasons|raw_Header__InvoiceNumber|
+--------------+------------+----------+----------+---------+----------+-------------+------------+-------+-------+-----------+---+-----+--------+----------+------------+---------+------+----+-----+---+-------------------+---------+----------------+-------------------------+
+--------------+------------+----------+----------+---------+----------+-------------+------------+-------+-------+-----------+---+-----+--------+---