In [0]:


%pip install pdfplumber pandas

import pdfplumber
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col
 
dbutils.library.restartPython()


In [0]:
%restart_python

In [0]:
import os
import pdfplumber
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import regexp_extract

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Define directory path
pdf_dir = "/Volumes/databricks_catalog/invoice_schema/pdf/"

# Get list of all PDF files in the directory
pdf_files = [f for f in os.listdir(pdf_dir) if f.lower().endswith(".pdf")]

# Extract text from each PDF file
data = []
for file_name in pdf_files:
    file_path = os.path.join(pdf_dir, file_name)
    with pdfplumber.open(file_path) as pdf:
        text = ""
        for page in pdf.pages:
            text += page.extract_text() or ""
        data.append(Row(file_name=file_name, raw_text=text))

# Create a Spark DataFrame from extracted data
pdf_df = spark.createDataFrame(data)

# View the raw data
display(pdf_df)

# Extract structured fields using regex
parsed_df = (
    pdf_df
    .withColumn("invoice_no", regexp_extract("raw_text", r"Invoice No:\s*([A-Z0-9-]+)", 1))
    .withColumn("vendor", regexp_extract("raw_text", r"Vendor:\s*([A-Za-z\s&]+)", 1))
    .withColumn("invoice_date", regexp_extract("raw_text", r"Date:\s*([\d-]+)", 1))
    .withColumn("bill_to", regexp_extract("raw_text", r"Bill To:\s*([A-Za-z0-9,\s]+)", 1))
    .withColumn("subtotal", regexp_extract("raw_text", r"Subtotal:\s*([\d,.]+)", 1))
    .withColumn("tax", regexp_extract("raw_text", r"Tax \(5%\):\s*([\d,.]+)", 1))
    .withColumn("total", regexp_extract("raw_text", r"Total Amount:\s*([\d,.]+)", 1))
    .withColumn("status", regexp_extract("raw_text", r"Payment Status:\s*([A-Za-z]+)", 1))
    .drop("raw_text")
)

# Step 6: Save structured data as Delta table
parsed_df.write.format("delta").mode("overwrite").save("/Volumes/databricks_catalog/invoice_schema/output")

# Save as a SQL table
parsed_df.write.format("delta").mode("overwrite").saveAsTable("databricks_catalog.invoice_schema.inovice_table")

# View & Query the structured invoice

display(spark.sql("SELECT * FROM databricks_catalog.invoice_schema.inovice_table"))