# Multi-Agent Customer Journey Analysis using Google ADK and Teradata

# Overview
This notebook demonstrates a sophisticated multi-agent AI system that analyzes customer behavior patterns by combining Google's Agent Development Kit (ADK) with Teradata's advanced analytics capabilities. The system uses three specialized AI agents working in concert to transform natural language questions into actionable business insights.

### Key Components:
- **Google ADK**: Orchestrates multiple AI agents for complex analysis tasks
- **Teradata nPath**: Performs pattern recognition on sequential customer events
- **Sessionization**: Groups customer interactions into meaningful sessions

# 1. Environment Setup and Library Imports

### Description
This section imports all necessary libraries for the multi-agent system. We use Google's ADK for agent orchestration, TeradataML for advanced analytics, and standard Python libraries for data manipulation and asynchronous operations.

In [None]:
# Install agent development kit
!pip install google-adk

In [None]:
import os
import json
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from pydantic import BaseModel, Field
from typing import List, Dict, Any
from teradataml import *
import pandas as pd
import asyncio

# 2. API Configuration and Model Settings

### Description
Configure the Google Gemini API credentials and model selection. The Gemini 2.0 Flash model provides fast, intelligent responses for our agents. We also define application-level constants for session management.


In [None]:
# Gemini API Key (Get from Google AI Studio: https://aistudio.google.com/app/apikey)
os.environ["GOOGLE_API_KEY"] = "YOUR_API_KEY_HERE"  # <--- REPLACE WITH YOUR KEY

# Configure ADK to use API keys directly (not Vertex AI)
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"

# Agent model configuration
AGENT_MODEL = "gemini-2.0-flash-001"

# Application configuration
APP_NAME = "customer_behavior_app"
USER_ID = "101"

# 3. Teradata Database Connection

### Description
Establish a connection to the Teradata database where customer event data is stored. This connection enables us to access retail transaction data and perform advanced analytics directly in the database, leveraging Teradata's massive parallel processing capabilities.

In [None]:
# Run the startup script and connect to Teradata
%run -i ../startup.ipynb
eng = create_context(host = 'host.docker.internal', username='demo_user', password = password)
print(f"Connected to Teradata: {eng}")

# 4. Data Loading and Initial Exploration

### Description
Load the retail events dataset from Teradata. This data contains timestamped customer interactions such as purchases, store visits, web chats, and service inquiries. Each row represents a single customer event with an entity ID (customer identifier), timestamp, and event type.

In [None]:
# Load the retail events dataset
%run -i ../run_procedure.py "call get_data('DEMO_Retail_cloud');"

# Create DataFrame from Teradata table
tdf_retail_events = DataFrame(in_schema('DEMO_Retail', 'Retail_Events'))
print("Sample of retail events:")
print(f"Total events: {tdf_retail_events.shape[0]}")
tdf_retail_events.head(10)

# 5. Data Sessionization

### Description
**Sessionization** is a crucial data preparation step that groups individual customer events into sessions. A session represents a continuous period of customer activity. By default, we use a 24-hour timeout - if a customer has no activity for 24 hours, their next event starts a new session. This helps us understand customer behavior patterns within focused time periods.

### Key Parameters:
- **data_partition_column**: Groups events by customer (entity_id)
- **data_order_column**: Orders events chronologically (datestamp)
- **time_out**: Maximum gap between events in the same session (86400 seconds = 24 hours)



In [None]:
# Group events into sessions using 24-hour windows
sessionized_events = Sessionize(
    data = tdf_retail_events, 
    data_partition_column = ['entity_id'],  # Group by customer
    data_order_column = ['datestamp'],      # Order chronologically
    time_column = 'datestamp', 
    time_out = 86400.00                     # 24 hours = new session
)

# Save sessionized data for analysis
tdf_sessionized_events = sessionized_events.result
tdf_sessionized_events.to_sql(table_name = 'demo_sessionized_events', if_exists = 'replace')
tdf_sessionized_events.head(10)

# 6. nPath Pattern Definitions

