In [None]:
import streamlit as st
import pandas as pd
import time
import random
import uuid
import graphviz

# --- CONFIGURATION ---
st.set_page_config(page_title="AutoGOV | Pipeline Demo", page_icon="üÖ∞Ô∏è", layout="wide")

# Custom CSS for #ab1945 Branding & Animations
st.markdown("""
    <style>
    .stApp { background-color: #f8fafc; }
    h1, h2, h3, .stMarkdown, p, li { color: #1e293b; font-family: 'Inter', sans-serif; }
    .highlight { color: #ab1945; font-weight: 800; }

    /* Pipeline Steps Styling */
    .step-card {
        background-color: white; padding: 15px; border-radius: 10px;
        border-left: 5px solid #cbd5e1; margin-bottom: 10px;
        box-shadow: 0 2px 4px rgba(0,0,0,0.05); transition: all 0.3s;
    }
    .step-active { border-left-color: #ab1945; border-left-width: 8px; transform: translateX(5px); }
    .step-done { border-left-color: #22c55e; opacity: 0.7; }

    /* Snowflake Table Style */
    .snowflake-header { background-color: #29b5e8; color: white; padding: 5px 10px; font-family: monospace; font-weight: bold; border-radius: 5px 5px 0 0; }
    .snowflake-body { border: 1px solid #29b5e8; padding: 10px; background: #f0f9ff; border-radius: 0 0 5px 5px; font-family: monospace; color: #333; }

    /* Neo4j Style */
    .neo4j-header { background-color: #006ce6; color: white; padding: 5px 10px; font-family: monospace; font-weight: bold; border-radius: 5px 5px 0 0; }
    .neo4j-body { border: 1px solid #006ce6; padding: 10px; background: #fafafa; border-radius: 0 0 5px 5px; }

    div[data-testid="stMetricValue"] { color: #1e293b; }

    /* FORCE DARK TEXT FOR LOGS */
    div[data-testid="stText"] {
        color: #1e293b !important;
        font-family: monospace;
    }

    .stButton>button { border: 2px solid #ab1945; color: #ab1945; font-weight: bold; }
    .stButton>button:hover { border: 2px solid #ab1945; background-color: #ab1945; color: white; }
    </style>
    """, unsafe_allow_html=True)

# --- STATE MANAGEMENT ---
if 'pipeline_step' not in st.session_state:
    st.session_state.pipeline_step = 0
if 'logs' not in st.session_state:
    st.session_state.logs = []
if 'stats' not in st.session_state:
    st.session_state.stats = {
        "processed": 0, "quarantine": 0, "pods": 0, "quality_score": 100, "processing_time": 0.0
    }
# Initialize Error Rates
if 'dupe_rate' not in st.session_state: st.session_state.dupe_rate = 10
if 'typo_rate' not in st.session_state: st.session_state.typo_rate = 10
if 'null_rate' not in st.session_state: st.session_state.null_rate = 5

def add_log(message, type="INFO"):
    timestamp = time.strftime("%H:%M:%S")
    icon = "üîπ" if type == "INFO" else "üî∏" if type == "WARN" else "‚úÖ"
    st.session_state.logs.insert(0, f"{timestamp} {icon} {message}")

def reset_pipeline():
    st.session_state.pipeline_step = 0
    st.session_state.logs = []
    st.session_state.stats = {"processed": 0, "quarantine": 0, "pods": 0, "quality_score": 100, "processing_time": 0.0}

