## Example script to run the Error-Scanner AI tool via API

This example retrieve metadata from the Metadata Editor and detects evidently incorrect, inconsistent, or contradictory information.

UI - https://w1lxscirender02.worldbank.org:8080/ai_for_data_playground

In [1]:
import os
import re
import json
import time
import math
import shutil
import tempfile
import requests
import threading
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from gradio_client import Client, handle_file
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
def fetch_indicator_ids():
    url = "https://data360api.worldbank.org/data360/indicators"
    params = {
        "datasetId": "WB_CSC"
    }
    
    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()

    data = resp.json()
    
    return data

In [3]:
def fetch_indicator_metadata(indicator_id):
    base_metadata_url = "https://data360files.worldbank.org/data360-data/metadata/WB_CSC/{indicator_id}.json"
    url = base_metadata_url.format(indicator_id=indicator_id)
    r = requests.get(url, timeout=30)
    r.raise_for_status()
    return r.json()

In [4]:
# define a function to wait for a job and get the outputs
def wait_for_job_outputs(job):
    while job.done() != True:
        time.sleep(0.5)
    return job.outputs()

In [5]:
# define a function to extract JSON from the output text
def extract_json(text):
    idx = text.rfind("----------")
    text = text[idx:]
    match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)    
    if match:
        try:
            data = json.loads(match.group(1))
            return data
        except json.JSONDecodeError:
            return None
    return None

In [6]:
def atomic_dataframe_write(df, file_name):
    """
    Write a pandas dataframe to a CSV file in an atomic manner.
    """
    # Create a temporary file in the same directory as the target file
    dir_name = os.path.dirname(file_name)
    with tempfile.NamedTemporaryFile(mode='w', delete=False, dir=dir_name, suffix='.xlsx') as tf:
        df.to_excel(tf.name, index=False)
        temp_file_name = tf.name
    # Atomically move the temporary file to the target file
    shutil.move(temp_file_name, file_name)

In [7]:
# create a Gradio client instance
gradio_client = Client("https://w1lxscirender02.worldbank.org:8080/ai_for_data_playground", ssl_verify=False)

Loaded as API: https://w1lxscirender02.worldbank.org:8080/ai_for_data_playground/ ✔


In [33]:
def process_one(indicator_id):
    try:
        # create a new session
        job = gradio_client.submit(
            api_name="/error_scanner__create_session"
        )
        outputs = wait_for_job_outputs(job)
        session_id = outputs[0]

        # load agents manifest
        job = gradio_client.submit(
            file_name="error_scanner_kam_20251215.yml",
            session_id=session_id,
            api_name="/error_scanner__load_agents_manifest"
        )
        outputs = wait_for_job_outputs(job)
        agents_manifest = outputs[0][0]

        # create agents
        job = gradio_client.submit(
            agents_manifest=agents_manifest,
            gpt_model="gpt-5",
            session_id=session_id,
            api_name="/error_scanner__create_agents"
        )
        outputs = wait_for_job_outputs(job)
        
        # fetch metadata
        metadata = fetch_indicator_metadata(indicator_id)
        metadata_to_scan = metadata["series_description"]
        if "ref_country" in metadata_to_scan:
            del metadata_to_scan["ref_country"]
        if "geographic_units" in metadata_to_scan:
            del metadata_to_scan["geographic_units"]
        indicator_name = metadata_to_scan["name"]

        # start agents activity
        job = gradio_client.submit(
            metadata_to_scan=metadata_to_scan,
            session_id=session_id,
            api_name="/error_scanner__start_agents_activity",
        )
        outputs = wait_for_job_outputs(job)

        # Parse detected issues into pretty JSON array text
        issues_list = extract_json(outputs[-1][0])
        json_text = "[\n    " + ",\n    ".join(
            json.dumps(obj, ensure_ascii=False) for obj in issues_list
        ) + "\n]"

        # delete the session
        job = gradio_client.submit(
            session_id=session_id,
            api_name="/error_scanner__delete_session"
        )
        outputs = wait_for_job_outputs(job)

        return {
            "indicator_id": indicator_id,
            "indicator_name": indicator_name,
            "detected_issues": json_text,
        }

    except Exception as e:
        print(f"[Warning] Failed for indicator: {indicator_id}. Reason: {e}")
        return None

In [9]:
MAX_WORKERS = 10
def run_parallel(todo_items, output_df, output_file_name):
    results = output_df.to_dict(orient="records")

    # skip projects that are already completed
    done = set(output_df["indicator_id"].astype(str))
    todo = [indicator_id for indicator_id in todo_items if indicator_id not in done]

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futures = [ex.submit(process_one, indicator_id) for indicator_id in todo]
        for fut in tqdm(as_completed(futures), total=len(futures), desc="Scanning"):
            res = fut.result()
            if res is not None:
                results.append(res)

                df_results = pd.DataFrame(results) if results else pd.DataFrame(
                    columns=['indicator_id', 'indicator_name', 'detected_issues']
                )

                atomic_dataframe_write(df_results, output_file_name)
                

In [35]:
todo = fetch_indicator_ids()

In [36]:
output_file_name = "WB_CSC_detected_metadata_issues_20260206.xlsx"
if os.path.exists(output_file_name):
    output_df = pd.read_excel(output_file_name)
else:
    column_names = ['indicator_id', 'indicator_name', 'detected_issues']
    output_df = pd.DataFrame(columns=column_names)

In [37]:
run_parallel(todo, output_df, output_file_name)

Scanning:   3%|▎         | 2/64 [03:44<1:35:33, 92.48s/it] 



Scanning:   5%|▍         | 3/64 [03:45<51:51, 51.01s/it]  



Scanning:  31%|███▏      | 20/64 [15:48<39:51, 54.34s/it]  



Scanning:  39%|███▉      | 25/64 [17:25<14:08, 21.75s/it]



Scanning:  47%|████▋     | 30/64 [21:34<19:41, 34.76s/it]



Scanning:  48%|████▊     | 31/64 [21:36<13:39, 24.82s/it]



Scanning:  55%|█████▍    | 35/64 [23:08<11:45, 24.33s/it]



Scanning:  66%|██████▌   | 42/64 [25:28<05:19, 14.54s/it]



Scanning:  69%|██████▉   | 44/64 [26:56<10:24, 31.20s/it]Traceback (most recent call last):
  File "c:\Users\wb575476\anaconda3\envs\ai_for_data\Lib\site-packages\gradio_client\client.py", line 296, in stream_messages
    self.pending_event_ids.remove(event_id)
KeyError: 'c2f242850c6f4e4fa405aecb4d823cd3'
Scanning:  70%|███████   | 45/64 [27:24<09:33, 30.16s/it]



Scanning:  73%|███████▎  | 47/64 [27:24<04:37, 16.34s/it]



Scanning:  86%|████████▌ | 55/64 [27:26<00:27,  3.09s/it]



Scanning: 100%|██████████| 64/64 [36:40<00:00, 34.39s/it]
