# ETL Pipeline Overview

**Last Updated:** January 2026

This notebook provides a comprehensive overview of the Chinese Archives ETL pipeline, including:

1. **Pipeline Architecture** - 4-step workflow from PDF upload to structured parsing
2. **Job Tracking System** - How jobs are tracked and linked
3. **Standardized Interface** - Common patterns across all steps
4. **Run Modes** - Understanding regular runs, resume, and force rerun
5. **Advanced Features** - Wildcards and dry-run previews

## 1. Pipeline Architecture

The ETL pipeline consists of 4 sequential steps:

```
Step 0: upload_pdfs       ‚Üí Upload scanned PDFs to B2 cloud storage
         ‚Üì (generates job_id)
         
Step 2: run_ocr          ‚Üí Send PDFs to Runpod API for OCR processing
         ‚Üì (references upload job_id)
         
Step 3: sync_ocr         ‚Üí Download OCR results from Runpod to B2
         ‚Üì (references ocr job_id)
         
Step 4: parse_structure  ‚Üí Detect pagination and extract table of contents
         ‚Üì (references sync job_id)
```

**Key Principle:** Each step references only the **immediate prior step's job_id** (via `--source-job-id`), creating a clear lineage chain.

## 2. Job Tracking System

Each pipeline run creates a unique **job_id** with timestamp format: `YYYY-MM-DD_HH-MM-SS`

### Data Storage Structure

```
data/
‚îú‚îÄ‚îÄ sources/                          # Upload metadata only
‚îÇ   ‚îî‚îÄ‚îÄ job_metadata/
‚îÇ       ‚îî‚îÄ‚îÄ 2026-01-02_00-28-28.json
‚îÇ
‚îî‚îÄ‚îÄ analytics/                        # All analytics steps
    ‚îú‚îÄ‚îÄ job_registry/                 # Central tracking
    ‚îÇ   ‚îú‚îÄ‚îÄ 2026-01-02_00-33-11.json  # OCR job
    ‚îÇ   ‚îî‚îÄ‚îÄ 2026-01-02_11-32-58.json  # Parse job
    ‚îÇ
    ‚îú‚îÄ‚îÄ ocr/                          # Step-specific results
    ‚îÇ   ‚îú‚îÄ‚îÄ job_metadata/
    ‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ 2026-01-02_00-33-11.json
    ‚îÇ   ‚îî‚îÄ‚îÄ dagz_v01/
    ‚îÇ       ‚îú‚îÄ‚îÄ latest -> 2026-01-02_00-33-11/
    ‚îÇ       ‚îî‚îÄ‚îÄ 2026-01-02_00-33-11/
    ‚îÇ           ‚îî‚îÄ‚îÄ dagz_v01.json
    ‚îÇ
    ‚îî‚îÄ‚îÄ parse_structure/
        ‚îî‚îÄ‚îÄ ... (same pattern)
```

### Metadata Tracking

Each job maintains **dual metadata**:

1. **Task-specific metadata** (`data/analytics/{step}/job_metadata/{job_id}.json`)
   - Detailed per-citekey results
   - Processing statistics
   - Error messages

2. **Central registry** (`data/analytics/job_registry/{job_id}.json`)
   - High-level job info
   - Step name and timestamp
   - Source job linkage
   - Overall status

## 3. Standardized Interface

All pipeline steps (except `upload_pdfs`) follow the same input pattern:

### Input Selection (mutually exclusive)

```bash
--input FILE          # Read citekeys from YAML file
--citekeys CK1 CK2... # Explicit list via command line
--resume-from JOB_ID  # Resume failed items from previous job
```

### Required Arguments

```bash
--source-job-id JOB_ID  # Explicit reference to prior step's job
```

**Important:** Never use `"latest"` as a job ID. Always use explicit timestamps.

### Optional Flags

```bash
--force-rerun    # Reprocess all citekeys (ignore existing results)
--dry-run        # Preview what would be processed without execution
```

### Example Commands

```bash
# Step 2: OCR (after upload)
python scripts/run_ocr.py \
    --source-job-id 2026-01-02_00-28-28 \
    --citekeys dagz_v01 dagz_v02

# Step 3: Sync (after OCR)
python scripts/sync_ocr.py \
    --source-job-id 2026-01-02_00-33-11 \
    --input config/my_citekeys.yaml

# Step 4: Parse (after sync)
python scripts/parse_structure.py \
    --source-job-id 2026-01-02_11-15-30 \
    --resume-from 2026-01-02_11-32-58
```

