# Bridge L3.M5.2 → L3.M5.3 Readiness Validation

## Purpose

This bridge validates the transition from **Data Pipelines** (M5.2 - production automation) to **Data Quality** (M5.3 - quality scoring and drift detection).

**What Shifts:** You move from running pipelines reliably at scale to ensuring the data flowing through them meets quality standards. M5.2 gave you throughput and reliability; M5.3 adds visibility into what you're actually indexing.

**Why It Matters:** A pipeline processing 5,000 documents in 8 minutes is worthless if those documents contain corrupted text, duplicates, or degraded data. Quality checks prevent garbage-in-garbage-out scenarios that undermine RAG system accuracy.

## Concepts Covered

This bridge introduces **validation gates** as a prerequisite pattern:

- **Infrastructure Readiness Checks** - Verifying production services are operational before layering new capabilities
- **Minimum Viable Dataset** - Establishing thresholds for meaningful quality analysis
- **Monitoring Foundation** - Ensuring metrics infrastructure exists to extend with quality signals

**Delta from M5.2:** No new pipeline features; only validation that existing automation is production-ready for quality instrumentation.

## After Completing This Bridge

You will be able to:

- Verify Airflow DAGs have run successfully for 3+ consecutive days
- Confirm parallel processing workers (Celery) are active and handling load
- Validate Prometheus metrics are collecting and visible in Grafana
- Check vector database contains minimum dataset size (1,000+ documents) for quality analysis
- Understand why each readiness check blocks M5.3 progress if failed

## Context in Track

**Track:** Level 3 - Production RAG Systems

**Bridge Position:** M5.2 → M5.3

**Previous Module:** L3.M5.2 Data Pipelines (Airflow scheduling, Celery parallelization, error handling, Prometheus monitoring)

**Next Module:** L3.M5.3 Data Quality (chunk quality scoring, duplicate detection, data drift monitoring)

**Estimated Time:** 15-20 minutes (validation checks only; no new implementation)

## Run Locally

**Windows (PowerShell):**
```powershell
powershell -c "$env:PYTHONPATH='$PWD'; jupyter notebook"
```

**macOS/Linux:**
```bash
PYTHONPATH=$PWD jupyter notebook
```

**Dependencies:** Python 3.8+, Jupyter Notebook

**Optional Service SDKs:** `apache-airflow`, `celery`, `prometheus-client`, `pinecone-client` (only needed for automated checks; manual verification via dashboards works without these)

---

## 1. Recap: What M5.2 Delivered

Module 5.2 (Data Pipelines) shipped four production-grade capabilities:

### 1.1 Automated Scheduling
- **Achievement:** DAGs run daily at 2 AM with zero manual intervention
- **Technology:** Airflow scheduling

### 1.2 Parallel Processing
- **Achievement:** 5,000 documents process in 8 minutes instead of 40 (5x speedup)
- **Technology:** 4-8 Celery workers

### 1.3 Error Handling
- **Achievement:** Single document failures don't crash pipeline
- **Technology:** Automatic retries with exponential backoff

### 1.4 Production Monitoring
- **Achievement:** Real-time metrics tracking
- **Metrics:** Success rate, P95 latency, error types
- **Technology:** Prometheus + Grafana dashboards

---

## 2. Readiness Check #1: Airflow DAGs Running Successfully

**Requirement:** Verify green runs for last 3 days in Airflow UI

**Why Critical:** Prevents 3+ hours debugging quality checks on broken pipelines. Quality instrumentation assumes stable base pipeline execution.

**Pass Criteria:** All scheduled DAG runs show SUCCESS status for 72+ hours

**What this code does:** Checks for `AIRFLOW_HOME` environment variable to confirm Airflow is configured. Skips gracefully if not present (offline-friendly), allowing manual dashboard verification.

In [None]:
import os
from datetime import datetime, timedelta

# Check if Airflow is configured
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')

if not AIRFLOW_HOME:
    print("⚠️ Skipping (no Airflow configured)")
    print("To validate: Check Airflow UI for DAG runs in last 3 days")
else:
    # Expected: Query DAG runs via Airflow API
    # Expected: dag_runs = [{"state": "success", "date": "2025-11-05"}, ...]
    # Expected: all([r["state"] == "success" for r in dag_runs]) == True
    print(f"✓ Airflow home: {AIRFLOW_HOME}")
    print("Manual verification required: Check Airflow UI for green runs")

---

## 3. Readiness Check #2: Parallel Processing Active (4-8 Workers)

**Requirement:** Verify Flower dashboard shows all workers active during runs

**Why Critical:** Quality checks add 20% overhead to processing time. Without parallelization, your 8-minute pipeline becomes 10+ minutes, degrading throughput.

**Pass Criteria:** 4-8 Celery workers registered and actively processing tasks

**What this code does:** Detects Celery broker configuration via environment variables. Prints skip message if unconfigured, enabling offline notebook execution without external dependencies.

