In [0]:
%pip install PyMuPDF

dbutils.library.restartPython()

In [0]:
import sys

PROJECT_ROOT = "/Workspace/scribe_sumgen"

if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

print(f"âœ… Python path configured: {PROJECT_ROOT}")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import mlflow
import sys
import os
sys.path.append(os.path.abspath('src'))  # Ensure src is in the Python path

# Schema for parsed documents
parsed_schema = StructType([
    StructField("text", StringType(), True),
    StructField("sections", StringType(), True),  # JSON string
    StructField("tables", StringType(), True),     # JSON string
    StructField("metadata", StringType(), True),   # JSON string
    StructField("parse_quality_score", DoubleType(), True),
    StructField("parse_errors", StringType(), True)
])

# Register parsing UDF
from src.parsers.pdf_parser import ClinicalPDFParser 

@udf(returnType=parsed_schema)
def parse_clinical_pdf(pdf_bytes):
    """Parse PDF bytes and return structured output"""
    import tempfile
    import os
    import json

    parser = ClinicalPDFParser()

    try:
        # Write bytes to temp file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
            tmp.write(pdf_bytes)
            tmp_path = tmp.name

        # Parse PDF
        result = parser.parse_pdf(tmp_path)

        # Calculate quality score
        quality_score = calculate_parse_quality(result)

        # Clean up
        os.unlink(tmp_path)

        return {
            'text': result['text'],
            'sections': json.dumps(result['sections']),
            'tables': json.dumps(result['tables']),
            'metadata': json.dumps(result['metadata']),
            'parse_quality_score': quality_score,
            'parse_errors': None
        }

    except Exception as e:
        return {
            'text': None,
            'sections': None,
            'tables': None,
            'metadata': None,
            'parse_quality_score': 0.0,
            'parse_errors': str(e)
        }

def calculate_parse_quality(parsed_data):
    """Calculate quality score based on completeness"""
    score = 0.0

    # Has text
    if parsed_data['text'] and len(parsed_data['text']) > 100:
        score += 0.4

    # Has sections
    if len(parsed_data['sections']) > 0:
        score += 0.3

    # Has tables
    if len(parsed_data['tables']) > 0:
        score += 0.2

    # Has metadata
    if parsed_data['metadata']['num_pages'] > 0:
        score += 0.1

    return score

# AutoLoader streaming ingestion
raw_documents = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("cloudFiles.schemaLocation", "/Volumes/healthcare_catalog/raw_zone/checkpoints/schema")
    .option("cloudFiles.maxFilesPerTrigger", 5)
    .option("pathGlobFilter", "*.pdf")
    .load("/Volumes/healthcare_catalog/raw_zone/encounter_pdfs/")
)

# Parse documents in streaming fashion
parsed_documents = (raw_documents
    .withColumn("document_id", F.expr("uuid()"))
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("parsed_data", parse_clinical_pdf(F.col("content")))
    .select(
        "document_id",
        "path",
        "ingestion_timestamp",
        "parsed_data.*"
    )
)

# Write to Delta Lake (Bronze layer)
(parsed_documents.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/healthcare_catalog/raw_zone/checkpoints/parsed_docs")
    .trigger(availableNow=True)
    .toTable("healthcare_catalog.raw_zone.parsed_documents")
)


In [0]:
%sql
SELECT * FROM healthcare_catalog.raw_zone.parsed_documents