### Description
**nPath** is Teradata's powerful pattern matching function that finds sequences of events in time-series data. Here we define patterns for three key business outcomes:

1. **Purchase Patterns**: Identifies sequences of 1-5 events leading to a purchase
2. **Cancellation Patterns**: Finds 2-5 events preceding membership cancellations
3. **Web Chat Patterns**: Discovers up to 3 events before customer service interactions

### Pattern Syntax:
- **Symbols**: Define event types (e.g., "EVENT = 'Purchase' AS PP")
- **Pattern**: Regular expression-like syntax (e.g., "A{1,5}.PP" means 1-5 any events followed by purchase)
- **Results**: What to extract from matched patterns

In [None]:
# Path definitions for different business outcomes
PATH_DEFS = {
    'purchase': {
        'symbols': [
            "EVENT = 'Purchase' AS PP",         # Regular purchase
            "EVENT = 'Mem Purchase' AS MP",     # Membership purchase
            "True AS A"                         # Any event
        ],
        'pattern': 'A{1,5}.(PP|MP)',           # 1-5 events followed by purchase
        'results': [
            'FIRST(datestamp OF A) AS start_time',
            'FIRST(entity_id OF A) AS entity_id',
            'ACCUMULATE(cast(event AS VARCHAR(50)) OF ANY(A,PP,MP)) AS path',
            'COUNT(* OF ANY(A,PP,MP)) AS event_cnt'
        ]
    },
    'cancellation': {
        'symbols': [
            "True AS A",                        # Any event
            "EVENT = 'Mem Cancel' AS B"         # Membership cancellation
        ],
        'pattern': 'A{2,5}.B',                 # 2-5 events followed by cancellation
        'results': [
            'FIRST(entity_id OF A) AS entity_id',
            'ACCUMULATE(cast(event AS VARCHAR(50)) OF ANY(A,B)) AS path',
            'COUNT(* OF ANY(A,B)) AS event_cnt'
        ]
    },
    'web_chat': {
        'symbols': [
            "EVENT = 'Web Chat' AS W",          # Web chat event
            "True AS A"                         # Any event
        ],
        'pattern': 'A{0,3}.W',                 # 0-3 events followed by web chat
        'results': [
            'FIRST(entity_id OF A) AS entity_id',
            'COUNT(* OF ANY(A,W)) AS chat_count',
            'ACCUMULATE(cast(event AS VARCHAR(50)) OF ANY(A,W)) AS path'
        ]
    }
}

# 7. nPath Execution Function

### Description
This function executes the nPath analysis on sessionized data. It takes an outcome type (purchase, cancellation, or web_chat) and returns all customer journey patterns matching that outcome. The function includes error handling and returns sample data if the database connection fails, ensuring the demo can run even without a live connection.

### How nPath Works:
1. Scans through each customer session
2. Looks for event sequences matching the defined pattern
3. Extracts the specified results (paths, counts, etc.)
4. Returns all matching patterns for analysis

In [None]:
def run_npath(outcome: str):
    """
    Execute nPath analysis for the specified outcome.
    
    Args:
        outcome: One of 'purchase', 'cancellation', or 'web_chat'
    
    Returns:
        JSON string containing the path analysis results
    """
    print(f"--- Running nPath for outcome: {outcome} ---")
    
    cfg = PATH_DEFS.get(outcome)
    if not cfg:
        return json.dumps({"error": f"Unknown outcome: {outcome}"})
    
    try:
        global tdf_sessionized_events
        
        # Execute nPath analysis
        np = NPath(
            data1=tdf_sessionized_events,
            data1_partition_column=['SESSIONID'],
            data1_order_column='datestamp',
            mode='NONOVERLAPPING',
            symbols=cfg['symbols'],
            pattern=cfg['pattern'],
            result=cfg['results']
        )
        
        # Convert results to JSON
        records = np.result.to_pandas().to_dict(orient='records')
        print(f"Found {len(records)} path records")
        return json.dumps(records, default=str)
        
    except Exception as e:
        error_msg = f"nPath execution failed: {str(e)}"
        print(f"Error: {error_msg}")
        # Return error details for transparency
        return json.dumps({
            "error": error_msg,
            "error_type": type(e).__name__,
            "suggestion": "Please check database connection and ensure sessionized data exists"
        })

