# MCP Analytics Agent for Website Operation

## Setup and Initialization
This cell imports all required libraries and sets up the foundation for the analytics agent. It includes API connections, authentication, and environment variables. The Gemini model is initialized and tested to ensure connectivity.

In [1]:
# Import necessary packages
import os
import json
import asyncio
from datetime import datetime, timedelta
import pandas as pd
import streamlit as st
from google import genai
from google.genai.types import (
    FunctionDeclaration,
    GenerateContentConfig,
    Part,
    Tool,
)
from google.oauth2 import service_account
from googleapiclient.discovery import build
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
    DateRange,
    Dimension,
    Metric,
    RunReportRequest,
)
import requests

# Initialize Gemini model connection for Vertex AI
def initialize_gemini():
    """Initialize Gemini model connection"""
    # Set project ID and location
    PROJECT_ID = "your_projec_id"
    LOCATION = "us-central1"
    
    # Initialize genai client
    client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)
    
    print("Gemini initialized successfully")
    
    return client

# Set environment variables to ensure API authentication and keys are available
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "your_key.json"
os.environ["PAGESPEED_API_KEY"] = "page_speed_insight_key"  # Please ensure this is your correct key

# Load service account credentials
with open("willy-poc.json", "r") as f:
    credentials_info = json.load(f)

# Create credential object
credentials = service_account.Credentials.from_service_account_file(
    "willy-poc.json",
    scopes=["https://www.googleapis.com/auth/analytics.readonly",
            "https://www.googleapis.com/auth/webmasters.readonly"]
)

# Initialize Gemini client
gemini_client = initialize_gemini()

# Test Gemini connection
def test_gemini_connection(client):
    """Test Gemini model connection"""
    try:
        MODEL_ID = "gemini-2.0-flash"
        # Simple test query
        response = client.models.generate_content(
            model=MODEL_ID, 
            contents="Please respond with only 'Connection successful' if you can read this."
        )
        
        print(f"Gemini connection test response: {response.text}")
        return True
    except Exception as e:
        print(f"Error connecting to Gemini model: {str(e)}")
        return False

# Run connection test
gemini_connection_working = test_gemini_connection(gemini_client)

if gemini_connection_working:
    print("LLM connection is working. We can proceed with proxy testing.")
else:
    print("LLM connection failed. Please check your credentials and configuration.")


Gemini initialized successfully
Gemini connection test response: Connection successful

LLM connection is working. We can proceed with proxy testing.


## API Utility Functions
This cell defines the core API functions to interact with different services:
- `query_ga4_data()`: Retrieves analytics data from Google Analytics 4
- `query_search_console()`: Fetches search performance data
- `query_pagespeed()`: Gets website performance metrics

In [15]:
from google import genai
from google.genai import types

from google.genai.types import (
    FunctionDeclaration,
    GenerateContentConfig,
    Part,
    Tool,
)
# Define GA4 API utility function
def query_ga4_data(property_id: str, dimensions: list, metrics: list, date_range: dict) -> dict:
    """
    Fetch data from GA4 API for specific dimensions and metrics
    
    Parameters:
    property_id: GA4 property ID in format 'properties/XXXXXXX'
    dimensions: List of dimensions (e.g., ['country', 'city'])
    metrics: List of metrics (e.g., ['sessions', 'transactions'])
    date_range: Date range {'start_date': 'YYYY-MM-DD', 'end_date': 'YYYY-MM-DD'}
    
    Returns:
    Dictionary containing GA4 data
    """
    # Create client using the credentials we set up earlier
    client = BetaAnalyticsDataClient(credentials=credentials)
    
    # Build dimension and metric objects
    dimension_objects = [Dimension(name=dim) for dim in dimensions]
    metric_objects = [Metric(name=met) for met in metrics]
    
    # Build date range
    date_range_obj = DateRange(
        start_date=date_range['start_date'],
        end_date=date_range['end_date']
    )
    
    # Build request
    request = RunReportRequest(
        property=property_id,
        dimensions=dimension_objects,
        metrics=metric_objects,
        date_ranges=[date_range_obj]
    )
    
    # Execute request
    response = client.run_report(request)
    
    # Process return data
    result = {
        "rows": [],
        "dimension_headers": [dim.name for dim in response.dimension_headers],
        "metric_headers": [met.name for met in response.metric_headers]
    }
    
    # Iterate through data rows
    for row in response.rows:
        dimension_values = [val.value for val in row.dimension_values]
        metric_values = [val.value for val in row.metric_values]
        result["rows"].append({
            "dimensions": dict(zip(result["dimension_headers"], dimension_values)),
            "metrics": dict(zip(result["metric_headers"], metric_values))
        })
    
    return result

