<a href="https://colab.research.google.com/github/sbindal2017-a11y/Langraph_Analytics_AI_Agent/blob/main/Langgraph_CR_Analytics_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install langgraph langchain-community google-generativeai pandas -q

In [None]:
# SECTION 1: IMPORTS & API CONFIGURATION
import pandas as pd
import json
from typing import TypedDict, Dict, Any, List
from google.colab import files, userdata
import google.generativeai as genai

# LangGraph core components
from langgraph.graph import StateGraph, END

# NOTE: We have REMOVED the checkpointer import for now to solve the error.
# We will add it back at the final assembly stage.

print("âœ… All libraries imported successfully.")

# --- API Key Configuration ---
try:
    # Action: Make sure your GOOGLE_API_KEY is saved in Colab Secrets (ðŸ”‘)
    GOOGLE_API_KEY = userdata.get('KEY')
    genai.configure(api_key=GOOGLE_API_KEY)
    print("âœ… Gemini API configured successfully.")
except Exception as e:
    print("ðŸš¨ ERROR: Could not configure Gemini API. Please ensure your GOOGLE_API_KEY is set correctly in Colab secrets.")
    # This will stop the script if the API key is not found.
    raise e

In [None]:
# ==============================================================================
# SECTION 2: AGENT STATE DEFINITION
# ==============================================================================
class AgentState(TypedDict):
    """
    The complete state of our CRO Analytics Agent.
    This dictionary acts as the agent's memory, carrying data between nodes.
    """
    # --- Inputs from the user ---
    user_question: str
    raw_data_path: str

    # --- Data processed by the agent ---
    cleaned_data_path: str
    config: Dict[str, Any]
    analysis_results: Dict[str, Any]
    final_narrative: str

    # --- Utility field for error handling ---
    error_message: str

print("âœ… Agent State defined.")

In [None]:
# ==================================
# NODE A: Ingest & Clean
# Purpose: Reads the user-uploaded CSV file, validates that it has the
#          correct columns, and saves a cleaned version for the rest of
#          the agent to use.
# ==================================
def node_a_ingest_and_clean(state: AgentState) -> dict:
    """Reads the user-uploaded CSV, validates it, and saves a cleaned version."""
    print("\n[Node A: Ingest & Clean]")
    print(f"  - Input: raw_data_path='{state.get('raw_data_path')}'")
    EXPECTED_COLUMNS = ['landing page', 'landing page group', 'channel', 'ad campaign', 'date', 'discount%', 'session', 'atc', 'view cart', 'checkout', 'payment', 'purchase']
    try:
        df = pd.read_csv(state["raw_data_path"])
        if not all(col in df.columns for col in EXPECTED_COLUMNS):
            return {"error_message": "Schema validation failed. The CSV is missing required columns."}

        df['date'] = pd.to_datetime(df['date'], errors='coerce')
        # We remove any rows where the date could not be parsed.
        df.dropna(subset=['date'], inplace=True)

        cleaned_file_path = 'cleaned_cro_data.csv'
        df.to_csv(cleaned_file_path, index=False)
        print(f"  - Output: Cleaned data saved to '{cleaned_file_path}'")
        return {"cleaned_data_path": cleaned_file_path}
    except Exception as e:
        return {"error_message": f"Node A failed: {e}"}