In [None]:
import os

# Check for Celery/Flower configuration
CELERY_BROKER = os.getenv('CELERY_BROKER_URL')
FLOWER_URL = os.getenv('FLOWER_URL', 'http://localhost:5555')

if not CELERY_BROKER:
    print("⚠️ Skipping (no Celery configured)")
    print("To validate: Check Flower dashboard for 4-8 active workers")
else:
    # Expected: Query Flower API /api/workers
    # Expected: workers = {"worker1": {"status": "active"}, "worker2": {...}, ...}
    # Expected: 4 <= len(workers) <= 8
    print(f"✓ Celery broker: {CELERY_BROKER}")
    print(f"  Flower URL: {FLOWER_URL}")
    print("Manual verification required: Check Flower for active workers")

---

## 4. Readiness Check #3: Prometheus Metrics Collecting

**Requirement:** Verify Grafana dashboard displays `documents_processed_total` metric

**Why Critical:** Quality metrics (corruption rate, duplicate %, drift scores) will extend the existing Prometheus setup. If base metrics aren't collecting, quality metrics can't piggyback on the infrastructure.

**Pass Criteria:** Prometheus scraping pipeline metrics; Grafana shows recent data points (last 5 minutes)

**What this code does:** Looks for Prometheus URL in environment. Falls back to offline mode if absent, prompting manual Grafana dashboard checks instead of automated queries.

In [None]:
import os

# Check for Prometheus/Grafana configuration
PROMETHEUS_URL = os.getenv('PROMETHEUS_URL', 'http://localhost:9090')
GRAFANA_URL = os.getenv('GRAFANA_URL', 'http://localhost:3000')

if not os.getenv('PROMETHEUS_URL'):
    print("⚠️ Skipping (no Prometheus configured)")
    print("To validate: Check Grafana for 'documents_processed_total' metric")
else:
    # Expected: Query Prometheus API /api/v1/query?query=documents_processed_total
    # Expected: result = {"data": {"result": [{"value": [timestamp, "1234"]}]}}
    # Expected: int(result["data"]["result"][0]["value"][1]) > 0
    print(f"✓ Prometheus: {PROMETHEUS_URL}")
    print(f"  Grafana: {GRAFANA_URL}")
    print("Manual verification: Check Grafana for documents_processed_total")

---

## 5. Readiness Check #4: Minimum Dataset Size

**Requirement:** Verify Pinecone shows 1,000+ vectors indexed

**Why Critical:** Quality analysis (detecting duplicates, measuring drift) requires statistically meaningful dataset sizes. Testing on <100 documents produces unreliable quality signals.

**Pass Criteria:** Vector database contains at least 1,000 indexed documents

**What this code does:** Validates Pinecone API credentials exist in environment. Skips external API calls if missing, preserving offline execution while guiding manual dashboard validation.

In [None]:
import os

# Check for Pinecone configuration
PINECONE_API_KEY = os.getenv('PINECONE_API_KEY')
PINECONE_ENV = os.getenv('PINECONE_ENVIRONMENT')

if not PINECONE_API_KEY:
    print("⚠️ Skipping (no Pinecone API key)")
    print("To validate: Check Pinecone dashboard for 1,000+ vectors")
else:
    # Expected: from pinecone import Pinecone; pc = Pinecone(api_key=...)
    # Expected: index = pc.Index("your-index"); stats = index.describe_index_stats()
    # Expected: stats["total_vector_count"] >= 1000
    print(f"✓ Pinecone API key configured")
    print(f"  Environment: {PINECONE_ENV or 'not set'}")
    print("Manual verification: Confirm 1,000+ vectors in Pinecone index")

---

## 6. Call-Forward: What M5.3 Will Introduce

**Central Question:** "How do you know you're indexing good quality data?"

Module 5.3 (Data Quality) builds three critical capabilities on top of your production pipeline:

### 6.1 Chunk Quality Scoring
- **Goal:** Detect corrupted text and extraction errors
- **Target Accuracy:** >80% detection rate
- **Techniques:** Text entropy analysis, language detection, structural validation
- **Output:** Per-chunk quality scores, automated filtering of low-quality data

### 6.2 Duplicate Detection
- **Goal:** Identify near-duplicate documents before indexing
- **Target Performance:** <5% false positive rate
- **Techniques:** MinHash signatures, Locality-Sensitive Hashing (LSH)
- **Output:** Deduplication reports, similarity clustering

### 6.3 Data Drift Monitoring
- **Goal:** Alert when document distributions change meaningfully
- **Techniques:** Statistical tests (KS test, chi-square), distribution tracking
- **Output:** Drift alerts, anomaly detection dashboards
- **Integration:** Extends existing Prometheus/Grafana monitoring

---

**Why This Matters:** Your pipeline can process 5,000 documents efficiently, but without quality checks, you might be indexing corrupted text, duplicates, or degraded data. M5.3 ensures the data quality matches your pipeline performance.