# Batch Parse with LlamaCloud Directories

This notebook demonstrates how to use LlamaCloud's batch processing API to parse multiple files in a directory. The workflow includes:

1. **Creating a Directory** - Set up a directory to organize your files
2. **Uploading Files** - Upload multiple files to the directory
3. **Starting a Batch Parse Job** - Kick off batch processing on all files
4. **Monitoring Progress** - Check the status and view results

This is useful when you need to parse many documents at once, as the batch API handles the orchestration and provides progress tracking.

## Setup and Installation

In [None]:
%pip install llama-cloud python-dotenv

In [None]:
import os
from dotenv import load_dotenv
import httpx

# Load environment variables
load_dotenv()

# Set your API key
LLAMA_CLOUD_API_KEY = os.environ.get("LLAMA_CLOUD_API_KEY", "llx-...")

# Optional: Set base URL (defaults to https://api.cloud.llamaindex.ai if not set)
LLAMA_CLOUD_BASE_URL = os.environ.get(
    "LLAMA_CLOUD_BASE_URL", "https://api.cloud.llamaindex.ai"
)

# Optional: Set project_id if you have one, otherwise it will use your default project
PROJECT_ID = os.environ.get("LLAMA_CLOUD_PROJECT_ID", None)

print("‚úÖ API key configured")
print(f"   Base URL: {LLAMA_CLOUD_BASE_URL}")

## Setup HTTP Client

Since the current version of the llama-cloud SDK has some issues with the beta endpoints, we'll use direct HTTP requests with httpx for reliability.

In [None]:
# Create HTTP client with authentication
headers = {
    "Authorization": f"Bearer {LLAMA_CLOUD_API_KEY}",
}

print("‚úÖ HTTP client configured")
print(f"   Using base URL: {LLAMA_CLOUD_BASE_URL}")

## Step 1: Create a Directory

First, we'll create a directory to organize our files. Directories help you group related files together for batch processing.

In [None]:
from datetime import datetime

# Create a directory with a timestamp in the name
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
directory_name = f"batch-parse-demo-{timestamp}"

# Create directory using HTTP request
response = httpx.post(
    f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/directories",
    headers=headers,
    params={"project_id": PROJECT_ID},
    json={
        "name": directory_name,
        "description": "Demo directory for batch parse example",
    },
    timeout=60.0,
)

if response.status_code in [200, 201]:
    directory = response.json()
    directory_id = directory["id"]
    project_id = directory["project_id"]

    print(f"‚úÖ Created directory: {directory['name']}")
    print(f"   Directory ID: {directory_id}")
    print(f"   Project ID: {project_id}")
else:
    raise Exception(
        f"Failed to create directory: {response.status_code} - {response.text}"
    )

## Step 2: Upload Files to the Directory

Now we'll upload some files to our directory. For this demo, we'll download some sample PDFs and upload them.

You can replace these with your own files.

In [None]:
# Create a directory for sample files
import requests

os.makedirs("sample_files", exist_ok=True)

# Sample documents to download
sample_docs = {
    "attention.pdf": "https://arxiv.org/pdf/1706.03762.pdf",
    "bert.pdf": "https://arxiv.org/pdf/1810.04805.pdf",
}

# Download sample documents
for filename, url in sample_docs.items():
    filepath = f"sample_files/{filename}"
    if not os.path.exists(filepath):
        print(f"üì• Downloading {filename}...")
        response = requests.get(url)
        if response.status_code == 200:
            with open(filepath, "wb") as f:
                f.write(response.content)
            print(f"   ‚úÖ Downloaded {filename}")
        else:
            print(f"   ‚ùå Failed to download {filename}")
    else:
        print(f"üìÅ {filename} already exists")

print("\n‚úÖ Sample files ready!")

### Upload Files to Directory

Now let's upload the files to our directory using the `upload_file_to_directory` endpoint.

In [None]:
uploaded_files = []

# Workaround: Use direct HTTP requests instead of SDK due to SDK bug
import httpx

