In [0]:
%pip install --quiet databricks-sdk httpx PyMuPDF openai
dbutils.library.restartPython()

In [0]:
import base64
import fitz 
import pandas as pd
import os
import glob
from pathlib import Path
import time
import random
import threading
from collections import deque
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql.functions import col, concat, lit, regexp_replace, split
from tqdm import tqdm


In [0]:
dbutils.widgets.text(
    "volume_path",
    "/Volumes/tsfrt/gsa/performance",
    label="Path to volume containing documents",
)

# MLflow experiment name.
dbutils.widgets.text(
    "output_schema",
    "tsfrt.gsa",
    label="Catalog and schema name for output table ({catalog}.{schema})",
)

dbutils.widgets.text(
    "output_catalog",
    "tsfrt",
    label="table for final output with embeddings",
)

dbutils.widgets.text(
    "output_schema",
    "gsa",
    label="table for final output with embeddings",
)

dbutils.widgets.text(
    "output_table",
    "document_base",
    label="table for final output with embeddings",
)

dbutils.widgets.text(
    "embedding_model",
    "databricks-gte-large-en",
    label="embedding model to use",
)

dbutils.widgets.text(
    "foundation_model",
    "databricks-llama-4-maverick",
    label="foundation model used for doc parsing",
)



In [0]:
# UPDATE THESE PATHS FOR YOUR SETUP
OUTPUT_CTLG = dbutils.widgets.get("output_catalog")
OUTPUT_SCHEMA = dbutils.widgets.get("output_schema")
OUTPUT_TABLE = dbutils.widgets.get("output_table")

PDF_DIRECTORY = dbutils.widgets.get("volume_path")
OUTPUT_CATALOG = f"{OUTPUT_CTLG}.{OUTPUT_SCHEMA}"

# You can choose the processing mode:
# "combined" - All PDFs go into one table with doc_id to distinguish (recommended)
# "separate" - Each PDF gets its own table
PROCESSING_MODE = "combined"  # or "separate"

# Table naming
if PROCESSING_MODE == "combined":
    INTERMEDIATE_TABLE = f"{OUTPUT_CATALOG}.all_pdfs_parsed_intermediate"
    FINAL_TABLE = f"{OUTPUT_CATALOG}.all_pdfs_parsed"
else:
    # For separate mode, tables will be named dynamically per PDF
    pass

context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = context.apiUrl().get()
DATABRICKS_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
DATABRICKS_BASE_URL = f'{workspace_url}/serving-endpoints/'

print(f"📂 PDF Directory: {PDF_DIRECTORY}")
print(f"📊 Output Catalog: {OUTPUT_CATALOG}")
print(f"🔧 Processing Mode: {PROCESSING_MODE}")
if PROCESSING_MODE == "combined":
    print(f"💾 Final Table: {FINAL_TABLE}")

In [0]:
def count_pdf_pages_fitz(directory=".", show_details=True):
    """
    Count pages in all PDF files in a directory using PyMuPDF (fitz)
    
    Args:
        directory (str): Directory path to scan for PDFs
        show_details (bool): Whether to show individual file counts
    
    Returns:
        tuple: (total_pages, file_count, errors)
    """
    pdf_files = list(Path(directory).glob("*.pdf"))
    
    if not pdf_files:
        print(f"No PDF files found in '{directory}'")
        return 0, 0, 0
    
    total_pages = 0
    file_count = 0
    errors = 0
    
    print(f"Scanning {len(pdf_files)} PDF files in '{directory}'...\n")
    
    for pdf_file in pdf_files:
        try:
            # Open PDF with fitz
            doc = fitz.open(pdf_file)
            pages = doc.page_count
            doc.close()
            
            if show_details:
                print(f"{pdf_file.name:<50} {pages:>6} pages")
            
            total_pages += pages
            file_count += 1
            
        except Exception as e:
            print(f"ERROR - {pdf_file.name}: {e}")
            errors += 1
    print("\n")
    return total_pages, file_count, errors

In [0]:

total_pages, num_documents, errors = (count_pdf_pages_fitz(PDF_DIRECTORY, show_details=True))

print(f"Total {num_documents} PDFs found with a total of {total_pages} pages. {errors} errors.")