# 8. Multi-Agent System Architecture

### Description
This section defines three specialized AI agents that work together to analyze customer journeys:

### Agent Roles:
1. **Query Parser Agent**: Natural language understanding specialist that extracts the business outcome from user questions
2. **Data Analyst Agent**: Technical expert that executes nPath analysis and summarizes results
3. **Business Strategist Agent**: Business consultant that transforms technical findings into executive insights

### Agent Communication:
- Agents communicate through a session service
- Each agent has a specific role and passes results to the next
- The pipeline ensures consistent, high-quality analysis

In [None]:
def create_agents():
    """Create and configure the three specialized agents."""
    
    # Agent 1: Query Parser - Extracts intent from natural language
    query_parser_agent = LlmAgent(
        name='QueryParser',
        model=AGENT_MODEL,
        instruction="""You are a query classification specialist. Extract the customer journey outcome from the user's query.

OUTCOME MAPPING:
- purchase: "purchase", "buy", "buying", "conversion", "checkout", "sales"
- cancellation: "cancel", "cancellation", "churn", "leave", "unsubscribe"
- web_chat: "chat", "web chat", "support", "customer service", "help"

OUTPUT RULES:
1. Output ONLY the mapped outcome word (purchase/cancellation/web_chat)
2. No explanations, quotes, or additional text
3. If unclear, default to "purchase"

EXAMPLES:
Query: "Show me paths leading to purchase" → purchase
Query: "Why do customers cancel?" → cancellation
Query: "Analyze chat interactions" → web_chat"""
    )
    
    # Agent 2: Data Analyst - Executes technical analysis
    data_analyst_agent = LlmAgent(
        name='DataAnalyst',
        model=AGENT_MODEL,
        instruction="""You are a data analysis specialist for customer journey analytics.

YOUR TASK:
1. You will receive an outcome type (purchase/cancellation/web_chat)
2. Call run_npath(outcome) to analyze customer paths for that outcome
3. Process the returned JSON data and create a technical summary

OUTPUT FORMAT:
- Total paths analyzed: [number]
- Unique path patterns: [count from data]
- Top 5 most frequent paths with percentages
- Key observations about path lengths and complexity
- Data quality notes (if any issues)

IMPORTANT: 
- Always call run_npath with the exact outcome provided
- Focus on factual data analysis, not interpretation
- Include specific numbers and percentages""",
        tools=[run_npath]  # Give this agent access to the nPath function
    )
    
    # Agent 3: Business Strategist - Creates actionable insights
    business_strategist_agent = LlmAgent(
        name='Strategist',
        model=AGENT_MODEL,
        instruction="""You are a business insights specialist who transforms technical data analysis into clear, valuable business understanding.

ADAPT your response based on the type of question:

FOR EXPLORATORY QUESTIONS (e.g., "What are typical journeys..."):
- Focus on describing patterns and behaviors
- Explain what's happening in business terms
- Highlight interesting findings
- Keep recommendations optional

FOR PROBLEM-SOLVING QUESTIONS (e.g., "Why do customers cancel..."):
- Identify root causes and pain points
- Provide actionable recommendations
- Include metrics and priorities
- Focus on solutions

FOR ANALYTICAL QUESTIONS (e.g., "How do customers interact with..."):
- Provide detailed behavioral analysis
- Show patterns with percentages
- Compare to benchmarks if relevant
- Focus on insights over actions

GENERAL GUIDELINES:
- Always start with a brief summary of findings
- Use business language, not technical jargon
- Include percentages and numbers from the data
- Make insights specific and actionable when appropriate
- Match the depth of analysis to the question asked

TONE: Professional but conversational, focused on clarity and value"""
    )
    
    return query_parser_agent, data_analyst_agent, business_strategist_agent