# --- DATA GENERATOR (UPDATED FOR ENTROPY) ---
@st.cache_data
def generate_sample_csv(dupe_p, typo_p, null_p):
    data = "ID,Name,Email,Phone,Source\n"
    names = ['James', 'Maria', 'Wei', 'Robert', 'Linda', 'David', 'Jennifer', 'Michael']

    def introduce_typo(text):
        if len(text) < 3: return text
        idx = random.randint(0, len(text)-2)
        return text[:idx] + text[idx+1] + text[idx] + text[idx+2:] # Swap chars

    # Generate 50 sample rows preview (implying 100k)
    for i in range(50):
        base_name = random.choice(names)

        # 1. Duplicates Logic
        is_dupe = random.random() < (dupe_p / 100.0)
        name = f"{base_name}_{i}"
        if is_dupe:
            name = f"{base_name}.{i}" # Fuzzy Duplicate

        # 2. Typos Logic
        is_typo = random.random() < (typo_p / 100.0)
        email = f"user{i}@test.com"
        if is_typo:
            name = introduce_typo(name)
            email = introduce_typo(email)

        # 3. Nulls Logic
        is_null = random.random() < (null_p / 100.0)
        phone = f"555-01{i:02d}"
        if is_null:
            phone = "" # Missing

        data += f"{uuid.uuid4()},{name},{email},{phone},CRM\n"
    return data

# --- GRAPH VISUALIZER ---
def render_neo4j_graph(state="INITIAL"):
    graph = graphviz.Digraph()
    graph.attr(rankdir='LR', size='8,5')

    if state == "INITIAL":
        for i in range(5):
            graph.node(f"A{i}", label=f"Person_{i}", style="filled", fillcolor="#ffcccb")
            graph.node(f"B{i}", label=f"Contact_{i}", style="filled", fillcolor="#e0e0e0")
    elif state == "LINKING":
        for i in range(5):
            graph.node(f"A{i}", label=f"Person_{i}", style="filled", fillcolor="#ffcccb")
            graph.node(f"B{i}", label=f"Contact_{i}", style="filled", fillcolor="#e0e0e0")
            graph.edge(f"A{i}", f"B{i}", label="HAS_CONTACT", color="#999")
            if i % 2 == 0:
                graph.edge(f"A{i}", f"A{i+1}", label="SAME_AS?", style="dashed", color="red")
    elif state == "FINAL":
        for i in range(0, 5, 2):
            with graph.subgraph(name=f'cluster_{i}') as c:
                c.attr(style='filled', color='#e6ffe6')
                c.node(f"GR{i}", label=f"GOLDEN_RECORD_{i}", shape="doublecircle", style="filled", fillcolor="#90ee90")
                c.node(f"A{i}", label=f"Src_CRM_{i}")
                c.edge(f"GR{i}", f"A{i}")
    return graph

# --- UI LAYOUT ---

# Sidebar
with st.sidebar:
    st.title("üÖ∞Ô∏è AutoGOV")
    st.caption("Level 5 Autonomous MDM")
    st.markdown("---")

    st.subheader("1. Data Prep")
    st.markdown("**Entropy Injection:**")

    dupe_rate = st.slider("1) Duplicate Pairs", 10, 30, 10, format="%d%%")
    typo_rate = st.slider("2) Typos (Levenshtein 1-2)", 10, 20, 10, format="%d%%")
    null_rate = st.slider("3) Null Values", 0, 10, 5, format="%d%%")

    # Save to state
    st.session_state.dupe_rate = dupe_rate
    st.session_state.typo_rate = typo_rate
    st.session_state.null_rate = null_rate

    csv_data = generate_sample_csv(dupe_rate, typo_rate, null_rate)
    st.download_button(
        label="üìÑ Download Synthetic CSV",
        data=csv_data,
        file_name="customer_master_100k.csv",
        mime="text/csv",
        help="Generates file based on Entropy sliders"
    )

    st.subheader("2. Ingestion")
    uploaded_file = st.file_uploader("Upload Data to Start", type=['csv'])

    if uploaded_file is not None and st.session_state.pipeline_step == 0:
        if st.button("üöÄ Initialize Pipeline", type="primary"):
            st.session_state.pipeline_step = 1
            add_log("File uploaded successfully. Checksum verified.", "‚úÖ")
            st.rerun()

    if st.session_state.pipeline_step > 0:
        if st.button("üîÑ Reset Demo"):
            reset_pipeline()
            st.rerun()

