# 7. Async Document Processing with Celery

*Process documents asynchronously using Celery workers for scalable, non-blocking document processing*

## What you will do

By the end of this notebook you will:
- Upload a document to the DMS
- Trigger async processing using Celery
- Monitor processing status in real-time
- See how async processing enables scalable document handling

## 1. Prerequisites

- Completed notebooks 1–6
- Docker services running (`docker compose up -d`)
- Celery worker running (`docker compose up celery-worker`)
- `data/loan_application.pdf` available

## 2. Environment configuration

### 2.1 Imports and project root detection


In [1]:
import sys
import os
from pathlib import Path
import time
import uuid
from datetime import datetime

# Detect project root dynamically
current_file = Path.cwd()
project_root_directory = None

# Look for project root by finding pyproject.toml
for parent in current_file.parents:
    if (parent / "pyproject.toml").exists():
        project_root_directory = parent
        break

if project_root_directory is None:
    raise RuntimeError("Could not find project root directory")

# Add project root to Python path for imports
sys.path.insert(0, str(project_root_directory))

# Change to project root for relative paths
os.chdir(project_root_directory)

### 2.2 Service clients

We connect to the running services and initialize the async processor.


In [2]:
import psycopg2
from azure.storage.blob import BlobServiceClient
import importlib

# Import DMS and async processing modules
from src.dms.service import DmsService
from src.dms.adapters import AzureBlobStorageClient, PostgresMetadataRepository
from src.async_processing import AsyncDocumentProcessor

# Reload modules to ensure latest code
import src.dms.adapters as dms_adapters
importlib.reload(dms_adapters)

# Config for compose-based services
POSTGRES_HOST: str = "localhost"
POSTGRES_PORT: int = 5432
POSTGRES_DBNAME: str = "dms_meta"
POSTGRES_USER: str = "dms"
POSTGRES_PASSWORD: str = "dms"

AZURITE_BLOB_PORT: int = 10000
AZURITE_ACCOUNT_NAME: str = "devstoreaccount1"
AZURITE_ACCOUNT_KEY: str = (
    "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/"
    "K1SZFPTOtr/KBHBeksoGMGw=="
)
CONTAINER_NAME: str = "documents"

# Initialize clients
connection_string = (
    "DefaultEndpointsProtocol=http;"
    f"AccountName={AZURITE_ACCOUNT_NAME};"
    f"AccountKey={AZURITE_ACCOUNT_KEY};"
    f"BlobEndpoint=http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1;"
)

blob_service_client: BlobServiceClient = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(CONTAINER_NAME)
try:
    container_client.create_container()
except Exception:
    pass

pg_conn = psycopg2.connect(
    host=POSTGRES_HOST,
    port=POSTGRES_PORT,
    database=POSTGRES_DBNAME,
    user=POSTGRES_USER,
    password=POSTGRES_PASSWORD,
)

storage_client = AzureBlobStorageClient(blob_service_client)
metadata_repo = PostgresMetadataRepository(pg_conn)
dms_service = DmsService(storage_client=storage_client, metadata_repository=metadata_repo)

# Initialize async processor
async_processor = AsyncDocumentProcessor()

print("Environment ready: connected to Postgres, Azurite, and Celery")


Environment ready: connected to Postgres, Azurite, and Celery


## 3. Database schema

Apply the schema to ensure all tables exist:


In [3]:
# Apply database schema
schema_path = project_root_directory / "database" / "schemas" / "schema.sql"
schema_sql = schema_path.read_text()

with pg_conn.cursor() as cursor:
    cursor.execute(schema_sql)
    pg_conn.commit()

print("Database schema applied successfully")
print("Tables ensured:")
print("- documents (text_extraction_status, processing_status)")
print("- extraction_jobs")
print("- ocr_results")


Database schema applied successfully
Tables ensured:
- documents (text_extraction_status, processing_status)
- extraction_jobs
- ocr_results


## 4. Upload document

Upload a document to the DMS for async processing:


In [4]:
# Upload document
test_file_path = project_root_directory / "data" / "loan_application.pdf"

if not test_file_path.exists():
    print(f"Test file not found: {test_file_path}")
else:
    document_id = dms_service.upload_document(
        file_path=test_file_path,
        document_type="loan_application",
        source_filename="loan_application.pdf",
    )
    print(f"Document uploaded with ID: {document_id}")

    # Check initial status
    document = dms_service.get_document(document_id)
    print(f"Initial text extraction status: {document.get('textextraction_status')}")
    print(f"Initial processing status: {document.get('processing_status')}")


Document uploaded with ID: 6b08be4b-8f81-48db-934e-552306074161
Initial text extraction status: ready
Initial processing status: pending extraction


## 5. Trigger async processing

Start the async processing pipeline:


In [5]:
# Trigger async processing
print("Triggering async processing...")
task_id = async_processor.trigger_processing(document_id)

if task_id:
    print(f"Async processing started with task ID: {task_id}")
    print("Processing is now running in the background Celery worker")
else:
    print("Failed to start async processing")


Triggering async processing...
Async processing started with task ID: a8b29ece-3446-4071-b3ed-4cf2e1607f00
Processing is now running in the background Celery worker


## 6. Monitor processing status

Monitor the processing status in real-time:

In [6]:
# Monitor processing status
print("Monitoring processing status...")
print("This may take a few minutes as the Celery worker processes the document")

for i in range(20):  # Monitor for up to 20 iterations
    status = async_processor.get_processing_status(document_id)
    
    print(f"\n--- Status Check {i+1} ---")
    print(f"Text extraction status: {status.get('text_extraction_status')}")
    print(f"Processing status: {status.get('processing_status')}")
    
    # Check extraction jobs
    jobs = status.get('extraction_jobs', [])
    if jobs:
        latest_job = jobs[0]
        print(f"Latest job status: {latest_job.get('status')}")
        if latest_job.get('error_message'):
            print(f"Error: {latest_job.get('error_message')}")
    
    # Check if processing is complete
    if status.get('processing_status') == 'done':
        print("\n✓ Processing completed successfully!")
        break
    elif status.get('processing_status') == 'failed':
        print("\n✗ Processing failed")
        break
    
    # Wait before next check
    time.sleep(7)
else:
    print("\nMonitoring timeout - processing may still be running")


Monitoring processing status...
This may take a few minutes as the Celery worker processes the document

--- Status Check 1 ---
Text extraction status: ready
Processing status: pending extraction
Latest job status: pending extraction

--- Status Check 2 ---
Text extraction status: ready
Processing status: ocr running
Latest job status: pending extraction

--- Status Check 3 ---
Text extraction status: ready
Processing status: ocr running
Latest job status: pending extraction

--- Status Check 4 ---
Text extraction status: ready
Processing status: ocr running
Latest job status: pending extraction

--- Status Check 5 ---
Text extraction status: ready
Processing status: llm running
Latest job status: pending extraction

--- Status Check 6 ---
Text extraction status: ready
Processing status: llm running
Latest job status: pending extraction

--- Status Check 7 ---
Text extraction status: ready
Processing status: llm running
Latest job status: pending extraction

--- Status Check 8 ---
Text

## 7. Cleanup

Close database connections:

In [7]:
pg_conn.close()
print("Database connection closed")

Database connection closed


## Summary

- **Async Processing**: Documents are processed in the background using Celery workers
- **Scalability**: Multiple workers can process documents concurrently
- **Status Tracking**: Real-time monitoring of processing status and job progress
- **Error Handling**: Failed tasks are properly logged and status is updated
- **Non-blocking**: The main application doesn't wait for processing to complete

For a deeper dive (theory, benefits, scaling patterns), see the README in this folder.