In [None]:
# ==================================
# NODE B: Generate Config
# Purpose: Uses the Gemini AI to interpret the user's natural language
#          question and convert it into a structured JSON configuration
#          that will control the rest of the analysis.
# ==================================
def node_b_generate_config(state: AgentState) -> dict:
    """Uses Gemini to parse the user's question into a JSON config."""
    print("\n[Node B: Generate Config]")
    print(f"  - Input: user_question='{state.get('user_question')[:60]}...'")
    try:
        df = pd.read_csv(state["cleaned_data_path"])
        data_min_date = pd.to_datetime(df['date']).min().strftime('%Y-%m-%d')
        data_max_date = pd.to_datetime(df['date']).max().strftime('%Y-%m-%d')
        model = genai.GenerativeModel("gemini-2.5-pro")
        prompt = f"""
        Dataset dates are from {data_min_date} to {data_max_date}. Today is {pd.to_datetime('today').strftime('%Y-%m-%d')}.
        User question: "{state['user_question']}"
        Your task is to create a JSON config. If the user does not provide enough information to determine both an `analysis_window` and a `baseline_window`, you MUST return `{{"need_dates": true, "message": "Please provide both an analysis and a baseline date range."}}`.
        The schema must be: `{{"question": "...", "analysis_window": {{...}}, "baseline_window": {{...}}, "filters": {{...}}, "thresholds": {{"TOP_K": 8, "MIN_SEG_SHARE": 0.01}}, "dims": ["channel", "ad_campaign", "landing_page_group", "landing_page"], "cr_definition": "session_cr"}}`.
        If the user filters on a dimension (e.g., channel="google"), you MUST remove that dimension from the `dims` array.
        Output only the raw JSON text.
        """
        response = model.generate_content(prompt)
        # Clean the response to ensure it's valid JSON
        config_string = response.text.strip().replace("```json", "").replace("```", "")
        config = json.loads(config_string)
        print("  - Output: Gemini generated the config dictionary.")
        return {"config": config}
    except Exception as e:
        return {"error_message": f"Node B failed: {e}"}

In [None]:

# NODE C: Calculate Metrics
# Purpose: The first deterministic math node. It takes the configuration
#          from Node B and the cleaned data from Node A to calculate
#          the top-line metrics (e.g., overall CR and the change).
# ==================================
def node_c_calculate_metrics(state: AgentState) -> dict:
    """Calculates the top-line metrics for the two periods."""
    print("\n[Node C: Calculate Metrics]")
    try:
        config = state["config"]
        df = pd.read_csv(state["cleaned_data_path"])
        df['date'] = pd.to_datetime(df['date'])

        aw = config['analysis_window']
        bw = config['baseline_window']

        # Use .copy() to prevent SettingWithCopyWarning and ensure data integrity
        df_analysis = df[df['date'].between(pd.to_datetime(aw['start']), pd.to_datetime(aw['end']))].copy()
        df_baseline = df[df['date'].between(pd.to_datetime(bw['start']), pd.to_datetime(bw['end']))].copy()
        print(f"  - DEBUG: Columns available for analysis: {df_analysis.columns.tolist()}")

        # Calculate CR for both periods
        analysis_cr = (df_analysis['purchase'].sum() / df_analysis['session'].sum()) * 100 if df_analysis['session'].sum() > 0 else 0
        baseline_cr = (df_baseline['purchase'].sum() / df_baseline['session'].sum()) * 100 if df_baseline['session'].sum() > 0 else 0
        delta_cr_pp = analysis_cr - baseline_cr

        # Convert date columns to strings to ensure they are JSON serializable
        df_analysis['date'] = df_analysis['date'].dt.strftime('%Y-%m-%d')
        df_baseline['date'] = df_baseline['date'].dt.strftime('%Y-%m-%d')

        analysis_results = {
            "summary": {"analysis_cr_pct": round(analysis_cr, 2), "baseline_cr_pct": round(baseline_cr, 2), "delta_cr_pp": round(delta_cr_pp, 2)},
            # Pass the detailed dataframes on to the next nodes
            "dataframes": {"analysis": df_analysis.to_dict('records'), "baseline": df_baseline.to_dict('records')}
        }
        print("  - Output: analysis_results dictionary created.")
        return {"analysis_results": analysis_results}
    except Exception as e:
        return {"error_message": f"Node C failed: {e}"}

