# BigQuery Agent Quickstart

A minimal agent that takes natural language → queries BigQuery → renders a chart.

In [21]:
# !pip install -qU duckduckgo-search langchain-community
# !pip install -U ddgs

In [22]:
import os
import json
from typing import Annotated, Literal
from datetime import datetime, timezone

from google.cloud import bigquery
from langchain_google_vertexai import ChatVertexAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_community.tools import DuckDuckGoSearchRun
from typing_extensions import TypedDict
import pandas as pd
import plotly.express as px

## 1. Setup

In [11]:
PROJECT = os.environ.get("GOOGLE_CLOUD_PROJECT", "knowsee-platform-development")
LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION", "europe-west2")

bq_client = bigquery.Client(project=PROJECT)
llm = ChatVertexAI(model="gemini-2.5-flash", project=PROJECT, location=LOCATION, temperature=0.3)

---
# Part 2: Full Data Analyst Agent

This is the distilled version of `backend/src/agents/data_analyst/`. It adds:
- Schema exploration tools
- Output type detection (text/chart/sheet)
- Auto chart configuration
- Iteration limits to prevent infinite loops

In [23]:
class ChartConfig(TypedDict, total=False):
    """Configuration for a single chart."""
    chart_type: Literal["line", "bar", "pie", "scatter", "area", "histogram"]
    title: str
    x: str
    y: str | list[str]  # Can be multiple y columns for multi-series
    color: str | None  # Column to use for color grouping
    data: list[dict]  # The actual data rows

class DataAnalystState(TypedDict):
    """State flows through the graph."""
    messages: Annotated[list[BaseMessage], add_messages]
    charts: list[ChartConfig]  # Accumulated chart configs (LLM decides these)
    iteration_count: int

In [24]:
PUBLIC_PROJECT = "bigquery-public-data"

# Initialise DuckDuckGo search
_ddg_search = DuckDuckGoSearchRun()

@tool
def web_search(query: str) -> str:
    """Search the web for current information, news, trends, or topics NOT available in BigQuery.
    
    Use this tool when:
    - The question is about current events, news, or recent developments
    - You need information about specific topics/categories (e.g., "AI trends", "tech news")
    - BigQuery public datasets cannot answer the question (no categorisation, wrong domain)
    - The user asks about trending topics in a specific industry or field
    
    Do NOT use this for:
    - Questions that can be answered with structured BigQuery data
    - Historical statistical analysis (use BigQuery instead)
    """
    return _ddg_search.run(query)

@tool
def list_datasets() -> str:
    """List available public BigQuery datasets with capabilities and limitations."""
    return """Available datasets in bigquery-public-data:

1. **usa_names** - US baby names 1910-present
   - Tables: usa_1910_current
   - Good for: Name popularity over time, gender trends, state comparisons
   - Limitations: US only, no global data, no ethnicity/origin data

2. **google_trends** - Search trends data
   - Tables: top_terms, top_rising_terms, international_top_terms
   - Good for: General trending searches, regional comparisons, time-based popularity
   - Limitations: NO TOPIC CATEGORISATION - cannot filter by industry (tech, sports, etc.)
     Raw search terms only. Use web_search for topic-specific trends.

3. **ga4_obfuscated_sample_ecommerce** - GA4 e-commerce events
   - Good for: User behaviour analysis, conversion funnels, event tracking examples
   - Limitations: Sample data only, obfuscated, not real business metrics

4. **samples** - Classic BigQuery samples
   - Tables: shakespeare, natality, github_timeline
   - Good for: Learning SQL, text analysis, birth statistics
   - Limitations: Historical/static data

5. **austin_bikeshare** / **chicago_taxi_trips** / **new_york_taxi_trips**
   - Good for: Transportation analysis, geospatial queries, time patterns
   - Limitations: City-specific, historical data

## Tool Selection Guide:
- Structured historical data → BigQuery (list_tables → get_table_schema → execute_query)
- Current events, topic trends, industry news → web_search
- If BigQuery query returns irrelevant results → try web_search instead

Use list_tables(dataset_id) to see exact table names."""

@tool
def list_tables(dataset_id: str) -> str:
    """List all tables in a dataset. ALWAYS call this before get_table_schema."""
    sql = f"""
    SELECT table_name
    FROM `bigquery-public-data.{dataset_id}.INFORMATION_SCHEMA.TABLES`
    """
    try:
      rows = list(bq_client.query(sql).result())
      if not rows:
          return f"No tables found in {dataset_id}"
      lines = [f"Tables in bigquery-public-data.{dataset_id}:"]
      for r in rows:
          lines.append(f"  - {r.table_name}")
      return "\n".join(lines)
    except Exception as e:
      return f"Error listing tables: {e}"