for filename in os.listdir("sample_files"):
    if filename.endswith(".pdf"):
        filepath = f"sample_files/{filename}"

        print(f"üì§ Uploading {filename}...")

        # Upload file using direct HTTP request (SDK has a bug with file uploads)
        with open(filepath, "rb") as f:
            # Prepare the multipart form data correctly
            files = {"upload_file": (filename, f, "application/pdf")}

            # Make the request directly
            response = httpx.post(
                f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/directories/{directory_id}/files/upload",
                params={"project_id": project_id},
                files=files,
                headers={"Authorization": f"Bearer {LLAMA_CLOUD_API_KEY}"},
                timeout=60.0,
            )

            if response.status_code in [200, 201]:
                directory_file = response.json()
                uploaded_files.append(directory_file)
                print(f"   ‚úÖ Uploaded: {directory_file.get('display_name')}")
                print(f"      File ID: {directory_file.get('id')}")
            else:
                print(f"   ‚ùå Upload failed: {response.status_code}")
                print(f"      Error: {response.text[:200]}")

print(f"\n‚úÖ Uploaded {len(uploaded_files)} files to directory")

## Step 3: Create a Batch Parse Job

Now that we have files in our directory, let's create a batch parse job to process them all at once.

The batch processing API uses the same configuration as LlamaParse.

In [None]:
# Configure the parse job
# This configuration will apply to all files in the directory
job_config = {
    "job_name": "parse_raw_file_job",  # Must match the JobNames enum value
    "partitions": {},
    "parameters": {
        "type": "parse",
        "lang": "en",
        "fast_mode": True,
    },
}

print("‚úÖ Job configuration created")
print(f"   Language: {job_config['parameters']['lang']}")
print(f"   Fast mode: {job_config['parameters']['fast_mode']}")

### Submit the Batch Job

Now let's submit the batch job to process all files in the directory.

In [None]:
print(f"üöÄ Submitting batch parse job for directory: {directory_id}")
print(f"   Processing {len(uploaded_files)} files...\n")

# Submit batch job using HTTP request
response = httpx.post(
    f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/batch-processing",
    headers=headers,
    params={"project_id": project_id},
    json={
        "directory_id": directory_id,
        "job_config": job_config,
        "page_size": 100,  # Number of files to fetch per batch
        "continue_as_new_threshold": 10,  # Workflow continuation threshold
    },
    timeout=60.0,
)

if response.status_code in [200, 201]:
    batch_job = response.json()
    batch_job_id = batch_job["id"]

    print("‚úÖ Batch job submitted successfully!")
    print(f"   Batch Job ID: {batch_job_id}")
    print(f"   Workflow ID: {batch_job.get('workflow_id')}")
    print(f"   Status: {batch_job.get('status')}")
    print(f"   Total Items: {batch_job.get('total_items')}")
else:
    raise Exception(
        f"Failed to create batch job: {response.status_code} - {response.text}"
    )

## Step 4: Monitor Job Progress

Now let's monitor the batch job progress. We'll poll the status endpoint to see how the job is progressing.

In [None]:
import time


def print_job_status(status_data):
    """Helper function to print job status in a readable format."""
    job = status_data["job"]
    progress_pct = status_data["progress_percentage"]

    print(f"\n{'='*60}")
    print(f"Job Status: {job['status']}")
    print(f"{'='*60}")
    print(f"Total Items: {job['total_items']}")
    print(f"Completed: {job['processed_items']}")
    print(f"Failed: {job['failed_items']}")
    print(f"Skipped: {job['skipped_items']}")
    print(f"Progress: {progress_pct:.1f}%")

    if job.get("completed_at"):
        print(f"Completed At: {job['completed_at']}")
    elif job.get("started_at"):
        print(f"Started At: {job['started_at']}")

    print(f"{'='*60}")


# Poll for status updates
print("üîÑ Monitoring batch job progress...")
print(
    "Note: It may take a few seconds for the workflow to initialize and count files.\n"
)

max_polls = 60  # Maximum number of status checks (increased for longer jobs)
poll_interval = 10  # Seconds between checks

for i in range(max_polls):
    response = httpx.get(
        f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/batch-processing/{batch_job_id}",
        headers=headers,
        params={"project_id": project_id},
        timeout=60.0,
    )

    if response.status_code == 200:
        status_data = response.json()
        print_job_status(status_data)

        # Check if job is complete
        job_status = status_data["job"]["status"]
        if job_status in ["completed", "failed", "cancelled"]:
            print(f"\n‚úÖ Job finished with status: {job_status}")
            break

        if i < max_polls - 1:
            print(f"\n‚è≥ Waiting {poll_interval} seconds before next check...")
            time.sleep(poll_interval)
    else:
        print(f"Error getting status: {response.status_code} - {response.text}")
        break
else:
    print(f"\n‚ö†Ô∏è  Reached maximum polling attempts. Job may still be running.")

## Step 5: View Job Items

