```{contents}
```

## Data Transformation

---

### 1 — High-level ETL / ELT Architecture

**ETL (strict):**

```
Source Systems (S3, Web, DB, APIs)
    ↓ Extract
Preprocessing / Validation (clean, schema validate)
    ↓ Transform (chunk, dedup, enrich, embed)
    ↓ Load (Vector DB, Document Store, Data Lake)
```

**ELT (hybrid common in GenAI):**

```
Source Systems
    ↓ Extract
Raw Landing Zone (S3 / GCS / Data Lake)
    ↓ Load
Transformation Jobs (Spark / Python / DBT / Airflow)
    ↓ Transformed Zone
    ↓ Load into Vector DB / Feature Store / Catalog
```

**Key components**

* Ingestors: connectors, scrapers, API fetchers
* Parsers: PDF/OCR/HTML → text
* Validator: schema, language, safety checks
* Cleaner: boilerplate removal, normalization
* Chunker: semantic chunking + metadata
* Deduper: exact + near-duplicate logic
* Embedder: model wrapper (OpenAI/local)
* Vector store: Pinecone/FAISS/Chroma/Weaviate wrapper
* Orchestrator: Airflow / Prefect / Dagster
* Monitoring: metrics, logs, data-quality dashboard

---

### 2 — Python transformation pipeline

**Files / modules**

* `models.py` — pydantic schemas
* `validator.py` — DocumentValidator (from earlier)
* `cleaner.py` — boilerplate & normalization
* `chunker.py` — semantic chunking
* `dedup.py` — exact + near-duplicate logic
* `embedder.py` — embedding wrapper (pluggable)
* `storage.py` — vector store interface
* `pipeline.py` — orchestrates ETL/ELT flow

Below is a single-file example that includes the main modules for clarity. Replace embedding and storage implementations with production drivers.