@tool
def get_table_schema(dataset_id: str, table_id: str) -> str:
    """Get schema of a BigQuery table. Call list_tables first to get valid table names."""
    try:
        table_ref = f"{PUBLIC_PROJECT}.{dataset_id}.{table_id}"
        table = bq_client.get_table(table_ref)
        lines = [f"Schema for {table_ref} ({table.num_rows:,} rows):"]
        for field in table.schema:
            lines.append(f"  - {field.name}: {field.field_type}")
        return "\n".join(lines)
    except Exception as e:
        return f"Error: {e}"

@tool
def execute_query(sql: str, max_rows: int = 100) -> dict:
    """Execute a SELECT query. Returns columns and rows."""
    if any(kw in sql.upper() for kw in ["INSERT", "UPDATE", "DELETE", "DROP", "CREATE"]):
        return {"error": "Only SELECT allowed", "columns": [], "rows": []}
    try:
        result = bq_client.query(sql).result(timeout=30)
        columns = [f.name for f in result.schema]
        rows = []
        for i, row in enumerate(result):
            if i >= min(max_rows, 1000):
                break
            row_dict = {}
            for col in columns:
                val = row[col]
                row_dict[col] = val.isoformat() if hasattr(val, "isoformat") else val
            rows.append(row_dict)
        return {"columns": columns, "rows": rows, "total_rows": result.total_rows}
    except Exception as e:
        return {"error": str(e), "columns": [], "rows": []}

@tool
def suggest_chart(
    chart_type: Literal["line", "bar", "pie", "scatter", "area", "histogram"],
    title: str,
    x_column: str,
    y_column: str,
    data: list[dict],
    color_column: str | None = None,
) -> str:
    """Suggest a visualisation for query results. Call this AFTER execute_query when data should be visualised.
    
    Args:
        chart_type: The type of chart that best represents the data:
            - "line": Time series, trends over time
            - "bar": Comparisons across categories
            - "pie": Part-to-whole relationships (use sparingly, max 6 slices)
            - "scatter": Correlation between two numeric variables
            - "area": Cumulative totals over time
            - "histogram": Distribution of a single numeric variable
        title: A descriptive title for the chart (based on the user's question)
        x_column: Column name for the x-axis
        y_column: Column name for the y-axis (the metric being measured)
        data: The query result rows to visualise (from execute_query)
        color_column: Optional column for grouping/coloring (e.g., gender, category)
    
    Returns:
        Confirmation that the chart has been queued for rendering.
    """
    # Validate data has the required columns
    if not data:
        return "Error: No data provided for chart"
    
    sample = data[0]
    missing = []
    if x_column not in sample:
        missing.append(f"x_column '{x_column}'")
    if y_column not in sample:
        missing.append(f"y_column '{y_column}'")
    if color_column and color_column not in sample:
        missing.append(f"color_column '{color_column}'")
    
    if missing:
        return f"Error: Columns not found in data: {', '.join(missing)}. Available: {list(sample.keys())}"
    
    # Return the config as JSON for parsing by generate_output_node
    config = {
        "chart_type": chart_type,
        "title": title,
        "x": x_column,
        "y": y_column,
        "color": color_column,
        "data": data,
        "_is_chart_config": True,  # Marker for parsing
    }
    return json.dumps(config)

ALL_TOOLS = [list_datasets, list_tables, get_table_schema, execute_query, suggest_chart, web_search]
llm_with_tools = llm.bind_tools(ALL_TOOLS)

## 4. System Prompt