In [0]:
def get_pdf_files(directory_path):
    """
    Get all PDF files from a Unity Catalog volume directory.
    
    Args:
        directory_path: Path to directory containing PDFs
        
    Returns:
        List of PDF file paths
    """
    try:
        # List all files in the directory
        files = dbutils.fs.ls(directory_path)
        
        # Filter for PDF files and clean the paths
        pdf_files = []
        for file in files:
            if file.path.lower().endswith('.pdf'):
                # Remove 'dbfs:' prefix if present to work with PyMuPDF
                clean_path = file.path.replace('dbfs:', '') if file.path.startswith('dbfs:') else file.path
                pdf_files.append(clean_path)
        
        print(f"Found {len(pdf_files)} PDF files in {directory_path}")
        for pdf in pdf_files:
            file_name = os.path.basename(pdf)
            print(f"  - {file_name}")
            print(f"    Path: {pdf}")
            
        return pdf_files
        
    except Exception as e:
        print(f"Error accessing directory {directory_path}: {str(e)}")
        return []

def get_clean_doc_name(pdf_path):
    """Extract a clean document name from the PDF path for table naming."""
    file_name = os.path.basename(pdf_path)
    # Remove .pdf extension and clean up for table naming
    clean_name = file_name.replace('.pdf', '').replace('.PDF', '')
    # Replace special characters with underscores
    clean_name = ''.join(c if c.isalnum() else '_' for c in clean_name)
    # Remove consecutive underscores and strip
    clean_name = '_'.join(filter(None, clean_name.split('_')))
    return clean_name.lower()

In [0]:
def convert_pdf_to_base64(pdf_path, dpi=300):
    """
    PDF conversion with better metadata and error handling.
    
    Args:
        pdf_path: Path to PDF file
        dpi: Resolution
    
    Returns:
        pandas DataFrame with metadata, success boolean, error message
    """
    
    zoom = dpi / 72
    zoom_matrix = fitz.Matrix(zoom, zoom)
    
    try:
        doc = fitz.open(pdf_path)
        num_pages = len(doc)
        
        # Extract document metadata
        metadata = doc.metadata
        file_name = os.path.basename(pdf_path)
        clean_doc_name = get_clean_doc_name(pdf_path)
        
        print(f"Converting {file_name} to base64: {num_pages} pages at {dpi} DPI...")
        
        df_data = []
        start_time = time.time()
        
        for page_num in range(num_pages):
            if page_num % 25 == 0:  # Progress update every 25 pages
                print(f"  Converting page {page_num + 1}/{num_pages} to base64")
            
            page = doc.load_page(page_num)
            
            # Get page dimensions and text for metadata
            page_rect = page.rect
            page_text_length = len(page.get_text())
            
            pix = page.get_pixmap(matrix=zoom_matrix, alpha=False)
            img_bytes = pix.tobytes("png")  
            img_base64 = base64.b64encode(img_bytes).decode('utf-8')
            
            df_data.append({
                'doc_id': pdf_path,
                'doc_name': clean_doc_name,
                'file_name': file_name,
                'page_num': page_num + 1,
                'total_pages': num_pages,
                'page_width': page_rect.width,
                'page_height': page_rect.height,
                'page_text_length': page_text_length,
                'base64_img': img_base64,
                'processed_timestamp': datetime.now(),
                'dpi': dpi,
                'doc_title': metadata.get('title', ''),
                'doc_author': metadata.get('author', ''),
                'doc_subject': metadata.get('subject', ''),
                'doc_creator': metadata.get('creator', '')
            })
        
        doc.close()
        processing_time = time.time() - start_time
        
        print(f"  Conversion complete: {len(df_data)} pages in {processing_time:.1f}s")
        
        return pd.DataFrame(df_data), True, None
        
    except Exception as e:
        error_msg = f"Error processing {pdf_path}: {str(e)}"
        print(f"❌ {error_msg}")
        return None, False, error_msg

In [0]:
def save_to_unity_catalog(df, table_path, mode="append"):
    """
    Save function with better error handling and options.
    """
    try:
        spark_df = spark.createDataFrame(df)
        
        if mode == "overwrite":
            spark_df.write \
                .format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(table_path)
        else:
            spark_df.write \
                .format("delta") \
                .mode("append") \
                .saveAsTable(table_path)
        
        print(f"✅ Saved {len(df)} records to: {table_path}")
        return True
        
    except Exception as e:
        print(f"❌ Error saving to {table_path}: {str(e)}")
        return False