In [None]:
# ==================================
# NODE D: AI Analyst
# Purpose: Uses Gemini to perform a qualitative scan of the data,
#          identifying high-level patterns and outliers like a human analyst.
# ==================================
def node_d_ai_analyst(state: AgentState) -> dict:
    """Performs qualitative analysis using Gemini."""
    print("\n[Node D: AI Analyst]")
    try:
        analysis_results = state["analysis_results"]
        # Recreate DataFrames from the dictionary records passed in the state
        df_analysis = pd.DataFrame(analysis_results["dataframes"]["analysis"])
        df_baseline = pd.DataFrame(analysis_results["dataframes"]["baseline"])

        # --- Data Simplification for the AI ---
        # Create simple summary tables to send to the AI
        summary_analysis = df_analysis.groupby('channel').agg(sessions=('session', 'sum'), purchases=('purchase', 'sum')).reset_index()
        summary_analysis['cr'] = (summary_analysis['purchases'] / summary_analysis['sessions']) * 100
        summary_baseline = df_baseline.groupby('channel').agg(sessions=('session', 'sum'), purchases=('purchase', 'sum')).reset_index()
        summary_baseline['cr'] = (summary_baseline['purchases'] / summary_baseline['sessions']) * 100

        model = genai.GenerativeModel("gemini-2.5-pro")
        prompt = f"""
        You are analyzing why a website's conversion rate changed from {analysis_results['summary']['baseline_cr_pct']:.2f}% to {analysis_results['summary']['analysis_cr_pct']:.2f}%.
        Baseline Period Performance by Channel:
        {summary_baseline.to_string()}

        Analysis Period Performance by Channel:
        {summary_analysis.to_string()}

        Based on these tables, what are the 2-3 most important, high-level qualitative observations? Be concise.
        Provide your response as a JSON object with a single key "observations", which is a list of strings.
        """
        response = model.generate_content(prompt)
        qualitative_analysis = json.loads(response.text.strip().replace("```json", "").replace("```", ""))

        # Add the AI's insights back into the main analysis object
        analysis_results["qualitative_analysis"] = qualitative_analysis
        print("  - Output: Added 'qualitative_analysis' to analysis_results.")
        return {"analysis_results": analysis_results}
    except Exception as e:
        return {"error_message": f"Node D failed: {e}"}

In [None]:
# ==================================
# NODE E: Performance Diagnostics
# Purpose: Calculates the 'performance contribution' for each segment. This
#          isolates the impact of a segment's own CR changing, holding
#          its traffic share constant.
# ==================================
def node_e_performance_diagnostics(state: AgentState) -> dict:
    """Calculates performance contribution of each segment."""
    print("\n[Node E: Performance Diagnostics]")
    try:
        config = state["config"]
        analysis_results = state["analysis_results"]
        df_analysis = pd.DataFrame(analysis_results["dataframes"]["analysis"])
        df_baseline = pd.DataFrame(analysis_results["dataframes"]["baseline"])
        dims_to_analyze = config.get("dims", [])
        drivers = []

        total_sessions_analysis = df_analysis['session'].sum()
        if total_sessions_analysis == 0:
            return {"error_message": "Cannot calculate performance, total sessions in analysis period is zero."}

        for dim in dims_to_analyze:
            # Aggregate data by the current dimension
            group_analysis = df_analysis.groupby(dim).agg(sessions=('session', 'sum'), purchases=('purchase', 'sum')).reset_index()
            group_baseline = df_baseline.groupby(dim).agg(sessions=('session', 'sum'), purchases=('purchase', 'sum')).reset_index()

            # Calculate CR and traffic share (weight)
            group_analysis['cr'] = (group_analysis['purchases'] / group_analysis['sessions']) * 100
            group_analysis['weight'] = group_analysis['sessions'] / total_sessions_analysis
            group_baseline['cr'] = (group_baseline['purchases'] / group_baseline['sessions']) * 100

            merged = pd.merge(group_analysis, group_baseline, on=dim, suffixes=('_analysis', '_baseline'), how='outer').fillna(0)

            # Performance Contribution Formula: weight_analysis * (cr_analysis - cr_baseline)
            merged['perf_contribution_pp'] = merged['weight_analysis'] * (merged['cr_analysis'] - merged['cr_baseline'])

            for _, row in merged.iterrows():
                if abs(row['perf_contribution_pp']) > 0.001: # Ignore negligible contributions
                    drivers.append({
                        "driver": f"{dim}: {row[dim]}",
                        "delta_pp": round(row['perf_contribution_pp'], 3),
                        "type": "performance"
                    })

        analysis_results["drivers_performance"] = drivers
        print(f"  - Output: Calculated {len(drivers)} performance drivers.")
        return {"analysis_results": analysis_results}
    except Exception as e:
        return {"error_message": f"Node E failed: {e}"}