# Main Area
st.title("Mission Control Center")
st.markdown("Dataset: 100,000 synthetic customer records generated via Python Faker.")
st.markdown("**Environment: A Python simulation of the Pod Architecture, using gpt-4o-mini for worker nodes and gpt-4o for Critic nodes.**")

# Top Metrics
m1, m2, m3, m4 = st.columns(4)
m1.metric("Active Pods", st.session_state.stats["pods"], delta="Auto-Scaling" if st.session_state.stats["pods"] > 0 else "")
m2.metric("Records Processed", f"{st.session_state.stats['processed']:,}")
m3.metric("Quarantine Queue", st.session_state.stats["quarantine"], delta_color="inverse")
m4.metric("Autonomy Score", "99.0%" if st.session_state.stats['processed'] > 0 else "---")

st.markdown("---")

col_pipeline, col_details = st.columns([1, 2])

with col_pipeline:
    st.subheader("Workflow Stages")
    step_labels = ["Idle", "1. Landing Zone", "2. Orchestrator", "3. Validator", "4. Resolver", "5. Cataloger", "6. Repository"]

    for i in range(1, 7):
        status_class = "step-card"
        icon = "‚ö™"
        if st.session_state.pipeline_step == i:
            status_class += " step-active"
            icon = "üîµ"
        elif st.session_state.pipeline_step > i:
            status_class += " step-done"
            icon = "üü¢"
        st.markdown(f'<div class="{status_class}"><b>{icon} {step_labels[i]}</b></div>', unsafe_allow_html=True)

