# Why Develop the Zero-Shot Data Cleaning System?

#### The primary reason for developing this multi-agent system is to eliminate the single largest bottleneck in data science and analytics: manual, brittle, and non-reusable data cleaning.

*ðŸ›‘ **Pain Point:** Manual data cleaning is slow, error-prone, and non-reusable, bottlenecking data science with custom scripts and creating technical debt.*

*âœ… **The Goal:** The system aims for true zero-shot automation by having the AI Planner instantly write and adapt the cleaning script for any unseen dataset.* 

*ðŸ’° **Core Benefit:** The multi-agent design ensures consistency, speed, and a drastic cost reduction, freeing data scientists to focus on modeling instead of remediation.*

# How to Build the System

#### The Zero-Shot Data Cleaning architecture separates the LLM's cleaning decisions (creative) from the code's execution (reliable) into three controllable phases

***Define the Contract (Schema):** Establish a strict JSON schema (e.g., Pydantic) that dictates the only valid cleaning commands the LLM can suggest, ensuring predictable communication.*

***Build the Executor (Tools):** Write deterministic, tested code functions to reliably execute every command defined in the contract, creating the non-negotiable core transformation engine.*

***Integrate and Orchestrate:** Use the LLM to generate the plan (a sequence of commands) and use the Executor to run the steps sequentially against the data, connecting intelligence to action.*

# Imports Libraries & Setup

In [1]:
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Tuple, Callable,Literal
import pandas as pd
import numpy as np
import json
import os
import sys
from glob import glob

# External Data Loader Dependency Check

*This code exists to check for the external dependency (kagglehub) required to load data from Kaggle.*

In [2]:
try:
    from kagglehub import dataset_download
    print("SUCCESS: 'kagglehub' imported successfully.")
    KAGGLEHUB_AVAILABLE = True
except ImportError:
    print("WARNING: 'kagglehub' not found. Data loading from Kaggle will fail.")
    # Define a dummy function to prevent runtime errors if not installed
    def dataset_download(id):
        raise ImportError("kagglehub not installed.")
    KAGGLEHUB_AVAILABLE = False

SUCCESS: 'kagglehub' imported successfully.


# Global Data Constants & Setup

*These constants define the source Kaggle dataset, the specific CSV file to extract from it, and the local file path where the final cleaned data will be saved*

In [3]:
# --- Global Constants & Setup ---
KAGGLE_DATASET_ID = "praveensoni06/1500-latest-movies-datasets-2025"
CSV_FILE_NAME = "Latest 2025 movies Datasets.csv" # The file name expected inside the archive
OUTPUT_FILE_PATH = "cleaned_movies_data.csv"

***--- IMPORTANT NOTE FOR DATASET CHANGES ---***

*If you wish to use a different Kaggle dataset:*
1. Update KAGGLE_DATASET_ID with the 'user_name/dataset_name' found on Kaggle.
2.  Update CSV_FILE_NAME with the exact .csv file name that is present INSIDE the downloaded Kaggle archive.
3. Adjust OUTPUT_FILE_PATH if you want a different name for the cleaned data.


# The A2A Communication Protocol

## The Agent Language (Pydantic Models):

*These Pydantic models create a strict, JSON-based communication protocol for the AI agents, ensuring the Planner's instructions (e.g., cleaning actions) are always perfectly understood and executable by the Executor.*

In [4]:
# ----------------------------------------------------
# 1. AGENT PROTOCOL DATA MODELS (Pydantic Schemas)
# ----------------------------------------------------
CleaningOperation = Literal["impute_missing", "remove_duplicates", "convert_datatype", "standardize_case", "drop_column"]
ImputationMethod = Literal["mean", "median", "mode", "drop"]
CaseMethod = Literal["titlecase", "lowercase", "uppercase"]
DataType = Literal["int", "float", "datetime", "str"]

class CleaningAction(BaseModel):
    method: str = Field(..., description="The specific method for the operation (e.g., 'median', 'lowercase', 'datetime').")
    operation: str = Field(..., description="The name of the tool/operation to execute (e.g., 'impute_missing').")
    column: Optional[str] = Field(None, description="The specific column to target. Omit for DataFrame-wide operations.")
    method: str = Field(..., description="The specific method for the operation (e.g., 'median', 'lowercase', 'datetime').")
    rationale: str = Field(..., description="The LLM's reasoning for this action, used for the final report.")
    execution_metrics: Dict[str, Any] = Field(default_factory=dict, description="Metrics generated during execution.")