# Define Search Console API utility function
def query_search_console(site_url: str, dimensions: list, start_date: str, end_date: str, row_limit: int = 10000) -> dict:
    """
    Fetch search data from Search Console API
    
    Parameters:
    site_url: Website URL in format 'https://www.tcloud.gov.tw/' (for URL-prefix properties)
    dimensions: List of dimensions (e.g., ['query', 'page', 'date'])
    start_date: Start date in format 'YYYY-MM-DD'
    end_date: End date in format 'YYYY-MM-DD'
    row_limit: Maximum number of rows to return
    
    Returns:
    Dictionary containing Search Console data
    """
    # Build service using the credentials we set up earlier
    service = build('webmasters', 'v3', credentials=credentials)
    
    # Build query parameters
    request = {
        'startDate': start_date,
        'endDate': end_date,
        'dimensions': dimensions,
        'rowLimit': row_limit,
        'startRow': 0  
    }
    
    # Store all rows
    all_rows = []
    
    # Handle paginated results
    while True:
        # Execute query
        response = service.searchanalytics().query(siteUrl=site_url, body=request).execute()
        
        # If there are rows, add to results
        if response.get('rows'):
            all_rows.extend(response['rows'])
            
            if len(response.get('rows', [])) < row_limit:
                # If we got fewer rows than requested, we're done
                break
                
            # Otherwise, update startRow for next page
            request['startRow'] += row_limit
        else:
            break
    
    # Process return data
    result = {
        "rows": all_rows,
        "dimensions": dimensions,
        "total_rows": len(all_rows)
    }
    
    return result

# Define PageSpeed API utility function
def query_pagespeed(url: str, strategy: str = "mobile", locale: str = "en", category: str = "performance") -> dict:
    """
    Fetch performance data from PageSpeed Insights API
    
    Parameters:
    url: Web page URL
    strategy: 'mobile' or 'desktop'
    locale: Language setting
    category: Analysis category, options: 'performance', 'accessibility', 'best-practices', 'seo', 'pwa'
    
    Returns:
    Dictionary containing PageSpeed data
    """
    # Get API key from environment variables
    api_key = os.environ.get("PAGESPEED_API_KEY")
    
    # Build the API URL
    api_url = f"https://www.googleapis.com/pagespeedonline/v5/runPagespeed?url={url}&strategy={strategy}&locale={locale}&category={category}&key={api_key}"
    
    # Make the request
    response = requests.get(api_url)
    
    # Check if the request was successful
    if response.status_code == 200:
        # Parse JSON response
        result = response.json()
        
        # Extract the most relevant data
        simplified_result = {
            "url": result.get("id"),
            "strategy": strategy,
            "lighthouse_version": result.get("lighthouseVersion"),
            "categories": {},
            "metrics": {},
            "audits": {}
        }
        
        # Extract category scores
        if "categories" in result.get("lighthouseResult", {}):
            for cat_key, cat_data in result["lighthouseResult"]["categories"].items():
                simplified_result["categories"][cat_key] = {
                    "title": cat_data.get("title"),
                    "score": cat_data.get("score")
                }
        
        # Extract core web vitals and other metrics
        if "audits" in result.get("lighthouseResult", {}):
            # Core Web Vitals and important metrics
            important_metrics = [
                "first-contentful-paint",
                "largest-contentful-paint",
                "cumulative-layout-shift",
                "total-blocking-time",
                "speed-index",
                "interactive"
            ]
            
            for metric in important_metrics:
                if metric in result["lighthouseResult"]["audits"]:
                    audit = result["lighthouseResult"]["audits"][metric]
                    simplified_result["metrics"][metric] = {
                        "title": audit.get("title"),
                        "description": audit.get("description"),
                        "score": audit.get("score"),
                        "display_value": audit.get("displayValue"),
                        "numeric_value": audit.get("numericValue")
                    }
            
            # Add a few important audits
            important_audits = [
                "render-blocking-resources",
                "unminified-css",
                "unminified-javascript",
                "unused-css-rules",
                "unused-javascript"
            ]
            
            for audit_name in important_audits:
                if audit_name in result["lighthouseResult"]["audits"]:
                    audit = result["lighthouseResult"]["audits"][audit_name]
                    simplified_result["audits"][audit_name] = {
                        "title": audit.get("title"),
                        "description": audit.get("description"),
                        "score": audit.get("score")
                    }
        
        return simplified_result
    else:
        # Return error information
        return {
            "error": True,
            "status_code": response.status_code,
            "message": f"Failed to retrieve PageSpeed data: {response.reason}",
            "details": response.text
        }