```python
# pipeline_full.py
from typing import List, Dict, Optional, Tuple
from pydantic import BaseModel, Field
import re, hashlib, json, time
import numpy as np

# -------------------------
# models.py
# -------------------------
class Metadata(BaseModel):
    source: str
    language: Optional[str] = "en"
    timestamp: Optional[str] = None
    tags: List[str] = []

class Document(BaseModel):
    doc_id: str
    text: str
    metadata: Metadata

class Chunk(BaseModel):
    chunk_id: str
    doc_id: str
    chunk_text: str
    chunk_index: int
    metadata: Dict = {}

# -------------------------
# validator.py (simplified)
# -------------------------
class DocumentValidator:
    def __init__(self, embedding_dim=1536, allowed_langs=None):
        self.embedding_dim = embedding_dim
        self.allowed_langs = allowed_langs or ["en"]

    def validate_schema(self, doc: Document) -> Tuple[bool, str]:
        if not doc.doc_id:
            return False, "Missing doc_id"
        if not doc.text or len(doc.text.strip()) < 10:
            return False, "Text too short"
        return True, "ok"

    def validate_embedding(self, emb: np.ndarray) -> Tuple[bool, str]:
        if emb is None:
            return False, "No embedding"
        if emb.ndim != 1:
            return False, "Embedding must be 1D"
        if emb.shape[0] != self.embedding_dim:
            return False, f"Embedding dimension mismatch {emb.shape[0]} != {self.embedding_dim}"
        if np.isnan(emb).any() or np.isinf(emb).any():
            return False, "Invalid embedding values"
        return True, "ok"

# -------------------------
# cleaner.py
# -------------------------
BOILERPLATE_PATTERNS = [
    r"Page \d+ of \d+",
    r"Copyright.*",
    r"Back to top",
    r"^\s*[-=]{3,}\s*$",  # separators
]

def remove_boilerplate(text: str) -> str:
    out = text
    for p in BOILERPLATE_PATTERNS:
        out = re.sub(p, " ", out, flags=re.IGNORECASE | re.MULTILINE)
    out = re.sub(r"\s+", " ", out).strip()
    return out

def normalize_text(text: str) -> str:
    text = text.replace("\u00A0", " ")
    text = re.sub(r"[ \t]+", " ", text)
    text = text.strip()
    return text

def clean_text(text: str) -> str:
    t = remove_boilerplate(text)
    t = normalize_text(t)
    return t

# -------------------------
# chunker.py
# -------------------------
def chunk_text_semantic(text: str, max_tokens: int = 256) -> List[str]:
    # Simple heuristic chunker by sentences grouped to approx max_tokens words
    sentences = re.split(r'(?<=[\.\?\!])\s+', text)
    chunks = []
    current = []
    current_len = 0
    for s in sentences:
        words = s.split()
        if current_len + len(words) > max_tokens and current:
            chunks.append(" ".join(current))
            current, current_len = [], 0
        current.append(s)
        current_len += len(words)
    if current:
        chunks.append(" ".join(current))
    return chunks

# -------------------------
# dedup.py
# -------------------------
def sha256_hash(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()

def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

def exact_dedup(hashes_seen: set, text: str) -> bool:
    h = sha256_hash(" ".join(text.split()))
    if h in hashes_seen:
        return True
    hashes_seen.add(h)
    return False

# -------------------------
# embedder.py (stub)
# -------------------------
class Embedder:
    def __init__(self, dim=1536):
        self.dim = dim

    def embed(self, texts: List[str]) -> List[np.ndarray]:
        # Production: call OpenAI/Local model; here we use deterministic pseudo-embeds
        out = []
        for t in texts:
            h = int(hashlib.md5(t.encode()).hexdigest()[:8], 16)
            rng = np.random.RandomState(h)
            v = rng.randn(self.dim).astype(np.float32)
            v = v / (np.linalg.norm(v) + 1e-9)
            out.append(v)
        return out

# -------------------------
# storage.py (vector store stub)
# -------------------------
class InMemoryVectorStore:
    def __init__(self, dim=1536):
        self.dim = dim
        self.vectors = []  # list of (id, vector, metadata)

    def upsert(self, id: str, vector: np.ndarray, metadata: Dict):
        self.vectors.append((id, vector, metadata))

    def query(self, vector: np.ndarray, top_k=5):
        sims = [(cid, cosine_sim(vector, v), meta) for cid, v, meta in self.vectors]
        sims.sort(key=lambda x: x[1], reverse=True)
        return sims[:top_k]

# -------------------------
# pipeline.py
# -------------------------
class RAGIngestionPipeline:
    def __init__(self, validator: DocumentValidator, embedder: Embedder, store: InMemoryVectorStore):
        self.validator = validator
        self.embedder = embedder
        self.store = store
        self.hashes_seen = set()

    def ingest_document(self, doc: Document):
        ok, msg = self.validator.validate_schema(doc)
        if not ok:
            return False, msg

        text = clean_text(doc.text)
        chunks = chunk_text_semantic(text, max_tokens=200)

        # exact dedup at chunk level
        unique_chunks = []
        for i, c in enumerate(chunks):
            if exact_dedup(self.hashes_seen, c):
                continue
            chunk_id = f"{doc.doc_id}#chunk{i}"
            unique_chunks.append(Chunk(chunk_id=chunk_id, doc_id=doc.doc_id, chunk_text=c, chunk_index=i, metadata=doc.metadata.dict()))

        if not unique_chunks:
            return False, "No unique chunks after dedup"

        texts = [c.chunk_text for c in unique_chunks]
        embeddings = self.embedder.embed(texts)

        # near-duplicate check (naive pairwise vs store)
        for ch, emb in zip(unique_chunks, embeddings):
            v_ok, v_msg = self.validator.validate_embedding(emb)
            if not v_ok:
                return False, f"Embedding invalid: {v_msg}"
            # naive: ensure not too similar to any stored vector
            too_similar = False
            for cid, v, meta in self.store.vectors:
                if cosine_sim(emb, v) > 0.995:
                    too_similar = True
                    break
            if too_similar:
                continue
            self.store.upsert(ch.chunk_id, emb, ch.metadata)

        return True, f"Ingested {len(self.store.vectors)} vectors"

# -------------------------
# Example usage
# -------------------------
if __name__ == "__main__":
    doc = Document(
        doc_id="doc_123",
        text="""
            Acme Product Manual – Version 4.2
            Copyright © 2024 Acme Corp.
            Page 1 of 3

            How to install the Acme Widget?
            Step 1: Unpack the device.
            Step 2: Connect the power cable.
            Step 3: Download the Acme setup app.
        """,
        metadata=Metadata(source="s3://bucket/manual.pdf", timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ"))
    )

    validator = DocumentValidator(embedding_dim=1536)
    embedder = Embedder(dim=1536)
    store = InMemoryVectorStore(dim=1536)
    pipeline = RAGIngestionPipeline(validator, embedder, store)

    ok, msg = pipeline.ingest_document(doc)
    print(ok, msg)
    # Query example
    q = embedder.embed(["How to install the Acme Widget?"])[0]
    print("Query results:", store.query(q, top_k=3))
```

