In [0]:
%pip install lxml

Python interpreter will be restarted.
Collecting lxml
  Downloading lxml-5.2.2-cp39-cp39-manylinux_2_28_x86_64.whl (5.0 MB)
Installing collected packages: lxml
Successfully installed lxml-5.2.2
Python interpreter will be restarted.


In [0]:
#dbutils.fs.rm("/FileStore/tables/invoices", recurse=True)

Out[94]: True

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, DateType
from pyspark.sql.functions import collect_list, struct, lit, concat, input_file_name
import base64
import uuid
from lxml import etree
import datetime
import os
import shutil

In [0]:
# Clear the checkpoint directory before starting the stream
if os.path.exists(checkpoint_dir):
    shutil.rmtree(checkpoint_dir)

In [0]:
# Define the staging directory
staging_directory = "/FileStore/tables/invoices/staging"
input_directory = "/FileStore/tables/invoices"
checkpoint_directory = "/FileStore/tables/invoices/checkpoint"
processed_directory = "/FileStore/tables/invoices/processed"

In [0]:
# create directories if not exists
def create_directory_if_not_exists(path):
    try:
        dbutils.fs.ls(path)
        print(f"Directory {path} already exists.")
    except:
        dbutils.fs.mkdirs(path)
        print(f"Directory {path} created.")

In [0]:
# Create the input directory if it doesn't exist
create_directory_if_not_exists(input_directory)

Directory /FileStore/tables/invoices created.


In [0]:
# Create the staging directory if it doesn't exist
create_directory_if_not_exists(staging_directory)

Directory /FileStore/tables/invoices/staging created.


In [0]:
# Create the checkpoint directory if it doesn't exist
create_directory_if_not_exists(checkpoint_directory)

Directory /FileStore/tables/invoices/checkpoint created.


In [0]:
# Create the processed directory if it doesn't exist
create_directory_if_not_exists(processed_directory)

Directory /FileStore/tables/invoices/processed created.


In [0]:

# Define schema for incoming invoices
invoiceSchema = StructType([
    StructField("invoice_number", StringType(), True),
    StructField("account_number", StringType(), True),
    StructField("invoice_date", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_pincode", StringType(), True),
    StructField("company_name", StringType(), True),
    StructField("company_state", StringType(), True),
    StructField("company_city", StringType(), True),
    StructField("company_pincode", StringType(), True),
    StructField("vat_amount", DoubleType(), True),
    StructField("vat_breakdown_amount", DoubleType(), True),
    StructField("tax_exemption_code", StringType(), True),
    StructField("tax_exemption_amount", DoubleType(), True),
    StructField("invoice_line_amount", DoubleType(), True),
    StructField("price", DoubleType(), True),
    StructField("invoice_start_date", StringType(), True),
    StructField("invoice_end_date", StringType(), True),
    StructField("invoice_total_amount", DoubleType(), True)
])

In [0]:

# Define file path
file_path = "/FileStore/tables/invoices"

# Read the CSV file as a streaming DataFrame
invoices = spark.readStream \
    .option("header", "true") \
    .option("ignoreOlderFiles", "1d") \
    .schema(invoiceSchema) \
    .csv(file_path)


In [0]:
invoices = invoices.withColumn("input_file_name", input_file_name())

In [0]:
invoices.display()

invoice_number,account_number,invoice_date,customer_name,customer_state,customer_city,customer_pincode,company_name,company_state,company_city,company_pincode,vat_amount,vat_breakdown_amount,tax_exemption_code,tax_exemption_amount,invoice_line_amount,price,invoice_start_date,invoice_end_date,invoice_total_amount,input_file_name
123462836-SS,123-456-657,17-06-2024,John Mathew,Texas,Atlanta,10,Southern Investors,Texas,Atlanta,10,200.0,100.0,ES,50.0,150.0,300.0,17-05-2024,30-05-2024,350.0,dbfs:/FileStore/tables/invoices/invoice_data_2024061802400001.csv
123462837-SS,123-456-658,18-06-2024,Joseph Christ,Pebbas,North,9,Southern Investors,Texas,Atlanta,10,250.0,100.0,ES,50.0,200.0,400.0,18-05-2024,31-05-2024,450.0,dbfs:/FileStore/tables/invoices/invoice_data_2024061802400001.csv
123462837-SS,123-456-658,19-06-2024,Joseph Christ,Pebbas,North,9,Southern Investors,Texas,Atlanta,10,300.0,150.0,ES,50.0,200.0,400.0,19-05-2024,01-06-2024,450.0,dbfs:/FileStore/tables/invoices/invoice_data_2024061802400001.csv
123462838-SS,123-456-656,18-06-2024,Sara Mathew,Texas,Atlanta,10,Southern Investors,Texas,Atlanta,10,200.0,100.0,ES,50.0,150.0,300.0,17-05-2024,30-05-2024,350.0,dbfs:/FileStore/tables/invoices/invoice_data_2024061802440011.csv
123462839-SS,123-456-654,18-06-2024,Ali Christ,Pebbas,North,9,Southern Investors,Texas,Atlanta,10,250.0,100.0,ES,50.0,200.0,400.0,18-05-2024,31-05-2024,450.0,dbfs:/FileStore/tables/invoices/invoice_data_2024061802440011.csv
123462834-SS,123-456-653,18-06-2024,Ali Christ,Pebbas,North,9,Southern Investors,Texas,Atlanta,10,300.0,150.0,ES,50.0,200.0,400.0,19-05-2024,01-06-2024,450.0,dbfs:/FileStore/tables/invoices/invoice_data_2024061802440011.csv


In [0]:

# Generate unique identifiers and hash
invoices_with_ids = invoices.withColumn("unique_invoice_identifier", lit(str(uuid.uuid4()))) \
    .withColumn("invoice_hash", concat("invoice_number", lit("_"), "unique_invoice_identifier"))

In [0]:

def save_as_xml(df, epoch_id):
    try:
        # Collect data as list of rows
        grouped_df = df.groupBy("account_number", "invoice_number", "unique_invoice_identifier","invoice_hash").agg(
            collect_list(struct("*")).alias("rows")
        ).collect()
        processed_files = set()
        
        # Iterate through the grouped rows and create XML files
        for row in grouped_df:
            account_number = row["account_number"]
            unique_invoice_identifier = row["unique_invoice_identifier"]
            invoice_hash = base64.b64encode(row["invoice_hash"].encode()).decode()
            rows = row["rows"]

            root = etree.Element("invoice")
            
            etree.SubElement(root, "invoice_number").text = rows[0]["invoice_number"]
            etree.SubElement(root, "unique_invoice_identifier").text = unique_invoice_identifier
            etree.SubElement(root, "account_number").text = account_number
            etree.SubElement(root, "invoice_hash").text = invoice_hash
            etree.SubElement(root, "invoice_date").text = str(rows[0]["invoice_start_date"])

            customer_address = etree.SubElement(root, "customer_address")
            etree.SubElement(customer_address, "customer_name").text = rows[0]["customer_name"]
            etree.SubElement(customer_address, "customer_state").text = rows[0]["customer_state"]
            etree.SubElement(customer_address, "customer_city").text = rows[0]["customer_city"]

            company_address = etree.SubElement(root, "company_address")
            etree.SubElement(company_address, "company_name").text = rows[0]["company_name"]
            etree.SubElement(company_address, "company_state").text = rows[0]["company_state"]
            etree.SubElement(company_address, "company_city").text = rows[0]["company_city"]

            tax_amount = etree.SubElement(root, "tax_amount")
            etree.SubElement(tax_amount, "vat_amount").text = str(rows[0]["vat_amount"])
            etree.SubElement(tax_amount, "vat_breakdown_amount").text = str(rows[0]["vat_breakdown_amount"])
            etree.SubElement(tax_amount, "tax_exemption_code").text = rows[0]["tax_exemption_code"]
            etree.SubElement(tax_amount, "tax_exemption_amount").text = str(rows[0]["tax_exemption_amount"])

            for idx, row in enumerate(rows):
                invoice_line = etree.SubElement(root, "invoice_line")
                etree.SubElement(invoice_line, "invoice_line_identifier").text = str(idx + 1)
                etree.SubElement(invoice_line, "invoice_line_amount").text = str(row["invoice_line_amount"])
                etree.SubElement(invoice_line, "price").text = str(row["invoice_line_amount"])
                etree.SubElement(invoice_line, "invoice_start_date").text = str(row["invoice_start_date"])
                etree.SubElement(invoice_line, "invoice_end_date").text = str(row["invoice_end_date"])
                etree.SubElement(invoice_line, "invoice_total_amount").text = str(row["invoice_total_amount"])

            tree = etree.ElementTree(root)
            current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
            account_number_without_hyphen = account_number.replace("-", "")
            file_name = f"{account_number_without_hyphen}_{str(uuid.uuid4())}_{current_timestamp}.xml"
            output_path = f"/dbfs/FileStore/tables/invoices/staging/{file_name}"  # Use /dbfs prefix for DBFS paths
            
            # Debugging statement to ensure we're writing the file
            print(f"Writing XML to: {output_path}")
            
            # Ensure the directory exists before writing
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            
            with open(output_path, "wb") as f:
                tree.write(f, pretty_print=True, xml_declaration=True, encoding="UTF-8")

        # Track processed files
        for file_path in df.select("input_file_name").distinct().collect():
            processed_files.add(file_path["input_file_name"])
        
        # Move processed files to the processed directory
        for file_path in processed_files:
            file_name = os.path.basename(file_path)
            src_path = file_path.replace("dbfs:", "")
            dest_path = processed_directory
            dbutils.fs.mv(src_path, dest_path)
            print(f"Moved file {file_name} from {src_path} to {dest_path}")
                
        print(f"Batch {epoch_id} processed successfully.")
        
    except Exception as e:
        print(f"Error in save_as_xml for batch {epoch_id}: {str(e)}")

In [0]:
# Write the streaming DataFrame to the staging directory as XML files
query = invoices_with_ids.writeStream \
    .foreachBatch(save_as_xml) \
    .option("checkpointLocation", checkpoint_dir) \
    .start()

# Wait for the query to process data
query.awaitTermination()

Writing XML to: /dbfs/FileStore/tables/invoices/staging/123456658_6ff05610-bdb6-4c2e-80f2-e9ff9d57d8d3_20240618095446.xml
Writing XML to: /dbfs/FileStore/tables/invoices/staging/123456657_3d45aee6-30e9-4036-b7b3-a1493a121fc0_20240618095446.xml
Moved file invoice_data_2024061802400001.csv from /FileStore/tables/invoices/invoice_data_2024061802400001.csv to /FileStore/tables/invoices/processed
Batch 0 processed successfully.
Writing XML to: /dbfs/FileStore/tables/invoices/staging/123456653_116bb1c6-08ba-4582-8ed0-7f7a000acbf0_20240618095537.xml
Writing XML to: /dbfs/FileStore/tables/invoices/staging/123456654_f2de3322-63d3-455b-a7ec-bcade9d7c3ed_20240618095537.xml
Writing XML to: /dbfs/FileStore/tables/invoices/staging/123456656_135cf522-84f1-406a-b6eb-53cb555b6d24_20240618095537.xml
Moved file invoice_data_2024061802440011.csv from /FileStore/tables/invoices/invoice_data_2024061802440011.csv to /FileStore/tables/invoices/processed
Batch 1 processed successfully.


In [0]:
query.stop()