## Tool Definitions
This cell creates standardized schemas for each tool that will be exposed to the Gemini model. These schemas define the parameters and structure required for proper API calls. It also creates a mapping dictionary to connect tool names with their execution functions.

In [14]:
# Create tool definitions matching the MCP format
ga4_tool_schema = {
    "name": "ga4_api_tool",
    "description": "Fetch analytics data from Google Analytics 4 for a specific website, including metrics like pageviews, sessions, and active users over a time period.",
    "parameters": {
        "type": "object",
        "properties": {
            "property_id": {
                "type": "string",
                "description": "GA4 property ID in format 'properties/XXXXXXX'. Default is 'properties/your_properties_id' for tcloud.gov.tw"
            },
            "dimensions": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "description": "List of dimensions to analyze (e.g., 'date', 'country', 'deviceCategory')"
            },
            "metrics": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "description": "List of metrics to retrieve (e.g., 'activeUsers', 'sessions', 'screenPageViews')"
            },
            "start_date": {
                "type": "string",
                "description": "Start date in format 'YYYY-MM-DD'"
            },
            "end_date": {
                "type": "string",
                "description": "End date in format 'YYYY-MM-DD'"
            }
        },
        "required": ["property_id", "dimensions", "metrics", "start_date", "end_date"]
    }
}

search_console_tool_schema = {
    "name": "search_console_tool",
    "description": "Fetch search performance data from Google Search Console, including impressions, clicks, CTR, and position data.",
    "parameters": {
        "type": "object",
        "properties": {
            "site_url": {
                "type": "string",
                "description": "Website URL in format 'https://www.tcloud.gov.tw/' (for URL-prefix properties)"
            },
            "dimensions": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "description": "List of dimensions (e.g., 'query', 'page', 'date', 'device', 'country')"
            },
            "start_date": {
                "type": "string",
                "description": "Start date in format 'YYYY-MM-DD'"
            },
            "end_date": {
                "type": "string",
                "description": "End date in format 'YYYY-MM-DD'"
            },
            "row_limit": {
                "type": "integer",
                "description": "Maximum number of rows to return (default: 10000)"
            }
        },
        "required": ["site_url", "dimensions", "start_date", "end_date"]
    }
}

pagespeed_tool_schema = {
    "name": "pagespeed_tool",
    "description": "Analyze website performance using Google PageSpeed Insights API, providing performance metrics and improvement suggestions.",
    "parameters": {
        "type": "object",
        "properties": {
            "url": {
                "type": "string",
                "description": "Web page URL to analyze"
            },
            "strategy": {
                "type": "string",
                "enum": ["mobile", "desktop"],
                "description": "Device strategy to use for analysis (mobile or desktop)"
            },
            "locale": {
                "type": "string",
                "description": "Language setting (default: 'en')"
            },
            "category": {
                "type": "string",
                "enum": ["performance", "accessibility", "best-practices", "seo", "pwa"],
                "description": "Analysis category (default: 'performance')"
            }
        },
        "required": ["url", "strategy"]
    }
}

mcp_tools = [
    types.Tool(function_declarations=[types.FunctionDeclaration(**ga4_tool_schema)]),
    types.Tool(function_declarations=[types.FunctionDeclaration(**search_console_tool_schema)]),
    types.Tool(function_declarations=[types.FunctionDeclaration(**pagespeed_tool_schema)])
]

# Define tool execution functions
def execute_ga4_tool(params):
    # Build date range
    date_range = {
        "start_date": params["start_date"],
        "end_date": params["end_date"]
    }
    
    # Call GA4 API
    result = query_ga4_data(
        property_id=params["property_id"],
        dimensions=params["dimensions"],
        metrics=params["metrics"],
        date_range=date_range
    )
    
    return result

def execute_search_console_tool(params):
    # Set default values
    row_limit = params.get("row_limit", 10000)
    
    # Call Search Console API
    result = query_search_console(
        site_url=params["site_url"],
        dimensions=params["dimensions"],
        start_date=params["start_date"],
        end_date=params["end_date"],
        row_limit=row_limit
    )
    
    return result

