In [0]:
%pip install fpdf
%pip install faker

In [0]:
import os
import requests
from fpdf import FPDF
from faker import Faker

# Download Indie Flower font if not exists
font_url = "https://github.com/google/fonts/raw/main/ofl/indieflower/IndieFlower-Regular.ttf"
font_path = "IndieFlower-Regular.ttf"

if not os.path.exists(font_path):
    print("Downloading Indie Flower font...")
    r = requests.get(font_url)
    with open(font_path, 'wb') as f:
        f.write(r.content)

fake = Faker()

def create_invoice(filename, invoice_num):
    pdf = FPDF()
    pdf.add_page()

    # Add the font
    pdf.add_font('IndieFlower', '', font_path, uni=True)
    pdf.set_font("IndieFlower", size=20)
    pdf.cell(200, 12, "INVOICE", ln=True, align='C')

    pdf.set_font("IndieFlower", size=12)
    pdf.cell(200, 10, f"Invoice Number: {invoice_num}", ln=True)
    pdf.cell(200, 10, f"Date: {fake.date_this_year()}", ln=True)
    pdf.cell(200, 10, f"Billed To: {fake.name()}", ln=True)
    pdf.cell(200, 10, f"Address: {fake.address().replace(chr(10), ', ')}", ln=True)
    for i in range(3):
        item = fake.word().capitalize()
        price = fake.pyfloat(left_digits=2, right_digits=2, positive=True)
        pdf.cell(200, 10, f"Item {i+1}: {item} - ${price}", ln=True)
    pdf.cell(200, 10, f"Total Due: ${fake.pyfloat(left_digits=3, right_digits=2, positive=True)}", ln=True)
    pdf.cell(200, 10, "Thank you for your business!", ln=True)
    pdf.output(filename)

output_dir = "/Volumes/lakeflow_demo/bronze/landing/pdf/"
os.makedirs(output_dir, exist_ok=True)

for i in range(3):
    create_invoice(os.path.join(output_dir, f"invoice_{i+1}.pdf"), f"INV-2025-{100+i}")


In [0]:
from pyspark.sql.functions import ai_parse_document, from_json, schema_of_json, col, explode

# Step 1: Run ai_parse_document and get the JSON string
df = (
    spark.read.format("binaryFile")
    .load("/Volumes/lakeflow_demo/bronze/landing/pdf/")
    .withColumn("parsed", ai_parse_document("content"))
)

# Step 2: Infer schema from a sample row (cast to string)
sample_json = str(df.select("parsed").first()[0])
json_schema = schema_of_json(sample_json)

# Step 3: Parse the JSON string
df_parsed = df.withColumn(
    "parsed_json",
    from_json(col("parsed").cast("string"), json_schema)
)

# Step 4: Explode the document.elements array
df_elements = (
    df_parsed
    .withColumn("element", explode(col("parsed_json.document.elements")))
    .select(
        col("path").alias("file_path"),
        col("element.id").alias("element_id"),
        col("element.page_id"),
        col("element.type"),
        col("element.content")
    )
)

display(df_elements)

In [0]:
from pyspark.sql.functions import udf, col, explode, coalesce
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
import re

# Parse invoice header (element_id == 1)
def parse_invoice_header(text):
    if not text:
        return None
    result = {}
    patterns = {
        "invoice_number": r"Invoice Number: (.+)",
        "date": r"Date: (.+)",
        "billed_to": r"Billed To: (.+)",
        "address": r"Address: (.+)"
    }
    for k, p in patterns.items():
        match = re.search(p, text)
        result[k] = match.group(1).strip() if match else None
    return result

header_schema = StructType([
    StructField("invoice_number", StringType()),
    StructField("date", StringType()),
    StructField("billed_to", StringType()),
    StructField("address", StringType())
])

header_udf = udf(parse_invoice_header, header_schema)

df_headers = df_elements.filter(col("element_id") == 1) \
    .withColumn("header", header_udf(col("content"))) \
    .select("file_path", col("header.*"))

# Parse items and total due from element id 2
def parse_markdown_table_with_total(text):
    total_due = None
    if not text:
        return [], None
    lines = [line.strip() for line in text.split("\n") if line.strip()]
    header_line = None
    for line in lines:
        if re.match(r"\| *Item *\| *Description *\| *.*\|", line, re.I):
            header_line = line
            break
    if not header_line:
        return [], None
    headers = [h.strip() for h in header_line.strip("|").split("|")]
    price_index = None
    for idx, h in enumerate(headers):
        if h.lower() in ["price", "amount"]:
            price_index = idx
            break
    data_lines = [line for line in lines if line != header_line and not line.startswith("|---")]
    items = []
    for line in data_lines:
        parts = [p.strip() for p in line.strip("|").split("|")]
        if len(parts) == len(headers):
            if parts[0].lower() == "total due":
                total_due = parts[price_index].replace("$", "").strip()
            else:
                price_value = parts[price_index].replace("$", "").strip()
                # Extract digits only from item field (remove words like "Item")
                item_number = re.search(r"\d+", parts[0])
                item_number = item_number.group() if item_number else parts[0]
                items.append({
                    "item": item_number,
                    "description": parts[1],
                    "price": price_value
                })
    return items, total_due

item_schema = ArrayType(
    StructType([
        StructField("item", StringType()),
        StructField("description", StringType()),
        StructField("price", StringType())
    ])
)

result_schema = StructType([
    StructField("items", item_schema),
    StructField("total_due", StringType())
])

table_udf = udf(parse_markdown_table_with_total, result_schema)

df_table_parsed = df_elements.filter(col("element_id") == 2) \
    .withColumn("parsed", table_udf(col("content")))

df_items = df_table_parsed.select(
    "file_path",
    explode(col("parsed.items")).alias("item_struct"),
    col("parsed.total_due")
).select(
    "file_path",
    col("item_struct.item").alias("item"),
    col("item_struct.description"),
    col("item_struct.price"),
    col("total_due")
)

# Extract total due separately from element id 3 (if present)
df_total_due = df_elements.filter(col("element_id") == 3) \
    .select("file_path", col("content"))

def extract_total_due(text):
    if not text:
        return None
    m = re.search(r"Total Due: \$?([\d\.]+)", text)
    return m.group(1) if m else None

extract_total_due_udf = udf(extract_total_due, StringType())

df_total_due = df_total_due.withColumn("total_due_3", extract_total_due_udf(col("content")))

# Join items with headers on file_path to get header metadata per item row
df_item_header = df_items.join(df_headers, on="file_path", how="left")

# Join totalDue from element 3, prioritize it if present
df_final = df_item_header.join(df_total_due.select("file_path", "total_due_3"), on="file_path", how="left") \
    .withColumn("final_total_due", coalesce(col("total_due_3"), col("total_due"))) \
    .drop("total_due_3", "total_due")

# Select desired columns for display
df_final_display = df_final.select(
    "invoice_number",
    "date",
    "billed_to",
    "address",
    "item",
    "description",
    "price",
    "final_total_due"
)

display(df_final_display)