In [None]:
# ==================================
# NODE F: Mix Diagnostics
# Purpose: Calculates the 'mix contribution' for each segment. This
#          isolates the impact of traffic shifting towards or away from
#          a segment, holding its CR constant at baseline levels.
# ==================================
def node_f_mix_diagnostics(state: AgentState) -> dict:
    """Calculates mix shift contribution of each segment."""
    print("\n[Node F: Mix Diagnostics]")
    try:
        config = state["config"]
        analysis_results = state["analysis_results"]
        df_analysis = pd.DataFrame(analysis_results["dataframes"]["analysis"])
        df_baseline = pd.DataFrame(analysis_results["dataframes"]["baseline"])
        dims_to_analyze = config.get("dims", [])
        drivers = []

        total_sessions_analysis = df_analysis['session'].sum()
        total_sessions_baseline = df_baseline['session'].sum()
        if total_sessions_analysis == 0 or total_sessions_baseline == 0:
            return {"error_message": "Cannot calculate mix, total sessions is zero in one or both periods."}

        for dim in dims_to_analyze:
            # Aggregate data by the current dimension
            group_analysis = df_analysis.groupby(dim).agg(sessions=('session', 'sum'), purchases=('purchase', 'sum')).reset_index()
            group_baseline = df_baseline.groupby(dim).agg(sessions=('session', 'sum'), purchases=('purchase', 'sum')).reset_index()

            # Calculate CR and traffic share (weight) for both periods
            group_analysis['weight'] = group_analysis['sessions'] / total_sessions_analysis
            group_baseline['cr'] = (group_baseline['purchases'] / group_baseline['sessions']) * 100
            group_baseline['weight'] = group_baseline['sessions'] / total_sessions_baseline

            merged = pd.merge(group_analysis, group_baseline, on=dim, suffixes=('_analysis', '_baseline'), how='outer').fillna(0)

            # Mix Contribution Formula: (weight_analysis - weight_baseline) * cr_baseline
            merged['mix_contribution_pp'] = (merged['weight_analysis'] - merged['weight_baseline']) * merged['cr_baseline']

            for _, row in merged.iterrows():
                if abs(row['mix_contribution_pp']) > 0.001: # Ignore negligible contributions
                    drivers.append({
                        "driver": f"{dim}: {row[dim]}",
                        "delta_pp": round(row['mix_contribution_pp'], 3),
                        "type": "mix"
                    })

        analysis_results["drivers_mix"] = drivers
        print(f"  - Output: Calculated {len(drivers)} mix drivers.")
        return {"analysis_results": analysis_results}
    except Exception as e:
        return {"error_message": f"Node F failed: {e}"}