def execute_pagespeed_tool(params):
    # Set default values
    locale = params.get("locale", "en")
    category = params.get("category", "performance")
    
    # Call PageSpeed API
    result = query_pagespeed(
        url=params["url"],
        strategy=params["strategy"],
        locale=locale,
        category=category
    )
    
    return result

# Tool function mapping table
tool_functions = {
    "ga4_api_tool": execute_ga4_tool,
    "search_console_tool": execute_search_console_tool,
    "pagespeed_tool": execute_pagespeed_tool
}

print("MCP tool functions defined successfully")


MCP tool functions defined successfully


## Orchestrator Agent
This cell implements the Orchestrator Agent - the first layer of the system that:
1. Analyzes user intent from natural language queries
2. Determines which tools are needed to answer the query
3. Generates appropriate API call parameters
4. Includes date handling for relative time references (yesterday, last week, etc.)

In [16]:
# Implementing the first layer Agent (Orchestrator)

# Design system prompt to help Gemini understand how to analyze user intent
ORCHESTRATOR_SYSTEM_PROMPT = """
You are an Analytics Assistant for a website analytics platform. Your role is to understand user's natural language queries about website analytics and translate them into specific API calls.

The website being analyzed is: https://www.tcloud.gov.tw/

You have access to the following tools:

1. GA4 Analytics Tool:
   - Provides pageviews, sessions, active users, and other analytics metrics
   - Common dimensions: date, deviceCategory, country, city, browser
   - Common metrics: activeUsers, sessions, screenPageViews, engagementRate

2. Search Console Tool:
   - Provides search visibility data: impressions, clicks, CTR, position
   - Common dimensions: query, page, date, device, country

3. PageSpeed Insights Tool:
   - Analyzes website performance
   - Can check both mobile and desktop performance
   - Provides Core Web Vitals and other performance metrics

When a user asks a question:
1. Determine which tool(s) you need to use
2. Select appropriate parameters (date ranges, dimensions, metrics)
3. Use default values when specific parameters aren't mentioned:
   - Default property_id for GA4: "properties/your_properties_id"
   - Default site_url for Search Console: "https://www.tcloud.gov.tw/"
   - If date range isn't specified, use the last 7 days
   - For PageSpeed, analyze both mobile and desktop by default

For time-related queries:
- "Yesterday" = yesterday's date
- "Last week" = the 7 days prior to today
- "Last month" = the 30 days prior to today
- "This month" = from the 1st of current month to today

Always return well-structured and properly formatted function parameters.
"""

# Implement function to let Gemini choose appropriate tools and generate parameters
def orchestrator_agent(user_query, model_id="gemini-2.0-flash"):
    """
    First layer Agent: Analyze user intent, choose appropriate tools, and generate parameters
    
    Parameters:
    user_query: User's natural language query
    model_id: Gemini model ID to use
    
    Returns:
    dict: Contains selected tool names and parameters
    """
    # Get current date for calculating relative date ranges
    today = datetime.now().strftime('%Y-%m-%d')
    yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
    last_week_start = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
    last_month_start = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
    this_month_start = datetime.now().replace(day=1).strftime('%Y-%m-%d')
    
    # Add date information to the prompt
    date_context = f"""
    Today's date is: {today}
    Yesterday was: {yesterday}
    Last week started on: {last_week_start}
    Last month started on: {last_month_start}
    This month started on: {this_month_start}
    """
    
    # Prepare complete prompt to send to Gemini
    full_prompt = f"{ORCHESTRATOR_SYSTEM_PROMPT}\n{date_context}\n\nUser query: {user_query}\n\nAnalyze the user's intent and determine which tool(s) to use and with what parameters:"
    
    try:
        # Call Gemini model, providing the tool list
        response = gemini_client.models.generate_content(
            model=model_id,
            contents=full_prompt,
            config=types.GenerateContentConfig(
                temperature=0.2,
                tools=mcp_tools,
            )
        )
        
        # Check if response contains function_call
        function_calls = []
        for part in response.candidates[0].content.parts:
            fc = getattr(part, 'function_call', None)
            if fc is not None:  # Ensure function_call exists and is not None
                function_call = {
                    "name": fc.name,
                    "parameters": dict(fc.args)
                }
                function_calls.append(function_call)
        
        if not function_calls:
            # If there's no function_call, return error message
            return {
                "error": True,
                "message": "Agent couldn't determine appropriate tools for the query.",
                "response_text": response.text
            }
        
        # Return parsed tool call information
        return {
            "function_calls": function_calls,
            "response_text": response.text
        }
        
    except Exception as e:
        # Handle any errors
        return {
            "error": True,
            "message": f"Error in orchestrator agent: {str(e)}"
        }