## 4. Run Modes: Understanding Runs and Reruns

### The Data Worker Scenario

Imagine you're a data worker processing 100 documents through OCR:

**Monday Morning (First Run)**
```bash
python scripts/run_ocr.py \
    --source-job-id 2026-01-02_00-28-28 \
    --input config/all_docs.yaml
```
- Creates job: `2026-01-02_09-00-00`
- Processes: All 100 documents
- Result: 95 succeed, 5 fail due to API timeouts

**Tuesday Morning (Resume Failed)**
```bash
python scripts/run_ocr.py \
    --source-job-id 2026-01-02_00-28-28 \
    --resume-from 2026-01-02_09-00-00
```
- **Reuses same job ID:** `2026-01-02_09-00-00`
- Processes: Only the 5 failed documents
- Result: All 5 succeed
- Total: All 100 documents now complete in job `2026-01-02_09-00-00`

**Wednesday (Change OCR Parameters - Force Rerun)**

You realize the OCR confidence threshold was too low. You need to reprocess with better settings.

```bash
python scripts/run_ocr.py \
    --source-job-id 2026-01-02_00-28-28 \
    --input config/all_docs.yaml \
    --force-rerun
```
- **Creates NEW job ID:** `2026-01-03_10-30-00`
- Processes: All 100 documents again (even though they already have results)
- Result: New results with improved OCR parameters
- History: Old results still available in `2026-01-02_09-00-00`

### Key Differences

| Mode | Job ID | Processes | Use Case |
|------|--------|-----------|----------|
| **First Run** | Creates new | All citekeys | Initial processing |
| **Regular Rerun** | Creates new if needed | Only missing results | Continue from where you left off |
| **Resume** | Reuses existing | Only failed items | Fix failures without new job |
| **Force Rerun** | Always creates new | ALL citekeys | Changed parameters, need fresh results |

### Decision Tree: Which Mode to Use?

```
Do you have results from a previous run?
‚îÇ
‚îú‚îÄ NO ‚Üí Use regular run (--input or --citekeys)
‚îÇ        Creates new job, processes all citekeys
‚îÇ
‚îî‚îÄ YES ‚Üí Did the previous run have failures?
         ‚îÇ
         ‚îú‚îÄ NO ‚Üí Did you change parameters/config?
         ‚îÇ        ‚îÇ
         ‚îÇ        ‚îú‚îÄ NO ‚Üí Nothing to do! Results already exist.
         ‚îÇ        ‚îÇ
         ‚îÇ        ‚îî‚îÄ YES ‚Üí Use --force-rerun
         ‚îÇ                 Creates new job, reprocesses everything
         ‚îÇ
         ‚îî‚îÄ YES ‚Üí Do you want to create a new job?
                  ‚îÇ
                  ‚îú‚îÄ NO ‚Üí Use --resume-from
                  ‚îÇ        Reuses job ID, processes only failures
                  ‚îÇ
                  ‚îî‚îÄ YES ‚Üí Use regular run (--input or --citekeys)
                           Creates new job, processes missing results
```

## 5. Advanced Features

### Wildcard Patterns

Use `_v*` and `_y*` wildcards to match multiple citekeys:

```bash
# Process all volumes of DAGZ
python scripts/parse_structure.py \
    --source-job-id 2026-01-02_11-15-30 \
    --citekeys "dagz_v*"
    
# Matches: dagz_v01, dagz_v02, dagz_v03, ...
```

```bash
# Process all years 2000-2009 for MZDNP
python scripts/parse_structure.py \
    --source-job-id 2026-01-02_11-15-30 \
    --citekeys "mzdnp_y200*"
    
# Matches: mzdnp_y2001, mzdnp_y2002, ..., mzdnp_y2009
```

**Why only _v* and _y*?**
- Prevents overly broad matches (e.g., `dagz*` matching everything)
- Matches common naming patterns in our corpus
- Reduces risk of accidental batch processing

### Dry Run Preview

Preview what would be processed before committing:

```bash
python scripts/parse_structure.py \
    --source-job-id 2026-01-02_11-15-30 \
    --citekeys "dagz_v*" \
    --dry-run
```

Output:
```
======================================================================
üîç DRY RUN PREVIEW: parse_structure
======================================================================

üìã Configuration:
  Source Job ID:    2026-01-02_11-15-30
  Force Rerun:      False
  Total Citekeys:   5

‚úÖ Citekeys to Process (2):
    ‚Ä¢ dagz_v04
    ‚Ä¢ dagz_v05

‚äò Citekeys to Skip (3):
    ‚Ä¢ dagz_v01: Result exists in job 2026-01-02_11-32-58
    ‚Ä¢ dagz_v02: Result exists in job 2026-01-02_11-32-58
    ‚Ä¢ dagz_v03: Result exists in job 2026-01-02_11-32-58

üÜî Estimated Job ID:
  2026-01-02_15-30-00

‚úÖ Will create new job and process 2 citekeys

======================================================================
üí° To execute: Remove --dry-run flag
======================================================================
```

## 6. Complete Example Workflow

Let's walk through a complete pipeline run:

### Step 0: Upload PDFs

```bash
python scripts/upload_pdfs.py \
    --input config/my_docs.yaml
```
**Result:** Job `2026-01-02_00-28-28` created with 10 documents uploaded

In [None]:
# Set up environment
import sys
sys.path.insert(0, '../../scripts')

from pathlib import Path

# Show directory structure
analytics_root = Path('../../data/analytics')
print("üìÅ Data Structure:")
print(f"Analytics Root: {analytics_root}")
print(f"Exists: {analytics_root.exists()}")

### Step 2: Run OCR

```bash
# Preview first
python scripts/run_ocr.py \
    --source-job-id 2026-01-02_00-28-28 \
    --citekeys "dagz_v*" \
    --dry-run

# Execute
python scripts/run_ocr.py \
    --source-job-id 2026-01-02_00-28-28 \
    --citekeys dagz_v01 dagz_v02 dagz_v03
```
**Result:** Job `2026-01-02_00-33-11` created, 3 documents processed

### Step 3: Sync OCR Results

```bash
python scripts/sync_ocr.py \
    --source-job-id 2026-01-02_00-33-11 \
    --citekeys dagz_v01 dagz_v02 dagz_v03
```
**Result:** Job `2026-01-02_11-15-30` created, 3 documents synced

### Step 4: Parse Structure

```bash
python scripts/parse_structure.py \
    --source-job-id 2026-01-02_11-15-30 \
    --citekeys dagz_v01 dagz_v02 dagz_v03
```
**Result:** Job `2026-01-02_11-32-58` created, pagination detected for 3 documents

### Handling Failures

Suppose `dagz_v03` failed. Resume it:

```bash
python scripts/parse_structure.py \
    --source-job-id 2026-01-02_11-15-30 \
    --resume-from 2026-01-02_11-32-58
```
**Result:** Same job `2026-01-02_11-32-58` updated, `dagz_v03` now complete

## 7. Best Practices

### ‚úÖ DO

- **Use explicit job IDs:** Always specify `--source-job-id` with exact timestamp
- **Preview first:** Use `--dry-run` for large batches to verify what will be processed
- **Use wildcards wisely:** Leverage `_v*` and `_y*` for volume and year ranges
- **Resume failures:** Use `--resume-from` to fix issues in the same job
- **Force rerun when needed:** Use `--force-rerun` when parameters change

### ‚ùå DON'T

- **Use "latest":** Never use "latest" as job ID (breaks reproducibility)
- **Skip source tracking:** Always provide `--source-job-id` to maintain lineage
- **Ignore dry run:** Don't skip `--dry-run` for large or complex operations
- **Use broad wildcards:** Don't use patterns like `*` or `dagz*` (only `_v*` and `_y*`)
- **Mix input modes:** Don't combine `--input`, `--citekeys`, and `--resume-from`

### Common Pitfalls

1. **Lost lineage:** Running a step without `--source-job-id` breaks the chain
2. **Accidental reprocessing:** Not checking existing results wastes compute time
3. **Wildcard explosions:** Using overly broad patterns processes unintended files
4. **Resume confusion:** Using `--resume-from` with wrong source creates mismatched data