In [None]:
# ==================================
# NODE G: Waterfall Composer
# Purpose: Merges the performance and mix drivers into a single, sorted
#          list. It also calculates any unexplained variance to ensure
#          the analysis perfectly reconciles with the total change.
# ==================================
def node_g_compose_waterfall(state: AgentState) -> dict:
    """Merges and sorts all drivers into a final list."""
    print("\n[Node G: Compose Waterfall]")
    try:
        analysis_results = state["analysis_results"]
        drivers_performance = analysis_results.get("drivers_performance", [])
        drivers_mix = analysis_results.get("drivers_mix", [])

        # Combine all drivers into one list
        all_drivers = drivers_performance + drivers_mix

        # Sort drivers by the absolute value of their impact
        all_drivers.sort(key=lambda x: abs(x["delta_pp"]), reverse=True)

        # Reconcile the sum of drivers with the total change
        total_delta_cr = analysis_results["summary"]["delta_cr_pp"]
        sum_of_drivers = sum(d["delta_pp"] for d in all_drivers)
        unexplained_pp = total_delta_cr - sum_of_drivers

        analysis_results["drivers_final"] = all_drivers
        analysis_results["unexplained_pp"] = round(unexplained_pp, 3)

        print(f"  - Output: Composed a final list of {len(all_drivers)} drivers.")
        return {"analysis_results": analysis_results}
    except Exception as e:
        return {"error_message": f"Node G failed: {e}"}

In [None]:
# ==================================
# NODE H: Generate Narrative
# Purpose: Takes all the structured facts and insights generated by the
#          previous nodes and uses Gemini to synthesize a final,
#          human-readable executive summary.
# ==================================
def node_h_generate_narrative(state: AgentState) -> dict:
    """Creates the final human-readable report."""
    print("\n[Node H: Generate Narrative]")
    try:
        analysis_results = state["analysis_results"]
        # Prepare the facts for the AI prompt
        summary = analysis_results.get("summary", {})
        drivers = analysis_results.get("drivers_final", [])
        qualitative_obs = analysis_results.get("qualitative_analysis", {}).get("observations", [])

        # Create a clean, text-based summary of the top drivers
        top_drivers_text = "\n".join([f"- {d['driver']}: {d['delta_pp']:.2f}pp ({d['type']})" for d in drivers[:5]])
        qualitative_text = "\n".join([f"- {obs}" for obs in qualitative_obs])

        model = genai.GenerativeModel("gemini-2.5-pro")
        prompt = f"""
        You are a senior business analyst delivering a final report.
        Your task is to create a concise, actionable summary based *only* on the facts provided.

        **Fact Sheet:**
        - **Overall Summary:** Conversion rate changed from {summary.get('baseline_cr_pct'):.2f}% to {summary.get('analysis_cr_pct'):.2f}%, a change of {summary.get('delta_cr_pp'):.2f} percentage points.
        - **Top Quantitative Drivers:**
        {top_drivers_text}
        - **High-Level Qualitative Observations:**
        {qualitative_text}

        **Your Output:**
        Provide a final report with the following structure:
        1.  **Executive Summary:** 2-3 sentences summarizing the key takeaway.
        2.  **What Changed:** A bulleted list of the top 2-3 drivers. For each driver, explain *what* happened in simple terms.
        3.  **Actionable Recommendations:** 1-2 concrete, actionable next steps based directly on the findings.
        4.  **Hypotheses (Optional):** If you have ideas that are *not* directly supported by the data, list them here under this specific heading.

        Be clear, concise, and professional. Do not invent any facts.
        """
        response = model.generate_content(prompt)
        final_narrative = response.text
        print("  - Output: Generated final narrative report.")
        return {"final_narrative": final_narrative}
    except Exception as e:
        return {"error_message": f"Node H failed: {e}"}

In [None]:
# ==============================================================================
# SECTION 4: GRAPH ASSEMBLY & ROUTING
# Here, we wire all our functions together into a state machine.
# ==============================================================================

### --- Router Function --- ###
def should_continue_router(state: AgentState) -> str:
    """The router that decides which path to take after Node B."""
    print("\n[Router]")
    if state.get("error_message"):
        print("  - Decision: Error detected. Routing to end.")
        return "end_error"
    if state.get("config", {}).get("need_dates", False):
        print("  - Decision: Dates needed. Routing to end for clarification.")
        return "end_clarify"
    else:
        print("  - Decision: Config OK. Routing to main analysis path.")
        return "continue_analysis"