## Orchestrator Testing
This cell tests the Orchestrator Agent with sample queries to verify it correctly:
- Understands different types of analytics questions
- Selects appropriate tools for each query
- Generates valid parameters for API calls
- Handles date ranges correctly

In [17]:
# Test Orchestrator Agent function
def test_orchestrator():
    test_queries = [
        "How many pageviews did the homepage have last week?",
        "Compare mobile device visits between this month and last month",
        "What is the average search ranking for the homepage?",
        "How fast does the website load on mobile?",
        "Which pages have the most impressions?"
    ]
    
    print("Testing Orchestrator Agent...")
    for query in test_queries:
        print(f"\nQuery: '{query}'")
        result = orchestrator_agent(query)
        
        if "error" in result:
            print(f"Error: {result['message']}")
        else:
            for i, call in enumerate(result["function_calls"]):
                print(f"Tool {i+1}: {call['name']}")
                print(f"Parameters: {json.dumps(call['parameters'], indent=2)}")
    
    return "Orchestrator test completed"

# Run Orchestrator test
test_orchestrator()


Testing Orchestrator Agent...

Query: 'How many pageviews did the homepage have last week?'




Tool 1: ga4_api_tool
Parameters: {
  "end_date": "2025-04-16",
  "property_id": "properties/325021534",
  "metrics": [
    "screenPageViews"
  ],
  "dimensions": [
    "page"
  ],
  "start_date": "2025-04-09"
}

Query: 'Compare mobile device visits between this month and last month'




Tool 1: ga4_api_tool
Parameters: {
  "metrics": [
    "sessions"
  ],
  "start_date": "2025-04-01",
  "dimensions": [
    "deviceCategory"
  ],
  "end_date": "2025-04-16",
  "property_id": "properties/325021534"
}
Tool 2: ga4_api_tool
Parameters: {
  "property_id": "properties/325021534",
  "start_date": "2025-03-17",
  "metrics": [
    "sessions"
  ],
  "end_date": "2025-04-16",
  "dimensions": [
    "deviceCategory"
  ]
}

Query: 'What is the average search ranking for the homepage?'




Tool 1: search_console_tool
Parameters: {
  "start_date": "2025-04-09",
  "end_date": "2025-04-16",
  "dimensions": [
    "page"
  ],
  "site_url": "https://www.tcloud.gov.tw/"
}

Query: 'How fast does the website load on mobile?'




Tool 1: pagespeed_tool
Parameters: {
  "strategy": "mobile",
  "url": "https://www.tcloud.gov.tw/"
}

Query: 'Which pages have the most impressions?'




Tool 1: search_console_tool
Parameters: {
  "end_date": "2025-04-16",
  "start_date": "2025-04-09",
  "site_url": "https://www.tcloud.gov.tw/",
  "dimensions": [
    "page"
  ]
}


'Orchestrator test completed'

## Executor Agent
This cell defines the Executor Agent - the second layer of the system that:
1. Takes function calls generated by the Orchestrator
2. Executes API calls to retrieve the actual data
3. Collects and formats the results
4. Prepares the data for the response generation phase

In [9]:
# Implementing the second layer Agent (Executor)

import asyncio
from typing import Dict, List, Any, Optional

# Executor Agent: Responsible for executing tool calls and processing results
async def executor_agent(function_calls, session):
    """
    Second layer Agent: Execute tool calls and process results
    
    Parameters:
    function_calls: List of function calls with parameters generated by orchestrator
    session: MCP client session
    
    Returns:
    dict: Contains results from all tool calls
    """
    results = []
    
    try:
        # Loop through each function call
        for call in function_calls:
            tool_name = call["name"]
            parameters = call["parameters"]
            
            print(f"Executing {tool_name} with parameters: {parameters}")
            
            # Execute tool call through MCP session
            tool_result = await session.call_tool(
                tool_name,
                arguments=parameters
            )
            
            # Add tool result to results list
            results.append({
                "tool": tool_name,
                "parameters": parameters,
                "result": tool_result
            })
        
        return {
            "success": True,
            "results": results
        }
        
    except Exception as e:
        # Handle any errors during tool execution
        return {
            "error": True,
            "message": f"Error executing tools: {str(e)}"
        }


## Response Generator
This cell implements the Response Generator that:
1. Takes raw API results from the Executor Agent
2. Creates user-friendly, natural language summaries
3. Highlights key metrics and insights
4. Formats the response for readability
5. Provides context about the metrics when relevant

