# 05 - ETL Design Generator

This notebook uses the RAG system to generate structured ETL designs.

In [1]:
import sys
import json
import os
sys.path.append('..')

from src.etl_design_schema import EtlDesignDoc
# UPDATED IMPORTS: Use new packages
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from langchain_ollama import OllamaEmbeddings
from qdrant_client import QdrantClient
from qdrant_client.http import models

# Config
QDRANT_URL = "http://qdrant:6333"
COLLECTION_NAME = "etl_specs"
OLLAMA_URL = "http://host.docker.internal:11434"
LLM_MODEL = "llama3" # Use a capable model for JSON extraction
EMBEDDING_MODEL = "nomic-embed-text"

# 1. Setup Retrieval
client = QdrantClient(url=QDRANT_URL)
embeddings = OllamaEmbeddings(base_url=OLLAMA_URL, model=EMBEDDING_MODEL)
# NOTE: We bypass langchain-qdrant vectorstore for retrieval to avoid client version conflicts (missing 'search' method)

llm = ChatOllama(base_url=OLLAMA_URL, model=LLM_MODEL, temperature=0)

# 2. Define Extraction Chain
# Pydantic Parser ensures strict JSON structure matching our schema
parser = PydanticOutputParser(pydantic_object=EtlDesignDoc)

PROMPT_TEMPLATE = """
You are an expert Data Architect. Your goal is to extract a structured ETL Design from the provided context.

Context:
{context}

Instructions:
1. Analyize the context for source systems, target systems, and mapping rules.
2. Identify specific field mappings. If logic is missing but implied (e.g., same name), note it as Direct.
3. Identify any rigid validation rules described (e.g., "Must not be null", "Must be unique").
4. Output STRICT JSON that matches the provided schema.

IMPORTANT: 
- Output ONLY a valid JSON object.
- Do NOT return the Schema definition itself.
- Do NOT return markdown formatting like ```json.
- Ensure all required fields (pipeline_name, summary, source, target) are filled.

{format_instructions}
"""

prompt = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)

def retrieve_documents(query, k=5):
    """Manual retrieval using Qdrant Client to avoid langchain compatibility issues."""
    # 1. Embed Query
    query_vector = embeddings.embed_query(query)
    
    # 2. Search using query_points (robust method)
    try:
        results = client.query_points(
            collection_name=COLLECTION_NAME,
            query=query_vector,
            limit=k,
            with_payload=True
        ).points
        
        # 3. Extract text content from payload
        docs = []
        for point in results:
            content = point.payload.get("page_content", "")
            docs.append(content)
        return docs
        
    except Exception as e:
        print(f"Retrieval Error: {e}")
        return []

def generate_design(query: str):
    print(f"Searching RAG for: {query}")
    
    # Manual Retrieval Call
    docs_content = retrieve_documents(query)
    context = "\n\n".join(docs_content)
    
    if not context:
        print("No relevant documents found in index. Did you run the ingestion notebook?")
        return None
    
    print(f"Found {len(docs_content)} context chunks. Generating structured design with {LLM_MODEL}...")
    chain = prompt | llm | parser
    
    try:
        design_obj = chain.invoke({
            "context": context,
            "format_instructions": parser.get_format_instructions()
        })
        return design_obj
    except Exception as e:
        print("Error in generation/parsing:")
        print(e)
        return None

# --- EXECUTION ---
# Modify this query to match the spec document you uploaded
pipeline_query = "Customer 360 Integration from Salesforce to Snowflake"

design = generate_design(pipeline_query)

if design:
    if not os.path.exists("../results"):
        os.makedirs("../results")

    # 3. Save Machine Actionable (JSON)
    json_path = "../results/design.json"
    with open(json_path, "w") as f:
        f.write(design.model_dump_json(indent=2))
    print(f"SUCCESS: Saved Machine-Actionable Design to {json_path}")

    # 4. Generate Human Consumable (MD)
    md_content = f"# ETL Design: {design.pipeline_name}\n\n"
    md_content += f"**Summary**: {design.summary}\n\n"
    md_content += f"## Systems\n- **Source**: {design.source.system_name} ({design.source.object_name})\n"
    md_content += f"- **Target**: {design.target.system_name} ({design.target.object_name})\n\n"
    md_content += "## Field Mappings\n| Source | Target | Logic | Business Rule |\n|---|---|---|---|\n"
    for m in design.field_mappings:
        logic = m.transformation_logic or 'Direct'
        rule = m.business_rule or '-'
        md_content += f"| `{m.source_field}` | `{m.target_field}` | {logic} | {rule} |\n"
    
    md_content += "\n## Validation Rules\n"
    for r in design.data_quality_rules:
        md_content += f"- ðŸ”´ **{r.target_field}**: `{r.rule_type}` (Severity: {r.severity})\n"
        
    md_path = "../results/design_report.md"
    with open(md_path, "w") as f:
        f.write(md_content)
    print(f"SUCCESS: Saved Human-Consumable Report to {md_path}")
    
    # Display for the notebook user
    from IPython.display import Markdown
    display(Markdown(md_content))
else:
    print("Failed to generate design.")

Searching RAG for: Customer 360 Integration from Salesforce to Snowflake
Found 2 context chunks. Generating structured design with llama3...
SUCCESS: Saved Machine-Actionable Design to ../results/design.json
SUCCESS: Saved Human-Consumable Report to ../results/design_report.md


# ETL Design: Customer 360

**Summary**: Migrates customer data from Salesforce Account object to Snowflake DIM_CUSTOMER table.

## Systems
- **Source**: Salesforce CRM (Account Table)
- **Target**: Snowflake Data Warehouse (DIM_CUSTOMER)

## Field Mappings
| Source | Target | Logic | Business Rule |
|---|---|---|---|
| `AccountId` | `CUST_ID` | Direct Map | Primary Key, immutable. |
| `Name` | `CUST_NAME` | UPPER(Name) | Standardize to uppercase for reporting |
| `BillingState` | `STATE_CODE` | Lookup(StateMap) | Standardize state names to 2-letter codes |
| `AnnualRevenue` | `REVENUE` | CAST(AnnualRevenue AS FLOAT) | Ensure numeric type |
| `PersonEmail` | `PRIMARY_EMAIL` | LOWER(PersonEmail) | Strictly lowercased |
| `Phone` | `CONTACT_PHONE` | Strip non-numeric characters (dashes, parentheses) | Raw digits only |

## Validation Rules
- ðŸ”´ **CUST_ID**: `Must be Unique and Not Null` (Severity: error)
- ðŸ”´ **REVENUE**: `Must be >= 0` (Severity: error)
- ðŸ”´ **STATE_CODE**: `Must be in the reference set of valid US State Codes` (Severity: error)
