# üîß Data Engineering Pipeline ‚Äî Log Processing with loclean

A production-oriented **data engineering** notebook showing how `loclean` handles
unstructured log data: parsing, shredding into relational tables, structured
extraction, and quality validation.

**Pipeline:** Raw logs ‚Üí Structured Extraction ‚Üí Log Shredding ‚Üí Quality Gates ‚Üí Clean relational tables

> **Model:** `qwen2.5-coder:1.5b` (lightweight, code-specialised).
> Swap to `qwen2.5-coder:7b` for complex log formats.

In [None]:
import polars as pl

import loclean

MODEL = "qwen2.5-coder:1.5b"

## 1 ¬∑ Raw Log Data

Simulates a mixed-format log ingestion: web server access logs with embedded
user agents, IPs, status codes, and response times.

In [None]:
logs = pl.DataFrame(
    {
        "log_entry": [
            (
                "192.168.1.10 - admin "
                "[2024-01-15 08:23:45] "
                '"GET /api/users HTTP/1.1" '
                '200 1523 0.045s "Mozilla/5.0"'
            ),
            (
                "10.0.0.55 - jsmith "
                "[2024-01-15 08:24:01] "
                '"POST /api/orders HTTP/1.1" '
                '201 892 0.123s "curl/7.68"'
            ),
            (
                "172.16.0.99 - - "
                "[2024-01-15 08:24:15] "
                '"GET /health HTTP/1.1" '
                '200 23 0.002s "kube-probe/1.28"'
            ),
            (
                "192.168.1.10 - admin "
                "[2024-01-15 08:25:30] "
                '"DELETE /api/users/42 HTTP/1.1" '
                '403 156 0.015s "Mozilla/5.0"'
            ),
            (
                "10.0.0.55 - jsmith "
                "[2024-01-15 08:26:00] "
                '"PUT /api/orders/100 HTTP/1.1" '
                '200 445 0.089s "curl/7.68"'
            ),
            (
                "192.168.2.1 - ops "
                "[2024-01-15 08:27:10] "
                '"GET /metrics HTTP/1.1" '
                '200 8921 0.234s "Prometheus/2.45"'
            ),
            (
                "10.0.0.55 - jsmith "
                "[2024-01-15 08:28:00] "
                '"POST /api/orders HTTP/1.1" '
                '500 234 2.105s "curl/7.68"'
            ),
            (
                "172.16.0.99 - - "
                "[2024-01-15 08:29:00] "
                '"GET /health HTTP/1.1" '
                '200 23 0.001s "kube-probe/1.28"'
            ),
            (
                "192.168.1.10 - admin "
                "[2024-01-15 08:30:15] "
                '"GET /api/users?page=2 HTTP/1.1" '
                '200 3201 0.067s "Mozilla/5.0"'
            ),
            (
                "10.0.0.88 - deploy "
                "[2024-01-15 08:31:00] "
                '"POST /api/deploy HTTP/1.1" '
                '202 567 1.456s "Jenkins/2.401"'
            ),
        ]
    }
)

print(f"Sample log entries: {logs.shape}")
logs.head(3)

## 2 ¬∑ Structured Extraction ‚Äî parse fields from logs

Use `loclean.extract()` with a Pydantic schema to parse structured fields
from each log line. The LLM handles format variations automatically.

In [None]:
from pydantic import BaseModel


class AccessLog(BaseModel):
    ip: str
    user: str
    timestamp: str
    method: str
    path: str
    status_code: int
    response_bytes: int
    response_time_s: float
    user_agent: str


parsed = loclean.extract(
    logs,
    AccessLog,
    target_col="log_entry",
    output_type="dataframe",
    model=MODEL,
)

print(f"Parsed {parsed.shape[0]} entries into {parsed.shape[1]} columns")
parsed.head(5)

## 3 ¬∑ Compiled Extraction ‚Äî high-performance parsing

`extract_compiled()` generates a native Python function (no LLM at runtime),
for 100x faster parsing on large datasets.