In [0]:
RETRYABLE_ERROR_SUBSTRINGS = ["retry", "got empty embedding result", "request_limit_exceeded", "rate limit", "insufficient_quota", "expecting value", "rate", "overloaded", "429", "bad gateway", "502"]

class RateLimitTracker:
    """Track API rate limits and adjust concurrency dynamically."""
    
    def __init__(self, initial_workers=5, min_workers=1, max_workers=10):
        self.current_workers = initial_workers
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.rate_limit_events = deque(maxlen=20)  # Track recent rate limits
        self.success_count = 0
        self.lock = threading.Lock()
        
    def record_rate_limit(self):
        """Record a rate limit event and potentially reduce workers."""
        with self.lock:
            self.rate_limit_events.append(datetime.now())
            
            # If we've had multiple rate limits recently, reduce workers
            recent_limits = sum(1 for event in self.rate_limit_events 
                              if datetime.now() - event < timedelta(minutes=2))
            
            if recent_limits >= 3 and self.current_workers > self.min_workers:
                old_workers = self.current_workers
                self.current_workers = max(self.min_workers, self.current_workers - 1)
                print(f"🔽 Rate limits detected! Reducing workers: {old_workers} → {self.current_workers}")
                
    def record_success(self):
        """Record successful processing and potentially increase workers."""
        with self.lock:
            self.success_count += 1
            
            # If no recent rate limits and we've had some successes, gradually increase workers
            recent_limits = sum(1 for event in self.rate_limit_events 
                              if datetime.now() - event < timedelta(minutes=5))
            
            # Increase workers every 20 successes if no recent rate limits
            if (recent_limits == 0 and 
                self.current_workers < self.max_workers and 
                self.success_count % 20 == 0):
                old_workers = self.current_workers
                self.current_workers = min(self.max_workers, self.current_workers + 1)
                print(f"🔼 Performance good! Increasing workers: {old_workers} → {self.current_workers}")