Let's look at the individual items in the batch job to see which files were processed successfully.

In [None]:
# Get all items in the batch job
response = httpx.get(
    f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/batch-processing/{batch_job_id}/items",
    headers=headers,
    params={"project_id": project_id, "limit": 100},
    timeout=60.0,
)

if response.status_code == 200:
    items_response = response.json()

    print(f"\nüìã Batch Job Items ({items_response['total_size']} total)")
    print(f"{'='*80}\n")

    for item in items_response["items"]:
        status_emoji = (
            "‚úÖ"
            if item["status"] == "completed"
            else "‚ùå"
            if item["status"] == "failed"
            else "‚è≥"
        )
        print(f"{status_emoji} {item['item_name']}")
        print(f"   Status: {item['status']}")
        print(f"   Item ID: {item['item_id']}")

        if item.get("error_message"):
            print(f"   Error: {item['error_message']}")

        print()
else:
    print(f"Error listing items: {response.status_code} - {response.text}")

## Step 6: Retrieve Processing Results

For each completed file, we can retrieve the processing results to see where the parsed output is stored.

In [None]:
# Get processing results for a specific item
if items_response["items"]:
    first_item = items_response["items"][0]

    print(f"\nüîç Processing results for: {first_item['item_name']}")
    print(f"{'='*80}\n")

    response = httpx.get(
        f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/batch-processing/items/{first_item['item_id']}/processing-results",
        headers=headers,
        params={"project_id": project_id},
        timeout=60.0,
    )

    if response.status_code == 200:
        results = response.json()

        print(f"Item: {results['item_name']}")
        print(f"Total processing runs: {len(results['processing_results'])}\n")

        for i, result in enumerate(results["processing_results"], 1):
            print(f"Run {i}:")
            print(f"  Job Type: {result['job_type']}")
            print(f"  Processed At: {result['processed_at']}")
            print(f"  Parameters Hash: {result['parameters_hash']}")

            if result.get("output_s3_path"):
                print(f"  Output S3 Path: {result['output_s3_path']}")

            if result.get("output_metadata"):
                print(f"  Output Metadata: {result['output_metadata']}")

            print()
    else:
        print(f"Error getting results: {response.status_code} - {response.text}")

## Optional: List All Batch Jobs

You can also list all batch jobs in your project to see the history of batch processing operations.

In [None]:
# List all parse jobs in the project
response = httpx.get(
    f"{LLAMA_CLOUD_BASE_URL}/api/v1/beta/batch-processing",
    headers=headers,
    params={"project_id": project_id, "job_type": "parse", "limit": 10},
    timeout=60.0,
)

if response.status_code == 200:
    jobs_response = response.json()

    print(f"\nüìä Recent Batch Parse Jobs ({jobs_response['total_size']} total)")
    print(f"{'='*80}\n")

    for job in jobs_response["items"]:
        status_emoji = (
            "‚úÖ"
            if job["status"] == "completed"
            else "‚ùå"
            if job["status"] == "failed"
            else "‚è≥"
        )
        print(f"{status_emoji} Job ID: {job['id']}")
        print(f"   Status: {job['status']}")
        print(f"   Directory: {job['directory_id']}")
        print(f"   Total Items: {job['total_items']}")
        print(f"   Completed: {job['processed_items']}")
        print(f"   Created: {job['created_at']}")
        print()
else:
    print(f"Error listing jobs: {response.status_code} - {response.text}")

## Step 7: Retrieve Parsed Text Results

Once the batch job is complete, each BatchJobItem will have a `job_id` field that maps to a parse job ID. We can use this ID with the standard parse client methods to fetch the actual parsed text results.

In [None]:
# Get all completed items and their job IDs
completed_items = [
    item for item in items_response["items"] if item["status"] == "completed"
]

print(f"üìÑ Found {len(completed_items)} completed items\n")
print(f"{'='*80}\n")

# Display the job_id for each completed item
for item in completed_items:
    print(f"üìù {item['item_name']}")
    print(f"   Item ID: {item['item_id']}")
    print(f"   Parse Job ID: {item['job_id']}")
    print()

### Fetch Parsed Text for a Specific Document

Now let's use the `job_id` to retrieve the actual parsed text content using the parse client methods.

