In [2]:
from contextlib import closing
import psycopg2
from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine
from tqdm import tqdm

dataset_path = Path('./archieved').resolve() / 'dataset'
datasets = dataset_path.glob('*.csv')

DATASET_TABLE_MAP = {
        "noaa_tsunami": "historical_runups",
        "thelook_ecommerce": "orders",
        "sunroof_solar": "solar_potential_by_postal_code",
        "sdoh_bea_cainc30": "fips",
        "medicare": "outpatient_charges_2014",
        "patents_dsep": "disclosures_13",
        "ncaa_basketball": "mbb_historical_teams_games"
    }
ENGINE_URL = "postgresql+psycopg2://{db_user}@{db_url}/{db_name}".format(
    db_user="postgresml",
    db_url="localhost:25432", # "localhost:25432", # "147.47.236.50:25432",
    db_name="postgresml"
)
PGML_SCHEMA = "public"
CSV_DIR = Path("./archieved/dataset")
ENGINE = create_engine(ENGINE_URL, future=True)

def insert_to_postgres():
    for csv_path in CSV_DIR.glob("*.csv"):
        base = csv_path.stem.removesuffix("_cleaned")
        table = DATASET_TABLE_MAP.get(base, base)
        with open(csv_path, "r", encoding="utf-8-sig") as f:
            total = len(f.readlines())
        first = True
        for chunk in tqdm(pd.read_csv(csv_path, chunksize=100_000, encoding="utf-8-sig"), total=total//100_000):
            chunk.to_sql(
                name=table,
                con=ENGINE,
                schema=PGML_SCHEMA,
                if_exists="replace" if first else "append",
                index=False,
                method="multi"
            )
            first = False
        print(f"✓ {csv_path.name} -> {PGML_SCHEMA}.{table}")

# insert_to_postgres()

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

def execute_query(query: str, params: dict = None, fetch: bool = False):
    """
    Execute a raw SQL query with SQLAlchemy.
    
    Args:
        query (str): The SQL query string (use :param for placeholders).
        params (dict): Dictionary of parameters to bind.
        fetch (bool): Whether to fetch results (for SELECT queries).
    
    Returns:
        list of dicts if fetch=True, else number of affected rows.

    Usage:
    ```
    # SELECT query
    rows = execute_query("SELECT * FROM users WHERE age > :age", {"age": 21}, fetch=True)
    print(rows)

    # INSERT query
    inserted = execute_query(
        "INSERT INTO users (name, age) VALUES (:name, :age)",
        {"name": "Alice", "age": 25}
    )
    print(f"Inserted rows: {inserted}")

    # UPDATE query
    updated = execute_query(
        "UPDATE users SET age = :age WHERE name = :name",
        {"name": "Alice", "age": 26}
    )
    print(f"Updated rows: {updated}")
    ```
    """
    
    try:
        with ENGINE.connect() as connection:
            result = connection.execute(text(query), params or {})
            
            if fetch:
                # Fetch results as list of dicts
                rows = [dict(row._mapping) for row in result]
                return rows
            else:
                # Commit changes for write queries
                connection.commit()
                return result.rowcount
    except SQLAlchemyError as e:
        print(f"Database error: {e}")
        return None
    

def create_random_split(table_name: str, split_criteria: float=0.9, train_view_name="train", test_view_name="test"):
    """
    Create a stable random train/test split by materializing a random number into a new table.
    Works without a primary key.
    """
    engine = create_engine(ENGINE_URL, future=True)
    table_with_rnd = f"{PGML_SCHEMA}.{table_name}_with_rnd"
    with engine.begin() as conn:
        # Drop old objects if they exist
        conn.execute(text(f"DROP VIEW IF EXISTS {train_view_name} CASCADE"))
        conn.execute(text(f"DROP VIEW IF EXISTS {test_view_name} CASCADE"))
        conn.execute(text(f"DROP TABLE IF EXISTS {table_with_rnd} CASCADE"))
        # Get original column Names
        original_columns = conn.execute(text(f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}'")).fetchall()
        original_columns = [col[0] for col in original_columns]
        col_list = ", ".join(original_columns)
        # Create a split table with a random assignment column
        conn.execute(text(f"""
            CREATE TABLE {table_with_rnd} AS
            SELECT *, RANDOM() AS rnd
            FROM {table_name};
        """))

        # Train view
        conn.execute(text(f"""
            CREATE VIEW {table_name}_{train_view_name} AS
            SELECT {col_list} 
            FROM {table_with_rnd}
            WHERE rnd < {split_criteria};
        """))

        # Test view
        conn.execute(text(f"""
            CREATE VIEW {test_view_name} AS
            SELECT {col_list} 
            FROM {table_with_rnd}
            WHERE rnd >= {split_criteria};
        """))

    print(f"Stable random split created: '{train_view_name}' and '{test_view_name}'")

def drop_views(table_name: str):
    execute_query(f"DROP VIEW IF EXISTS {table_name}_train CASCADE")
    execute_query(f"DROP VIEW IF EXISTS {table_name}_test CASCADE")
    print(f"Dropped views for table: {table_name}")

In [3]:
#TODO 
# 1. train/test 셋 생성 o
# 2. 모델 훈련
# 3. test 데이터 퍼포먼스 저장
# 4. Skyline 확인

table_specific_exclusions = {
    "orders": [], #done
    "disclosures_13": ["wg_name"], #done
    "outpatient_charges_2014": [], #done, no missing values
    "fips": ["Proprietors_income", "Nonfarm_proprietors_income", "Net_earnings_by_place_of_residence"], #done
    "mbb_historical_teams_games": [], #done
    "historical_runups": ["id", "timestamp", "location_name"],
    "solar_potential_by_postal_code": [],
}

In [4]:
table_name = "historical_runups"
query = f"""SELECT * FROM public.{table_name};"""
x = execute_query(query, fetch=True)
df = pd.DataFrame(x)
cols_list = ", ".join([c for c in df.columns.tolist() if c not in table_specific_exclusions.get(table_name, [])])
cols_list

'tsevent_id, year, month, day, doubtful, country, state, latitude, longitude, region_code, distance_from_source, arr_day, arr_hour, arr_min, travel_time_hours, travel_time_minutes, water_ht, horizontal_inundation, type_measurement_id, period, first_motion, deaths, deaths_description, injuries, injuries_description, damage_millions_dollars, damage_description, houses_damaged, houses_damaged_description, houses_destroyed, houses_destroyed_description'

In [119]:
create_random_split("historical_runups")
drop_views("historical_runups")

Stable random split created: 'train' and 'test'
Dropped views for table: historical_runups


In [5]:
relation_name = f"{PGML_SCHEMA}.{table_name}_view"
query = f"""CREATE VIEW {relation_name} AS (
SELECT {cols_list}
FROM {PGML_SCHEMA}.{table_name}
);"""

print(query)

CREATE VIEW public.historical_runups_view AS (
SELECT tsevent_id, year, month, day, doubtful, country, state, latitude, longitude, region_code, distance_from_source, arr_day, arr_hour, arr_min, travel_time_hours, travel_time_minutes, water_ht, horizontal_inundation, type_measurement_id, period, first_motion, deaths, deaths_description, injuries, injuries_description, damage_millions_dollars, damage_description, houses_damaged, houses_damaged_description, houses_destroyed, houses_destroyed_description
FROM public.historical_runups
);


In [6]:
import json

def build_pgml_train_sql(project_name: str, task: str, relation_name: str, y_column_name: str, algorithm: str, preprocess: dict | None = None, test_size: float | None = None):
    """
    
    preprocess: 
        impute: `error`, `mean`, `median`, `mode`, `min`, `max`, `zero`
        scale: `preserve`, `standard`, `min_max`, `max_abs`, `robust`
        encode: `native`, `target`, `one_hot`, `ordinal`
    """
    args = [
        f"project_name => '{project_name}'",
        f"task => '{task}'",
        f"relation_name => '{relation_name}'",
        f"y_column_name => '{y_column_name}'",
        f"algorithm => '{algorithm}'",
    ]
    if preprocess:
        args.append(f"preprocess => '{json.dumps(preprocess, ensure_ascii=False)}'")
    if test_size is not None:
        args.append(f"test_size => {test_size}")
    return "SELECT pgml.train(\n    " + ",\n    ".join(args) + "\n);\n"

model_families = {
    "regression": ['xgboost', 'ada_boost', 'random_forest', 'gradient_boosting_trees', 'bagging', 'svm', 'ridge', 'bayesian_ridge'],
    "classification": ['xgboost', 'ada_boost', 'random_forest', 'gradient_boosting_trees', 'bagging', 'svm', 'ridge'],
    "clustering": ['kmeans', 'mini_batch_kmeans'],
}

query = build_pgml_train_sql(
    project_name=f"noaa_tsunami_{PGML_SCHEMA}.historical_runups",
    task="regression",
    relation_name=relation_name,
    y_column_name="period",
    algorithm="xgboost",
    preprocess={"country": {"impute": "mode"}},
    test_size=0.2,
    
)

print(query)


SELECT pgml.train(
    project_name => 'noaa_tsunami_public.historical_runups',
    task => 'regression',
    relation_name => 'public.historical_runups_view',
    y_column_name => 'period',
    algorithm => 'xgboost',
    preprocess => '{"country": {"impute": "mode"}}',
    test_size => 0.2
);



In [123]:
execute_query(query)

Database error: (psycopg2.errors.InternalError_) 0 missing values for country. You may provide a preprocessor to impute a value. e.g:

 pgml.train(preprocessor => '{"country": {"impute": "mean"}}'

[SQL: SELECT pgml.train(
    project_name => 'noaa_tsunami_public.historical_runups',
    task => 'regression',
    relation_name => 'public.historical_runups_view',
    y_column_name => 'period',
    algorithm => 'xgboost',
    test_size => 0.2
);
]
(Background on this error at: https://sqlalche.me/e/20/2j85)


Total unique intent: 225 | S1: 131 | S2: 94


In [None]:
import os
import psycopg2
from google.cloud import bigquery
from dotenv import load_dotenv
import logging

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Load environment variables from a .env file
load_dotenv()
PROJECT_ID = os.getenv("PROJECT_ID")

In [None]:
import json
import hashlib
import random

# --- CONFIGURATION ---
jsonl_path = "/content/data_for_SQL_generation.json" # File with only selected datasets
output_path = "/content/SQLs.jsonl"
PROJECT_ID = "crypto-isotope-366706"
DATA_ID = "bigquery-public-data"

BigqueryML models

In [None]:
model_families = {
    "Time Series": {
        "regression": ["ARIMA_PLUS", "ARIMA_PLUS_XREG", "LINEAR_REG", "BOOSTED_TREE_REGRESSOR", "DNN_REGRESSOR", "DNN_LINEAR_COMBINED_REGRESSOR" "RANDOM_FOREST_REGRESSOR"],
        "classification": ["LOGISTIC_REG", "BOOSTED_TREE_CLASSIFIER", "DNN_CLASSIFIER", "DNN_LINEAR_COMBINED_CLASSIFIER", "RANDOM_FOREST_CLASSIFIER"],
        "clustering": ["KMEANS"],
        "anomaly_detection": ["KMEANS", "ARIMA_PLUS", "ARIMA_PLUS_XREG"]

    },
    "Non Time Series":{
        "regression": ["LINEAR_REG", "BOOSTED_TREE_REGRESSOR", "DNN_REGRESSOR", "DNN_LINEAR_COMBINED_REGRESSOR", "RANDOM_FOREST_REGRESSOR"],
        "classification": ["LOGISTIC_REG", "BOOSTED_TREE_CLASSIFIER", "DNN_CLASSIFIER", "DNN_LINEAR_COMBINED_CLASSIFIER", "RANDOM_FOREST_CLASSIFIER"],
        "clustering": ["KMEANS"],
        "anomaly_detection": ["KMEANS"]
    }
}

PostgreML models

In [7]:
import os
import json
import re
import uuid

PGML_SCHEMA = os.getenv("PGML_SCHEMA", "public")
PGML_ALLOWED = {
    "regression": ['xgboost', 'ada_boost', 'random_forest', 'gradient_boosting_trees', 'bagging', 'svm', 'ridge', 'bayesian_ridge'],
    "classification": ['xgboost', 'ada_boost', 'random_forest', 'gradient_boosting_trees', 'bagging', 'svm', 'ridge'],
    "clustering": ['kmeans', 'mini_batch_kmeans'],
}
PGML_DEFAULT = {
    "regression": "xgboost",
    "classification": "xgboost",
    "clustering": "kmeans",
}
COMPARISON_OPS = {">", "<", ">=", "<="}

Excluded columns (Bigquery) and missing values (for pgml)

In [3]:
import os
from typing import Literal, Optional, Any
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values

class TemplateGenerator():
    comparison_ops = {">", "<", ">=", "<="}

    def __init__(self, platform_type: Literal["bigquery", "postgresql"]):
        self.platform_type = platform_type

    def extract_tag_value(self, tagged_str: str, tag: str) -> str:
        # Extracts <tag>value</tag> from a string
        if not tagged_str:
            return ""
        import re
        pattern = f"<{tag}>(.*?)</{tag}>"
        match = re.search(pattern, tagged_str)
        return match.group(1) if match else ""
    
    def format_val(self, val: str) -> str:
        try:
            float(val)
            return val
        except ValueError:
            return f"'{val}'"

    def parse_cond(self, cond: str) -> tuple[str, str, str]:
        col = self.extract_tag_value(cond, "col")
        op = self.extract_tag_value(cond, "op")
        val = self.extract_tag_value(cond, "val")
        return col, op, val
    
    def split_update_conditions(self, update_conds_raw: list[str]) -> tuple[list[str], list[str]]:
        filter_like, true_updates = [], []
        for cond in update_conds_raw or []:
            col, op, val = self.parse_cond(cond)
            if op in self.comparison_ops:
                filter_like.append(cond)
            else:
                true_updates.append(cond)
        return filter_like, true_updates
    
    def quote_ident(self, name: str) -> str:
        quote_char = '"' if self.platform_type == "postgresql" else "`"
        if name is None:
            return ''
        s = str(name).strip()
        if s.startswith(quote_char) and s.endswith(quote_char):
            return s
        s = s.replace(quote_char, f"{quote_char}{quote_char}")
        return f"{quote_char}{s}{quote_char}"
    
    def get_input_feature_columns_from_schema(self, schema: dict[str, dict[str, dict[str, Any]]], 
                                              exclude_cols: Optional[list[str]|None]) -> list[str]:
        """
        {
            <table_name>: {
                'columns': {
                    <column_name1>: dict,
                    <column_name2>: dict,
                    ...
                }
            }
        }
        """
        cols = []
        for _, columns in schema.items():
            for col in columns['columns'].keys():
                if exclude_cols is None or col not in exclude_cols:
                    cols.append(col)
        return cols
    
    def gen(self, dataset_name: str, table_name: str, schema: dict, intent: dict, is_train: bool, model_name: str, **kwargs) -> dict:
        """
        intent: the intend dictionary with five keys - `time_series`, `target_column`, `inference_condition`, `update_condition`, `task`
        model_name: the name of the model to be used
        schema: data_dictionary with {table_name: {'columns': {column_name: column_infos[dict], ... }}}
        """
        raise NotImplementedError("This method should be implemented by subclasses.")

class BigQueryTemplateGenerator(TemplateGenerator):
    model_families = {
        "Time Series": {
            "regression": ["ARIMA_PLUS", "ARIMA_PLUS_XREG", "LINEAR_REG", "BOOSTED_TREE_REGRESSOR", "DNN_REGRESSOR", "DNN_LINEAR_COMBINED_REGRESSOR" "RANDOM_FOREST_REGRESSOR"],
            "classification": ["LOGISTIC_REG", "BOOSTED_TREE_CLASSIFIER", "DNN_CLASSIFIER", "DNN_LINEAR_COMBINED_CLASSIFIER", "RANDOM_FOREST_CLASSIFIER"],
            "clustering": ["KMEANS"],
            "anomaly_detection": ["KMEANS", "ARIMA_PLUS", "ARIMA_PLUS_XREG"]

        },
        "Non Time Series":{
            "regression": ["LINEAR_REG", "BOOSTED_TREE_REGRESSOR", "DNN_REGRESSOR", "DNN_LINEAR_COMBINED_REGRESSOR", "RANDOM_FOREST_REGRESSOR"],
            "classification": ["LOGISTIC_REG", "BOOSTED_TREE_CLASSIFIER", "DNN_CLASSIFIER", "DNN_LINEAR_COMBINED_CLASSIFIER", "RANDOM_FOREST_CLASSIFIER"],
            "clustering": ["KMEANS"],
            "anomaly_detection": ["KMEANS"]
        }
    }
    def __init__(self, platform_type):
        super().__init__(platform_type)

    def generate_hash(self, name: str) -> str:
        return hashlib.md5(name.encode()).hexdigest()[:6]
    
    def gen(self, dataset_name: str, table_name: str, schema: dict, intent: dict, is_train: bool, model_name: str) -> str:
        # dataset_name = intent["dataset_name"]
        # table_name = intent["table_id"]
        full_table = f"`{DATA_ID}.{dataset_name}.{table_name}`"

        output = intent["output"]
        task = output["task"]
        is_time_series = output.get("time_series", "False") == "True"
        target_col = self.extract_tag_value(output.get("target_column", ""), "col")
        inference_conds_raw = output.get("inference_condition", [])
        update_conds_raw = output.get("update_condition", [])

        model_prefix = f"crypto-isotope-366706.sql_knowledge_base.{target_col or task}"
        base_model_name = f"{model_prefix}_{model_name.lower().replace('-', '_')}"
        full_model_name = f"{base_model_name}_{self.generate_hash(base_model_name)}"

        # Separate update conditions
        filter_like_updates, true_updates = [], []
        for cond in update_conds_raw:
            (filter_like_updates if self.extract_tag_value(cond, "op") in COMPARISON_OPS else true_updates).append(cond)

        updated_cols = {self.extract_tag_value(cond, "col") for cond in true_updates}
        filtered_inference_conds = [cond for cond in inference_conds_raw if self.extract_tag_value(cond, "col") not in updated_cols]
        all_filters = filtered_inference_conds + filter_like_updates

        where_clause = " AND ".join(
            f"{self.quote_ident(self.extract_tag_value(c, 'col'))} {self.extract_tag_value(c, 'op')} {self.format_val(self.extract_tag_value(c, 'val'))}"
            for c in all_filters
        )

        needs_label = target_col and task not in ["clustering", "anomaly_detection"]
        label_opt = f", INPUT_LABEL_COLS=['{target_col}']" if needs_label else ""
        where_filter = f"WHERE {target_col} IS NOT NULL" if needs_label else ""

        # Identify time column
        data_dict = intent.get("data_dictionary", {})
        timestamp_cols_by_priority = {"TIMESTAMP": [], "DATETIME": [], "DATE": []}
        for col, meta in data_dict.items():
            col_type = meta.get("type")
            if col_type in timestamp_cols_by_priority:
                timestamp_cols_by_priority[col_type].append(col)

        time_col = None
        if is_time_series:
            for t in ["TIMESTAMP", "DATETIME", "DATE"]:
                if timestamp_cols_by_priority[t]:
                    time_col = timestamp_cols_by_priority[t][0]
                    break
            if not time_col:
                raise ValueError("No time column found for time-series model.")
            if not target_col:
                raise ValueError("No target_col provided for time-series model.")

        # Excluded columns
        timestamp_cols = {col for col, meta in data_dict.items() if meta.get("type") in {"TIMESTAMP", "DATE", "DATETIME"}}
        excluded_cols = timestamp_cols
        if is_time_series:
            excluded_cols.update({time_col, target_col})

        excluded_cols.update(table_specific_exclusions.get(table_name, []))
        excluded_cols = {c for c in excluded_cols if c}

        exclude_clause = ", ".join(sorted(excluded_cols))
        select_clause = f"* EXCEPT({exclude_clause})" if excluded_cols else "*"

        # Subquery for inference
        subquery = f"(SELECT * FROM {full_table}"
        if where_clause:
            subquery += f" WHERE {where_clause}"
        subquery += ")"

        # Training SQL
        if is_train:
            if is_time_series:
                ts_opts = [
                    f"TIME_SERIES_TIMESTAMP_COL='{time_col}'",
                    f"TIME_SERIES_DATA_COL='{target_col}'"
                ]
                if "xreg" in model_name.lower():
                    training_sql = f"""
                    CREATE MODEL IF NOT EXISTS `{full_model_name}`
                    OPTIONS(model_type='ARIMA_PLUS_XREG', {", ".join(ts_opts)})
                    AS
                    (SELECT {time_col}, {target_col}, {select_clause} FROM {full_table}
                    {where_filter})
                    """.strip()
                else:
                    training_sql = f"""
                    CREATE MODEL IF NOT EXISTS `{full_model_name}`
                    OPTIONS(model_type='ARIMA_PLUS', {", ".join(ts_opts)})
                    AS
                    (SELECT {time_col}, {target_col} FROM {full_table}
                    {where_filter})
                    """.strip()
            else:
                training_sql = f"""
                CREATE MODEL IF NOT EXISTS `{full_model_name}`
                OPTIONS(model_type='{model_name}'{label_opt},
                DATA_SPLIT_METHOD='RANDOM',
                DATA_SPLIT_EVAL_FRACTION=0.10)
                AS
                (SELECT {select_clause} FROM {full_table}
                {where_filter})
                """.strip()
            return training_sql
        # Inference SQL
        else:
            if task == "anomaly_detection":
                inference_sql = f"""
                SELECT * FROM ML.DETECT_ANOMALIES(
                  MODEL `{full_model_name}`,
                  STRUCT(0.2 AS contamination),
                  {subquery}
                )
                """.strip()
            elif is_time_series:
                if "xreg" in model_name.lower():
                    inference_sql = f"""
                    SELECT * FROM ML.FORECAST(
                      MODEL `{full_model_name}`,
                      STRUCT(10 AS horizon, 0.8 AS confidence_level),
                      {subquery}
                    )
                    """.strip()
                elif "arima_plus" in model_name.lower():
                    inference_sql = f"""
                    SELECT * FROM ML.FORECAST(
                      MODEL `{full_model_name}`,
                      STRUCT(10 AS horizon, 0.8 AS confidence_level))
                    """.strip()
                else:
                    inference_sql = f"""
                    SELECT * FROM ML.PREDICT(MODEL `{full_model_name}`, {subquery})
                    """.strip()
            else:
                inference_sql = f"""
                SELECT * FROM ML.PREDICT(MODEL `{full_model_name}`, {subquery})
                """.strip()
            return inference_sql

def _merge_user(custom_for_col: dict, base: dict) -> dict:
    """
    Deep-ish merge: only keys present in `custom_for_col` override.
    Unknown keys are ignored (keeps output clean for downstream systems).
    """
    out = base.copy()
    for k in ("impute", "scale", "encode"):
        if k in custom_for_col:
            out[k] = custom_for_col[k]
    return out

def _flatten_schema(schema: dict) -> dict:
    """
    Accepts either:
    {<table>: {"columns": {<col>: {"type": ...}}}}
    or directly:
    {"columns": {...}}
    Returns {column_name: type_string}
    """
    # If schema already at columns level
    if "columns" in schema:
        return {c: (v.get("type") or "").upper() for c, v in schema["columns"].items()}

    # Else assume top-level tables
    out = {}
    for _tbl, spec in schema.items():
        cols = spec.get("columns", {})
        out.update({c: (v.get("type") or "").upper() for c, v in cols.items()})
    return out


class PostgresTemplateGenerator(TemplateGenerator):
    pgml_schema = os.getenv("PGML_SCHEMA", "public")
    model_families = {
        "regression": ['xgboost', 'ada_boost', 'random_forest', 'gradient_boosting_trees', 'bagging', 'svm', 'ridge', 'bayesian_ridge'],
        "classification": ['xgboost', 'ada_boost', 'random_forest', 'gradient_boosting_trees', 'bagging', 'svm', 'ridge'],
        "clustering": ['mini_batch_kmeans', 'affinity_propagation'],
    }
    # pgml_default = {
    #     "regression": "xgboost",
    #     "classification": "xgboost",
    #     "clustering": "kmeans",
    # }
    def __init__(self):
        super().__init__(platform_type="postgresql")

    def split_to_views(self, table_name: str):
        create_random_split(f"{self.pgml_schema}.{table_name}")
        
    def drop_views(self, table_name: str):
        drop_views(f"{self.pgml_schema}.{table_name}")

    def _gen_preprocess(self, schema: dict, custom: Optional[dict|None]=None) -> dict:
        """
        custom: {column: {impute|scale|encode: ...}}
        Valid options:
            impute:  error | mean | median | mode | min | max | zero
            scale:   preserve | standard | min_max | max_abs | robust
            encode:  native | target | one_hot | ordinal
        Returns:
            {column: {impute, scale, encode}}
        """
        defaults_by_type = {
            "STRING":   {"impute": "mode", "encode": "native"},
            "FLOAT":    {"impute": "median", "scale": "standard"},
            "INTEGER":  {"impute": "median", "scale": "standard"},
            "GEOGRAPHY":{"impute": "mode", "encode": "native"},
        }
        
        custom = custom or {}
        col_types = _flatten_schema(schema)
        result = {}

        for col, typ in col_types.items():
            base_type = defaults_by_type.get(typ, {"impute": "median", "scale": "standard", "encode": "native"})
            # Merge any user overrides
            if col in custom:
                base_type = _merge_user(custom[col], base_type)
            result[col] = base_type

        return result
    
    def gen(self, dataset_name: str, table_name: str, schema: dict, intent: dict, is_train: bool, model_name: str, 
            exclude_cols: Optional[list[str]]=[], test_size: Optional[float|None]=0.1, **kwargs) -> dict:
        """
        Generate the SQL queries and view definitions for the given parameters.
        
        
        ```
        train_output = generator.gen(
            dataset_name=x['dataset_name'],
            table_name=x['table_name'],
            schema=x['schema'],
            intent=x['intent'],
            is_train=True,
            model_name=x['model_name'],
            exclude_cols=table_specific_exclusions.get(x['table_name'], [])
        )
        inference_output = generator.gen(
            dataset_name=x['dataset_name'],
            table_name=x['table_name'],
            schema=x['schema'],
            intent=x['intent'],
            is_train=False,
            model_name=x['model_name'],
            exclude_cols=table_specific_exclusions.get(x['table_name'], [])
        )
        ```
        
        """
        # assert the model_name is the algorithm name
        assert model_name in self.model_families.get(intent.get("task"), []), f"Model {model_name} is not suitable for task {intent.get('task')}"

        task = (intent.get("task") or "").lower()
        is_time_series = intent.get("time_series", "False") == "True"
        target_col = self.extract_tag_value(intent.get("target_column", ""), "col")

        # Generate unique model/project name
        project_name = f"{dataset_name}/{table_name}/{task}/{model_name}"

        input_feature_cols = self.get_input_feature_columns_from_schema(schema, [target_col]+exclude_cols)

        # === TRAINING ===
        if is_train:
            # Parse input features
            relation_name = f"{self.pgml_schema}.{table_name}_view"
            # Create a view table for training.
            tuple_expr = ", ".join(self.quote_ident(c) for c in input_feature_cols)
            target_expr = ', ' + self.quote_ident(target_col) if target_col else ''
            view_table_query = f"""CREATE OR REPLACE VIEW {relation_name} AS (\nSELECT {tuple_expr}{target_expr} \nFROM {self.pgml_schema}.{table_name});"""
            
            # Preprocess arguments
            preprocess = self._gen_preprocess(schema, **kwargs)
            if task in ("regression", "classification", "clustering"):
                query = self.build_pgml_train_sql(
                    project_name=project_name,
                    task=task,
                    relation_name=relation_name,
                    y_column_name=target_col,
                    algorithm=model_name,
                    preprocess=preprocess,
                    test_size=test_size
                )
            else:
                raise ValueError(f"Unsupported task: {task}")
            
            return {
                'project_name': project_name,
                'relation_name': relation_name,
                'view_table': view_table_query.strip(),
                'query': query.strip(),
                'preprocess': preprocess,
            }

        # === INFERENCE ===
        else:
            # Inference on the original table with filters but selection should be same as training
            relation_name = f"{self.pgml_schema}.{table_name}"
            
            # Parse conditions
            inference_conds_raw = sorted(intent.get("inference_condition", []))
            update_conds_raw = sorted(intent.get("update_condition", []))
            filter_like_updates, true_updates = self.split_update_conditions(update_conds_raw)
            where_clause = []
            for c in filter_like_updates:
                col_u, op_u, val_u = self.parse_cond(c)
                for i, ci in enumerate(inference_conds_raw):
                    col_i, op_i, val_i = self.parse_cond(ci)

                    # check if both conditions are on the same column and operation but different values(take the update one)
                    if col_i == col_u and op_i == op_u and val_i != val_u:
                        where_clause.append(f"{self.quote_ident(col_u)} {op_u} {self.format_val(val_u)}")
                        inference_conds_raw.pop(i)

            # append the rest of the inference conds
            for c in inference_conds_raw:
                col_i, op_i, val_i = self.parse_cond(c)
                where_clause.append(f"{self.quote_ident(col_i)} {op_i} {self.format_val(val_i)}")
            where_clause = " AND ".join(where_clause)

            if true_updates:
                query = self.build_pgml_predict_sql_scenario2(
                    project_name=project_name,
                    relation_name=relation_name,
                    input_feature_cols=input_feature_cols if input_feature_cols else [],
                    where_clause=where_clause,
                    true_updates=true_updates,
                    limit=10
                )
            else:
                query = self.build_pgml_predict_sql_scenario1(
                    project_name=project_name,
                    relation_name=relation_name,
                    input_feature_cols=input_feature_cols,
                    where_clause=where_clause,
                    limit=10
                )
                
            return {
                'query': query.strip(),
            }

    def build_pgml_train_sql(self, project_name: str, task: str, relation_name: str, y_column_name: str, algorithm: str, 
                             preprocess: dict | None = None, test_size: float | None = None):
        """
        preprocess: 
            impute: `error`, `mean`, `median`, `mode`, `min`, `max`, `zero`
            scale: `preserve`, `standard`, `min_max`, `max_abs`, `robust`
            encode: `native`, `target`, `one_hot`, `ordinal`
        """
        args = [
            f"project_name => '{project_name}'",
            f"task => '{task}'",
            f"relation_name => '{relation_name}'",
            f"algorithm => '{algorithm}'",
        ]
        if task != 'clustering':
            args.append(f"y_column_name => '{y_column_name}'")

        if preprocess:
            args.append(f"preprocess => '{json.dumps(preprocess, ensure_ascii=False)}'")
        if test_size is not None:
            args.append(f"test_size => {test_size}")
        return "SELECT pgml.train(\n    " + ",\n    ".join(args) + "\n);\n"

    def build_pgml_predict_sql_scenario1(self, project_name: str, relation_name: str, input_feature_cols: list[str], 
                                         where_clause: str = "", limit: int | None = None):
        """
        Predict referencing columns directly (simple; no overrides).
        """
        tuple_expr = ", ".join(self.quote_ident(c) for c in input_feature_cols)
        sql = (
            "SELECT pgml.predict(\n"
            f"    '{project_name}', ({tuple_expr})\n"
            f") AS prediction\n"
            f"FROM {relation_name}"
        )
        if where_clause:
            sql += f"\nWHERE {where_clause}"
        if limit:
            sql += f"\nLIMIT {limit}"
        sql += ";\n"
        return sql

    def build_pgml_predict_sql_scenario2(self, project_name: str, relation_name: str, input_feature_cols: list[str], 
                                         where_clause: str = "", true_updates: list[str] | None = None, limit: int | None = None) -> str:
        select_items = []
        true_updates = true_updates or []
        override_map = {self.parse_cond(c)[0]: self.parse_cond(c)[2] for c in true_updates}
        for col in input_feature_cols:
            if col in override_map:
                select_items.append(f"{self.format_val(override_map[col])} AS {self.quote_ident(col)}")
            else:
                select_items.append(f"{self.quote_ident(col)}")

        inner_sql = "SELECT " + ", ".join(select_items) + f" FROM {relation_name}"
        if where_clause:
            inner_sql += f" WHERE {where_clause}"
        if limit:
            inner_sql += f" LIMIT {limit}"

        return (
            "SELECT pgml.predict(\n"
            f"    '{project_name}', t\n"
            ")\n"
            "FROM (\n"
            f"    {inner_sql}\n"
            ") AS t;\n"
        )

In [4]:
from tqdm import tqdm
import json
from collections import defaultdict, Counter

counter = defaultdict(set)
with open('./data/train_final.jsonl', 'r') as file:
    data = [json.loads(line) for line in file]

for d in data:
    counter['schema'].add(json.dumps(d['schema'], indent=2))
    if d['intent'].get('update_condition'):
        counter['s2'].add(json.dumps(d['intent'], indent=2))
    else:
        counter['s1'].add(json.dumps(d['intent'], indent=2))

    counter['intent'].add(json.dumps(d['intent'], indent=2))
    
with open('./archieved/data/train_merged_CoT_final.jsonl', 'r') as f:
    cots = [json.loads(line.strip()) for line in f.readlines()]

print(f"Total unique intent: {len(counter['intent'])} | S1: {len(counter['s1'])} | S2: {len(counter['s2'])}")

Total unique intent: 225 | S1: 131 | S2: 94


In [5]:
gen_class = PostgresTemplateGenerator
TABLE_DATASET_MAP = {v: k for k, v in DATASET_TABLE_MAP.items()}
dataset = []
checked_intents = set()
for d in tqdm(data, total=len(data)):
    k = json.dumps(d['intent'])
    if k in checked_intents:
        continue
    schema = d['schema']['tables']
    assert len(list(schema.keys())) == 1, "len table is not 1"
    table_name = list(schema.keys())[0]
    dataset_name = TABLE_DATASET_MAP[table_name]
    intent = d['intent']

    for model_name in gen_class.model_families.get(intent.get("task"), []):
        dataset.append({
            'dataset_name': dataset_name,
            'table_name': table_name,
            'schema': schema,
            'intent': intent,
            'model_name': model_name
    })
    checked_intents.add(k)

print(f"Total dataset entries: {len(dataset)}")

100%|██████████| 2721/2721 [00:00<00:00, 253424.11it/s]

Total dataset entries: 837





In [6]:
import time
table_specific_exclusions = {
    "orders": ["user_id", "order_id"],
    "disclosures_13": ["record_id", "family_id", "blanket_scope", "disclosure_event", "pub_cleaned", "wg_name"],
    "outpatient_charges_2014": ["provider_id"],
    "fips": ["GeoName", "GeoFIPS"],
    "mbb_historical_teams_games": ["team_id", "name", "market", "opp_id", "opp_name", "opp_code", "opp_market"],
    "historical_runups": ["id", "tsevent_id", "location_name"],
    "solar_potential_by_postal_code": ["center_point", "install_size_kw_buckets"],
}

outputs = []
generator = gen_class()
for x in tqdm(dataset, total=len(dataset)):
    train_output = generator.gen(
        dataset_name=x['dataset_name'],
        table_name=x['table_name'],
        schema=x['schema'],
        intent=x['intent'],
        is_train=True,
        model_name=x['model_name'],
        exclude_cols=table_specific_exclusions.get(x['table_name'], [])
    )
    # inference_output = generator.gen(
    #     dataset_name=x['dataset_name'],
    #     table_name=x['table_name'],
    #     schema=x['schema'],
    #     intent=x['intent'],
    #     is_train=False,
    #     model_name=x['model_name'],
    #     exclude_cols=table_specific_exclusions.get(x['table_name'], [])
    # )

    outputs.append(train_output)


  0%|          | 0/837 [00:00<?, ?it/s]

100%|██████████| 837/837 [00:00<00:00, 19299.69it/s]


In [None]:
for o in tqdm(outputs, total=len(outputs)):
    # create view_table
    execute_query(o['view_table'])

In [None]:

# create view_table
execute_query(train_output['view_table'])

time.sleep(1)

# train model
execute_query(train_output['query'])


In [None]:
print(train_output['query'])

SELECT pgml.train(
    project_name => 'sunroof_solar_solar_potential_by_postal_code_xgboost',
    task => 'regression',
    relation_name => 'public.sunroof_solar_solar_potential_by_postal_code_xgboost_view',
    algorithm => 'xgboost',
    y_column_name => 'number_of_panels_w',
    preprocess => '{"region_name": {"impute": "mode", "encode": "native"}, "state_name": {"impute": "mode", "encode": "native"}, "lat_max": {"impute": "median", "scale": "standard"}, "lat_min": {"impute": "median", "scale": "standard"}, "lng_max": {"impute": "median", "scale": "standard"}, "lng_min": {"impute": "median", "scale": "standard"}, "lat_avg": {"impute": "median", "scale": "standard"}, "lng_avg": {"impute": "median", "scale": "standard"}, "yearly_sunlight_kwh_kw_threshold_avg": {"impute": "median", "scale": "standard"}, "count_qualified": {"impute": "median", "scale": "standard"}, "percent_covered": {"impute": "median", "scale": "standard"}, "percent_qualified": {"impute": "median", "scale": "standar

In [154]:
print(train_output['project_name'])
print(train_output['relation_name'])

"sunroof_solar/solar_potential_by_postal_code/clustering/kmeans"
public."sunroof_solar/solar_potential_by_postal_code/clustering/kmeans"_view


In [145]:
print(train_output['view_table'])

CREATE OR REPLACE VIEW public.sunroof_solar_solar_potential_by_postal_code_xgboost_view AS (
SELECT "region_name", "state_name", "lat_max", "lat_min", "lng_max", "lng_min", "lat_avg", "lng_avg", "yearly_sunlight_kwh_kw_threshold_avg", "count_qualified", "percent_covered", "percent_qualified", "number_of_panels_n", "number_of_panels_s", "number_of_panels_e", "number_of_panels_f", "number_of_panels_median", "number_of_panels_total", "kw_median", "kw_total", "yearly_sunlight_kwh_n", "yearly_sunlight_kwh_s", "yearly_sunlight_kwh_e", "yearly_sunlight_kwh_w", "yearly_sunlight_kwh_f", "yearly_sunlight_kwh_median", "yearly_sunlight_kwh_total", "carbon_offset_metric_tons", "existing_installs_count", "number_of_panels_w" 
FROM public.solar_potential_by_postal_code);


In [146]:
print(train_output['query'])

SELECT pgml.train(
    project_name => 'sunroof_solar_solar_potential_by_postal_code_xgboost',
    task => 'regression',
    relation_name => 'public.sunroof_solar_solar_potential_by_postal_code_xgboost_view',
    algorithm => 'xgboost',
    y_column_name => 'number_of_panels_w',
    preprocess => '{"region_name": {"impute": "mode", "encode": "native"}, "state_name": {"impute": "mode", "encode": "native"}, "lat_max": {"impute": "median", "scale": "standard"}, "lat_min": {"impute": "median", "scale": "standard"}, "lng_max": {"impute": "median", "scale": "standard"}, "lng_min": {"impute": "median", "scale": "standard"}, "lat_avg": {"impute": "median", "scale": "standard"}, "lng_avg": {"impute": "median", "scale": "standard"}, "yearly_sunlight_kwh_kw_threshold_avg": {"impute": "median", "scale": "standard"}, "count_qualified": {"impute": "median", "scale": "standard"}, "percent_covered": {"impute": "median", "scale": "standard"}, "percent_qualified": {"impute": "median", "scale": "standar

In [147]:
print(inference_output["query"])

SELECT pgml.predict(
    'sunroof_solar_solar_potential_by_postal_code_xgboost', ("region_name", "state_name", "lat_max", "lat_min", "lng_max", "lng_min", "lat_avg", "lng_avg", "yearly_sunlight_kwh_kw_threshold_avg", "count_qualified", "percent_covered", "percent_qualified", "number_of_panels_n", "number_of_panels_s", "number_of_panels_e", "number_of_panels_f", "number_of_panels_median", "number_of_panels_total", "kw_median", "kw_total", "yearly_sunlight_kwh_n", "yearly_sunlight_kwh_s", "yearly_sunlight_kwh_e", "yearly_sunlight_kwh_w", "yearly_sunlight_kwh_f", "yearly_sunlight_kwh_median", "yearly_sunlight_kwh_total", "carbon_offset_metric_tons", "existing_installs_count")
) AS prediction
FROM public.solar_potential_by_postal_code
WHERE "number_of_panels_n" = 1386
LIMIT 10;


In [9]:
# models.id, models.algorithm, models.metrics
metric_query = f"""SELECT * 
FROM pgml.models AS m 
JOIN pgml.projects AS p 
 ON p.id = m.project_id
WHERE p.name = '{train_output['project_name']}';"""

execute_query(metric_query, fetch=True)

NameError: name 'train_output' is not defined

In [10]:
# drop records in the table in pgml.models
drop_query = f"""
DELETE FROM pgml.models
WHERE project_id IN (15, 280)
"""
# WHERE project_id = (SELECT id FROM pgml.projects WHERE name = '{train_output['project_name']}');

execute_query(drop_query, fetch=False)

2

In [12]:
# drop all projects in the table in pgml.projects
drop_query = f"""
DELETE FROM pgml.projects
WHERE id IN (14, 15, 280)
"""
# WHERE project_id = (SELECT id FROM pgml.projects WHERE name = '{train_output['project_name']}');

execute_query(drop_query, fetch=False)

3

In [76]:
metric_query

"\nSELECT *\nFROM pgml.models\nJOIN pgml.projects\n ON projects.id = models.project_id\nWHERE projects.project_name = 'sunroof_solar_solar_potential_by_postal_code_kmeans';\n"

In [None]:
intent = {
    "dataset_name": "sample_dataset",
    "table_id": "sales_data",
    "output": {
        "task": "regression",
        "time_series": "True",
        "target_column": "<col>revenue</col>",
        "inference_condition": [
            "<col>region</col><op>=</op><val>west</val>"
        ],
        "update_condition": [
            "<col>revenue</col><op>></op><val>1000</val>"
        ]
    },
    "data_dictionary": {
        "revenue": {"type": "FLOAT"},
        "date": {"type": "DATE"},
        "region": {"type": "STRING"},
        "lat_long": {"type": "GEOGRAPHY"}
    }
}

# Parameters for generation
platform_type = "PostgreML"  # or "PostgreML"
todo_type = "train"         # or "predict"
model_name = "my_arima_model"

# Run test
template = Template()
try:
    sql_output = template.gen(platform_type, intent, todo_type, model_name)
    print("=== Generated SQL ===")
    print(sql_output)
except Exception as e:
    print(f"Error: {e}")

=== Generated SQL ===
SELECT pgml.train(
    project_name => 'sample_dataset_sales_data_regression_ce2f02bf7984',
    task => 'regression',
    relation_name => 'public."sales_data"',
    y_column_name => 'revenue',
    algorithm => 'xgboost'
);