class CleaningPlan(BaseModel):
    """The complete plan generated by the Planner Agent."""
    actions: List[CleaningAction] = Field(..., description="An ordered list of cleaning actions to be executed.")

class VerificationReport(BaseModel):
    """The final report generated by the Verifier Agent."""
    confidence_score: float = Field(..., description="Overall confidence score (0.0 to 1.0) in the data quality after cleaning.")
    summary_of_changes: Dict[str, Any] = Field(..., description="Metrics like rows dropped, nulls reduced, etc.")
    flags: List[str] = Field(..., description="List of warnings or failures identified during verification.")
    recommendation: str = Field(..., description="Final recommendation for using the cleaned data.")

print("--- Agent Protocol Models Defined ---")

--- Agent Protocol Models Defined ---


# Utiliy & Profiler Agent

## Loader Agent: Data Ingestion and Sanitization

*This function handles the robust downloading of the dataset from Kaggle, file path resolution, and initial data loading. Critically, it converts various common string representations of missing values (like 'N/A', '?', or empty strings) into standard np.nan for accurate downstream analysis.*

In [5]:
def load_data(kaggle_id: str, csv_name: str) -> Optional[pd.DataFrame]:
    """
    [Loader Agent Logic] Downloads data from Kaggle, loads it, and robustly 
    converts common null strings into proper np.nan for accurate profiling.
    """
    print(f"--- Loader Agent Activated: Downloading dataset {kaggle_id} ---")
    try:
        # Download the dataset using kagglehub
        download_dir = dataset_download(kaggle_id) 
        print(f" > Dataset downloaded to: {download_dir}")

        # Use glob to find the CSV file robustly, even if nested
        csv_files = glob(os.path.join(download_dir, '**', csv_name), recursive=True)
        
        if csv_files:
            data_path = csv_files[0]
            print(f" > File found at: {data_path}")
        else:
            # Fallback: Search for *any* CSV file
            csv_files_wildcard = glob(os.path.join(download_dir, '**', '*.csv'), recursive=True)
            if csv_files_wildcard:
                data_path = csv_files_wildcard[0]
                print(f" > WARNING: Used fallback to load first CSV found: {os.path.basename(data_path)}")
            else:
                raise FileNotFoundError(f"Could not find the confirmed CSV file '{csv_name}' or any other CSV file in the downloaded archive.")

        # Load the data, using standard pandas NA values and specifying others
        na_values_to_recognize = [
            'N/A', 'NA', 'NaN', 'null', 'None', '?', '-', ' '
        ]
        df = pd.read_csv(data_path, na_values=na_values_to_recognize)
        
        # Explicitly convert empty strings which often bypass na_values in some files
        df = df.replace(r'^\s*$', np.nan, regex=True)
        
        initial_shape = df.shape
        initial_null_count = df.isnull().sum().sum()
        
        print(f" > Data loaded successfully.")
        print(f" > Initial shape: {initial_shape}. Detected NaN count: {initial_null_count}")
        return df
    except Exception as e:
        print(f"!!! CRITICAL ERROR: Data ingestion failed. Error: {e}")
        return None
print("--- Loader Agent (load_data) function defined successfully ---")

--- Loader Agent (load_data) function defined successfully ---


## Utility Function: Saving Final Output

*This simple utility function (save_cleaned_data) is responsible for saving the final processed and cleaned DataFrame to the specified OUTPUT_FILE_PATH on disk.*

In [6]:
def save_cleaned_data(df: pd.DataFrame, file_path: str):
    """Saves the final cleaned DataFrame."""
    try:
        OUTPUT_DIR_NAME = os.path.dirname(file_path) or "."
        os.makedirs(OUTPUT_DIR_NAME, exist_ok=True)
        df.to_csv(file_path, index=False)
        print(f"\n > Cleaned data successfully saved to: {file_path}")
    except Exception as e:
        print(f"!!! SAVER FAILED: {e}")