In [25]:
SYSTEM_PROMPT = f"""You are a cynical but a brilliant Data Analyst with access to BigQuery public datasets AND web search.
Today: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}

## TOOL SELECTION (decide FIRST):

**Use BigQuery when:**
- Question involves structured data analysis (counts, aggregations, comparisons)
- Historical trends with specific metrics (baby names over time, taxi trips by hour)
- Statistical queries on known datasets

**Use web_search when:**
- Question asks about topic-specific trends (e.g., "AI trends", "tech news")
- Current events, recent news, or real-time information
- BigQuery datasets lack the categorisation needed (e.g., google_trends has NO topic filtering)
- You need qualitative information, not just numbers

## BigQuery WORKFLOW (when using BigQuery):
1. list_datasets() → see available datasets AND their limitations
2. list_tables(dataset_id) → get EXACT table names (NEVER guess)
3. get_table_schema(dataset_id, table_id) → understand columns
4. execute_query(sql) → run your query
5. suggest_chart() → if data should be visualised, call this with appropriate config

## VISUALISATION GUIDELINES:
After execute_query returns data, decide if visualisation helps answer the question:
- **YES, visualise**: Comparisons, trends, distributions, rankings
- **NO, skip chart**: Single values, yes/no answers, text-heavy results

When calling suggest_chart, choose the right chart type:
- **line**: Trends over time (years, months, dates)
- **bar**: Comparing categories (names, states, products)
- **pie**: Part-to-whole (max 6 slices, use sparingly)
- **scatter**: Correlation between two numbers
- **histogram**: Distribution of a single metric

Create a clear, descriptive title based on what the user asked.

## MULTI-PART QUESTIONS:
If the user asks multiple questions, handle each separately:
1. Answer the first question completely (including chart if needed)
2. Then answer the second question (with its own chart if needed)
3. You can call suggest_chart multiple times for different datasets

## Rules:
- NEVER guess table names. Always call list_tables first.
- Read dataset limitations carefully - if a dataset can't answer the question, use web_search
- Use LIMIT for exploration queries
- Only SELECT statements allowed
- Summarise findings in 2-3 sentences after presenting data"""

In [27]:
MAX_ITERATIONS = 12  # Increased for multi-part questions with chart suggestions

async def agent_node(state: DataAnalystState) -> dict:
    """LLM decides: call tool or respond."""
    messages = list(state["messages"])
    
    # Add system prompt if missing
    if not any(isinstance(m, SystemMessage) for m in messages):
        messages = [SystemMessage(content=SYSTEM_PROMPT)] + messages
    
    response = await llm_with_tools.ainvoke(messages)
    return {
        "messages": [response],
        "iteration_count": state.get("iteration_count", 0) + 1,
    }

def should_continue(state: DataAnalystState) -> str:
    """Route: tools, or generate output."""
    if state.get("iteration_count", 0) >= MAX_ITERATIONS:
        return "generate_output"
    
    last = state["messages"][-1]
    if isinstance(last, AIMessage) and getattr(last, "tool_calls", None):
        return "tools"
    return "generate_output"

async def generate_output_node(state: DataAnalystState) -> dict:
    """Extract chart configs from suggest_chart tool calls."""
    charts = []
    
    # Parse chart configs from tool messages (suggest_chart returns JSON)
    for msg in state["messages"]:
        if not isinstance(msg, ToolMessage):
            continue
        if msg.name != "suggest_chart":
            continue
        try:
            parsed = json.loads(msg.content) if isinstance(msg.content, str) else msg.content
            if isinstance(parsed, dict) and parsed.get("_is_chart_config"):
                # Remove the marker and add to charts
                chart_config = {k: v for k, v in parsed.items() if k != "_is_chart_config"}
                charts.append(chart_config)
        except (json.JSONDecodeError, TypeError):
            continue
    
    return {"charts": charts}

In [None]:
graph = StateGraph(DataAnalystState)

graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(ALL_TOOLS))
graph.add_node("generate_output", generate_output_node)

graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue, {
       "tools": "tools",
       "generate_output": "generate_output",
})
graph.add_edge("tools", "agent")
graph.add_edge("generate_output", END)

data_analyst = graph.compile()
data_analyst

In [None]:
def render_chart(chart_config: dict) -> None:
    """Render a single chart using Plotly based on LLM-provided config."""
    df = pd.DataFrame(chart_config["data"])
    chart_type = chart_config["chart_type"]
    title = chart_config.get("title", "Chart")
    x = chart_config["x"]
    y = chart_config["y"]
    color = chart_config.get("color")
    
    # Map chart types to Plotly Express functions
    chart_funcs = {
        "line": px.line,
        "bar": px.bar,
        "pie": px.pie,
        "scatter": px.scatter,
        "area": px.area,
        "histogram": px.histogram,
    }
    
    func = chart_funcs.get(chart_type, px.bar)
    
    # Build kwargs based on chart type
    kwargs = {"data_frame": df, "title": title}
    
    if chart_type == "pie":
        kwargs["names"] = x
        kwargs["values"] = y
    elif chart_type == "histogram":
        kwargs["x"] = y  # Histogram uses the metric column
    else:
        kwargs["x"] = x
        kwargs["y"] = y
        if color:
            kwargs["color"] = color
    
    fig = func(**kwargs)
    fig.show()