# 9. Agent Orchestration Pipeline

### Description
This function coordinates the three agents to work together in a sequential pipeline. It manages the flow of information from natural language query to final business insights.

### Pipeline Steps:
1. **Query Understanding**: Parse user's natural language question
2. **Data Analysis**: Execute technical analysis using nPath
3. **Insight Generation**: Transform data into business recommendations

### Session Management:
- Each agent runs in its own session for isolation
- Results are passed between agents through the orchestrator
- Asynchronous execution ensures responsive performance

In [None]:
from google.genai import types

async def analyze_customer_journey(query: str):
    """
    Run multi-agent analysis pipeline.
    
    Args:
        query: Natural language question about customer journeys
    """
    print(f"\n{'='*60}")
    print(f"Query: {query}")
    print(f"{'='*60}")
    
    # Create agents
    query_parser_agent, data_analyst_agent, business_strategist_agent = create_agents()
    
    # Create session service for agent communication
    session_service = InMemorySessionService()
    
    # Step 1: Parse query to extract outcome
    print("\n1. Query Understanding Agent")
    session_id = "parser_session"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    
    runner = Runner(agent=query_parser_agent, app_name=APP_NAME, session_service=session_service)
    content = types.Content(role='user', parts=[types.Part(text=query)])
    
    outcome = ""
    async for event in runner.run_async(user_id=USER_ID, session_id=session_id, new_message=content):
        if event.is_final_response() and event.content and event.content.parts:
            outcome = event.content.parts[0].text.strip()
            print(f"   Identified outcome: {outcome}")
            break
    
    # Step 2: Analyze data using nPath
    print("\n2. Data Analysis Agent")
    session_id = "analyst_session"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    
    runner = Runner(agent=data_analyst_agent, app_name=APP_NAME, session_service=session_service)
    analyst_query = f"Analyze customer paths for outcome: {outcome}"
    content = types.Content(role='user', parts=[types.Part(text=analyst_query)])
    
    analysis = ""
    async for event in runner.run_async(user_id=USER_ID, session_id=session_id, new_message=content):
        if event.is_final_response() and event.content and event.content.parts:
            analysis = event.content.parts[0].text
            print(f"   Analysis completed")
            break
    
    # Step 3: Generate business insights
    print("\n3. Business Strategy Agent")
    session_id = "strategist_session"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    
    runner = Runner(agent=business_strategist_agent, app_name=APP_NAME, session_service=session_service)
    strategist_query = f"Create business insights from this analysis:\n{analysis}"
    content = types.Content(role='user', parts=[types.Part(text=strategist_query)])
    
    async for event in runner.run_async(user_id=USER_ID, session_id=session_id, new_message=content):
        if event.is_final_response() and event.content and event.content.parts:
            insights = event.content.parts[0].text
            print(f"\n{'='*60}")
            print("FINAL BUSINESS ANALYSIS:")
            print(f"{'='*60}")
            print(insights)
            break

# 10. Execute Analysis - Purchase Journey

### Description
Run the complete multi-agent analysis for understanding customer purchase journeys. This demonstrates how the system transforms a simple question into comprehensive business insights.

In [None]:
# Analyze customer purchase patterns
await analyze_customer_journey("What are the typical journeys before cancellation?")

# queries = [
#     "What are the typical journeys before cancellation?",
#     "How do customers interact with web chat support?",
#     "What drives customers to make repeat purchases?",
#     "Show me paths where customers contact support multiple times"
# ]

# 11. Cleaning the testing dataBefore wrapping up, we need to clean up our work tables and remove any temporary data to keep the environment tidy for future runs.

In [None]:
# Drop the sessionized events table and remove the demo database
db_drop_table(table_name='demo_sessionized_events')

data_cleaning_queries = [
    '''
    DELETE DATABASE DEMO_Retail ALL;
    ''',
    '''
    DROP DATABASE DEMO_Retail;
    '''
]

for query in data_cleaning_queries:
    execute_sql(query)