# Multi AI Agent Server + BigQuery Data Cleaning Stage Workflo


# Problem

Businesses face a paradox: data is abundant but often messy, fragmented, and unstructured, which makes it hard to harness effectively. As datasets grow in scale and complexity, the same problems recur:

- **Analyst bottlenecks & inefficiency**  
  Skilled analysts spend disproportionate time on repetitive cleaning, validation, and schema reconciliation. This delays insight, prevents teams from scaling, and diverts talent from higher-value modeling.

- **Inconsistent and fragile anomaly handling**  
  Ad-hoc fixes for missing values, anomalies, and duplicates introduce bias, reduce model reliability, and make pipelines brittle under new data conditions.

- **Lack of real-time, reasoning-capable frameworks**  
  Traditional pipelines perform static transformations and cannot reason about evolving data states. Without context-aware systems that update in real time, organizations suffer delayed insights, missed opportunities, and fragile decision-making.

---

# Objective

Gaby AI is a self-orchestrating data agent that embeds Google BigQuery as its knowledge backbone, enabling automated decision-making across data preparation, anomaly handling, and analytical workflows. By coupling reinforcement learning with in-database AI/ML functions (e.g., `AI.GENERATE`, `ML.FORECAST`), Gaby executes self-feedback learning cycles that integrate real-time insights from active pipelines and historical datasets. The agent efficiently forecasts and updates expected reward trajectories, aligning its decision policies with the evolving workflows of data teams. Leveraging BigQuery’s rapid retrieval and embedded machine learning capabilities, Gaby accelerates both business intelligence and advanced data science experimentation—delivering audit-ready results in sub-second turnaround times.
Gaby’s design goals:

- Produce data that is clean, context-aware, and auditable — suitable for high-stakes business decisions without continuous human babysitting.  
- Learn which remediation actions actually improve downstream utility instead of relying on brittle, one-off rules.  
- Scale incident detection and repair with traceable, reproducible workflows.

### Key objectives

- **Automate end-to-end cleaning:** deduplication, anomaly detection, imputation, and schema consistency.  
- **Build a reusable knowledge framework:** encode the “belief state” of data (table + vector forms) for transparency and interoperability.  
- **Employ reinforcement learning:** adapt dataset-specific strategies rather than using fixed rules.  
- **Preserve past insights:** maintain a dynamic observation store to enable context-aware reasoning.  
- **Deliver near-real-time insights:** leverage BigQuery’s streaming and query capabilities to produce sub-second to low-second responses where feasible.

# Solution

## 1. BigQuery as a database gatekeeper  
BigQuery stores raw and processed data plus derived observations (anomaly flags, feature statistics, cluster assignments). Automated pipelines generate human-readable schema documentation, structured cleaning flows, and database summaries — reducing preparation time from hours to minutes or seconds.

## 2. Reinforcement learning integration  
Gaby applies decision-focused learning patterns:

- **Contextual bandits** for low-latency, per-field remediation (e.g., choose between median imputation, model-based imputation, or flagging).  
- **Episodic policy learners** for multi-step repair sequences with delayed downstream effects (e.g., dedupe → impute → transform → retrain).

Policies are trained or simulated offline using *proxy rewards* (data-quality gains + quick downstream proxy checks) and are governed by safety rules and human-in-the-loop thresholds so high-risk actions require review.

## 3. Self-orchestrated tasks via agents
- **Code agents:** execute cleaning and diagnostics in sandboxed staging environments.  
- **Reasoning LLM:** coordinates workflows and produces concise, human-readable rationales for proposed fixes.  
- **Quantized LLMs:** compact, GGUF-format variants for housekeeping tasks and low-latency inference.

## 4. Real-time reasoning & validation  
Gaby uses BigQuery ML (clustering, forecasting, and quick retrains) and generative primitives to validate candidate fixes in staging. Actions that pass validation can be auto-committed for low-risk changes; higher-risk changes are flagged for human review. Example insights:

- “Sales dropped 8% in Region Y last quarter.”  
- “Inventory for Product Z is below forecast demand.”  
- “Customer Segment B is trending upward in lifetime value.”