async def analyse(question: str, show_trace: bool = False):
    """Run the data analyst agent and display results."""
    result = await data_analyst.ainvoke({
        "messages": [HumanMessage(content=question)],
        "charts": [],
        "iteration_count": 0,
    })
    
    # Show trace if requested
    if show_trace:
        print("=" * 60)
        print("TRACE:")
        for i, msg in enumerate(result["messages"]):
            content_preview = str(msg.content)[:100] if msg.content else "(empty)"
            print(f"[{i}] {type(msg).__name__}: {content_preview}...")
        print("=" * 60)
    
    # Print final answer
    final_msg = result["messages"][-1]
    print(f"\nAnswer: {final_msg.content}\n")
    
    # Render all charts
    charts = result.get("charts", [])
    if charts:
        print(f"Rendering {len(charts)} chart(s)...\n")
        for i, chart_config in enumerate(charts):
            print(f"Chart {i+1}: {chart_config.get('title', 'Untitled')}")
            render_chart(chart_config)
    
    return result

# 7. Run the Agent

In [19]:
async def analyse(question: str, show_trace: bool = False):
    """Run the data analyst agent and display results."""
    result = await data_analyst.ainvoke({
        "messages": [HumanMessage(content=question)],
        "query_results": [],
        "output_type": None,
        "chart_type": None,
        "chart_config": None,
        "iteration_count": 0,
    })
    
    # Show trace if requested
    if show_trace:
        print("=" * 60)
        print("TRACE:")
        for i, msg in enumerate(result["messages"]):
            print(f"[{i}] {type(msg).__name__}: {str(msg.content)[:100]}...")
        print("=" * 60)
    
    # Print final answer
    final_msg = result["messages"][-1]
    print(f"\nAnswer: {final_msg.content}\n")
    
    # Show chart if suggested
    if result.get("output_type") == "chart" and result.get("query_results"):
        df = pd.DataFrame(result["query_results"])
        config = result["chart_config"] or {}
        
        if result["chart_type"] == "line":
            fig = px.line(df, x=config.get("x"), y=config.get("y"), title=question)
        else:
            fig = px.bar(df, x=config.get("x"), y=config.get("y"), title=question)
        fig.show()
    
    return result

## 8. Test It

In [20]:
# Simple query
result = await analyse("What were the top 5 baby names in 2020? And what were the top tech search queries in 2020?", show_trace=True)

TRACE:
[0] HumanMessage: What were the top 5 baby names in 2020? And what were the top tech search queries in 2020?...
[1] AIMessage: ...
[2] ToolMessage: Available datasets in bigquery-public-data:

1. **usa_names** - US baby names 1910-present
   - Tabl...
[3] AIMessage: ...
[4] ToolMessage: Schema for bigquery-public-data.usa_names.usa_1910_current (6,311,504 rows):
  - state: STRING
  - g...