**Notes**

* Swap `Embedder.embed` with real model calls (OpenAI / local) and `InMemoryVectorStore` with FAISS/Pinecone/Chroma.
* Add async support for throughput.
* Add logging, retry, DLQ (dead-letter queue) for failed docs.

---

### 3 — Airflow DAG for ingestion + transformation

Below is an Airflow DAG skeleton using `PythonOperator`. It orchestrates extract → transform → load. Replace the task functions with the pipeline methods above.

```python
# airflow_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# Import your pipeline modules (assume available in PYTHONPATH)
from pipeline_full import Document, Metadata, RAGIngestionPipeline, DocumentValidator, Embedder, InMemoryVectorStore
import json, time

DEFAULT_ARGS = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    dag_id="rag_ingest_pipeline",
    default_args=DEFAULT_ARGS,
    start_date=datetime(2025, 1, 1),
    schedule_interval="@hourly",
    catchup=False,
    max_active_runs=1,
)

# Mocked extract step
def extract(**context):
    # In production: list objects from S3, read DB rows, subscribe to Kafka topic, etc.
    sample_doc = {
        "doc_id": f"doc-{int(time.time())}",
        "text": "Acme manual ...",
        "metadata": {"source": "s3://bucket/sample.pdf", "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ")}
    }
    # push list of documents to XCom (or push references)
    context['ti'].xcom_push(key="raw_docs", value=json.dumps([sample_doc]))

def transform(**context):
    raw = context['ti'].xcom_pull(key="raw_docs")
    docs = json.loads(raw)
    # instantiate pipeline components (or import from shared module)
    validator = DocumentValidator(embedding_dim=1536)
    embedder = Embedder(dim=1536)
    store = InMemoryVectorStore(dim=1536)
    pipeline = RAGIngestionPipeline(validator, embedder, store)
    results = []
    for d in docs:
        doc = Document(doc_id=d["doc_id"], text=d["text"], metadata=Metadata(**d["metadata"]))
        ok, msg = pipeline.ingest_document(doc)
        results.append({"doc_id": d["doc_id"], "ok": ok, "msg": msg})
    context['ti'].xcom_push(key="ingest_results", value=json.dumps(results))

def load(**context):
    # load could push to production vector DB, update catalog, metrics, etc.
    results = json.loads(context['ti'].xcom_pull(key="ingest_results"))
    # For demo, just print or send to monitoring
    for r in results:
        print("Ingest result:", r)

t1 = PythonOperator(task_id="extract", python_callable=extract, dag=dag)
t2 = PythonOperator(task_id="transform", python_callable=transform, dag=dag)
t3 = PythonOperator(task_id="load", python_callable=load, dag=dag)

t1 >> t2 >> t3
```

**Production considerations**

