# Pattern 7: Data Plumbing

**The Interview-Isomorphic Pattern**

This is the hidden pattern behind most "real-world" coding challenges.

---

## The Universal Pipeline

Every data plumbing problem is the same pattern in different disguises:

```
parse → validate → normalize → aggregate → select top-k/best → report errors
```

**Time**: O(n) single pass or O(n log k) with top-k selection  
**Space**: O(n) for aggregated data

---

## When to Use

- JSONL/CSV file processing
- API response handling
- Log aggregation
- ETL-style transforms
- "Process this messy data" problems

**Recognition trigger**: "Given a file/stream of records...", "aggregate by...", "handle malformed..."

---

## Pattern Template

```python
import json
from collections import defaultdict
from typing import Iterator, Dict, List, Any

def process_records(lines: Iterator[str]) -> Dict[str, Any]:
    """Universal plumbing template."""
    aggregated = defaultdict(list)
    errors = []
    
    for line_num, line in enumerate(lines, 1):
        # 1. PARSE
        try:
            record = json.loads(line.strip())
        except json.JSONDecodeError as e:
            errors.append({"line": line_num, "error": "parse", "msg": str(e)})
            continue
        
        # 2. VALIDATE
        if not validate(record):
            errors.append({"line": line_num, "error": "validation", "record": record})
            continue
        
        # 3. NORMALIZE
        normalized = normalize(record)
        
        # 4. AGGREGATE
        key = normalized["group_key"]
        aggregated[key].append(normalized)
    
    # 5. SELECT (top-k, best, filter)
    result = select_top(aggregated, k=10)
    
    # 6. REPORT ERRORS
    return {"data": result, "errors": errors, "error_count": len(errors)}
```

## Invariant Statement

After processing line `i`, the aggregation state correctly reflects all valid records from lines `[1, i]`, and all invalid records are captured in the error list.


In [None]:
# Setup
import json
import csv
from io import StringIO
from collections import defaultdict, Counter
from typing import List, Dict, Any, Iterator, Optional
from dataclasses import dataclass
import heapq


## Walkthrough: JSONL Log Aggregation

**Problem**: Given a JSONL file of API logs, compute:
1. Request count per endpoint
2. Top-5 slowest endpoints by avg latency
3. Error rate per endpoint
4. Handle malformed lines gracefully


In [None]:
# Sample JSONL data
SAMPLE_LOGS = """
{"endpoint": "/api/users", "latency_ms": 120, "status": 200}
{"endpoint": "/api/orders", "latency_ms": 350, "status": 200}
{"endpoint": "/api/users", "latency_ms": 95, "status": 200}
{"endpoint": "/api/orders", "latency_ms": 500, "status": 500}
{"endpoint": "/api/products", "latency_ms": 80, "status": 200}
malformed line here
{"endpoint": "/api/users", "latency_ms": 110, "status": 404}
{"missing_endpoint": true}
{"endpoint": "/api/products", "latency_ms": 75, "status": 200}
""".strip().split('\n')


In [None]:
def aggregate_logs(lines: List[str]) -> Dict[str, Any]:
    """
    Process API logs following the universal pipeline.
    
    INVARIANT: After each line, stats[endpoint] reflects
    all valid requests seen for that endpoint.
    """
    # Aggregation state
    stats = defaultdict(lambda: {"count": 0, "total_latency": 0, "errors": 0})
    parse_errors = []
    validation_errors = []
    
    for line_num, line in enumerate(lines, 1):
        # 1. PARSE
        try:
            record = json.loads(line.strip())
        except json.JSONDecodeError:
            parse_errors.append(line_num)
            continue
        
        # 2. VALIDATE (required fields)
        required = ["endpoint", "latency_ms", "status"]
        if not all(k in record for k in required):
            validation_errors.append({"line": line_num, "record": record})
            continue
        
        # 3. NORMALIZE (already clean in this example)
        endpoint = record["endpoint"]
        latency = record["latency_ms"]
        is_error = record["status"] >= 400
        
        # 4. AGGREGATE
        stats[endpoint]["count"] += 1
        stats[endpoint]["total_latency"] += latency
        if is_error:
            stats[endpoint]["errors"] += 1
    
    # 5. SELECT (compute derived metrics + top-k)
    results = []
    for endpoint, data in stats.items():
        avg_latency = data["total_latency"] / data["count"]
        error_rate = data["errors"] / data["count"]
        results.append({
            "endpoint": endpoint,
            "count": data["count"],
            "avg_latency_ms": round(avg_latency, 2),
            "error_rate": round(error_rate, 3)
        })
    
    # Top 5 by average latency (descending)
    top_slow = sorted(results, key=lambda x: -x["avg_latency_ms"])[:5]
    
    # 6. REPORT
    return {
        "by_endpoint": results,
        "top_slow": top_slow,
        "parse_errors": parse_errors,
        "validation_errors": validation_errors
    }