[5] AIMessage: ...
[6] ToolMessage: {"columns": ["name", "total_births"], "rows": [{"name": "Liam", "total_births": 19777}, {"name": "No...
[7] AIMessage: [{'type': 'text', 'text': "The top 5 baby names in the US for 2020 were:\n\n1.  Liam (19,777 births)...
[8] ToolMessage: Technology is evolving and search engines are one of the most dynamic fields there is. ... searches ...
[9] AIMessage: ...
[10] ToolMessage: Google Search and YouTube are the two most -visited websites worldwide followed by Facebook and Twit...
[11] AIMessage: ...
[12] ToolMessage: In terms of the data itself, V1 Analytics 

In [34]:
result.get("messages")

[HumanMessage(content='What were the top 5 baby names in 2020?', additional_kwargs={}, response_metadata={}, id='8cc1e152-28a0-493f-a52f-69bf25b5ae76'),
 AIMessage(content='', additional_kwargs={'function_call': {'name': 'list_datasets', 'arguments': '{}'}, '__gemini_function_call_thought_signatures__': {'a4197be6-9210-4612-a00a-127e5e821288': 'CvABAePx/15OQXmSB9Wxo17mkDzYaPVPfxDkt0fh2ftaFaWFZGBbXbdlFI8nkXNbrQ4aTB04p3drKmJT9kl880wc9dxaj9972JKR1tlc0Z324T59Admn487HyCG7pvBvvZyI95wwAcdYQCp1cjyWsYm1zvCyaPBK77MhZrHsWUFwiXDHdLtMNasciiGbx0vHtn4pyU7mgVx5ZJyxYsBKzHsyJFpfEsjZttu42GR/rnYZGz0jL/2538xSFK/oq0y4QHXTcZyH2es98YECBf66l0MJzj75SR6ooaJKxOfL+9lhvrC3c/fgwfRtt1bOpPT51iOF'}}, response_metadata={'is_blocked': False, 'safety_ratings': [], 'usage_metadata': {'prompt_token_count': 257, 'candidates_token_count': 3, 'total_token_count': 312, 'prompt_tokens_details': [{'modality': 1, 'token_count': 257}], 'candidates_tokens_details': [{'modality': 1, 'token_count': 3}], 'thoughts_token_count': 52, 'ca

In [35]:
# Comparison (should produce bar chart)
result = await analyse("Compare total births by state for the name 'Noah' in 2020, top 10 states")


Answer: [{'type': 'text', 'text': "The top 10 states for the name 'Noah' in 2020 are California (2648 births), Texas (2117 births), and Florida (1330 births). New York and Illinois round out the top five with 1250 and 720 births, respectively.", 'thought_signature': 'CtUBAePx/14HT+pSVt+7J/kx79rnwKWpgzLYX7MqDKIwmj4+NObaI/tqqBMY+Eav25Foh1NlNNa00Lt/1T+1Eqn/0S5ru3R5hOLpfUcajTSCYzxA8qAEBeL9QohBQ01zAexrLCbA1YiILYNLZptZqE/N6Kd/MPnsRQBFPhC78qgJ0QoMhpaatlWVUygqY2LKUFMOvkkkUv7WYWz5qIsFnp1DWgsJU6HAwkVYRlDyqU3zS9/lgKM69hvspya+uPWVxAl6YsEPb6BKe7i7nagZkNF1VqwCXrsU'}]



In [11]:
# Google Trends (marketing/advertising data)
result = await analyse("What are the top 10 trending search terms in the US right now?", show_trace=True)

TRACE:
[0] HumanMessage: What are the top 10 trending search terms in the US right now?...
[1] AIMessage: ...
[2] ToolMessage: Available datasets in bigquery-public-data:
- google_trends: Search trends (top_terms, top_rising_te...
[3] AIMessage: ...
[4] ToolMessage: Error listing tables: 400 Unrecognized name: row_count at [2:33]; reason: invalidQuery, location: qu...
[5] AIMessage: ...
[6] ToolMessage: Schema for bigquery-public-data.google_trends.top_terms (43,968,750 rows):
  - score: INTEGER
  - ra...
[7] AIMessage: ...
[8] ToolMessage: {"columns": ["refresh_date", "dma_name"], "rows": [{"refresh_date": "2025-12-02", "dma_name": "Las V...
[9] AIMessage: ...
[10] ToolMessage: {"columns": ["term", "total_score"], "rows": [{"term": "barcelona", "total_score": 1151975}, {"term"...
[11] AIMessage: [{'type': 'text', 'text': "The top 10 trending search terms in the US for December 2nd, 2025, are: '...

Answer: [{'type': 'text', 'text': "The top 10 trending search terms in the US for Decem

In [37]:
result = await analyse("What are the most trending keywords in tech and AI world for the past 6 months? Show me a trend too. Use BigQuery public data.", show_trace=True)

TRACE:
[0] HumanMessage: What are the most trending keywords in tech and AI world for the past 6 months? Show me a trend too....
[1] AIMessage: ...
[2] ToolMessage: Available datasets in bigquery-public-data:
- google_trends: Search trends (top_terms, top_rising_te...
[3] AIMessage: ...
[4] ToolMessage: Tables in bigquery-public-data.google_trends:
  - top_rising_terms
  - international_top_rising_term...
[5] AIMessage: ...
[6] ToolMessage: Schema for bigquery-public-data.google_trends.top_rising_terms (43,968,750 rows):
  - score: INTEGER...
[7] AIMessage: ...
[8] ToolMessage: {"columns": ["term", "week", "score", "percent_gain"], "rows": [{"term": "browns vs raiders", "week"...
[9] AIMessage: ...
[10] ToolMessage: {"columns": ["term", "max_percent_gain", "max_score", "latest_week"], "rows": [{"term": "cowboys vs ...
[11] AIMessage: ...
[12] ToolMessage: {"columns": ["term", "max_percent_gain", "max_score", "latest_week"], "rows": [], "total_rows": 0}...
[13] AIMessage: [{'type': 'tex