Gaby AI is a self-orchestrating data agent that embeds Google BigQuery as its in-database knowledge backbone, enabling automated decision-making across data preparation, anomaly handling, and analytics workflows. By coupling reinforcement-learning policies with BigQuery’s native AI/ML primitives (e.g., `AI.GENERATE`, `ML.FORECAST`) the agent runs closed-loop self-feedback cycles: it proposes fixes, validates them with fast in-database checks, observes proxy downstream effects, and updates expected reward trajectories so its policies stay aligned with evolving data-team objectives. Because the belief-state, audit logs, policy checkpoints, and validation runs all live in BigQuery, Gaby delivers auditable, reproducible recommendations with near-real-time responsiveness — often achieving sub-second to low-second turnaround for diagnostics and low-risk fixes.

**Why this matters**
- **End-to-end automation:** replaces repetitive cleaning and validation work with safe, policy-driven automation so analysts can focus on modeling and insight.  
- **Adaptive, evidence-based decisions:** RL-driven policies learn which repairs actually improve downstream utility (not just reduce missingness), improving over time with observed outcomes.  
- **Speed & scale:** colocated reasoning and fast retrieval reduce ETL overhead and shrink time-to-insight for both BI dashboards and data-science experiments.  
- **Auditability & governance:** every suggested change includes a human-readable rationale, pre/post snapshots, and a recorded reward — enabling traceable, reviewable data fixes.  
- **Practical, measurable outcomes:** design experiments to demonstrate reductions in manual review rates, improvements in downstream model metrics (AUC/RMSE), and shorter ingestion→analytics latency.

**Evaluation metrics**
- % reduction in human-reviewed fixes (automation rate)  
- Δ downstream model performance (AUC / RMSE) after automated cleaning vs baseline  
- Mean time to first validated insight (seconds)  
- % of actions auto-committed vs flagged for review

Gaby’s colocated, learning-first approach therefore reduces analyst toil, improves model reliability, and accelerates safe, reproducible decision-making across production analytics and data-science workflows.

---

# Links & demo

- **Demo:** http://gaby.mimeus.com — *fallback:* Hugging Face demo: https://mimipynb-agent-sandbox.hf.space/  
  Dump any unclean dataset and watch Gaby do it all for you!

- **GitHub:** https://github.com/whoamimi/bigquery-comp  
- **Blog:** http://github.com/whoamimi


## Table of Contents 

