#### Paramters - Databricks Widgets: 
For more information about widgets see [Databricks Widgets Documentation](https://docs.databricks.com/en/notebooks/widgets.html)

Exemplatory notebook: [widgets](./widgets)

In [0]:
catalogs_df = spark.sql("SHOW CATALOGS")
catalog_names: list[str] = [row.catalog for row in catalogs_df.collect()]
dbutils.widgets.dropdown(
    name="catalog_name",
    defaultValue=catalog_names[0] if catalog_names else "",
    choices=catalog_names if catalog_names else [""],
    label="Catalog Name",
)
dbutils.widgets.text(
    name="schema_name",
    defaultValue="",
    label="Schema Name",
)
dbutils.widgets.text(
    name="volume_name",
    defaultValue="",
    label="Volume Name",
)
CATALOG_NAME: str = dbutils.widgets.get("catalog_name")
SCHEMA_NAME: str = dbutils.widgets.get("schema_name")
VOLUME_NAME: str = dbutils.widgets.get("volume_name")

#### ! Make sure the output-volume exists.

For more information about creating volumes see [Databricks Volumes Documentation](https://docs.databricks.com/en/volumes/index.html)

Exemplatory notebook: [volumes](./volumes)

## Exporting Table Data to XML

Exports can be made on multiple platforms and methods, including:

- **PySpark DataFrameWriter**: Use `.write.format("xml")` to export DataFrames to XML.
- **Databricks SQL**: Use the `TO_XML()` function to convert struct or variant columns to XML strings.
- **Scala DataFrameWriter**: Similar to the PySpark DataFrameWriter.
- **R DataFrameWriter**: Use `saveDF()` with XML format in R notebooks.

Each method supports options for customizing XML output, such as `rootTag` and `rowTag`.

Exports normally take place in the final layer, like the `gold` or `product` layer.

![Export architecture diagram](../resources/export-architecture.png)

## PySpark export example

1. **Read the table into a DataFrame**  
   Use Spark to load your table:  
   ```python
   data_frame = spark.table("catalog_name.schema_name.table_name")
   ```

2. **Write the DataFrame as XML**  
   Use the XML data source:  
   ```python
   data_frame.write \
       .format("xml") \
       .options(rootTag="trips", rowTag="trip") \
       .mode("overwrite") \
       .save("/Volumes/catalog_name/schema_name/volume_name/output.xml")
   ```

- Adjust `rootTag` and `rowTag` for your schema.
- The output XML file will be saved to your specified Unity Catalog volume.

See [Spark XML Documentation](https://spark.apache.org/docs/latest/sql-data-sources-xml.html) for details.

In [0]:
import os
def get_first_xml_file(directory_path: str) -> str:
    """Return the full path of the first XML file found in the given directory."""
    for file_name in os.listdir(directory_path):
        if file_name.lower().endswith(".xml"):
            return os.path.join(directory_path, file_name)
    raise FileNotFoundError("No XML file found in the specified directory.")

def displayXML(xml_path: str) -> None:    
    """Show the content of an XML file"""
    if xml_path.lower().endswith(".xml"):
        selected_xml_file_path: str = xml_path
    else:
        selected_xml_file_path: str = get_first_xml_file(xml_path)
    with open(selected_xml_file_path, "r", encoding="utf-8") as xml_file:
        raw_xml_content: str = "".join([xml_file.readline() for _ in range(100)])
    print(raw_xml_content)

In [0]:
# This cell illustrates how a complete table can be exported to XML based on a simple structure.

from pyspark.sql import DataFrame

output_filename: str = "nyctaxi_trips"

print("Selecting data from samples.nyctaxi.trips table...")
nyctaxi_trips_df: DataFrame = spark.sql("SELECT * FROM samples.nyctaxi.trips")
print("✅ Data selected.")

volume_path: str = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}/{output_filename}"
print(f"Writing XML data to volume: {volume_path} ...")
nyctaxi_trips_df.write \
  .format("xml") \
  .options(rootTag="trips", rowTag="trip") \
  .mode("overwrite") \
  .save(volume_path)
print("✅ XML export complete.")

# Displaying the xml file to showcase the result.
displayXML(volume_path)

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import collect_list, struct, col, substring

output_foldername: str = "bakehouse_franchises_complex"
volume_path: str = (
    f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}/{output_foldername}"
)

# Base tables
transactions_df: DataFrame = (
    spark.table("samples.bakehouse.sales_transactions")
    .select(
        col("transactionID"),
        col("dateTime"),
        col("quantity"),
        col("product"),
        col("customerID"),
        col("franchiseID"),
    )
)
customers_df: DataFrame = (
    spark.table("samples.bakehouse.sales_customers")
    .select(
        col("customerID"),
        col("last_name").alias("customer_name"),
        col("email_address"),
        col("phone_number"),
    )
    .filter(col("email_address").isNotNull())
)
franchises_df: DataFrame = (
    spark.table("samples.bakehouse.sales_franchises")
    .select(
        col("franchiseID"),
        col("name").alias("franchise_name"),
        col("city"),
        col("district"),
        col("zipcode"),
        col("country"),
        col("size"),
        col("longitude"),
        col("latitude"),
        col("supplierID"),
    )
)

# Enrich transactions with nested children
tx_nested_df: DataFrame = (
    transactions_df.alias("t")
    .join(customers_df.alias("c"), col('t.customerID').substr(2, 100) == col('c.customerID').substr(2, 100), how="inner")
    .join(franchises_df.alias("f"), on="franchiseID", how="inner")
    .select(
        col("t.transactionID").alias("transactionID"),
        col("t.dateTime").alias("dateTime"),
        col("t.quantity").alias("quantity"),
        col("t.product").alias("product"),
        struct(
            col("c.customerID").alias("id"),
            col("c.customer_name").alias("name"),
            struct(
                col("c.email_address").alias("email"),
                col("c.phone_number").alias("phone"),
            ).alias("contact_info"),
        ).alias("customer"),
        struct(
            col("f.franchiseID").alias("id"),
            col("f.franchise_name").alias("name"),
            col("f.city").alias("city"),
            col("f.district").alias("district"),
            col("f.zipcode").alias("zipcode"),
            col("f.country").alias("country"),
            col("f.size").alias("size"),
            col("f.longitude").alias("longitude"),
            col("f.latitude").alias("latitude"),
            col("f.supplierID").alias("supplierID"),
        ).alias("franchise"),
    )
)

# Write nested XML
(
    tx_nested_df.write
    .format("xml")
    .options(rootTag="transactions", rowTag="transaction")
    .mode("overwrite")
    .save(volume_path)
)

print("✅ XML export complete.")

# Displaying the xml file to showcase the result.
displayXML(volume_path)

## Databricks SQL Example

1. **Read the table using SQL**  
   Use a SQL query to select your data:  

   ```sql
   SELECT * FROM :catalog_name.:schema_name.:table_name
   ```

2. **Export the data to XML using TO_XML()**  
   Use the `TO_XML()` function to convert rows to XML strings:  

   ```sql
   SELECT TO_XML(
       struct(*),
       'trips',    -- rootTag
       'trip'      -- rowTag
   ) AS xml_data
   FROM :catalog_name.:schema_name.:table_name
   ```

3. **Save the XML output to a Unity Catalog volume**  
   Use the `COPY INTO` command to write the XML data to your volume:  
   
   ```sql
   COPY INTO '/Volumes/:catalog_name/:schema_name/:volume_name/output.xml'
   FROM (
       SELECT TO_XML(
           struct(*),
           'trips',
           'trip'
       ) AS xml_data
       FROM :catalog_name.:schema_name.:table_name
   )
   FILEFORMAT = TEXT
   ```
   
- Adjust `rootTag` and `rowTag` for your schema.
- The output XML file will be saved to your specified Unity Catalog volume.

See [TO_XML() documentation](https://docs.databricks.com/en/sql/language-manual/functions/to_xml.html) for details.

In [0]:
%sql

DECLARE OR REPLACE xml_output_path STRING;
SET VAR xml_output_path = CONCAT('dbfs:/Volumes/',:catalog_name,'/',:schema_name,'/',:volume_name,'/bakehouse_franchises_complex_sql');

DECLARE OR REPLACE sql_fqn STRING;
SET VAR sql_fqn = CONCAT(:catalog_name, '.', :schema_name, '._tmp_xml_output');

-- Create temporary table to simplify parameterization.
CREATE OR REPLACE TABLE IDENTIFIER(sql_fqn)
AS 
SELECT
        TO_XML(
            named_struct(
            'transactionID', t.transactionID,
            'dateTime',      t.dateTime,
            'quantity',      t.quantity,
            'product',       t.product,
            'customer', named_struct(
                'id',           c.customerID,
                'name',         c.last_name,
                'contact_info', named_struct('email', c.email_address, 'phone', c.phone_number)
            ),
            'franchise', named_struct(
                'id',         f.franchiseID,
                'name',       f.name,
                'city',       f.city,
                'district',   f.district,
                'zipcode',    f.zipcode,
                'country',    f.country,
                'size',       f.size,
                'longitude',  f.longitude,
                'latitude',   f.latitude,
                'supplierID', f.supplierID
            )
            ),
            map('rootTag','transaction')
        ) AS xml_row
        FROM samples.bakehouse.sales_transactions t
        JOIN samples.bakehouse.sales_customers c
            ON substr(cast(t.customerID AS string), 2) = substr(cast(c.customerID AS string), 2)
        JOIN samples.bakehouse.sales_franchises f
            ON t.franchiseID = f.franchiseID
        WHERE c.email_address IS NOT NULL
        LIMIT 100
;

-- Write temp table to txt file
DECLARE OR REPLACE liststmt STRING;
SET VAR liststmt = CONCAT('
    INSERT OVERWRITE DIRECTORY "',xml_output_path,'"
    USING TEXT
    SELECT xml_row 
    FROM IDENTIFIER(sql_fqn)
    ;');
execute immediate liststmt ;

-- Drop temp table
DROP TABLE IF EXISTS IDENTIFIER(sql_fqn);

-- Verify file creation
SET VAR liststmt = CONCAT('LIST "', xml_output_path, '"');
execute immediate liststmt ;