* Use sensors/operators for S3/Kafka.
* Use XCom sparingly — prefer storing references in S3/DB and passing keys.
* Make tasks idempotent.
* Add DLQ handling for failed docs.
* Parameterize resources and concurrency (pooling, task slots).
* Use KubernetesPodOperator or DockerOperator for heavy embedding tasks.

---

### 4 — How transformation differs: RAG vs LLM fine-tuning

| Aspect                   |                                            RAG (Retrieval-Augmented Generation) | LLM Fine-tuning                                                                                       |
| ------------------------ | ------------------------------------------------------------------------------: | ----------------------------------------------------------------------------------------------------- |
| Primary goal             |                   Fast, accurate retrieval of supporting context for generation | Update model weights to change behavior or knowledge                                                  |
| Output of transform      |                                  Clean, coherent chunks + embeddings + metadata | Paired training examples (input → target), instruction–response pairs, tokenized datasets             |
| Chunking strategy        | Semantic chunks (200–800 tokens) preserving context; include citations/metadata | Examples often shorter; preserve full dialogue/turns if training conversational behavior              |
| Deduplication importance |                  Critical — duplicate chunks waste vector DB and bias retrieval | Important but may be allowed if duplicates represent important distribution                           |
| Annotation needs         |                  Minimal; quality scores, citations, source attribution helpful | High: labels, correct outputs, multi-step reasoning, prompt formatting, instruction templates         |
| Safety transformations   |      Remove unsafe content before indexing; mask PII to avoid retrieval leakage | Strict removal of harmful content; curated to avoid introducing unsafe behavior during weight updates |
| Embedding generation     |    Required; multiple embedding versions possible; embedding validation crucial | Not required (unless training retrieval-augmented finetuning)                                         |
| Format for storage       |                           Vector DB + document store (JSON lines with metadata) | JSONL or TFRecords: `{prompt:..., completion:...}` or conversational formats                          |
| Update cadence           |                              Frequent incremental ingestion; live index updates | Less frequent; expensive retraining or LoRA/adapter updates                                           |
| Evaluation               |            Retrieval metrics (recall@k), end-to-end generation with R-precision | Perplexity, BLEU, ROUGE, human eval, instruction-following benchmarks                                 |
| Handling updates         |                     Dual-write, lazy migration of embeddings, versioned indexes | Re-train / fine-tune or use adapters; more costly to update knowledge                                 |

**Practical differences in transformation steps**

* **RAG**

  * Emphasize: boilerplate removal, chunk coherence, citation extraction, embedding generation, dedup.
  * Produce: `(chunk_id, chunk_text, embedding, metadata)` records.
  * Post-process: index vectors and keep provenance.

* **Fine-tuning**

  * Emphasize: consistent formatting, label correctness, instruction style alignment, dedup of erroneous labels, dataset balancing.
  * Produce: training records in model-specific format (JSONL, TFRecords) with careful tokenization and padding, often with quality metadata and sampling weights.
  * Post-process: calculate dataset statistics, remove low-quality examples, split train/val/test.

---

## 5 — Additional production considerations

* **Observability**: track ingestion success rate, validation failure reasons, duplicate rates, embedder latency, downstream retrieval performance.
* **Versioning**: schema_version in documents, embedding_model_version, vector_index_version.
* **Backfill / Migration**: support lazy migration (on-read) and batch re-embed pipelines for embedding model upgrades.
* **Privacy & Compliance**: PII detection & masking, retention policies, opt-out handling.
* **Cost/Throughput**: batch embedding calls, parallelization, GPU/CPU sizing, rate-limiting for remote APIs.
* **Testing**: unit tests for cleaning, chunking, dedup; integration tests for upserting and querying.

---

If you want, I will:

* produce a runnable repo scaffold with the files separated, or
* convert the pipeline into an Airflow `KubernetesPodOperator` pattern for scalable embedding jobs, or
* provide a production-ready `Dockerfile` + `docker-compose` for local testing.