- [Architecture](#architecture)
    - [Reinforcement Learning Architecture](#reinforcement-learning-architecture)
- [Setup Workspace](#setup-workspace)
    - [Project Configuration & BigQuery Setup](#project-configuration--bigquery-setup)
- [Prompts](#prompts)
    - [Episode Window Prompt Template](#episode-window-prompt-template)
- [Stage I: Data Cleaning](#stage-i-data-cleaning)
    - [Helper Functions](#helper-functions)
    - [Data Schemas](#data-schemas)
    - [Gaby Main Agent Workflow](#gaby-main-agent-workflow)
    - [BigQuery Gatekeeper](#bigquery-gatekeeper)
    - [Data Cleaning Pipeline Execution](#data-cleaning-pipeline-execution)
    - [Final Report](#final-report)
- [Stage II: Missing Data Values](#stage-ii-missing-data-values)
    - [Missing Data Pattern Analysis](#missing-data-pattern-analysis)
- [Agent Reinforcement Learning & LifeCycle](#agent-reinforcement-learning--lifecycle)
    - [Recent Observations Fetching](#recent-observations-fetching)

## Setup Workspace

This section initializes the development environment, imports required libraries, establishes BigQuery connections, and configures project-specific parameters for the Gaby AI data cleaning pipeline.

In [None]:
import pandas as pd
from dataclasses import dataclass
from google.cloud import bigquery

In [None]:
# Project path & core agent imports
# This cell sets up the Python path to import Gaby AI's core components
import sys
from pathlib import Path

# Define project root explicitly (adjust if repo moved)
PROJECT_ROOT = Path('/Users/mimiphan/mimeus-app/backend/gaby')
SRC_PATH = PROJECT_ROOT / 'src'
if str(SRC_PATH) not in sys.path:
    sys.path.insert(0, str(SRC_PATH))

try:
    from gaby_agent.core.agent._core import GabyBasement, Instructor
    print('✅ Imported GabyBasement & Instructor from gaby_agent.core.agent._core')
except ModuleNotFoundError as e:
    print('❌ Import failed. Check that SRC_PATH is correct and package __init__ files exist.')
    print(e)
except Exception as e:
    print('Unexpected error during import:', e)

✅ Imported GabyBasement & Instructor from gaby_agent.core.agent._core


In [3]:
client = bigquery.Client()

In [None]:
# BigQuery Configuration - Project, Dataset, and Model Setup
# This cell defines all the BigQuery resources used throughout the notebook

# CURRENT TESTING CONFIGURATION
BQ_PROJECT_ID = client.project
BQ_DATASET_ID = "cleaning_service"  # Database to store unclean data and workspace
BQ_TABLE_ID = "sample_dataset"      # actual data table
SAMPLE_FULL_ID = "genial-motif-472804-s1.cleaning_service.sample_dataset"
SAMPLE_SUMMARY_FULL_ID = "genial-motif-472804-s1.cleaning_service.field_summary"

# PRODUCTION DATABASE CONFIGURATION
# Dataset organization for different data types
BQ_DATASET_OBSERVATION_ID = "observations"  # Stores agent observations and learning data
BQ_DATASET_ACTION_ID = "cognitive"          # Stores agent decision patterns
BQ_DATASET_OUTPUT_ID = "cleaned_data"       # Stores final cleaned datasets

# Table naming conventions
BQ_TABLE_SUMMARY_ID = "field_summary"  # Summary tables prefixed with episode_id

# GENERATIVE AI MODEL CONFIGURATION
# BigQuery ML remote model configuration for Gemini
BQ_MODEL_CONNECTION = "projects/481034637222/locations/australia-southeast1/connections/__default_cloudresource_connection__"
BQ_MODEL_ENDPOINT = "projects/genial-motif-472804-s1/locations/australia-southeast1/publishers/google/models/gemini-2.5-flash"
BQ_MODEL_ID = "genial-motif-472804-s1.cleaning_service.gatekeeper"
DEFAULT_MODEL_TYPE = "gemini-2.5-flash"

# LOCAL DEVELOPMENT FILES
LOCAL_SAMPLE_PATH = "/Users/mimiphan/mimeus-app/backend/gaby/src/gaby_agent/data/input/dirty_cafe_sales.csv"

## Prompts

This section defines the prompt templates used by Gaby AI for decision-making and data analysis. These prompts guide the agent's reasoning process during data cleaning operations.

In [None]:
# Episode Window Prompt Template
# This prompt guides the agent's decision-making process during data cleaning episodes
EPISODE_WINDOW_PROMPT = """
You are a senior data analyst and currently performing data cleaning tasks.
{current_workspace}
Given the following knowledge of your current dataset, respond with your next action from one of the following options:
{action_space}
Your response must be one of the action options.
"""

## Stage I: Data Cleaning

This is the primary data cleaning stage where Gaby AI analyzes datasets, generates field descriptions, and performs initial data quality assessment. The stage integrates local Python processing with BigQuery's generative AI capabilities.

### Helper Functions

This subsection contains utility functions and decorators that facilitate data processing, BigQuery interactions, and DataFrame operations throughout the cleaning pipeline.

In [None]:
# Decorator for BigQuery SQL Execution
# This decorator automatically executes SQL queries and returns pandas DataFrames

import functools


def pandas_gatekeeper(func):
    """
    A decorator that executes a SQL query generated by a function and returns a DataFrame.

    This decorator intercepts the SQL string returned by the wrapped function,
    executes it using the provided BigQuery client, and returns the result
    as a pandas DataFrame.

    The decorated function must be called with a 'client' keyword argument
    of type `google.cloud.bigquery.Client`.

    Args:
        func: The function to be decorated, which should return a SQL query string.

    Returns:
        A wrapper function that executes the query and returns a DataFrame.

    Raises:
        TypeError: If the 'client' keyword argument is not provided or is not a
                   `bigquery.Client` instance.
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs):

        # client = bigquery.Client() -- Since running in notebook.

        # Call the original function to get the SQL query string
        sql_query = func(*args, **kwargs)

        print(f"--- Executing SQL from '{func.__name__}' ---")
        print(sql_query)

        # Execute the query and return the result as a pandas DataFrame
        job = client.query(sql_query)
        return job.to_dataframe()

    return wrapper

In [None]:
# HELPER FUNCTIONS FOR DATA ANALYSIS AND BIGQUERY OPERATIONS

def summarize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """
    Generates summary statistics for each column in a DataFrame.

    For each column, calculates:
    - Missing value count and percentage
    - Data type information
    - Unique value count (or "continuous" for numeric columns with >20 unique values)

    Args:
        df: Input pandas DataFrame to analyze

    Yields:
        dict: Summary statistics for each column
    """
    for col in df.columns:
        total_count = len(df)
        missing_count = df[col].isna().sum()
        data_type = df[col].dtype

        # Check if continuous: numeric with many unique values
        if pd.api.types.is_numeric_dtype(df[col]) and df[col].nunique() > 20:
            unique_vals = "continuous"
        else:
            unique_vals = df[col].nunique()

        yield {
            "data_field_name": col,
            "missing_count": missing_count,
            "total_count": total_count,
            "data_type": str(data_type),
            "unique_values": unique_vals
        }


def upload_dataframe_to_bq(df: pd.DataFrame, project_id: str, dataset_id: str, table_id: str):
    """
    Uploads a pandas DataFrame to a specified BigQuery table.

    Args:
        df: DataFrame to upload
        project_id: BigQuery project ID
        dataset_id: BigQuery dataset ID
        table_id: BigQuery table ID
    """

    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        autodetect=True
    )
    job = client.load_table_from_dataframe(
        df, table_ref, job_config=job_config)
    job.result()  # Wait for the job to complete.
    print(f"✅ Data uploaded to {table_ref}")

### Data Schemas 

This subsection defines the data structures and configuration classes used throughout the data cleaning pipeline. These schemas ensure consistent data handling and episode tracking across the system.

In [None]:
# Data Reports & Profilers for updating databases in BigQuery between sessions etc.

# PROFILERS
from uuid import uuid4
from datetime import datetime


@dataclass
class EpisodeConfig:
    input_id: str = SAMPLE_FULL_ID
    summary_id: str = SAMPLE_SUMMARY_FULL_ID
    # BIGQUERY MODEL STORED LOCS
    bq_model_connection: str = BQ_MODEL_CONNECTION
    bq_model_endpoint: str = BQ_MODEL_ENDPOINT
    bq_model_id: str = BQ_MODEL_ID
    default_model_type: str = DEFAULT_MODEL_TYPE
    # DEFAULT NAMES
    default_project_id: str = BQ_PROJECT_ID
    default_dataset_id: str = BQ_DATASET_ID
    default_table_id: str = BQ_TABLE_ID


@dataclass
class EntryReport:
    description: str | None = None
    data_field_summary: pd.DataFrame | None = None
    data_field_description: pd.DataFrame | None = None


@dataclass
class MissingDataReport:
    missing_pattern: str
    missing_count: float
    missing_perc: float
    data_field_type: str


@dataclass
class DataProfiler:
    # User Inputs
    data: pd.DataFrame
    user_input_tags: str | list | None = None

    # Defining the dataset
    description: str | None = None  # Describing the dataset in natural language
    data_field_summary: pd.DataFrame | None = None  # Data field summary table
    # Data field summary table with description columns
    data_field_description: pd.DataFrame | None = None
    _send_to_gatekeeper: bool = False

    # Episode ID & Configuration
    episode_id: str = uuid4().hex
    timestamp: str = datetime.now().isoformat()
    config: EpisodeConfig | None = None

    def __post_init__(self):
        self.define_dataset(self._send_to_gatekeeper)
        # the EpisodeConfig stored dataset id for the input dataset is set to self.episode_id-self.tiemstamp in prod
        # self.config = EpisodeConfig(
        #    input_dataset_id=f"{self.episode_id}-{self.timestamp}",
        #    summary_table_id=f"{self.episode_id}-{BQ_TABLE_SUMMARY_ID}"
        # )
        self.config = EpisodeConfig()

    def define_dataset(self, upload_summary: bool = False):
        if self.data.shape[0] == 0 or self.user_input_tags is None:
            raise ValueError(
                "The provided DataFrame is empty or data origin is not specified. Both these are required to start the workflow.")

        # assuming have loaded the model and returned it
        self.data_field_summary = pd.DataFrame.from_records(
            list(summarize_dataframe(self.data)))

        print(
            f"✅ Dataset defined with {self.data.shape[0]} rows and {self.data.shape[1]} columns.")

        if upload_summary is True:
            upload_dataframe_to_bq(
                self.data, BQ_PROJECT_ID, BQ_DATASET_ID, BQ_TABLE_ID)
            upload_dataframe_to_bq(
                self.data_field_summary, BQ_PROJECT_ID, BQ_DATASET_ID, BQ_TABLE_SUMMARY_ID)

        print(
            f"Completed profiling for dataset id: {self.episode_id} and uploaded to BQ.")

    @property
    def end_cleaning_report(self) -> EntryReport:
        return EntryReport(
            description=self.description,
            data_field_summary=self.data_field_summary,
            data_field_description=self.data_field_description
        )

    def episode_recap(self):
        context_prompt = f"--- Data Summary ---"

        end_cleaning_report = self.end_cleaning_report

        context_prompt += f"\nDataset Description: {end_cleaning_report.description}\n" if end_cleaning_report.description is not None else "\nNo dataset description available.\n"
        context_prompt += f"\nData Field Summary:\n{end_cleaning_report.data_field_summary.to_markdown(index=False)}\n" if end_cleaning_report.data_field_summary is not None else ""
        context_prompt += f"\nData Field Description:\n{end_cleaning_report.data_field_description.to_markdown(index=False)}\n" if end_cleaning_report.data_field_description is not None else ""

        return context_prompt

In [9]:
# load the datasets into workspace
data = pd.read_csv(LOCAL_SAMPLE_PATH, header=0)
gb = DataProfiler(data=data, user_input_tags=["sales", "beverages", "cafe"])

✅ Dataset defined with 10000 rows and 8 columns.
Completed profiling for dataset id: 8fcb0efffe114abe8ec09730c159829c and uploaded to BQ.


### Gaby Main Agent Workflow

This subsection implements the core AI agents responsible for dataset analysis and field description generation. These agents use the GabyBasement framework with structured prompts to analyze data characteristics.

In [None]:
""" clean_stage_a.py """


class DatasetSummarizer(
    GabyBasement,
    prompt=Instructor(
        prompt="""
        You are a senior data analyst. Based on the dataset’s fields and descriptive metadata, provide a concise summary (no more than 2 sentences) that highlights:
        •	the dataset’s key characteristics and notable features or patterns,
        •	potential modeling or analytical objectives it may support, and
        •	whether the dataset contains any unique identifiers.
        """,
        input_template="""
        Dataset descriptive labels: {user_inputs},
        Dataset Subset:
        {data_table}
        """
    )
):
    pass


class DataFieldMetaDescription(
    GabyBasement,
    prompt=Instructor(
        prompt="You are a data analyst. Given the dataset description and a specific data field label, return a concise description of what the data field possibly means in the context of the dataset. Return your response in at most 1 sentence.",
        input_template="""
        Dataset Description: {data_description}
        Data Field Label: {data_label}
        Data Sample: {data_sample}
        """
    )
):
    def run_loop(self, data: pd.DataFrame, data_description: str) -> dict:
        """ Run the description for each data field in the dataframe. """

        descriptions = {}

        for column in data.columns:
            sample = data[column].dropna().unique()[:3].tolist()
            sample_str = ", ".join(map(str, sample))

            description = self.run(
                data_description=data_description,
                data_label=column,
                data_sample=sample_str
            )

            descriptions[column] = [
                {
                    'name': self.post_process(description),
                    'data_type': str(data[column].dtype),
                    'description': description
                }
            ]

        return descriptions

### BigQuery Gatekeeper 

This subsection implements the BigQuery-based AI functions that leverage Google's generative AI models for data field analysis. It demonstrates direct integration with BigQuery's `AI.GENERATE` function for scalable data processing.

In [None]:
SQL_DESCRIBE_DATA_FIELD_LABEL = """
SELECT
  data_field_name,
  AI.GENERATE( ('The data field name ',
      data_field_name,
      'with values of data type,',
      data_type,
      'is one of the dataset column labels of a dataset with description: cafe sale logs. In a sentence, define what each data field represent.'),
    connection_id => '{connection_id}',
    endpoint => '{endpoint}',
    output_schema => 'data_field_name STRING, description STRING').description
FROM
  {data_summary_id};
"""

# 2 methods in this file to distinguish numeric values.
# Ordinal: categories have a natural order but no consistent distance between them (e.g., low, medium, high)
SQL_DETECT_NUMERIC_FIELD = """
SELECT
  data_field_name,
  AI.GENERATE( ('The data field name ',
      data_field_name,
      'with values of data type,',
      data_type,
      'is one of the dataset column labels of a dataset with description: cafe sale logs.'
      'Classify the dataset field into one of: Nominal, Ordinal, Continuous, Unknown, if the data type is numerical and if not, return Unknown. Return the result strictly as: {"data_type": "<Nominal|Ordinal|Continuous|Unknown>"}'
      ),
    connection_id => '{connection_id}',
    endpoint => '{endpoint}',
    output_schema => 'data_field_name STRING, numeric_type STRING').numeric_type
FROM
  {data_summary_id};
"""


@pandas_gatekeeper
def describe_data_field(
    data_summary_id: str,
    connection_id: str = BQ_MODEL_CONNECTION,
    endpoint: str = DEFAULT_MODEL_TYPE
):
    return SQL_DESCRIBE_DATA_FIELD_LABEL.format(
        data_summary_id=data_summary_id,
        connection_id=connection_id,
        endpoint=endpoint
    )


@pandas_gatekeeper
def detect_numeric_field(
    data_summary_id: str,
    connection_id: str = BQ_MODEL_CONNECTION,
    endpoint: str = DEFAULT_MODEL_TYPE
):
    return SQL_DETECT_NUMERIC_FIELD.format(
        data_summary_id=data_summary_id,
        connection_id=connection_id,
        endpoint=endpoint
    )

# gb.data_field_description = describe_data_field(
#    data_summary_id=SAMPLE_SUMMARY_FULL_ID,
#    connection_id=BQ_MODEL_CONNECTION,
#    endpoint=DEFAULT_MODEL_TYPE
# )

### Workflow

This subsection orchestrates the complete data cleaning pipeline, combining local processing with BigQuery AI capabilities. It demonstrates fallback mechanisms when cloud resources are unavailable.

In [None]:
def data_cleaning_pipeline(report: DataProfiler) -> DataProfiler:
    """ Main function to run the data cleaning pipeline. """

    report.description = DatasetSummarizer().run(
        user_inputs=report.user_input,
        data_table=report.data.head(3).to_string(index=False)
    )

    try:
        report.data_field_description = describe_data_field(
            connection_id=report.config.bq_model_connection,
            model_endpoint=report.config.default_model_type,
        )

        report.numeric_table = detect_numeric_field(
            connection_id=report.config.bq_model_connection,
            endpoint=report.config.default_model_type,
            data_summary_id=report.config.summary_id
        )
    except Exception as e:
        print(
            "Error using GCP model, falling back to local model (takes longer to run):", e)

        report.data_field_description = DataFieldMetaDescription().run_loop(
            data=report.data,
            data_description=report.description
        )
        report.numeric_field = None
    return report


gb = data_cleaning_pipeline(gb)

### Final Report 

This subsection displays the results of the data cleaning pipeline. It shows the generated dataset summaries, field descriptions, and data quality assessments produced by Gaby AI.

In [None]:
gb.data_field_summary

Unnamed: 0,data_field_name,missing_count,total_count,data_type,unique_values
0,Transaction ID,0,10000,object,10000
1,Item,333,10000,object,10
2,Quantity,138,10000,object,7
3,Price Per Unit,179,10000,object,8
4,Total Spent,173,10000,object,19
5,Payment Method,2579,10000,object,5
6,Location,3265,10000,object,4
7,Transaction Date,159,10000,object,367


In [None]:
print(gb.description)

Here is a summary of the dataset in 2 sentences:

This dataset appears to track sales transactions from a cafe, capturing details such as items purchased, quantities, prices per unit, and total spent at each location. The data contains some inconsistencies due to missing values or unknown payment methods, but overall suggests that the cafe sells beverages like coffee and cake, with customers often purchasing in-store.


In [None]:
gb.data_field_description

Unnamed: 0,data_field_name,description
0,Total Spent,This field represents the total amount of mone...
1,Payment Method,This field represents the method of payment us...
2,Item,The 'Item' field represents the name of the pr...
3,Location,This field represents the geographical locatio...
4,Quantity,Represents the number of items sold in a cafe ...
5,Transaction ID,cafe sale logs
6,Price Per Unit,This field represents the cost of a single ite...
7,Transaction Date,This data field represents the date of a trans...


## Stage II: Missing Data Values

This stage focuses on analyzing missing data patterns and recommending appropriate handling strategies. It demonstrates advanced BigQuery AI capabilities for context-aware missing data analysis.

In [None]:
SQL_MISSING_DATA_PATTERN_METHOD = """
SELECT
  data_field_name,
  data_type,
  AI.GENERATE( ('The data field name ',
      data_field_name,
      'is sourced from dataset with description: car sale logs.',

      'Data field is of data type,',
      data_type,
      'and has a total',
      missing_count,
      'number of missing values. Suggest the most common method in dealing with missing datasets of this data type and context. Your response must only contain one of the values fromthe following: [\'imputation\', \'drop_missing\', ].'),
    connection_id => '{connection_id}',
    endpoint => '{endpoint}',
    output_schema => 'data_field_name STRING, description STRING').description
FROM
  {data_summary_id};
"""

@pandas_gatekeeper
def suggest_missing_data_pattern_method(
    data_summary_id: str,
    connection_id: str = BQ_MODEL_CONNECTION,
    endpoint: str = DEFAULT_MODEL_TYPE
):
    return SQL_MISSING_DATA_PATTERN_METHOD.format(
        data_summary_id=data_summary_id,
        connection_id=connection_id,
        endpoint=endpoint
    )

### Helper Functions & Utils

This subsection contains specialized functions for missing data pattern analysis and recommendation generation using BigQuery's generative AI capabilities.

In [None]:
SQL_MISSING_DATA_PATTERN_METHOD = """
SELECT
  data_field_name,
  data_type,
  AI.GENERATE( ('The data field name ',
      data_field_name,
      'is sourced from dataset with description: car sale logs.',

      'Data field is of data type,',
      data_type,
      'and has a total',
      missing_count,
      'number of missing values. Suggest the most common method in dealing with missing datasets of this data type and context. Your response must only contain one of the values fromthe following: [\'imputation\', \'drop_missing\', ].'),
    connection_id => '{connection_id}',
    endpoint => '{endpoint}',
    output_schema => 'data_field_name STRING, description STRING').description
FROM
  {data_summary_id};
"""

In [None]:
def select_missing_data_pattern(
    client: bigquery.Client,
    summary_table_id: str,
    model_id: str,
    data_context: str = "general business data"
) -> pd.DataFrame:
    """
    This function only runs when there have been no previous observations related to similar datasets.
    """

    # This query constructs a prompt for each row and calls the model.
    # It returns the original field name alongside the generated description.
    query = f"""
    SELECT
      data_field_name,
      ai_generate_table_result AS description
    FROM
      AI.GENERATE_TABLE(
        MODEL `{model_id}`,
        (
          SELECT
            data_field_name,
            CONCAT(
              'Given a dataset about {data_context}, ',
              'provide a one-sentence business description for a data field named: ',
              data_field_name
            ) AS prompt
          FROM
            `{summary_table_id}`
        ),
        STRUCT(
          0.2 AS temperature,
          20 AS max_output_tokens
        )
      );
    """
    print("--- Running Query to Generate Descriptions ---")
    print(query)

    # Execute the query and return the results as a DataFrame
    job = client.query(query)
    return job.to_dataframe()


data_field_descriptions_take_2 = generate_descriptions_for_fields(
    client=client,
    summary_table_id=SAMPLE_SUMMARY_FULL_ID,
    model_id=BQ_MODEL_ID,
    data_context="cafe sales data including beverages and food items"
)

#### Stage 3: Detect Anomalities 


In [None]:
def get_continuous_fields(df: pd.DataFrame,
                          numeric_types=("INT64", "FLOAT64", "NUMERIC"),
                          min_unique=20,
                          min_unique_ratio=0.05):
    """
    Return only the continuous fields from a dataset summary DataFrame.

    Args:
        df (pd.DataFrame): Must have columns [data_field_name, data_type, unique_values, total_count].
        numeric_types (tuple): BigQuery numeric types considered numeric.
        min_unique (int): Minimum unique values threshold.
        min_unique_ratio (float): Minimum ratio (unique / total_count).

    Returns:
        pd.DataFrame: Filtered DataFrame containing only continuous fields.
    """
    mask = (
        df["data_type"].isin(numeric_types)
        & (
            (df["unique_values"] > min_unique) |
            (df["unique_values"] / df["total_count"] > min_unique_ratio)
        )
        & (df["unique_values"] < df["total_count"])  # exclude pure IDs
    )
    return df.loc[mask].reset_index(drop=True)