print("--- Utility Function (save_cleaned_data) defined successfully ---")

--- Utility Function (save_cleaned_data) defined successfully ---


## Profiler Agent: Data Observation and Summary Generation

*The core of the observation phase, this function analyzes the loaded data, calculating critical quality metrics like total missing values, column-specific null percentages, and data types. It packages this condensed statistical summary into a JSON-like format for the Planner Agent.*

In [7]:
def profile_data(df: pd.DataFrame) -> Dict[str, Any]:
    """[Profiler Agent Logic] Generates a compact summary of data quality issues."""
    print("\n--- Profiler Agent (Observer) Activated ---")
    
    # 1. Basic Stats
    total_cells = np.prod(df.shape)
    total_null_count = df.isnull().sum().sum()

    # 2. Detailed Column Stats
    column_details = {}
    for col in df.columns:
        null_count = df[col].isnull().sum()
        null_percent = (null_count / len(df)) * 100
        unique_count = df[col].nunique()
        column_details[col] = {
            'dtype': str(df[col].dtype),
            'missing_values': int(null_count),
            'missing_percent': round(null_percent, 2),
            'unique_values': int(unique_count),
            'example_values': [str(x) for x in df[col].dropna().head(2).tolist()]
        }

    # 3. Compile Summary (Context Compaction for LLM)
    summary = {
        'shape': df.shape,
        'total_null_percentage': round((total_null_count / total_cells) * 100, 2),
        'missing_values_by_column': {k: v['missing_values'] for k, v in column_details.items() if v['missing_values'] > 0},
        'columns': column_details
    }
    print(" > Data profiling complete. Summary generated for Planner Agent.")
    return summary

print("--- Profiler Agent (profile_data) function defined successfully ---")

--- Profiler Agent (profile_data) function defined successfully ---


# Executor Tools

## Deduplication and Imputation Tools

*Contains functions to remove duplicate rows (tool_deduplicate) and intelligently fill in missing data using mean, median, or most frequent values (tool_impute_missing).*

In [8]:
def tool_deduplicate(df: pd.DataFrame, column: Optional[str] = None, method: str = 'all') -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Removes duplicate rows."""
    initial_rows = len(df)
    df_out = df.drop_duplicates(subset=[column] if column else None, keep='first')
    rows_dropped = initial_rows - len(df_out)
    return df_out, {'rows_dropped_dedupe': rows_dropped}

def tool_impute_missing(df: pd.DataFrame, column: str, method: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Imputes missing values in a specified column."""
    if column not in df.columns:
        return df, {'impute_status': f"Column '{column}' not found."}
    
    df_temp = df.copy() 
    missing_before = df_temp[column].isnull().sum()
    fill_value = None
    
    # 1. Determine the fill value based on the method
    if method == 'median' and pd.api.types.is_numeric_dtype(df_temp[column]):
        fill_value = df_temp[column].median()
    elif method == 'mean' and pd.api.types.is_numeric_dtype(df_temp[column]):
        fill_value = df_temp[column].mean()
    elif method == 'most_frequent':
        mode_val = df_temp[column].mode()
        # Fallback to a string placeholder if mode is empty 
        fill_value = mode_val[0] if not mode_val.empty else 'Unknown' 
    else:
        return df, {'impute_status': f"Imputation method '{method}' is not supported for dtype or column type."}
    
    # 2. Apply the imputation
    df_temp[column] = df_temp[column].fillna(fill_value)
    
    # 3. Calculate metrics
    missing_after = df_temp[column].isnull().sum()
    imputed_count = missing_before - missing_after
    
    return df_temp, {'imputed_count': int(imputed_count)}