In [0]:
def process_single_image(prompt, image_data, image_index, databricks_token, databricks_url, model, rate_tracker):
    """Process a single image with adaptive rate limiting."""
    
    client = OpenAI(api_key=databricks_token, base_url=databricks_url)
    
    # Skip empty images
    if pd.isna(image_data) or image_data == "":
        return (image_index, "ERROR: Empty image")
    
    
    # Retry logic with exponential backoff
    for attempt in range(3):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}
                        }
                    ]
                }]
            )
            
            result = response.choices[0].message.content.strip()
            rate_tracker.record_success()
            
            # Print success message if this was a retry attempt
            if attempt > 0:
                print(f"✅ SUCCESS: Image {image_index} processed successfully after {attempt + 1} attempts")
            
            return (image_index, result)
            
        except Exception as e:
            error_str = str(e).lower()
            is_retryable = any(substring in error_str for substring in RETRYABLE_ERROR_SUBSTRINGS)
            
            if is_retryable:
                rate_tracker.record_rate_limit()
                
                if attempt < 2:  # Only retry if we have attempts left
                    # Exponential backoff with jitter
                    wait_time = (2 ** attempt) + random.uniform(1, 3)
                    print(f"⚠️  RATE LIMIT: Image {image_index}, attempt {attempt + 1}/3. Retrying in {wait_time:.1f}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    print(f"❌ FAILED: Image {image_index} failed after 3 attempts due to rate limiting")
                    return (image_index, f"ERROR: Rate limited after 3 attempts - {str(e)}")
            else:
                print(f"❌ ERROR: Image {image_index} failed with non-retryable error: {str(e)}")
                return (image_index, f"ERROR: {str(e)}")
    
    return (image_index, "ERROR: Max retries exceeded")

In [0]:
def process_images_adaptive(prompt, images, databricks_token, databricks_url, 
                           model="databricks-llama-4-maverick", 
                           initial_workers=5, min_workers=1, max_workers=10):
    """
    Adaptive processing that adjusts concurrency based on rate limits.
    
    Args:
        images: pandas Series of base64 encoded image strings
        databricks_token: Token for Databricks API  
        databricks_url: Base URL for Databricks API
        model: Model name to use
        initial_workers: Starting number of concurrent workers
        min_workers: Minimum workers (fallback during heavy rate limiting)
        max_workers: Maximum workers (cap for scaling up)
        
    Returns:
        pandas Series: Results with same index as input
    """
    
    # Convert to pandas Series if needed
    if not isinstance(images, pd.Series):
        images = pd.Series(images)
    
    results = pd.Series(index=images.index, dtype='object')
    rate_tracker = RateLimitTracker(
        initial_workers=initial_workers, 
        min_workers=min_workers, 
        max_workers=max_workers
    )
    
    print(f"🚀 Starting transcription of {len(images)} images...")
    print(f"📊 Model: {model}")
    print(f"⚙️  Workers: {initial_workers} (range: {min_workers}-{max_workers})")
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        with tqdm(total=len(images), desc="Processing images", unit="img") as pbar:
            
            remaining_items = list(images.items())
            
            while remaining_items:
                # Submit batch based on current worker count
                batch_size = min(rate_tracker.current_workers, len(remaining_items))
                current_batch = remaining_items[:batch_size]
                remaining_items = remaining_items[batch_size:]
                
                # Submit current batch
                futures = {
                    executor.submit(process_single_image, prompt, img_data, idx, 
                                  databricks_token, databricks_url, model, rate_tracker): idx
                    for idx, img_data in current_batch
                }
                
                # Process batch results
                for future in as_completed(futures):
                    try:
                        image_index, result = future.result()
                        results[image_index] = result
                        
                        # Update progress bar with status and current worker count
                        if result.startswith("ERROR:"):
                            pbar.set_postfix({
                                "Last": f"❌ {image_index}", 
                                "Workers": rate_tracker.current_workers,
                                "Rate Limits": len(rate_tracker.rate_limit_events)
                            })
                        else:
                            pbar.set_postfix({
                                "Last": f"✅ {image_index}", 
                                "Workers": rate_tracker.current_workers,
                                "Rate Limits": len(rate_tracker.rate_limit_events)
                            })
                        
                    except Exception as e:
                        idx = futures[future]
                        results[idx] = f"ERROR: Exception - {str(e)}"
                        pbar.set_postfix({
                            "Last": f"❌ {idx} (Exception)", 
                            "Workers": rate_tracker.current_workers
                        })
                        print(f"❌ EXCEPTION: Image {idx} failed with exception: {str(e)}")
                    
                    pbar.update(1)
                
                # Small delay between batches if we have more to process
                if remaining_items:
                    time.sleep(0.2)  # Small delay to prevent overwhelming
    
    # Final summary statistics
    error_count = sum(1 for result in results if str(result).startswith("ERROR:"))
    success_count = len(results) - error_count
    
    print(f"\n📈 Llama 4 Transcription Summary:")
    print(f"   ✅ Successful: {success_count}/{len(results)}")
    print(f"   ❌ Failed: {error_count}/{len(results)}")
    print(f"   📊 Success rate: {(success_count/len(results)*100):.1f}%")
    print(f"   🔧 Final worker count: {rate_tracker.current_workers}")
    print(f"   ⚠️  Total rate limit events: {len(rate_tracker.rate_limit_events)}")
    
    return results

Tweak your prompt based on document content as needed.

In [0]:

    # Define the prompt
PROMPT = """
Instructions: Transcribe only the visible text from this PDF page. 
Rules:
- Use markdown formatting only for text that appears formatted in the original
- Do not add document titles, page headers, or section headings unless explicitly visible
- Do not add introductory text like 'This page contains...' or 'The document shows...' or '# Transcription of PDF Page'
- Preserve exact wording and technical terminology
- For images/diagrams: describe content within <figure></figure> tags
- For tables: use markdown table format if present
- Start transcription immediately without preamble
For visual elements, follow these rules:
**TABLES**: If the content is clearly a structured table, provide BOTH:
1. A detailed caption in <figure></figure> tags describing the table structure and content
2. The actual table recreated in markdown format with proper alignment
**FLOWCHARTS/DECISION TREES**: Provide detailed caption in <figure></figure> tags including:
- Starting point and decision criteria
- All pathways and decision branches
- Specific thresholds, values, and conditions
- Final outcomes and recommendations
- Flow direction and logical connections
**CHARTS/DIAGRAMS**: Provide detailed caption in <figure></figure> tags including:
- Chart type and title
- All categories, sections, and color coding
- Specific values, ranges, and criteria
- Evidence levels and recommendations
- Visual organization and groupings
**FORMS/CHECKLISTS**: Transcribe structure using markdown formatting, preserving:
- Section headers and numbering
- Checkbox options and rating scales
- Please bold the Key in Key-Value Pairs in the form, e.g. **Name **: John Doe.
Preserve exact technical terminology, drug names, dosages, and clinical criteria for diagnostic accuracy.
This transcription will be used for technical diagnosis, so accuracy is critical.
"""

In [0]:
def process_multiple_pdfs(pdf_directory, output_catalog, prompt=PROMPT, processing_mode="combined", 
                         dpi=300, model="databricks-llama-4-maverick", initial_workers=5, 
                         min_workers=1, max_workers=10):
    """
    Process all PDFs in a directory.
    
    Args:
        pdf_directory: Directory containing PDF files
        output_catalog: Catalog.schema for output tables
        processing_mode: "combined" or "separate"
        dpi: Image resolution
        model: LLM model to use
    """
    
    # Discover PDF files
    pdf_files = get_pdf_files(pdf_directory)
    
    if not pdf_files:
        print("No PDF files found. Exiting.")
        return
    
    print(f"\n🚀 Starting batch processing of {len(pdf_files)} PDFs")
    print(f"📊 Processing mode: {processing_mode}")
    print(f"🎯 Output catalog: {output_catalog}")
    
    # Initialize tracking variables
    total_files = len(pdf_files)
    successful_files = 0
    failed_files = 0
    total_pages_processed = 0
    all_results = []
    processing_log = []
    
    # Process each PDF
    for file_idx, pdf_path in enumerate(pdf_files, 1):
        file_name = os.path.basename(pdf_path)
        clean_doc_name = get_clean_doc_name(pdf_path)
        
        print(f"\n{'='*60}")
        print(f"📄 Processing file {file_idx}/{total_files}: {file_name}")
        print(f"{'='*60}")
        
        file_start_time = time.time()
        
        try:
            # Convert PDF to base64 images
            df, success, error = convert_pdf_to_base64(pdf_path, dpi=dpi)
            
            if not success:
                failed_files += 1
                processing_log.append({
                    'file_name': file_name,
                    'status': 'FAILED_CONVERSION',
                    'error': error,
                    'pages_processed': 0,
                    'processing_time': time.time() - file_start_time
                })
                continue
            
            # Save intermediate results
            if processing_mode == "combined":
                intermediate_table = f"{output_catalog}.all_pdfs_parsed_intermediate"
                save_mode = "append" if file_idx > 1 else "overwrite"
            else:
                intermediate_table = f"{output_catalog}.{clean_doc_name}_parsed_intermediate"
                save_mode = "overwrite"
                
            save_to_unity_catalog(df, intermediate_table, mode=save_mode)
            
            # Process images with LLM
            print(f"🤖 Starting LLM processing for {len(df)} pages...")
            
            # Process with adaptive rate limiting
            results_series = process_images_adaptive(
                prompt=prompt,
                images=df['base64_img'],
                databricks_token=DATABRICKS_TOKEN,
                databricks_url=DATABRICKS_BASE_URL,
                model=model,
                initial_workers=initial_workers,
                min_workers=min_workers,
                max_workers=max_workers
            )
            
            # Add transcription results to dataframe
            df['transcription'] = results_series
            
            # Count successful transcriptions
            error_count = sum(1 for result in results_series if str(result).startswith("ERROR:"))
            success_count = len(results_series) - error_count
            
            # Save final results
            if processing_mode == "combined":
                final_table = f"{output_catalog}.all_pdfs_parsed"
                save_mode = "append" if file_idx > 1 else "overwrite"
            else:
                final_table = f"{output_catalog}.{clean_doc_name}_parsed"
                save_mode = "overwrite"
                
            save_success = save_to_unity_catalog(df, final_table, mode=save_mode)
            
            if save_success:
                successful_files += 1
                total_pages_processed += len(df)
                all_results.append(df)
                
                file_processing_time = time.time() - file_start_time
                
                processing_log.append({
                    'file_name': file_name,
                    'status': 'SUCCESS',
                    'pages_processed': len(df),
                    'successful_transcriptions': success_count,
                    'failed_transcriptions': error_count,
                    'processing_time': file_processing_time,
                    'final_table': final_table
                })
                
                print(f"✅ File completed successfully:")
                print(f"   📊 Pages: {len(df)}")
                print(f"   ✅ Successful transcriptions: {success_count}")
                print(f"   ❌ Failed transcriptions: {error_count}")
                print(f"   ⏱️  Processing time: {file_processing_time:.1f}s")
                print(f"   💾 Saved to: {final_table}")
            else:
                failed_files += 1
                processing_log.append({
                    'file_name': file_name,
                    'status': 'FAILED_SAVE',
                    'pages_processed': len(df),
                    'processing_time': time.time() - file_start_time
                })
                
        except Exception as e:
            failed_files += 1
            file_processing_time = time.time() - file_start_time
            error_msg = str(e)
            
            processing_log.append({
                'file_name': file_name,
                'status': 'FAILED_EXCEPTION',
                'error': error_msg,
                'pages_processed': 0,
                'processing_time': file_processing_time
            })
            
            print(f"❌ Failed to process {file_name}: {error_msg}")
    
    # Final summary
    print(f"\n{'='*80}")
    print(f"🎊 BATCH PROCESSING COMPLETE")
    print(f"{'='*80}")
    print(f"📊 Files processed: {successful_files}/{total_files}")
    print(f"📄 Total pages processed: {total_pages_processed}")
    print(f"✅ Successful files: {successful_files}")
    print(f"❌ Failed files: {failed_files}")
    
    if processing_mode == "combined" and successful_files > 0:
        print(f"💾 All results combined in: {output_catalog}.all_pdfs_parsed")
    
    # Show processing log
    print(f"\n📋 PROCESSING LOG:")
    for log_entry in processing_log:
        status_emoji = "✅" if log_entry['status'] == 'SUCCESS' else "❌"
        print(f"   {status_emoji} {log_entry['file_name']}: {log_entry['status']} "
              f"({log_entry['pages_processed']} pages, {log_entry['processing_time']:.1f}s)")
        
        if 'error' in log_entry:
            print(f"      Error: {log_entry['error']}")
    
    return processing_log, all_results

In [0]:
# Run the batch processing
processing_log, all_results = process_multiple_pdfs(
    pdf_directory=PDF_DIRECTORY,
    output_catalog=OUTPUT_CATALOG,
    prompt = PROMPT,
    processing_mode=PROCESSING_MODE,
    dpi=150,
    model="databricks-llama-4-maverick", #default databricks-llama-4-maverick, change to your own provisioned throughput endpoint for more speed
    initial_workers=3, #update if you have a provisioned throughput endpoint
    min_workers=1, #default 1
    max_workers=3 #update if you have a provisioned throughput endpoint
)

In [0]:
# If using combined mode, show summary statistics
if PROCESSING_MODE == "combined":
    summary_df = spark.sql(f"""
        SELECT 
            file_name,
            doc_name,
            COUNT(*) as total_pages,
            SUM(CASE WHEN transcription NOT LIKE 'ERROR:%' THEN 1 ELSE 0 END) as successful_pages,
            SUM(CASE WHEN transcription LIKE 'ERROR:%' THEN 1 ELSE 0 END) as failed_pages,
            AVG(page_text_length) as avg_page_text_length,
            MIN(processed_timestamp) as first_processed,
            MAX(processed_timestamp) as last_processed
        FROM {FINAL_TABLE}
        GROUP BY file_name, doc_name
        ORDER BY file_name
    """)
    
    print("📊 PROCESSING SUMMARY BY FILE:")
display(summary_df)

In [0]:
spark.sql(f"DROP TABLE IF EXISTS {OUTPUT_CTLG}.{OUTPUT_SCHEMA}.{OUTPUT_TABLE}")

In [0]:
embedding_model = dbutils.widgets.get("embedding_model")

spark.sql(f"""
CREATE TABLE {OUTPUT_CTLG}.{OUTPUT_SCHEMA}.{OUTPUT_TABLE} as SELECT
  ROW_NUMBER() OVER (ORDER BY transcription) as id,
  doc_id,
  transcription,
  ai_query("{embedding_model}", subquery.transcription) as embedding
FROM
  (
    SELECT
      doc_id,
      transcription
    FROM
      {OUTPUT_CTLG}.{OUTPUT_SCHEMA}.all_pdfs_parsed
  ) AS subquery
""")