# Run
result = aggregate_logs(SAMPLE_LOGS)
print("By endpoint:", result["by_endpoint"])
print("Top slow:", result["top_slow"])
print("Parse errors on lines:", result["parse_errors"])
print("Validation errors:", result["validation_errors"])


## Drill Problems

### Drill 1: CSV Validation
Given a CSV with columns `name,email,age`, validate:
- Email contains "@"
- Age is a positive integer
- Return valid rows + list of errors with line numbers


In [None]:
SAMPLE_CSV = """name,email,age
Alice,alice@example.com,30
Bob,bob-at-email,25
Charlie,charlie@test.org,-5
Diana,diana@corp.io,28
,missing@name.com,22
Eve,eve@place.net,not_a_number
"""

def validate_csv(csv_content: str) -> Dict[str, Any]:
    """
    TODO: Implement CSV validation.
    Return: {"valid": [...], "errors": [{"line": N, "reason": "..."}]}
    """
    pass

# Test your implementation
# result = validate_csv(SAMPLE_CSV)
# print("Valid:", result["valid"])
# print("Errors:", result["errors"])


### Drill 2: API Response Merger
Given paginated API responses as a list of dicts, merge them and deduplicate by ID.


In [None]:
PAGES = [
    {"data": [{"id": 1, "name": "A"}, {"id": 2, "name": "B"}], "next": True},
    {"data": [{"id": 2, "name": "B"}, {"id": 3, "name": "C"}], "next": True},
    {"data": [{"id": 4, "name": "D"}], "next": False},
]

def merge_pages(pages: List[Dict]) -> List[Dict]:
    """
    TODO: Merge all pages, deduplicate by ID (keep first occurrence).
    Return list of unique records.
    """
    pass

# Expected: [{"id": 1, "name": "A"}, {"id": 2, "name": "B"}, {"id": 3, "name": "C"}, {"id": 4, "name": "D"}]


## Common Variants

| Variant | Key Difference |
|---------|----------------|
| Streaming | Use generators, don't load all into memory |
| Top-K | Use heap instead of sorting at end |
| Window aggregation | Keep rolling window state |
| Multi-file | Outer loop over files, same inner logic |

## Edge Case Checklist

- [ ] Empty file/input
- [ ] All records invalid
- [ ] First/last line malformed
- [ ] Unicode/encoding issues
- [ ] Very large numbers (overflow?)
- [ ] Null/None values in fields
- [ ] Whitespace in data

## Common Bugs

| Bug | Fix |
|-----|-----|
| Not stripping whitespace | `.strip()` before parse |
| Swallowing all exceptions | Catch specific exceptions, log others |
| Off-by-one line numbers | `enumerate(lines, 1)` for 1-indexed |
| Division by zero in averages | Check `count > 0` before divide |
| Mutating input data | Work on copies if needed |


## Solutions


In [None]:
# Solutions

def validate_csv_solution(csv_content: str) -> Dict[str, Any]:
    reader = csv.DictReader(StringIO(csv_content.strip()))
    valid = []
    errors = []
    
    for line_num, row in enumerate(reader, 2):  # 2 because header is line 1
        issues = []
        
        # Validate name
        if not row.get("name", "").strip():
            issues.append("missing name")
        
        # Validate email
        if "@" not in row.get("email", ""):
            issues.append("invalid email")
        
        # Validate age
        try:
            age = int(row.get("age", ""))
            if age <= 0:
                issues.append("age must be positive")
        except ValueError:
            issues.append("age not a number")
        
        if issues:
            errors.append({"line": line_num, "reasons": issues, "row": row})
        else:
            valid.append(row)
    
    return {"valid": valid, "errors": errors}

def merge_pages_solution(pages: List[Dict]) -> List[Dict]:
    seen_ids = set()
    result = []
    
    for page in pages:
        for record in page.get("data", []):
            if record["id"] not in seen_ids:
                seen_ids.add(record["id"])
                result.append(record)
    
    return result

# Test solutions
print("CSV Validation:")
csv_result = validate_csv_solution(SAMPLE_CSV)
print(f"  Valid: {len(csv_result['valid'])} rows")
print(f"  Errors: {csv_result['errors']}")

print("\nPage Merger:")
print(f"  Result: {merge_pages_solution(PAGES)}")