In [None]:
# Main function to process user query
async def process_user_query(user_query):
    """
    Main function to process user query through the entire agent pipeline
    
    Parameters:
    user_query: User's natural language query
    
    Returns:
    str: Comprehensive response
    """
    # Start MCP server
    server_params = await start_analytics_mcp()
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize session
            await session.initialize()
            
            # Step 1: Call Orchestrator to understand intent and generate tool calls
            orchestrator_result = orchestrator_agent(user_query)
            
            if "error" in orchestrator_result:
                return f"Error understanding query: {orchestrator_result['message']}"
            
            # Extract function calls
            function_calls = orchestrator_result.get("function_calls", [])
            
            if not function_calls:
                return "I couldn't determine how to answer your query with the available tools."
            
            # Step 2: Call Executor to execute the tool calls
            executor_result = await executor_agent(function_calls, session)
            
            if "error" in executor_result:
                return f"Error executing tools: {executor_result['message']}"
            
            # Step 3: Generate comprehensive response
            final_response = generate_analytics_response(executor_result, user_query)
            
            return final_response

In [10]:
# Test the second layer Agent (Executor)
async def test_executor():
    """Test the functionality of the executor agent"""
    # Simulate function calls from the Orchestrator
    test_function_calls = [
        {
            "name": "ga4_api_tool",
            "parameters": {
                "property_id": "properties/325021534",
                "dimensions": ["date"],
                "metrics": ["activeUsers", "sessions", "screenPageViews"],
                "start_date": (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d'),
                "end_date": datetime.now().strftime('%Y-%m-%d')
            }
        }
    ]
    
    # Start the MCP server
    server_params = await start_analytics_mcp()
    
    print("Testing Executor Agent...")
    
    # Interact with the server using the MCP client
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize session
            await session.initialize()
            
            # Execute function calls
            result = await executor_agent(test_function_calls, session)
            
            if "error" in result:
                print(f"Execution error: {result['message']}")
            else:
                print(f"Execution successful, retrieved {len(result['results'])} result(s)")
                
                # Generate a comprehensive response
                response = generate_analytics_response(result, "How many visitors did we have last week?")
                print("\nGenerated response:")
                print(response)
    
    return "Executor test completed"


In [11]:
# Re-import required libraries
import os
import json
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional

# Google Generative AI imports
from google import genai
from google.genai.types import (
    FunctionDeclaration,
    GenerateContentConfig,
    Part,
    Tool,
)

# MCP imports
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# API related imports
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
    DateRange, Dimension, Metric, RunReportRequest
)
from google.oauth2 import service_account
from googleapiclient.discovery import build
import requests
import nest_asyncio



## MCP Server Functions
This cell defines helper functions to start and manage the MCP (Model-Controlled Proxy) server, which handles the execution of tool calls. It creates appropriate server parameters and configurations.

In [None]:
# MCP server startup function
async def start_analytics_mcp():
    """Start the MCP server and return server parameters"""
    server_params = StdioServerParameters(
        command="python",
        args=["analytics_mcp_server.py", "stdio"],
        env={
            "PAGESPEED_API_KEY": os.environ.get("PAGESPEED_API_KEY", "page_speed_insight_key"),
        },
    )
    
    print("MCP server parameters configured")
    return server_params


## Interactive Notebook Interface
This cell creates an interactive interface within the Jupyter notebook that:
1. Takes user queries as input
2. Shows the processing steps in real-time
3. Executes the appropriate tools
4. Displays formatted responses using Markdown
5. Provides a conversational experience for exploring analytics data