with col_details:

    # STAGE 1: LANDING
    if st.session_state.pipeline_step == 1:
        st.info("Status: File Detected in Landing Zone")
        st.code("s3://landing-zone/customer_master_100k.csv (Size: 45MB)", language="bash")
        add_log("Triggering Orchestrator for ingestion...", "INFO")
        if st.button("‚ñ∂Ô∏è Start Orchestrator"):
            st.session_state.pipeline_step = 2
            st.rerun()

    # STAGE 2: ORCHESTRATOR
    elif st.session_state.pipeline_step == 2:
        st.subheader("ü§ñ Orchestrator Agent")
        st.write("Initializing Agent Pods and connecting to Graph DB...")
        my_bar = st.progress(0)
        for i in range(100):
            time.sleep(0.01)
            my_bar.progress(i + 1)
            if i == 20: st.session_state.stats["pods"] = 1
            if i == 50: st.session_state.stats["pods"] = 3
            if i == 80: st.session_state.stats["pods"] = 5
        add_log("5 Agent Pods Active.", "‚úÖ")
        st.markdown("##### Initial Graph State (Raw)")
        st.graphviz_chart(render_neo4j_graph("INITIAL"))
        if st.button("‚ñ∂Ô∏è Run Validator"):
            st.session_state.pipeline_step = 3
            st.rerun()

    # STAGE 3: VALIDATOR
    elif st.session_state.pipeline_step == 3:
        st.subheader("üõ°Ô∏è Validator Agent")
        st.write("Checking schema compliance and data quality...")
        with st.status("Running Validation Rules...", expanded=True):
            st.write("Checking Data Types...")
            time.sleep(0.5)
            st.write("Verifying Email Formats...")
            time.sleep(0.5)

            # Dynamic Logic based on Null Rate
            n_errors = int(100000 * (st.session_state.null_rate / 100.0))
            add_log(f"Validator detected {n_errors:,} missing fields (Null Rate: {st.session_state.null_rate}%).", "WARN")
            add_log(f"Imputed {n_errors:,} records with default tokens.", "INFO")
            add_log("Quality Score: 99.8%", "‚úÖ")

        if st.button("‚ñ∂Ô∏è Run Resolver"):
            st.session_state.pipeline_step = 4
            st.rerun()

    # STAGE 4: RESOLVER
    elif st.session_state.pipeline_step == 4:
        st.subheader("üß© Resolver Agent (Level 5)")
        st.write("Running Entity Resolution & Self-Correction...")

        start_time = time.time()
        metric_ph = st.empty()
        for i in range(0, 100001, 5000):
            st.session_state.stats['processed'] = i
            metric_ph.metric("Processed", f"{i:,}")
            time.sleep(0.02)
        end_time = time.time()
        proc_time = end_time - start_time

        # --- REPORT CALCULATIONS ---
        rows = 100000
        # Weights for logic
        d_rate = st.session_state.dupe_rate / 100.0
        t_rate = st.session_state.typo_rate / 100.0

        total_duplicates = int(rows * d_rate)

        # Self-Corrections are driven by Typos (Reflector needs to fix them)
        self_corrections = int(total_duplicates * 0.4 * (1 + t_rate))

        # Quarantine is driven by remaining hard cases (approx 1% of dupes)
        quarantine_count = int(total_duplicates * 0.01)

        auto_resolutions = total_duplicates - quarantine_count
        autonomy_score = 100 * (1 - (quarantine_count / total_duplicates)) if total_duplicates > 0 else 100.0

        st.session_state.stats['quarantine'] = quarantine_count
        st.session_state.stats['processing_time'] = proc_time

        report = f"""
==================================================
   LEVEL 5 AUTONOMOUS AGENT REPORT ({rows//1000}k ROWS)
==================================================
Total Comparisons:      {rows * 995:,}
--------------------------------------------------
1. Memory Hits (RAG):   12 (Skipped computation)
2. Self-Corrections:    {self_corrections:,} (Reflector fixed Resolver)
3. Auto-Resolutions:    {auto_resolutions:,} (Matches Found)
4. Quarantined:         {quarantine_count} (Async Review needed)
--------------------------------------------------
Autonomy Score:         {autonomy_score:.2f}%
Processing Time:        {proc_time:.4f}s
==================================================
        """
        add_log("Generation Complete. Writing Audit Report...", "INFO")
        st.code(report, language="text")

        st.markdown("##### Current Graph State (Linking)")
        st.graphviz_chart(render_neo4j_graph("LINKING"))

        if st.button("‚ñ∂Ô∏è Run Cataloger"):
            st.session_state.pipeline_step = 5
            st.rerun()

    # STAGE 5: CATALOGER
    elif st.session_state.pipeline_step == 5:
        st.subheader("üìö Cataloger Agent")
        st.write("Scanning metadata and updating lineage...")
        my_bar = st.progress(0)
        for i in range(100):
            time.sleep(0.01)
            my_bar.progress(i + 1)
        add_log("Tagged 'Email' and 'Phone' as PII.", "INFO")
        add_log("Data Lineage updated in DataHub.", "‚úÖ")
        if st.button("‚ñ∂Ô∏è Finalize & Commit"):
            st.session_state.pipeline_step = 6
            st.rerun()

    # STAGE 6: FINAL
    elif st.session_state.pipeline_step == 6:
        st.subheader("‚úÖ Pipeline Complete")
        tab1, tab2 = st.tabs(["‚ùÑÔ∏è Snowflake Repository", "üï∏Ô∏è Neo4j Final Graph"])
        with tab1:
            st.markdown("""
            <div class="snowflake-header">DATABASE: PROD_MDM | SCHEMA: GOLDEN</div>
            <div class="snowflake-body">
            SELECT * FROM GOLDEN_RECORDS LIMIT 5;<br><br>
            ID | NAME | EMAIL | CONFIDENCE | SOURCE<br>
            ------------------------------------------------------<br>
            GR-001 | James Smith | j.smith@gmail.com | 1.00 | CRM<br>
            GR-002 | Maria Garcia | m.garcia@outlook.com | 0.99 | SAP<br>
            ...<br>
            <span style="color: green;">100,000 rows affected.</span>
            </div>
            """, unsafe_allow_html=True)
        with tab2:
            st.markdown("##### Final Resolved Clusters")
            st.graphviz_chart(render_neo4j_graph("FINAL"))
            st.success("Graph State: Optimized & Merged")

    if st.session_state.pipeline_step > 0:
        st.markdown("---")
        st.caption("Live Agent Logs")
        with st.container(height=150):
            for log in st.session_state.logs:
                st.text(log)