### --- Graph Definition --- ###
workflow = StateGraph(AgentState)

# Add all nodes to the graph
workflow.add_node("ingest", node_a_ingest_and_clean)
workflow.add_node("generate_config", node_b_generate_config)
workflow.add_node("calculate_metrics", node_c_calculate_metrics)
# The parallel analysis branches
workflow.add_node("ai_analyst", node_d_ai_analyst)
workflow.add_node("performance_diagnostics", node_e_performance_diagnostics)
workflow.add_node("mix_diagnostics", node_f_mix_diagnostics)
# The composition nodes
workflow.add_node("compose_waterfall", node_g_compose_waterfall)
workflow.add_node("generate_narrative", node_h_generate_narrative)


### --- Define Edges (the flow of the application) --- ###

# 1. Start with the Ingest node
workflow.set_entry_point("ingest")

# 2. From Ingest to Config Generation
workflow.add_edge("ingest", "generate_config")

# 3. After Config Generation, use the router to decide the next step
workflow.add_conditional_edges(
    "generate_config",
    should_continue_router,
    {"continue_analysis": "calculate_metrics", "end_clarify": END, "end_error": END}
)

workflow.add_edge("calculate_metrics", "ai_analyst")
workflow.add_edge("ai_analyst", "performance_diagnostics")
workflow.add_edge("performance_diagnostics", "mix_diagnostics")
workflow.add_edge("mix_diagnostics", "compose_waterfall")
workflow.add_edge("compose_waterfall", "generate_narrative")

# 7. The final step is the end of the graph
workflow.add_edge("generate_narrative", END)

### --- Compile the graph into a runnable app --- ###
# We remove the checkpointer for simplicity as it was causing import errors.
# The graph will be stateless between runs, but stateful within a single run.
app = workflow.compile()
print("\nâœ… LangGraph app compiled successfully.")

In [None]:
print("\n--- Agent Workflow Diagram ---")
# This command will render the visual graph directly in your Colab output.
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
print("----------------------------")

In [None]:
# ==============================================================================
# SECTION 5: USER INTERFACE & EXECUTION
# This is the final part of the script that the user interacts with.
# ==============================================================================

print("\n\n--- Starting CRO Analytics Agent ---")

# --- Step 1: Upload File ---
# This uses the Colab 'files' library imported in Section 1.
print("Please upload your CRO data CSV file...")
uploaded = files.upload()
if not uploaded:
    raise Exception("No file uploaded. Halting execution.")
raw_file_path = next(iter(uploaded))

# --- Step 2: Get Question ---
# The input() function creates a text box for the user to type in.
question = input("\n>>> Please ask a complete question (e.g., 'Why did CR drop last week vs the week before?'): ")

# --- Step 3: Run the Agent ---
# Define the initial state to kick off the graph.
initial_state = {
    "raw_data_path": raw_file_path,
    "user_question": question,
}

# The 'invoke' command runs the entire graph from the entry point to an end point.
# It passes the 'initial_state' to the first node.
final_state = app.invoke(initial_state)

# --- Step 4: Display the Final Output ---
# After the graph has finished running, we inspect the final state.
print("\n\nðŸŽ‰ --- Agent Execution Complete --- ðŸŽ‰")
if final_state.get("error_message"):
    # If any node returned an error, we display it.
    print(f"ðŸš¨ Agent stopped with an error: {final_state.get('error_message')}")
elif final_state.get("config", {}).get("need_dates"):
    # If the router stopped because dates were needed, we display the AI's message.
    print(f"ðŸ’¡ AI needs more information: {final_state.get('config', {}).get('message')}")
else:
    # If the graph ran successfully, we display the final report.
    print("\n==================================================")
    print("           Final Executive Summary")
    print("==================================================")
    # Print the final narrative generated by Node H.
    print(final_state.get("final_narrative", "No narrative was generated."))
    print("\n")