In [None]:
# Get the parsed text for the first completed item
if completed_items:
    first_completed = completed_items[0]

    print(f"üìñ Retrieving parsed text for: {first_completed['item_name']}")
    print(f"   Using Parse Job ID: {first_completed['job_id']}\n")
    print(f"{'='*80}\n")

    # Use the job_id to fetch the parse result
    response = httpx.get(
        f"{LLAMA_CLOUD_BASE_URL}/api/v1/parsing/job/{first_completed['job_id']}/result/text",
        headers=headers,
        params={"project_id": project_id},
        timeout=60.0,
    )

    if response.status_code == 200:
        parse_result = response.text

        print(f"‚úÖ Retrieved parsed text ({len(parse_result)} characters)\n")

        # Display first 1000 characters as a preview
        print("Preview (first 1000 characters):")
        print("-" * 80)
        print(parse_result[:1000])
        print("-" * 80)

        if len(parse_result) > 1000:
            print(f"\n... and {len(parse_result) - 1000} more characters")
    else:
        print(
            f"Error retrieving parse result: {response.status_code} - {response.text}"
        )
else:
    print("‚ö†Ô∏è  No completed items found to retrieve results from")

### Retrieve Parsed Results in Other Formats

You can also retrieve the parsed results in JSON or Markdown format using different client methods.

In [None]:
if completed_items:
    first_completed = completed_items[0]

    print(
        f"üìã Retrieving parse results in different formats for: {first_completed['item_name']}\n"
    )

    # Get as JSON (includes structured data with pages, images, etc.)
    print("1Ô∏è‚É£ Retrieving as JSON...")
    response = httpx.get(
        f"{LLAMA_CLOUD_BASE_URL}/api/v1/parsing/job/{first_completed['job_id']}/result/json",
        headers=headers,
        params={"project_id": project_id},
        timeout=60.0,
    )

    if response.status_code == 200:
        json_result = response.json()
        print(f"   ‚úÖ JSON result with {len(json_result['pages'])} pages")
        print(f"      Keys: {list(json_result.keys())}\n")
    else:
        print(f"   Error: {response.status_code}\n")

    # Get as Markdown
    print("2Ô∏è‚É£ Retrieving as Markdown...")
    response = httpx.get(
        f"{LLAMA_CLOUD_BASE_URL}/api/v1/parsing/job/{first_completed['job_id']}/result/markdown",
        headers=headers,
        params={"project_id": project_id},
        timeout=60.0,
    )

    if response.status_code == 200:
        markdown_result = response.text
        print(f"   ‚úÖ Markdown result ({len(markdown_result)} characters)\n")

        # Display markdown preview
        print("Markdown Preview (first 500 characters):")
        print("-" * 80)
        print(markdown_result[:500])
        print("-" * 80)

        if len(markdown_result) > 500:
            print(f"\n... and {len(markdown_result) - 500} more characters")
    else:
        print(f"   Error: {response.status_code}")
else:
    print("‚ö†Ô∏è  No completed items found to retrieve results from")

### Batch Process All Parsed Results

You can also loop through all completed items to retrieve and process all the parsed results.

In [None]:
# Process all completed items
print(f"üîÑ Processing all {len(completed_items)} completed items...\n")
print(f"{'='*80}\n")

all_results = {}

for item in completed_items:
    print(f"üìÑ Processing: {item['item_name']}")
    print(f"   Parse Job ID: {item['job_id']}")

    try:
        # Retrieve the parsed text for this item
        response = httpx.get(
            f"{LLAMA_CLOUD_BASE_URL}/api/v1/parsing/job/{item['job_id']}/result/text",
            headers=headers,
            params={"project_id": project_id},
            timeout=60.0,
        )

        if response.status_code == 200:
            parsed_text = response.text

            all_results[item["item_name"]] = {
                "job_id": item["job_id"],
                "text": parsed_text,
                "length": len(parsed_text),
            }

            print(f"   ‚úÖ Retrieved {len(parsed_text)} characters")
        else:
            all_results[item["item_name"]] = {
                "job_id": item["job_id"],
                "error": f"HTTP {response.status_code}",
            }
            print(f"   ‚ùå Error: HTTP {response.status_code}")

    except Exception as e:
        print(f"   ‚ùå Error: {str(e)}")
        all_results[item["item_name"]] = {"job_id": item["job_id"], "error": str(e)}

    print()

print(f"{'='*80}")
print(f"\n‚úÖ Processed {len(all_results)} items")
print(f"\nSummary:")
for name, result in all_results.items():
    if "error" in result:
        print(f"  ‚ùå {name}: Error - {result['error']}")
    else:
        print(f"  ‚úÖ {name}: {result['length']:,} characters")