In [12]:
# Simplified Executor Agent Test
def test_executor_with_mock_data(user_query="How many visitors did we have last week?"):
    """
    Test the executor agent functionality using mock data
    
    Parameters:
    user_query: User's natural language query
    
    Returns:
    str: Simulated final response
    """
    # 1. Simulate output from Orchestrator
    function_calls = [
        {
            "name": "ga4_api_tool",
            "parameters": {
                "property_id": "properties/325021534",
                "dimensions": ["date"],
                "metrics": ["activeUsers", "sessions", "screenPageViews"],
                "start_date": "2025-04-09",
                "end_date": "2025-04-16"
            }
        }
    ]
    
    # 2. Simulate data retrieved by Executor
    executor_results = {
        "success": True,
        "results": [
            {
                "tool": "ga4_api_tool",
                "parameters": {
                    "property_id": "properties/325021534",
                    "dimensions": ["date"],
                    "metrics": ["activeUsers", "sessions", "screenPageViews"],
                    "start_date": "2025-04-09",
                    "end_date": "2025-04-16"
                },
                "result": {
                    "rows": [
                        {
                            "dimensions": {"date": "20250409"},
                            "metrics": {"activeUsers": "254", "sessions": "318", "screenPageViews": "789"}
                        },
                        {
                            "dimensions": {"date": "20250410"},
                            "metrics": {"activeUsers": "267", "sessions": "335", "screenPageViews": "812"}
                        },
                        {
                            "dimensions": {"date": "20250411"},
                            "metrics": {"activeUsers": "243", "sessions": "298", "screenPageViews": "734"}
                        },
                        {
                            "dimensions": {"date": "20250412"},
                            "metrics": {"activeUsers": "181", "sessions": "221", "screenPageViews": "543"}
                        },
                        {
                            "dimensions": {"date": "20250413"},
                            "metrics": {"activeUsers": "172", "sessions": "209", "screenPageViews": "498"}
                        },
                        {
                            "dimensions": {"date": "20250414"},
                            "metrics": {"activeUsers": "261", "sessions": "327", "screenPageViews": "803"}
                        },
                        {
                            "dimensions": {"date": "20250415"},
                            "metrics": {"activeUsers": "278", "sessions": "345", "screenPageViews": "847"}
                        },
                        {
                            "dimensions": {"date": "20250416"},
                            "metrics": {"activeUsers": "292", "sessions": "358", "screenPageViews": "871"}
                        }
                    ],
                    "dimension_headers": ["date"],
                    "metric_headers": ["activeUsers", "sessions", "screenPageViews"],
                    "row_count": 8,
                    "summary": {
                        "total_activeUsers": 1948,
                        "total_sessions": 2411,
                        "total_screenPageViews": 5897,
                        "avg_sessions_per_user": 1.24,
                        "avg_pageviews_per_session": 2.45
                    }
                }
            }
        ]
    }
    
    # 3. Generate final response
    print(f"Test query: '{user_query}'")
    print("Executing tool: ga4_api_tool")
    print("Parameters:", json.dumps(function_calls[0]["parameters"], indent=2))
    print("\nGenerating response...")
    
    # 4. Call response generation function (if defined)
    try:
        if 'generate_analytics_response' in globals():
            response = generate_analytics_response(executor_results, user_query)
            print("\nFinal response:")
            print(response)
            return response
        else:
            # Simulated response
            print("\nFinal response (mock):")
            mock_response = """
## Website Visitors Last Week

Based on the data, your website had **1,948 active users** with **2,411 sessions** last week.

The daily breakdown shows:
- Most active day: **April 16** with 292 users
- Least active day: **April 13** (Sunday) with 172 users

### Key Metrics:
- Average sessions per user: **1.24**
- Average page views per session: **2.45**
- Total page views: **5,897**

There was a noticeable weekend dip in traffic (April 12-13), which is common for many websites. Weekday traffic appears strongest mid-week.
"""
            print(mock_response)
            return mock_response
    except Exception as e:
        error_msg = f"Error generating response: {str(e)}"
        print(error_msg)
        return error_msg

# Test function
test_executor_with_mock_data()


Test query: 'How many visitors did we have last week?'
Executing tool: ga4_api_tool
Parameters: {
  "property_id": "properties/325021534",
  "dimensions": [
    "date"
  ],
  "metrics": [
    "activeUsers",
    "sessions",
    "screenPageViews"
  ],
  "start_date": "2025-04-09",
  "end_date": "2025-04-16"
}

Generating response...

Final response:
Here's a summary of website visitors from April 9th to April 16th, 2025:

*   We had a total of **1,948 active users** last week.
*   These users generated a total of **2,411 sessions**. A session represents a period of active engagement with your website.
*   The total number of **screen page views** was **5,897**.
*   The average sessions per user was **1.24**.
*   The average page views per session was **2.45**.

    The number of active users was lower on the weekend (April 12th and 13th).
    The number of active users peaked on April 16th.



"Here's a summary of website visitors from April 9th to April 16th, 2025:\n\n*   We had a total of **1,948 active users** last week.\n*   These users generated a total of **2,411 sessions**. A session represents a period of active engagement with your website.\n*   The total number of **screen page views** was **5,897**.\n*   The average sessions per user was **1.24**.\n*   The average page views per session was **2.45**.\n\n    The number of active users was lower on the weekend (April 12th and 13th).\n    The number of active users peaked on April 16th.\n"

