In [None]:
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Large XML ETL Workflow (Automated Flow)\n",
    "\n",
    "This notebook demonstrates:\n",
    "- Splitting a large XML file into smaller files\n",
    "- Reading split XML files with Spark\n",
    "- Flattening and pivoting the data\n",
    "- Writing the result to a Delta table\n",
    "\n",
    "> **Adjust paths and parameters as needed for your environment!**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Split Large XML File into Smaller Files\n",
    "\n",
    "This cell splits a large XML file into smaller files for parallel Spark processing.\n",
    "If you already have split files, you can skip this step."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Databricks Python cell\n",
    "import xml.etree.ElementTree as ET\n",
    "import os\n",
    "\n",
    "input_file = '/dbfs/FileStore/large.xml'  # Path to your large XML file\n",
    "output_dir = '/dbfs/FileStore/orders_split'  # Output dir for split files\n",
    "chunk_size = 1000  # Orders per split file\n",
    "\n",
    "os.makedirs(output_dir, exist_ok=True)\n",
    "\n",
    "context = ET.iterparse(input_file, events=('end',))\n",
    "orders = []\n",
    "file_idx = 1\n",
    "\n",
    "for event, elem in context:\n",
    "    if elem.tag == 'order':\n",
    "        orders.append(ET.tostring(elem, encoding='unicode'))\n",
    "        if len(orders) == chunk_size:\n",
    "            with open(f'{output_dir}/orders_part_{file_idx}.xml', 'w', encoding='utf-8') as f:\n",
    "                f.write(f'<orders>\\n{\"\".join(orders)}\\n</orders>')\n",
    "            orders = []\n",
    "            file_idx += 1\n",
    "        elem.clear()\n",
    "\n",
    "if orders:\n",
    "    with open(f'{output_dir}/orders_part_{file_idx}.xml', 'w', encoding='utf-8') as f:\n",
    "        f.write(f'<orders>\\n{\"\".join(orders)}\\n</orders>')\n",
    "\n",
    "print(f\"Splitting complete. {file_idx} files created in {output_dir}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Read XML Files with Spark\n",
    "\n",
    "This cell reads all split XML files into a Spark DataFrame."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import functions as F\n",
    "\n",
    "xml_path = \"dbfs:/FileStore/orders_split/*.xml\"\n",
    "\n",
    "df = spark.read.format(\"xml\") \\\n",
    "    .option(\"rowTag\", \"order\") \\\n",
    "    .load(xml_path)\n",
    "\n",
    "df.printSchema()\n",
    "display(df.limit(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Flatten and Pivot the Data\n",
    "\n",
    "This cell flattens the nested structure and pivots product/qty pairs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.window import Window\n",
    "\n",
    "# Flatten\n",
    "df_flat = df \\\n",
    "    .withColumn(\"order_id\", F.col(\"id\")) \\\n",
    "    .withColumn(\"customer_name\", F.col(\"customer.name\")) \\\n",
    "    .withColumn(\"customer_email\", F.col(\"customer.email\")) \\\n",
    "    .withColumn(\"notes\", F.col(\"notes\")) \\\n",
    "    .withColumn(\"item\", F.explode_outer(\"items.item\")) \\\n",
    "    .withColumn(\"product\", F.col(\"item.product\")) \\\n",
    "    .withColumn(\"qty\", F.col(\"item.qty\")) \\\n",
    "    .select(\n",
    "        \"order_id\", \"customer_name\", \"customer_email\", \"product\", \"qty\", \"notes\"\n",
    "    )\n",
    "\n",
    "# Add row number per item in each order\n",
    "windowSpec = Window.partitionBy(\"order_id\").orderBy(F.lit(1))\n",
    "df_numbered = df_flat.withColumn(\"prod_num\", F.row_number().over(windowSpec))\n",
    "\n",
    "# Pivot columns\n",
    "max_products = 10  # Adjust as needed\n",
    "cols = []\n",
    "for i in range(1, max_products + 1):\n",
    "    cols.append(F.max(F.when(F.col(\"prod_num\") == i, F.col(\"product\"))).alias(f\"Product{i}\"))\n",
    "    cols.append(F.max(F.when(F.col(\"prod_num\") == i, F.col(\"qty\"))).alias(f\"Qty{i}\"))\n",
    "\n",
    "df_pivot = df_numbered.groupBy(\"order_id\", \"customer_name\", \"customer_email\", \"notes\").agg(*cols)\n",
    "\n",
    "# Select columns in desired order\n",
    "select_cols = [\"order_id\", \"customer_name\", \"customer_email\"]\n",
    "for i in range(1, max_products + 1):\n",
    "    select_cols.append(f\"Product{i}\")\n",
    "    select_cols.append(f\"Qty{i}\")\n",
    "select_cols.append(\"notes\")\n",
    "\n",
    "df_pivot = df_pivot.select(*select_cols)\n",
    "display(df_pivot.limit(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Write to Delta Table\n",
    "\n",
    "This cell writes the final DataFrame to a Delta table for analytics."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_pivot.write.format(\"delta\").mode(\"overwrite\").saveAsTable(\"orders_pivoted\")\n",
    "print(\"Data written to Delta table 'orders_pivoted'\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": ""
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