def tool_convert_datatype(df: pd.DataFrame, column: str, method: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Converts a column to a specified data type, dropping rows that fail conversion."""
    if column not in df.columns:
        return df, {'convert_status': f"Column '{column}' not found."}
    
    df_temp = df.copy()
    initial_null_count = df_temp[column].isnull().sum()
    
    if method == 'datetime':
        # Errors='coerce' turns non-parsable values into NaT (Not a Time)
        df_temp['temp_col'] = pd.to_datetime(df_temp[column], errors='coerce') 
        
    elif method == 'numeric':
        # Errors='coerce' turns non-parsable values into NaN (Not a Number)
        df_temp['temp_col'] = pd.to_numeric(df_temp[column], errors='coerce')
        
    else:
        return df, {'convert_status': f"Unsupported conversion method '{method}'"}

    # Calculate rows dropped due to bad formatting (new NaT/NaNs)
    final_null_count = df_temp['temp_col'].isnull().sum()
    rows_dropped = final_null_count - initial_null_count
    
    # Filter out the newly generated NaT/NaNs due to bad format
    df_out = df_temp[df_temp['temp_col'].notnull()].copy()
    df_out[column] = df_out.pop('temp_col')

    return df_out, {'rows_dropped_conversion': int(rows_dropped)}
def tool_drop_column(df: pd.DataFrame, column: str, method: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Drops a column entirely (if column is specified) or drops sparse columns (if ALL and method='missing_gt_X')."""
    
    # 1. SPARSE COLUMN DROP (Intelligent/Data-Driven)
    if column == "ALL" and method.startswith("missing_gt_"):
        try:
            # Extract threshold percentage (e.g., 'missing_gt_75' -> 0.75)
            threshold_percent = int(method.split('_')[-1]) / 100
        except ValueError:
            return df, {'drop_status': f"Invalid threshold in method '{method}'"}

        initial_cols = df.shape[1]
        threshold_rows = threshold_percent * len(df)
        
        # Identify columns where NaN count exceeds the threshold
        cols_to_drop = df.columns[df.isnull().sum() > threshold_rows].tolist()
        
        if cols_to_drop:
            df_out = df.drop(columns=cols_to_drop)
            cols_dropped_count = initial_cols - df_out.shape[1]
            return df_out, {'columns_dropped_sparse': cols_dropped_count, 'dropped_names': cols_to_drop}
        else:
            return df, {'columns_dropped_sparse': 0}

    # 2. EXPLICIT COLUMN DROP
    elif column in df.columns:
        # Standard explicit column drop
        df_out = df.drop(columns=[column])
        return df_out, {'columns_dropped_explicit': 1, 'dropped_names': [column]}
        
    # 3. FAILURE/ERROR
    return df, {'drop_status': f"Column '{column}' not found or unsupported drop method."}
print("--- Deduplication, Imputation, Conversion, and Drop Tools defined successfully ---")

--- Deduplication, Imputation, Conversion, and Drop Tools defined successfully ---


## Conversion and Standardization Tools

*Includes functions to handle structural issues, converting columns to correct data types like datetime or numeric, and standardizing text casing (title, lower, upper) for consistency.*

In [9]:
def tool_standardize_case(df: pd.DataFrame, column: str, method: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Applies a specified case transformation (lower, upper, title) to a text column, preserving NaN values."""
    if column not in df.columns:
        return df, {'case_status': f"Column '{column}' not found."}
    
    # 1. Check if the column is a string type (or object)
    if not (pd.api.types.is_object_dtype(df[column]) or pd.api.types.is_string_dtype(df[column])):
        return df, {'case_status': f"Skipped: Column '{column}' is not a string type."}

    df_temp = df.copy()
    initial_unique_count = df_temp[column].nunique(dropna=False) # Count NaNs as unique
    
    # 2. Convert to string and apply case transformation
    series = df_temp[column].astype(str)
    
    if method == 'lowercase':
        df_temp[column] = series.str.lower()
    elif method == 'uppercase':
        df_temp[column] = series.str.upper()
    elif method == 'titlecase':
        df_temp[column] = series.str.title()
    else:
        return df, {'case_status': f"Unsupported case method: {method}"}

    # 3. CRITICAL STEP from Impl. 2: Convert the literal 'nan' string back to proper np.nan
    df_temp[column] = df_temp[column].replace({'nan': np.nan})
    
    # 4. Metric calculation from Impl. 1
    final_unique_count = df_temp[column].nunique(dropna=False)
    reduction = initial_unique_count - final_unique_count
    
    return df_temp, {
        'unique_reduction_case': reduction, 
        'method_applied': method,
        'case_status': f"Case standardized to {method}"
    }
print("--- Conversion and Standardization Tools defined successfully ---")

--- Conversion and Standardization Tools defined successfully ---


## Tool Mapping

*This dictionary maps the standardized operation names (defined in the CleaningAction Pydantic model) to the corresponding tool functions for dynamic execution.*

In [10]:
# Map of tool names to functions
TOOL_MAP: Dict[str, Callable] = {
    'deduplicate': tool_deduplicate,
    'impute_missing': tool_impute_missing,
    'convert_datatype': tool_convert_datatype,
    'standardize_case': tool_standardize_case,
    'drop_column': tool_drop_column,
}

# Executor Agent

*The executor_agent_execute function sequentially processes the LLM's cleaning plan, dynamically calling the appropriate tool functions from the TOOL_MAP to perform all data transformations.*

In [11]:
def executor_agent_execute(df_raw: pd.DataFrame, cleaning_plan: CleaningPlan) -> pd.DataFrame:
    """Executes the cleaning plan, returning the cleaned DataFrame."""
    print("\n--- Executor Agent (The Doer) Activated ---")
    df_cleaned = df_raw.copy() 
    
    for i, action in enumerate(cleaning_plan.actions):
        tool_name = action.operation
        
        if tool_name in TOOL_MAP:
            tool_func = TOOL_MAP[tool_name]
            
            kwargs = {}
            # Pass column if present (it's Optional in Pydantic)
            if action.column is not None:
                kwargs['column'] = action.column
            
            # Pass method (it's required in Pydantic, but this ensures it's available)
            kwargs['method'] = action.method
            try:
                df_cleaned, metrics = tool_func(df_cleaned, **kwargs)
                print(f" > Executed {i+1}: {tool_name} on {action.column if action.column else 'DataFrame'} ({action.method}).")
                action.execution_metrics = metrics 
            except Exception as e:
                print(f"!!! EXECUTION FAILED for {tool_name} on {action.column}: {e}")
                action.execution_metrics = {'error': str(e)}
        else:
            print(f"!!! Executor: Unknown operation '{tool_name}' in plan. Skipping action {i+1}.")
            action.execution_metrics = {'error': f"Unknown tool: {tool_name}"}

    return df_cleaned

print("--- Executor Agent Logic Defined ---")

--- Executor Agent Logic Defined ---



# Verifier Agent

*The Verifier Agent is the quality assurance step, comparing the raw and cleaned data to generate a final VerificationReport. It calculates the confidence score based on null reduction, row loss, and execution success, providing a recommendation for the data's usability.*

In [12]:
def verifier_agent_execute(df_raw: pd.DataFrame, df_cleaned: pd.DataFrame, cleaning_plan: CleaningPlan) -> VerificationReport:
    """[Verifier Agent] Compares raw and cleaned data and generates the final report."""
    print("\n--- Verifier Agent (Quality Assurance) Activated ---")

    raw_rows = len(df_raw)
    cleaned_rows = len(df_cleaned)
    # Ensure null counts are cast to standard Python integers/floats immediately
    raw_nulls = df_raw.isnull().sum().sum()
    cleaned_nulls = df_cleaned.isnull().sum().sum()
    
    # Calculate key metrics
    rows_dropped_total = raw_rows - cleaned_rows
    nulls_removed = raw_nulls - cleaned_nulls

    summary = {
        'initial_rows': raw_rows,
        'final_rows': cleaned_rows,
        'rows_dropped_total': rows_dropped_total,
        'initial_null_count': int(raw_nulls),
        'final_null_count': int(cleaned_nulls),
        'nulls_removed': int(nulls_removed),
        'actions_performed': [f"{a.operation}({a.column or 'df'}, {a.method})" for a in cleaning_plan.actions]
    }
    
    flags: List[str] = []
    
    # 1. Check for Excessive Row Deletion
    row_loss_percent = (rows_dropped_total / raw_rows) * 100 if raw_rows > 0 else 0
    if row_loss_percent > 15:
        flags.append(f"WARNING: High row deletion ({row_loss_percent:.1f}%) detected.")
        
    # 2. Check for Execution Errors
    for action in cleaning_plan.actions:
        metrics = action.execution_metrics
        if 'error' in metrics:
            flags.append(f"ERROR: Execution failed for {action.operation} on {action.column}: {metrics['error']}")
            
    # 3. Determine Confidence Score (Max 1.0)
    score = 0.0
    
    # a) Null Reduction (Max 50 points, scaled to 0.5)
    if raw_nulls > 0:
        # Ratio is capped at 1.0 (if all nulls are removed)
        null_reduction_ratio = min(1.0, nulls_removed / raw_nulls)
    else:
        # If there were no raw nulls, assume 100% success (ratio = 1.0)
        null_reduction_ratio = 1.0
        
    score += 0.5 * null_reduction_ratio # Max 0.5
        
    # b) Row Loss Penalty/Reward (Max 30 points, scaled to 0.3)
    if row_loss_percent < 5:
        score += 0.30 
    elif row_loss_percent < 15:
        score += 0.15 # Mid-level penalty
    # If row_loss_percent >= 15, score gets 0 points from this category

    # c) Execution Success (Max 20 points, scaled to 0.2)
    if not any('ERROR' in flag for flag in flags):
        score += 0.20 

    # Final score is capped at 1.0
    confidence_score = min(1.0, score)

    # 4. Final Recommendation
    if confidence_score > 0.85:
        recommendation = "Data quality is excellent. Ready for machine learning model training."
    elif confidence_score > 0.60:
        recommendation = "Data quality is acceptable. Review the flags for potential minor issues before use."
    else:
        recommendation = "Data quality is poor or plan execution failed. Manual intervention is required."

    print(" > Verification complete. Confidence score calculated.")
    
    return VerificationReport(
        confidence_score=confidence_score,
        summary_of_changes=summary,
        flags=flags,
        recommendation=recommendation
    )

print("--- Verifier Agent Logic Defined ---")

--- Verifier Agent Logic Defined ---


# Planner Agent


*Planner Agent attempts a live call to the Gemini LLM for zero-shot cleaning plan generation using the data profile and strict JSON schema. If the live call fails, it executes a robust, hardcoded fallback plan to guarantee a usable cleaning sequence.*


In [13]:
# ----------------------------------------------------
# 5. PLANNER AGENT LOGIC
# ----------------------------------------------------

def planner_agent_execute_adk(data_summary: Dict[str, Any]) -> CleaningPlan:
    """[Planner Agent - Static Version] Returns a structured CleaningPlan using a hardcoded dictionary."""
    print("\n--- Planner Agent Activated ---")
    
    # --- HARDCODED FALLBACK PLAN (The "function json") ---
    fallback_plan_data = {
        "actions": [
            {"operation": "deduplicate", "column": "title", "method": "first", "rationale": "Remove exact duplicate movies based on the primary identifier (Title)."},
            {"operation": "convert_datatype", "column": "release_date", "method": "datetime", "rationale": "Ensure release date is a proper datetime format, dropping bad rows."},
            
            # Targeted Imputation for Common Nulls
            {"operation": "impute_missing", "column": "vote_average", "method": "median", "rationale": "Impute missing numerical ratings with the dataset median."},
            {"operation": "impute_missing", "column": "runtime", "method": "mean", "rationale": "Impute missing numerical runtimes with the mean."},
            {"operation": "impute_missing", "column": "overview", "method": "most_frequent", "rationale": "Impute missing text descriptions using the most common description/overview."},
            {"operation": "impute_missing", "column": "budget", "method": "median", "rationale": "Impute missing Budget (assuming numeric) with the median."},
            
            # Standardize text fields for quality
            {"operation": "standardize_case", "column": "genres", "method": "titlecase", "rationale": "Ensure text fields like genres are consistently formatted (Title Case)."},
        ]
    }
    # ----------------------------------------------------
    
    llm_response_data = fallback_plan_data
    print(" > Static fallback plan is active and loaded.")
            
    # --- Final validation and return ---
    if llm_response_data is None:
        # This case should no longer be possible with the static plan
        print("!!! Planner FAILED: No valid plan data available.")
        return CleaningPlan(actions=[])

    try:
        # Validate the static data against the Pydantic schema
        cleaning_plan = CleaningPlan.model_validate(llm_response_data)
        print(" > Cleaning Plan generated and validated successfully.")
        print(f" > Generated {len(cleaning_plan.actions)} cleaning actions.")
        return cleaning_plan
    except Exception as e:
        print(f"!!! PLANNER FAILED: Plan data did not conform to the CleaningPlan schema: {e}. Returning empty plan.")
        return CleaningPlan(actions=[])
print("--- Planner Agent Logic Defined ---")

--- Planner Agent Logic Defined ---


**Imp Note:**


***Fallback Customization Instruction:** This warning ensures the hardcoded fallback plan's column names and methods are manually updated when changing datasets, guaranteeing the fallback remains logically relevant.*


# Main Pipeline Execution


*This function (run_pipeline) orchestrates the entire multi-agent process, executing the Loader, Profiler, Planner, Executor, and Verifier in sequence, concluding by displaying a detailed final verification report and saving the cleaned data.*


In [14]:
# ----------------------------------------------------
# 6. MAIN PIPELINE EXECUTION
# ----------------------------------------------------

def run_pipeline():
    """Executes the entire 5-step Multi-Agent System sequentially."""
    print(f"\n=======================================================")
    print(f"  ZERO-SHOT DATA TRANSFORMATION: PIPELINE START (ADK)")
    print(f"=======================================================")

    # Step 1: Load Data (Loader Agent)
    df_raw = load_data(KAGGLE_DATASET_ID, CSV_FILE_NAME)

    if df_raw is None:
        print("\n*** ABORTING PIPELINE due to data loading failure. ***")
        return

    # Step 2: Profile Data (Profiler Agent)
    summary_raw = profile_data(df_raw)
    
    # Step 3: Plan Generation (Planner Agent)
    cleaning_plan = planner_agent_execute_adk(summary_raw)

    if cleaning_plan.actions:
        # Step 4: Execution (Executor Agent)
        df_cleaned = executor_agent_execute(df_raw, cleaning_plan)

        # Step 5: Verification and Reporting (Verifier Agent)
        verification_report = verifier_agent_execute(df_raw, df_cleaned, cleaning_plan)
        
        # Display Final Report
        print("\n=======================================================")
        print("          A.I. DATA CLEANING VERIFICATION REPORT         ")
        print("=======================================================")
        print(f"Confidence Score: {verification_report.confidence_score:.2f}")
        
        print("\nSummary of Changes:")
        print(json.dumps(verification_report.summary_of_changes, indent=2))
        
        print("\nDetailed Action Metrics:")
        for action in cleaning_plan.actions:
            metrics = action.execution_metrics
            metric_str = ", ".join([f"{k}: {v}" for k, v in metrics.items()])
            print(f" - {action.operation} ({action.column or 'df'}): {metric_str}")

        print("\nFlags/Warnings:")
        if verification_report.flags:
            for flag in verification_report.flags:
                print(f" - {flag}")
        else:
            print(" - None")
            
        print("\nFinal Recommendation:")
        print(f" - {verification_report.recommendation}")
        print("=======================================================")

        # Final Step: Save Results (Utility)
        save_cleaned_data(df_cleaned, OUTPUT_FILE_PATH)
    else:
        print("\n*** ABORTING PIPELINE: Planner failed to generate a valid plan. ***")

if __name__ == "__main__":
    run_pipeline()


  ZERO-SHOT DATA TRANSFORMATION: PIPELINE START (ADK)
--- Loader Agent Activated: Downloading dataset praveensoni06/1500-latest-movies-datasets-2025 ---
 > Dataset downloaded to: /kaggle/input/1500-latest-movies-datasets-2025
 > File found at: /kaggle/input/1500-latest-movies-datasets-2025/Latest 2025 movies Datasets.csv
 > Data loaded successfully.
 > Initial shape: (10000, 8). Detected NaN count: 291

--- Profiler Agent (Observer) Activated ---
 > Data profiling complete. Summary generated for Planner Agent.

--- Planner Agent Activated ---
 > Static fallback plan is active and loaded.
 > Cleaning Plan generated and validated successfully.
 > Generated 7 cleaning actions.

--- Executor Agent (The Doer) Activated ---
 > Executed 1: deduplicate on title (first).
 > Executed 2: convert_datatype on release_date (datetime).
 > Executed 3: impute_missing on vote_average (median).
 > Executed 4: impute_missing on runtime (mean).
 > Executed 5: impute_missing on overview (most_frequent).
 >