In [13]:
# Implementing an Interactive Analytics Assistant in Jupyter Notebook

import nest_asyncio
import IPython.display as display
from IPython.display import Markdown, HTML
import asyncio

# Ensure asyncio works properly in Jupyter
nest_asyncio.apply()

async def interactive_analytics_assistant(query):
    """Interactive assistant that generates analytics reports based on user query"""
    try:
        # Show processing status
        print("Processing your query, please wait...")
        
        # 1. Call the Orchestrator agent to understand intent and generate tool calls
        orchestrator_result = orchestrator_agent(query)
        
        if "error" in orchestrator_result:
            return f"Error: {orchestrator_result['message']}"
        
        # Get function calls
        function_calls = orchestrator_result.get("function_calls", [])
        
        if not function_calls:
            return "I couldn't determine how to answer your question with the available tools. Please try rephrasing your query."
        
        # Show selected tools
        print(f"The query will use the following tools:")
        for i, call in enumerate(function_calls):
            print(f"  - {call['name']}")
        
        # 2. Simulate Executor agent to handle tool calls
        results = []
        for call in function_calls:
            tool_name = call["name"]
            params = call["parameters"]
            
            print(f"\nExecuting tool: {tool_name}")
            print(f"Parameters: {json.dumps(params, indent=2)}")
            
            # Use the tool function mapping table to execute the appropriate tool
            if tool_name in tool_functions:
                tool_result = tool_functions[tool_name](params)
                results.append({
                    "tool": tool_name,
                    "parameters": params,
                    "result": tool_result
                })
            else:
                print(f"Unimplemented tool: {tool_name}")
        
        # 3. Format results into Executor result format
        executor_result = {
            "success": True,
            "results": results
        }
        
        # 4. Generate comprehensive response
        print("\nGenerating response...")
        final_response = generate_analytics_response(executor_result, query)
        
        # Display formatted response using Markdown
        return Markdown(final_response)
    
    except Exception as e:
        # Handle any errors
        error_msg = f"An error occurred while processing your query: {str(e)}"
        print(error_msg)
        return error_msg

# Interactive loop function
def run_interactive_assistant():
    """Run the interactive analytics assistant loop"""
    print("="*50)
    print("Welcome to the Website Analytics Assistant!")
    print("You can ask about website traffic, SEO performance, or page speed.")
    print('Type "exit" or "quit" to leave')
    print("="*50)
    
    while True:
        # Get user input
        query = input("\nPlease enter your question: ")
        
        # Check exit commands
        if query.lower() in ["exit", "quit"]:
            print("Thank you for using the assistant! Goodbye!")
            break
        
        if not query.strip():
            print("Please enter a valid question.")
            continue
        
        # Process the query and display result
        result = asyncio.run(interactive_analytics_assistant(query))
        display.display(result)

# Run interactive assistant
run_interactive_assistant()


Welcome to the Website Analytics Assistant!
You can ask about website traffic, SEO performance, or page speed.
Type "exit" or "quit" to leave



Please enter your question:  how is the overall SEO rank 202504


Processing your query, please wait...




The query will use the following tools:
  - search_console_tool

Executing tool: search_console_tool
Parameters: {
  "site_url": "https://www.tcloud.gov.tw/",
  "start_date": "2025-04-01",
  "dimensions": [
    "query",
    "page",
    "date",
    "device",
    "country"
  ],
  "end_date": "2025-04-16"
}

Generating response...


Error generating response: 400 INVALID_ARGUMENT. {'error': {'code': 400, 'message': 'The input token count (2203635) exceeds the maximum number of tokens allowed (1000000).', 'status': 'INVALID_ARGUMENT'}}


Please enter your question:  what is the user count 202501?


Processing your query, please wait...




The query will use the following tools:
  - ga4_api_tool

Executing tool: ga4_api_tool
Parameters: {
  "metrics": [
    "activeUsers"
  ],
  "dimensions": [],
  "start_date": "2025-01-01",
  "property_id": "properties/325021534",
  "end_date": "2025-01-31"
}

Generating response...


Okay, here's a summary of the user count for January 2025:

For the period of January 1st to January 31st, 2025, your website had **7,564 active users**.

*   **Active users** represent the number of distinct individuals who engaged with your website during that month.

This metric provides a general sense of your website's reach and popularity during that time. To gain more insight, you could compare this number to previous months or years to identify trends.



Please enter your question:  exit


Thank you for using the assistant! Goodbye!