In [None]:
compiled_result = loclean.extract_compiled(
    logs,
    "log_entry",
    AccessLog,
    instruction="Parse the access log entry into structured fields.",
    max_retries=5,
    model=MODEL,
)

print(f"Compiled extraction: {compiled_result.shape}")
compiled_result.head(3)

## 4 ¬∑ Log Shredding ‚Äî relational table decomposition

The LLM infers a relational schema and generates a parser to split
log entries into normalised tables (e.g. `requests`, `users`). This is
useful for loading into a data warehouse.

In [None]:
tables = loclean.shred_to_relations(
    logs,
    "log_entry",
    sample_size=10,
    max_retries=5,
    model=MODEL,
)

print(f"Shredded into {len(tables)} tables:\n")
for name, df in tables.items():
    print(f"  üìã {name}: {df.shape}")
    print(f"     Columns: {list(df.columns)}\n")

In [None]:
# Inspect each shredded table
for name, df in tables.items():
    print(f"\n{'=' * 60}")
    print(f"Table: {name}")
    print(f"{'=' * 60}")
    print(df)

## 5 ¬∑ Entity Resolution ‚Äî canonicalize user identifiers

User fields like `"admin"`, `"jsmith"`, `"-"` (anonymous) need normalisation.
This helps build consistent user activity tables.

In [None]:
user_df = pl.DataFrame(
    {
        "user_raw": [
            "admin",
            "Admin",
            "ADMIN",
            "administrator",
            "jsmith",
            "j.smith",
            "john.smith",
            "ops",
            "operations",
            "deploy",
            "deployer",
            "-",
            "anonymous",
            "(none)",
        ]
    }
)

resolved = loclean.resolve_entities(
    user_df,
    "user_raw",
    threshold=0.7,
    model=MODEL,
)

print("User entity resolution:")
resolved

## 6 ¬∑ Quality Gates ‚Äî validate processed data

Before loading into the warehouse, validate the data against
business rules defined in plain English.

In [None]:
quality = loclean.validate_quality(
    logs,
    rules=[
        "Each log entry must contain an IP address",
        ("HTTP status codes must be 3-digit numbers between 100 and 599"),
        ("Timestamps must follow ISO-8601 or common datetime format"),
    ],
    sample_size=10,
    model=MODEL,
)

rate = quality["compliance_rate"]
status = "‚úÖ PASS" if rate >= 0.95 else "‚ùå FAIL"
print(f"Quality gate result: {status}")
print(f"Compliance: {rate:.0%}")

if quality["failures"]:
    print(f"\nTop failures ({len(quality['failures'])}):\n")
    for f in quality["failures"][:5]:
        idx = f.get("row_index", "?")
        rule = f.get("rule", "")
        reason = f.get("reason", "")
        print(f"  Row {idx}: {rule} ‚Üí {reason}")

## 7 ¬∑ PII Scrubbing ‚Äî before data lake ingestion

Scrub IP addresses and usernames before storing in the data lake.

In [None]:
original_logs = logs.select("log_entry").to_series().to_list()

scrubbed = loclean.scrub(
    logs,
    target_col="log_entry",
    mode="mask",
    model=MODEL,
)

scrubbed_logs = scrubbed.select("log_entry").to_series().to_list()

print("Scrubbed log entries (PII masked):")
for orig, masked in zip(original_logs[:3], scrubbed_logs[:3], strict=True):
    print(f"  Before: {orig}")
    print(f"  After:  {masked}\n")

## Summary

| Step | API | Use Case |
|------|-----|----------|
| Structured Extraction | `loclean.extract()` | Parse log fields into columns |
| Compiled Extraction | `loclean.extract_compiled()` | High-perf native parsing |
| Log Shredding | `loclean.shred_to_relations()` | Split into normalised tables |
| Entity Resolution | `loclean.resolve_entities()` | Canonicalize user IDs |
| Quality Gates | `loclean.validate_quality()` | Pre-load data validation |
| PII Scrubbing | `loclean.scrub()` | Mask PII before lake ingestion |