# Extraction Piepline

## Setup

In [1]:
# =========================
# Colab setup (one-time)
# =========================
!pip -q install google-cloud-bigquery google-cloud-storage pyarrow db-dtypes

In [3]:
import os, re
from typing import Dict
from datetime import datetime
import pyarrow.parquet as pq

from google.cloud import bigquery, storage

# --- Auth: pick one ---
# 1) Service account JSON you uploaded to /content/gcp-key.json
SERVICE_ACCOUNT_KEY = "/content/gcp-key.json"
if os.path.exists(SERVICE_ACCOUNT_KEY):
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = SERVICE_ACCOUNT_KEY
else:
    # 2) Interactive Colab login (if you don't use a key file)
    try:
        from google.colab import auth
        auth.authenticate_user()
        print("✔ Authenticated with Colab user account")
    except Exception as e:
        print("⚠ Could not do Colab auth automatically. If using a key file, upload it as /content/gcp-key.json.")

In [8]:
# =========================
# CONFIG — EDIT THESE
# =========================
PROJECT_ID         = "digimusketeers-project"
BQ_DATASET         = "facebook_ads"               # source dataset
BQ_TABLE_PREFIX    = "FBADS_AGE_GENDER_"            # reads from ...FBADS_AD_LEVEL_* .. FBADS_AGE_GENDER_*
DATE_START         = "20210101"
DATE_END           = "20250630"
BQ_LOCATION        = "asia-southeast1"            # must match dataset location

# Temp dataset to materialize per-company tables (auto-created if missing)
TEMP_DATASET       = "facebook_ads_tmp"

GCS_BUCKET         = "digi-datasets-ml"
GCS_BASE_PREFIX    = "facebook-age-temp"
GROUP_BY_TYPE      = False                        # put outputs under business type folders?
PROFILE_COLUMN     = "profile"
DROP_TEMP_TABLES   = True                         # set False to inspect temp tables
DO_ROWCOUNT        = False                        # quick count before export (optional)


In [12]:
# =========================
# Companies (your mapping)
# =========================
business_type_mapping: Dict[str, str] = {
    # Shopping Malls & Retail Chains
    # 'central': 'retail_brand',
    # 'bebeplay': 'retail_brand',
    'long lasting': 'retail_brand',
    'nitori': 'retail_brand',
    'zhulian': 'retail_brand',
    'ielleair': 'retail_brand',
    'vetz petz': 'retail_brand',
    'kanekoji': 'retail_brand',
    'kamedis': 'retail_brand',

    # Real Estate & Property Development
    'sena development': 'real_estate',
    'sena developmant': 'real_estate',
    'onerealestate': 'real_estate',
    'jsp property': 'real_estate',
    'jsp property x sena': 'real_estate',
    'nusasiri': 'real_estate',
    'premium place': 'real_estate',
    'urban': 'real_estate',
    'pieamsuk': 'real_estate',
    'asakan': 'real_estate',
    'cpn': 'real_estate',
    'wt land development (2024)': 'real_estate',
    'property client (ex.the fine)': 'real_estate',
    'the fine': 'real_estate',
    'colour development': 'real_estate',
    'banmae villa': 'real_estate',
    'varintorn l as vibhavadi l brand': 'real_estate',
    'inspired': 'real_estate',
    'goldenduck': 'real_estate',
    'chewa x c - pk': 'real_estate',

    # Fashion & Lifestyle
    'samsonite': 'fashion_lifestyle',
    'do day dream': 'fashion_lifestyle',
    'ido day dream': 'fashion_lifestyle',
    'ido day dream i valeraswiss thailand i brand [2025]': 'fashion_lifestyle',
    'fila': 'fashion_lifestyle',
    'playboy': 'fashion_lifestyle',
    'what a girl want': 'fashion_lifestyle',
    'rich sport': 'fashion_lifestyle',
    'heydude': 'fashion_lifestyle',

    # Beauty & Cosmetics
    'bb care': 'beauty_cosmetics',
    'reuse': 'beauty_cosmetics',
    'dedvelvet': 'beauty_cosmetics',
    'riobeauty': 'beauty_cosmetics',
    'kameko': 'beauty_cosmetics',
    'befita': 'beauty_cosmetics',
    'vitablend': 'beauty_cosmetics',

    # Healthcare & Medical
    'abl clinic': 'healthcare_medical',
    'luxury clinic': 'healthcare_medical',
    'mane clinic': 'healthcare_medical',
    'dentalme clinic': 'healthcare_medical',
    'mild clinic': 'healthcare_medical',
    'aestheta wellness': 'healthcare_medical',
    'luxury club skin': 'healthcare_medical',

    # Technology & Electronics
    'kangyonglaundry': 'technology_electronics',
    'bosch': 'technology_electronics',
    'amazfit': 'technology_electronics',
    'panduit': 'technology_electronics',
    'mitsubishi electric x digimusketeers': 'technology_electronics',
    'asiasoft digital marketing (center)': 'technology_electronics',
    'at home thailand': 'technology_electronics',
    'sinthanee group': 'technology_electronics',
    'noventiq th': 'technology_electronics',
    'blaupunk l blaupunk l brand': 'technology_electronics',
    'yip in tsoi': 'technology_electronics',

    # Digital Marketing & Agencies
    'digimusketeers': 'digital_marketing',
    'set x digimusketeers': 'digital_marketing',
    'we are innosense co., ltd. v.2': 'digital_marketing',

    # Software Development
    'dspace': 'software_development',
    'midas': 'software_development',
    'launch platform': 'software_development',

    # Financial Services
    'cimb': 'financial_services',
    'tisco ppk': 'financial_services',
    'tisco - insure': 'financial_services',
    'gsb society': 'financial_services',
    'aslan investor': 'financial_services',
    'aeon': 'financial_services',
    'proprakan': 'financial_services',

    # Entertainment & Media
    'donut bangkok': 'entertainment_media',
    'i have ticket': 'entertainment_media',
    'ondemand l ondemand l brand': 'entertainment_media',

    # Food & Beverage
    'ramendesu': 'food_beverage',
    'nomimashou': 'food_beverage',
    'oakberry': 'food_beverage',

    # Transportation & Logistics
    'paypoint': 'transportation_logistics',
    'asia cab': 'transportation_logistics',
    'uac': 'transportation_logistics',
    'artralux': 'transportation_logistics',
    'artralux (social media project)': 'transportation_logistics',
    'siamwatercraft': 'transportation_logistics',

    # Pharmaceuticals & Health Products
    'inpac pharma': 'pharmaceuticals',

    # Non-Profit & Organizations
    'unhcr': 'non_profit',

    # Construction & Manufacturing
    'arun plus ptt': 'industrial_manufacturing',
    'scg': 'industrial_manufacturing',

    # Others/Uncategorized
    'free 657,00 thb': 'other',
}


## Helper

In [13]:
# =========================
# Helpers
# =========================
def slugify_for_gcs(name: str) -> str:
    s = re.sub(r"[|/\\\s]+", "-", name.strip().lower())
    s = re.sub(r"[^a-z0-9\-_]+", "", s)
    return re.sub(r"-{2,}", "-", s).strip("-") or "unknown"

def slugify_for_bq_table(name: str) -> str:
    s = re.sub(r"[^a-zA-Z0-9_]", "_", name.strip().lower())
    s = re.sub(r"_+", "_", s).strip("_")
    if not s or s[0].isdigit():
        s = f"c_{s}"
    return s[:250]  # stay well under limits

def gcs_prefix_for(company: str, business_type: str) -> str:
    base = f"{GCS_BASE_PREFIX}/"
    if GROUP_BY_TYPE:
        base += f"{business_type}/"
    slug = slugify_for_gcs(company)
    return f"{base}{slug}_profile_{DATE_START}_{DATE_END}"

def ensure_dataset(client: bigquery.Client, dataset_id: str):
    ds_ref = bigquery.Dataset(f"{PROJECT_ID}.{dataset_id}")
    try:
        client.get_dataset(ds_ref)
    except Exception:
        ds_ref.location = BQ_LOCATION
        client.create_dataset(ds_ref)
        print(f"✔ Created dataset {PROJECT_ID}.{dataset_id} in {BQ_LOCATION}")

def count_rows_for_company(bq: bigquery.Client, company: str) -> int:
    q = f"""
    SELECT COUNT(*) AS c
    FROM `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE_PREFIX}*`
    WHERE _TABLE_SUFFIX BETWEEN @start AND @end
      AND (LOWER({PROFILE_COLUMN}) LIKE CONCAT(@c, '%')
           OR LOWER({PROFILE_COLUMN}) LIKE CONCAT('|', @c, '%'))
    """
    cfg = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("start", "STRING", DATE_START),
            bigquery.ScalarQueryParameter("end", "STRING", DATE_END),
            bigquery.ScalarQueryParameter("c", "STRING", company.strip().lower()),
        ]
    )
    rows = list(bq.query(q, job_config=cfg, location=BQ_LOCATION).result())
    return int(rows[0]["c"]) if rows else 0

def materialize_then_export(
    bq: bigquery.Client,
    company: str,
    business_type: str
) -> str:
    """
    1) CREATE OR REPLACE TABLE tmp AS SELECT ... FROM wildcard
    2) EXPORT DATA AS SELECT * FROM tmp
    3) DROP TABLE tmp (optional)
    Returns: gs prefix used (without wildcard).
    """
    ensure_dataset(bq, TEMP_DATASET)

    table_slug = slugify_for_bq_table(company)
    temp_table = f"{PROJECT_ID}.{TEMP_DATASET}.tmp_{table_slug}_{DATE_START}_{DATE_END}"

    # 1) Materialize from wildcard into a real table
    create_sql = f"""
    CREATE OR REPLACE TABLE `{temp_table}`
    OPTIONS(expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)) AS
    SELECT *
    FROM `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE_PREFIX}*`
    WHERE _TABLE_SUFFIX BETWEEN @start AND @end
      AND (LOWER({PROFILE_COLUMN}) LIKE CONCAT(@c, '%')
           OR LOWER({PROFILE_COLUMN}) LIKE CONCAT('|', @c, '%'))
    """
    job_cfg = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("start", "STRING", DATE_START),
            bigquery.ScalarQueryParameter("end", "STRING", DATE_END),
            bigquery.ScalarQueryParameter("c", "STRING", company.strip().lower()),
        ]
    )
    bq.query(create_sql, job_config=job_cfg, location=BQ_LOCATION).result()
    print(f"✔ Materialized: {temp_table}")

    # 2) EXPORT from the materialized table (no wildcard here)
    prefix = gcs_prefix_for(company, business_type)
    uri = f"gs://{GCS_BUCKET}/{prefix}*.parquet"

    export_sql = f"""
    EXPORT DATA OPTIONS (
        uri = @uri,
        format = 'PARQUET',
        overwrite = TRUE
    ) AS
    SELECT * FROM `{temp_table}`
    """
    export_cfg = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("uri", "STRING", uri)]
    )
    bq.query(export_sql, job_config=export_cfg, location=BQ_LOCATION).result()
    print(f"✔ Exported to {uri}")

    # 3) Drop temp table if desired (we also set a 24h expiration for safety)
    if DROP_TEMP_TABLES:
        drop_sql = f"DROP TABLE `{temp_table}`"
        bq.query(drop_sql, location=BQ_LOCATION).result()
        print(f"🧹 Dropped: {temp_table}")

    return prefix

def merge_parquet_to_single_file_memory_safe(
    storage_client: storage.Client,
    bucket_name: str,
    source_prefix: str,
    destination_blob_name: str,
    delete_shards_after_merge: bool = True,
):
    """
    Merge Parquet shards under `gs://bucket/source_prefix*.parquet` into
    one file at `gs://bucket/destination_blob_name` (PyArrow, streaming).
    If `delete_shards_after_merge` is True, deletes the source shards after upload.
    """

    bucket = storage_client.bucket(bucket_name)

    # Collect only shard files; exclude any previous merged file
    parquet_blobs = []
    for b in bucket.list_blobs(prefix=source_prefix):
        name = b.name
        if not name.endswith(".parquet"):
            continue
        if name == destination_blob_name or name.endswith("_merged.parquet"):
            # avoid re-reading an old merged file
            continue
        parquet_blobs.append(b)

    if not parquet_blobs:
        print(f"⚠ No shard parquet files found for gs://{bucket_name}/{source_prefix}")
        return

    print(f"⏳ Merging {len(parquet_blobs)} shard(s) for prefix '{source_prefix}' ...")

    tmp_local = "/content/_merged_tmp.parquet"
    if os.path.exists(tmp_local):
        os.remove(tmp_local)

    schema = None
    writer = None
    try:
        # Stream shards into a single Parquet
        for i, b in enumerate(parquet_blobs, start=1):
            with b.open("rb") as f:
                table = pq.read_table(f)
            if schema is None:
                schema = table.schema
                writer = pq.ParquetWriter(tmp_local, schema)
            else:
                table = table.cast(schema, safe=False)  # align columns if needed
            writer.write_table(table)
            if i % 10 == 0 or i == len(parquet_blobs):
                print(f"  • processed {i}/{len(parquet_blobs)}")

        if writer is not None:
            writer.close()

        # Upload merged file
        dest_blob = bucket.blob(destination_blob_name)
        with open(tmp_local, "rb") as f:
            dest_blob.upload_from_file(f, rewind=True)

        print(f"✔ Saved merged file: gs://{bucket_name}/{destination_blob_name}")

        # Delete shards after successful upload
        if delete_shards_after_merge:
            deleted = 0
            for b in parquet_blobs:
                try:
                    b.delete()
                    deleted += 1
                except Exception as de:
                    print(f"   ⚠ Could not delete {b.name}: {de}")
            print(f"🧹 Deleted {deleted}/{len(parquet_blobs)} shard(s).")

    finally:
        try:
            if writer is not None:
                writer.close()
        except:
            pass
        if os.path.exists(tmp_local):
            os.remove(tmp_local)


## Run the pipeline

In [14]:
# =========================
# Run the pipeline
# =========================
bqclient = bigquery.Client(project=PROJECT_ID)
gcs_client = storage.Client(project=PROJECT_ID)

start_time = datetime.now()
print(f"▶ Run started at {start_time}")

for company, biz_type in business_type_mapping.items():
    try:
        if DO_ROWCOUNT:
            cnt = count_rows_for_company(bqclient, company)
            print(f"{company!r} rows = {cnt}")
            if cnt == 0:
                print(f"↳ Skip (no rows)\n")
                continue

        # 1) Materialize + Export
        source_prefix = materialize_then_export(bqclient, company, biz_type)

        # 2) Merge shards → single parquet
        merged_blob = f"{source_prefix}_merged.parquet"
        merge_parquet_to_single_file_memory_safe(
            gcs_client,
            GCS_BUCKET,
            source_prefix,
            merged_blob,
            delete_shards_after_merge=True,   # ← deletes the shards after merge
        )
        print()
    except Exception as e:
        print(f"❌ Error for {company!r}: {e}\n")

end_time = datetime.now()
print(f"⏹ Finished at {end_time} (duration {end_time - start_time})")


▶ Run started at 2025-09-03 02:53:46.299777
✔ Materialized: digimusketeers-project.facebook_ads_tmp.tmp_long_lasting_20210101_20250630
✔ Exported to gs://digi-datasets-ml/facebook-age-temp/long-lasting_profile_20210101_20250630*.parquet
🧹 Dropped: digimusketeers-project.facebook_ads_tmp.tmp_long_lasting_20210101_20250630
⏳ Merging 1 shard(s) for prefix 'facebook-age-temp/long-lasting_profile_20210101_20250630' ...
  • processed 1/1
✔ Saved merged file: gs://digi-datasets-ml/facebook-age-temp/long-lasting_profile_20210101_20250630_merged.parquet
🧹 Deleted 1/1 shard(s).

✔ Materialized: digimusketeers-project.facebook_ads_tmp.tmp_nitori_20210101_20250630
✔ Exported to gs://digi-datasets-ml/facebook-age-temp/nitori_profile_20210101_20250630*.parquet
🧹 Dropped: digimusketeers-project.facebook_ads_tmp.tmp_nitori_20210101_20250630
⏳ Merging 1 shard(s) for prefix 'facebook-age-temp/nitori_profile_20210101_20250630' ...
  • processed 1/1
✔ Saved merged file: gs://digi-datasets-ml/facebook-age-

# Many Parquet to Single File

## Helper

In [15]:
import os
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
from datetime import datetime

def merge_many_parquet_files_to_single_gcs_safe(
    storage_client: storage.Client,
    bucket_name: str,
    base_prefix: str,          # e.g. 'facebook-ads'
    per_company_suffix: str,   # e.g. f"_profile_{DATE_START}_{DATE_END}_merged.parquet"
    output_blob_name: str,     # e.g. 'facebook-ads/facebook_ads__20210101_20250630.parquet'
    overwrite: bool = False,   # protect against accidental overwrite
):
    """
    READ-ONLY merge: finds all per-company merged Parquet files under gs://bucket/<base_prefix>/
    that end with `per_company_suffix`, unifies schemas, and writes a single Parquet to `output_blob_name`.

    SAFETY:
      - Never deletes or mutates input files.
      - If overwrite=False and the output already exists, raises an error instead of overwriting.
    """
    bucket = storage_client.bucket(bucket_name)

    # Safety: don't overwrite combined file by default
    dest_blob = bucket.blob(output_blob_name)
    if dest_blob.exists() and not overwrite:
        raise RuntimeError(
            f"Output exists and overwrite=False: gs://{bucket_name}/{output_blob_name}"
        )

    # Find only the date-window files we want
    scan_prefix = base_prefix.rstrip("/") + "/"
    inputs = []
    for b in bucket.list_blobs(prefix=scan_prefix):
        name = b.name
        if name.endswith(per_company_suffix) and name != output_blob_name:
            inputs.append(b)

    # Keep deterministic order (optional)
    inputs.sort(key=lambda b: b.name)

    if not inputs:
        raise RuntimeError(
            f"No input files matched suffix '{per_company_suffix}' under gs://{bucket_name}/{scan_prefix}"
        )

    print(f"Found {len(inputs)} per-company files to merge (read-only).")

    # Build union schema across inputs (robust to drift)
    schemas = []
    rowcounts = []
    for b in inputs:
        with b.open("rb") as f:
            pf = pq.ParquetFile(f)
            schemas.append(pf.schema_arrow)
            # metadata row count (fast, no full read)
            rc = sum(pf.metadata.row_group(i).num_rows for i in range(pf.metadata.num_row_groups))
            rowcounts.append(rc)

    union_schema = pa.unify_schemas(schemas)
    print(f"Union schema has {len(union_schema)} columns.")
    print(f"Approx. total rows across inputs: {sum(rowcounts):,}")

    # Stream into one local parquet (memory-safe)
    tmp_local = "/content/_ALL_FACEBOOK_ADS.parquet"
    if os.path.exists(tmp_local):
        os.remove(tmp_local)

    writer = None
    written_rows = 0
    try:
        for idx, b in enumerate(inputs, start=1):
            with b.open("rb") as f:
                pf = pq.ParquetFile(f)

                for rg in range(pf.num_row_groups):
                    tbl = pf.read_row_group(rg)

                    # Align to union schema
                    arrays = []
                    for field in union_schema:
                        name = field.name
                        if name in tbl.column_names:
                            col = tbl[name]
                            if not col.type.equals(field.type):
                                col = col.cast(field.type, safe=False)
                            arrays.append(col)
                        else:
                            arrays.append(pa.nulls(tbl.num_rows, type=field.type))

                    out_tbl = pa.Table.from_arrays(arrays, schema=union_schema)
                    if writer is None:
                        writer = pq.ParquetWriter(tmp_local, union_schema)
                    writer.write_table(out_tbl)
                    written_rows += out_tbl.num_rows

            if idx % 5 == 0 or idx == len(inputs):
                print(f"  • processed {idx}/{len(inputs)} files")

        if writer is not None:
            writer.close()

        # Upload the combined file
        with open(tmp_local, "rb") as f:
            dest_blob.upload_from_file(f, rewind=True)

        print(f"✔ Wrote {written_rows:,} rows to gs://{bucket_name}/{output_blob_name}")
        print("🔒 Inputs were not deleted or modified.")
    finally:
        try:
            if writer is not None:
                writer.close()
        except:
            pass
        if os.path.exists(tmp_local):
            os.remove(tmp_local)



## Run merge data

In [16]:

# ---- Run it (READ-ONLY merge) ----

bqclient = bigquery.Client(project=PROJECT_ID)
gcs_client = storage.Client(project=PROJECT_ID)

start_time = datetime.now()
print(f"▶ Run started at {start_time}")

ALL_OUT_BLOB = f"{GCS_BASE_PREFIX}/facebook_age__{DATE_START}_{DATE_END}.parquet"
PER_COMPANY_SUFFIX = f"_profile_{DATE_START}_{DATE_END}_merged.parquet"

merge_many_parquet_files_to_single_gcs_safe(
    storage_client=gcs_client,
    bucket_name=GCS_BUCKET,
    base_prefix=GCS_BASE_PREFIX,
    per_company_suffix=PER_COMPANY_SUFFIX,
    output_blob_name=ALL_OUT_BLOB,
    overwrite=False,   # keep this False to avoid overwriting an existing combined file
)

end_time = datetime.now()
print(f"⏹ Finished at {end_time} (duration {end_time - start_time})")


▶ Run started at 2025-09-03 05:29:39.024747
Found 93 per-company files to merge (read-only).
Union schema has 130 columns.
Approx. total rows across inputs: 16,840,103
  • processed 5/93 files
  • processed 10/93 files
  • processed 15/93 files
  • processed 20/93 files
  • processed 25/93 files
  • processed 30/93 files
  • processed 35/93 files
  • processed 40/93 files
  • processed 45/93 files
  • processed 50/93 files
  • processed 55/93 files
  • processed 60/93 files
  • processed 65/93 files
  • processed 70/93 files
  • processed 75/93 files
  • processed 80/93 files
  • processed 85/93 files
  • processed 90/93 files
  • processed 93/93 files
✔ Wrote 16,840,103 rows to gs://digi-datasets-ml/facebook-age-temp/facebook_age__20210101_20250630.parquet
🔒 Inputs were not deleted or modified.
⏹ Finished at 2025-09-03 05:35:22.803332 (duration 0:05:43.778585)


# Download File to Colab

In [2]:
!gcloud auth activate-service-account --key-file=/content/gcp-key.json
REMOTE="gs://digi-datasets-ml/facebook-ads/facebook_ads__20210101_20250630.parquet"
!gsutil cp "$REMOTE" /content/
!ls -lh /content/facebook_ads__20210101_20250630.parquet


Activated service account credentials for: [docker-bigquery-runner@digimusketeers-project.iam.gserviceaccount.com]
Copying gs://digi-datasets-ml/facebook-ads/facebook_ads__20210101_20250630.parquet...
==> NOTE: You are downloading one or more large file(s), which would
run significantly faster if you enabled sliced object downloads. This
feature is enabled by default but requires that compiled crcmod be
installed (see "gsutil help crcmod").

\ [1 files][738.6 MiB/738.6 MiB]                                                
Operation completed over 1 objects/738.6 MiB.                                    
-rw-r--r-- 1 root root 739M Sep  2 05:58 /content/facebook_ads__20210101_20250630.parquet


In [97]:
# !pip -q install -U "polars[gpu]" "pyarrow<18" || pip -q install -U polars "pyarrow<18"


# ML Pipeline

## DATA LOADING & PREPARATION

In [35]:
!python /content/fbads_prep_polars.py \
  --input "/content/facebook_ads__20210101_20250630.parquet" \
  --output "/content/campaign_level__final.parquet" \
  --engine gpu

Polars: 1.25.2
Input : /content/facebook_ads__20210101_20250630.parquet
Output: /content/campaign_level__final.parquet
Prefer GPU: True
Dedup → before 18,452,736 | after 13,332,311 | removed 5,120,425

✅ Done in 126.0s | rows: 22,894 | cols: 10
Dropped columns: ['campaign_id', 'campaign_start_str', 'campaign_end_str']
Saved → /content/campaign_level__final.parquet

Preview:
 shape: (10, 10)
┌─────┬─────┬─────┬─────┬───┬─────┬─────┬─────┬─────┐
│ cam ┆ num ┆ cos ┆ imp ┆ … ┆ act ┆ con ┆ bus ┆ bty │
│ pai ┆ _ad ┆ t   ┆ res ┆   ┆ ion ┆ ver ┆ ine ┆ pe_ │
│ gn_ ┆ s   ┆ --- ┆ sio ┆   ┆ s   ┆ sio ┆ ss_ ┆ con │
│ dur ┆ --- ┆ f64 ┆ ns  ┆   ┆ --- ┆ n_v ┆ typ ┆ fid │
│ ati ┆ i64 ┆     ┆ --- ┆   ┆ i64 ┆ alu ┆ e   ┆ enc │
│ on  ┆     ┆     ┆ i64 ┆   ┆     ┆ e   ┆ --- ┆ e   │
│ --- ┆     ┆     ┆     ┆   ┆     ┆ --- ┆ str ┆ --- │
│ i64 ┆     ┆     ┆     ┆   ┆     ┆ f64 ┆     ┆ f64 │
╞═════╪═════╪═════╪═════╪═══╪═════╪═════╪═════╪═════╡
│ 41  ┆ 3   ┆ 499 ┆ 122 ┆ … ┆ 152 ┆ 0.0 ┆ unk ┆ 1.0 │
│     ┆     

## Distribution plots + Outlier handling

In [98]:
# GPU read (Polars Lazy -> collect engine=gpu). Sklearn IF still runs on CPU.
!python /content/outliers_and_plots.py \
  --input "/content/campaign_level__final.parquet" \
  --out-gw "/content/campaign_level__final__no_outliers__groupwise.parquet" \
  --out-final "/content/campaign_level__final__no_outliers__gw_then_global.parquet" \
  --plot-dir "/content/plots_isoforest" \
  --mask-prefix "/content/isoforest_mask" \
  --engine gpu \
  --min-group 150 \
  --contamination auto \
  --plot-sample 200000 \
  --bins 60 \
  --seed 42


Polars: 1.25.2
Config: {'input': '/content/campaign_level__final.parquet', 'out_gw': '/content/campaign_level__final__no_outliers__groupwise.parquet', 'out_final': '/content/campaign_level__final__no_outliers__gw_then_global.parquet', 'plot_dir': '/content/plots_isoforest', 'mask_prefix': '/content/isoforest_mask', 'engine': 'gpu', 'min_group': 150, 'contamination': 'auto', 'plot_sample': 200000, 'bins': 60, 'seed': 42, 'num_cols': ['campaign_duration', 'num_ads', 'cost', 'impressions', 'reach', 'clicks', 'actions', 'conversion_value', 'btype_confidence'], 'log_cols': ['cost', 'impressions', 'reach', 'clicks', 'actions', 'conversion_value']}
Loaded: (22894, 11) ['row_index', 'campaign_duration', 'num_ads', 'cost', 'impressions', 'reach', 'clicks', 'actions', 'conversion_value', 'business_type', 'btype_confidence'] …

Group-wise summary:
                business_type   size  outliers        pct note
14                   unknown  11429       961   8.408435     
3          fashion_lifesty

## FEATURE ENGINEERING + TARGET TRANSFORMS

In [None]:
!pip -q install xgboost==2.1.1 joblib

In [152]:

import os, json, warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import polars as pl
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib

RNG = 42

def rmse(y, yhat):
    return float(np.sqrt(mean_squared_error(y, yhat)))

def wape(y, yhat):
    denom = np.abs(y).sum()
    return float(np.nan) if denom == 0 else float(np.abs(y - yhat).sum() / denom)

def smape(y, yhat, eps=1e-8):
    num = np.abs(yhat - y)
    den = (np.abs(y) + np.abs(yhat)).clip(min=eps)
    return float(2.0 * np.mean(num / den))

def print_metrics(tag, y_true, y_pred):
    print(f"{tag}: MAE={mean_absolute_error(y_true, y_pred):,.2f}  "
          f"RMSE={rmse(y_true, y_pred):,.2f}  R2={r2_score(y_true, y_pred):.4f}  "
          f"WAPE={wape(y_true, y_pred):.3f}  sMAPE={smape(y_true, y_pred):.3f}")

def median_smear(y_true_level, yhat_log):
    """Robust Duan smearing; returns multiplicative factor for expm1(pred_log)."""
    yhat_level = np.expm1(yhat_log).clip(min=1e-12)
    ratio = (np.clip(y_true_level, 0, None) / yhat_level)
    ratio = ratio[np.isfinite(ratio)]
    return float(np.median(ratio)) if ratio.size else 1.0


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.9/153.9 MB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [159]:
IN_PATH = "/content/campaign_level__final__no_outliers__gw_then_global.parquet"
MODEL_DIR = "/content/models_xgb_ohe"
os.makedirs(MODEL_DIR, exist_ok=True)

# Load with Polars (fast) then to pandas
df = pl.read_parquet(IN_PATH).to_pandas()
# Ensure numerics
NUMS = ["campaign_duration","num_ads","cost",
        "impressions","reach","clicks","actions","conversion_value"]
for c in NUMS:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).clip(lower=0)

assert "business_type" in df.columns, "business_type column is required."

# One-hot business_type (keep full set so train/val/test align)
bt_dummies = pd.get_dummies(df["business_type"].astype(str), prefix="bt", drop_first=False)
X = pd.concat([
        df[["campaign_duration","num_ads","cost"]].astype(float),
        bt_dummies.astype(float)
    ], axis=1)

TARGETS = ["impressions","reach","clicks","actions","conversion_value"]
print("X shape:", X.shape, "| targets:", TARGETS)
print("One-hot columns:", [c for c in X.columns if c.startswith("bt_")])


X shape: (17865, 18) | targets: ['impressions', 'reach', 'clicks', 'actions', 'conversion_value']
One-hot columns: ['bt_beauty_cosmetics', 'bt_digital_marketing', 'bt_entertainment_media', 'bt_fashion_lifestyle', 'bt_financial_services', 'bt_food_beverage', 'bt_healthcare_medical', 'bt_industrial_manufacturing', 'bt_other', 'bt_real_estate', 'bt_retail_brand', 'bt_software_development', 'bt_technology_electronics', 'bt_transportation_logistics', 'bt_unknown']


In [160]:
df

Unnamed: 0,row_index,campaign_duration,num_ads,cost,impressions,reach,clicks,actions,conversion_value,business_type,btype_confidence,is_outlier_gw,anomaly_score_gw
0,0,41.0,3,4999.9601,122352,119837,76,152363,0.0,unknown,1.0,False,0.011321
1,1,15.0,1,999.9101,29785,28714,721,1356,0.0,unknown,1.0,False,0.086499
2,2,9.0,4,17349.5898,661442,587260,32313,91756,0.0,unknown,1.0,False,0.033763
3,3,31.0,2,3863.0500,46096,42902,2630,2135,0.0,retail_brand,1.0,False,0.082247
4,4,6.0,4,999.7800,87016,86190,193,231,0.0,unknown,1.0,False,0.109463
...,...,...,...,...,...,...,...,...,...,...,...,...,...
17860,22888,14.0,2,6997.0200,235160,219229,2139,4819,0.0,real_estate,1.0,False,0.103131
17861,22889,0.0,6,3000.0000,25712,23274,2673,4407,0.0,unknown,1.0,False,0.113571
17862,22890,6.0,3,2999.9999,64957,57195,3153,6491,0.0,real_estate,1.0,False,0.114508
17863,22892,6.0,1,1399.5002,22522,20222,1369,4195,0.0,digital_marketing,1.0,False,0.112443


In [162]:
from sklearn.model_selection import train_test_split

RNG = 42

def build_safe_strata(labels_bt: pd.Series,
                      cost: pd.Series,
                      max_q: int = 10,
                      min_count: int = 2):
    """
    Try BT|cost-quantile stratification; shrink q until every stratum has >= min_count.
    Fallback to BT-only if needed; final fallback: None (no stratification).
    Returns a pd.Series or None.
    """
    labels_bt = labels_bt.astype(str)
    # Try cost quantiles combined with BT
    for q in range(max_q, 1, -1):
        try:
            cb = pd.qcut(cost.rank(method="first"), q=q, duplicates="drop").astype(str)
        except Exception:
            continue
        strata = labels_bt + "|" + cb
        vc = strata.value_counts()
        if (vc >= min_count).all():
            return strata

    # Fallback: BT only
    vc_bt = labels_bt.value_counts()
    if (vc_bt >= min_count).all():
        return labels_bt

    # Final fallback: no stratification possible
    return None

def safe_split_indices(idx: np.ndarray,
                       strat_labels: pd.Series | None,
                       test_size: float,
                       random_state: int):
    """
    Perform a split with stratification if possible; otherwise fall back to unstratified.
    """
    try:
        return train_test_split(
            idx,
            test_size=test_size,
            random_state=random_state,
            stratify=(strat_labels.values if strat_labels is not None else None),
        )
    except ValueError:
        # fallback: no stratification
        return train_test_split(idx, test_size=test_size, random_state=random_state, stratify=None)

# ----- Build initial strata on FULL dataset -----
bt_labels = df["business_type"].astype(str)
cost_vals = df["cost"].astype(float)

strat_all = build_safe_strata(bt_labels, cost_vals, max_q=10, min_count=2)

if strat_all is None:
    print("⚠️ Using UNSTRATIFIED split (could not build valid strata).")
else:
    vc = strat_all.value_counts()
    print(f"Using strata with {vc.size} groups; min group size={int(vc.min())}, max={int(vc.max())}")

# indices for the whole dataframe
idx_all = np.arange(len(df))

# 1) Train/Test split (e.g., 10% test)
train_idx, test_idx = safe_split_indices(idx_all, strat_all, test_size=0.10, random_state=RNG)

# 2) Rebuild strata *on the training subset* and split Train/Val (e.g., 10% of train into val)
strat_train = None
if strat_all is not None:
    # Recompute safe strata on the training subset (counts changed after test split)
    strat_train = build_safe_strata(bt_labels.iloc[train_idx], cost_vals.iloc[train_idx], max_q=8, min_count=2)
    if strat_train is None:
        print("⚠️ Train/Val will be UNSTRATIFIED (no valid strata on train).")
    else:
        vc_tr = strat_train.value_counts()
        print(f"Train strata: {vc_tr.size} groups; min={int(vc_tr.min())}, max={int(vc_tr.max())}")

# Now split train into train/val (~10% val of original, i.e., ~11.1% of remaining train)
train_idx, val_idx = safe_split_indices(train_idx, strat_train, test_size=0.111, random_state=RNG)

# Finally build your matrices
X_train, X_val, X_test = X.iloc[train_idx], X.iloc[val_idx], X.iloc[test_idx]
print("Split shapes →",
      "Train", X_train.shape,
      "Val",   X_val.shape,
      "Test",  X_test.shape)


Using strata with 42 groups; min group size=2, max=3631
Train strata: 42 groups; min=2, max=3269
Split shapes → Train (14293, 18) Val (1785, 18) Test (1787, 18)


In [166]:
# Use GPU if present; otherwise hist on CPU
tree_method = "gpu_hist" if os.path.exists("/usr/local/cuda") else "hist"
print("XGBoost tree_method:", tree_method)

def train_xgb_logtarget(X_tr, y_tr, X_va, y_va,
                        params_override=None,
                        num_boost_round=5000,
                        early_stopping_rounds=400):
    """
    Trains on log1p(y) with xgb.train, returns booster, best_iter, smear(TRAIN).
    """
    y_tr = y_tr.astype(float)
    y_va = y_va.astype(float)

    dtrain = xgb.DMatrix(X_tr, label=np.log1p(y_tr))
    dval   = xgb.DMatrix(X_va, label=np.log1p(y_va))

    params = {
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "eta": 0.03,
        "max_depth": 6,
        "min_child_weight": 2.0,
        "subsample": 0.9,
        "colsample_bytree": 0.8,
        "reg_lambda": 1.0,
        "tree_method": tree_method,
        "verbosity": 0,
    }
    if params_override:
        params.update(params_override)

    watchlist = [(dtrain, "train"), (dval, "val")]
    booster = xgb.train(
        params,
        dtrain,
        num_boost_round=num_boost_round,
        evals=watchlist,
        early_stopping_rounds=early_stopping_rounds,
        verbose_eval=False
    )
    best_iter = booster.best_iteration or booster.best_ntree_limit or booster.num_boosted_rounds()

    # Smearing on TRAIN
    pred_tr_log = booster.predict(dtrain, iteration_range=(0, best_iter+1))
    smear = median_smear(y_tr, pred_tr_log)

    return booster, int(best_iter), float(smear)

def predict_level(booster, X_mat, best_iter, smear=1.0):
    dmat = xgb.DMatrix(X_mat)
    yhat_log = booster.predict(dmat, iteration_range=(0, best_iter+1))
    return np.expm1(yhat_log).clip(min=0) * smear


XGBoost tree_method: gpu_hist


In [167]:
results = {}
models_dir = os.path.join(MODEL_DIR, "per_target")
os.makedirs(models_dir, exist_ok=True)

meta = {
    "features": list(X.columns),
    "targets": {},
    "tree_method": tree_method,
    "notes": "X = [campaign_duration, num_ads, cost] + one-hot(business_type); y = log1p(target). Smearing median on train."
}

for tgt in TARGETS:
    print("\n" + "="*90)
    print(f"Training target: {tgt}")

    y = df[tgt].values.astype(float)
    y_tr, y_va, y_te = y[train_idx], y[val_idx], y[test_idx]

    bst, best_iter, smear = train_xgb_logtarget(
        X_train, y_tr, X_val, y_va,
        params_override=None,
        num_boost_round=6000,
        early_stopping_rounds=500
    )

    # Validation metrics (both log space & original)
    dtr = xgb.DMatrix(X_train); dva = xgb.DMatrix(X_val); dte = xgb.DMatrix(X_test)
    pred_va_log = bst.predict(dva, iteration_range=(0, best_iter+1))
    pred_te_log = bst.predict(dte, iteration_range=(0, best_iter+1))

    # Log-space metrics (optional to inspect)
    log_mae_va = mean_absolute_error(np.log1p(y_va), pred_va_log)
    log_rmse_va = rmse(np.log1p(y_va), pred_va_log)
    log_r2_va = r2_score(np.log1p(y_va), pred_va_log)

    log_mae_te = mean_absolute_error(np.log1p(y_te), pred_te_log)
    log_rmse_te = rmse(np.log1p(y_te), pred_te_log)
    log_r2_te = r2_score(np.log1p(y_te), pred_te_log)

    # Back-transform with smearing
    yhat_va = np.expm1(pred_va_log).clip(min=0) * smear
    yhat_te = np.expm1(pred_te_log).clip(min=0) * smear

    print(f"[VAL LOG]  MAE={log_mae_va:.4f}  RMSE={log_rmse_va:.4f}  R2={log_r2_va:.4f}")
    print(f"[TEST LOG] MAE={log_mae_te:.4f}  RMSE={log_rmse_te:.4f}  R2={log_r2_te:.4f}")
    print_metrics(f"[VAL  ORIG] {tgt}", y_va, yhat_va)
    print_metrics(f"[TEST ORIG] {tgt}", y_te, yhat_te)

    # Save model + meta
    model_path = os.path.join(models_dir, f"xgb_{tgt}.json")
    bst.save_model(model_path)

    results[tgt] = {
        "best_iteration": best_iter,
        "smear": smear,
        "val_log": {"MAE": float(log_mae_va), "RMSE": float(log_rmse_va), "R2": float(log_r2_va)},
        "test_log":{"MAE": float(log_mae_te), "RMSE": float(log_rmse_te), "R2": float(log_r2_te)},
        "val_orig_metrics": {}, "test_orig_metrics": {}
    }
    # also stash orig-scale metrics
    results[tgt]["val_orig_metrics"] = {
        "MAE": float(mean_absolute_error(y_va, yhat_va)),
        "RMSE": rmse(y_va, yhat_va),
        "R2": float(r2_score(y_va, yhat_va)),
        "WAPE": wape(y_va, yhat_va),
        "sMAPE": smape(y_va, yhat_va)
    }
    results[tgt]["test_orig_metrics"] = {
        "MAE": float(mean_absolute_error(y_te, yhat_te)),
        "RMSE": rmse(y_te, yhat_te),
        "R2": float(r2_score(y_te, yhat_te)),
        "WAPE": wape(y_te, yhat_te),
        "sMAPE": smape(y_te, yhat_te)
    }

# Save meta (features + per-target smear/best_iter/metrics)
meta["targets"] = results
with open(os.path.join(MODEL_DIR, "meta.json"), "w") as f:
    json.dump(meta, f, indent=2)

# Persist the feature column order for inference
joblib.dump(list(X.columns), os.path.join(MODEL_DIR, "feature_columns.joblib"))

print("\nSaved models to:", MODEL_DIR)



Training target: impressions
[VAL LOG]  MAE=0.6408  RMSE=0.8207  R2=0.4769
[TEST LOG] MAE=0.6632  RMSE=0.8583  R2=0.4309
[VAL  ORIG] impressions: MAE=64,777.63  RMSE=118,581.17  R2=0.3344  WAPE=0.559  sMAPE=0.580
[TEST ORIG] impressions: MAE=64,000.50  RMSE=122,646.72  R2=0.3188  WAPE=0.570  sMAPE=0.595

Training target: reach
[VAL LOG]  MAE=0.6877  RMSE=0.8755  R2=0.4383
[TEST LOG] MAE=0.7067  RMSE=0.9100  R2=0.3935
[VAL  ORIG] reach: MAE=61,780.44  RMSE=115,211.65  R2=0.3045  WAPE=0.592  sMAPE=0.616
[TEST ORIG] reach: MAE=60,507.94  RMSE=118,151.73  R2=0.2931  WAPE=0.601  sMAPE=0.627

Training target: clicks
[VAL LOG]  MAE=0.9522  RMSE=1.2650  R2=0.3428
[TEST LOG] MAE=0.9519  RMSE=1.2656  R2=0.3541
[VAL  ORIG] clicks: MAE=1,899.69  RMSE=4,159.80  R2=0.1114  WAPE=0.712  sMAPE=0.781
[TEST ORIG] clicks: MAE=2,056.03  RMSE=4,487.07  R2=0.1035  WAPE=0.697  sMAPE=0.775

Training target: actions
[VAL LOG]  MAE=1.1751  RMSE=1.5316  R2=0.3481
[TEST LOG] MAE=1.1295  RMSE=1.4651  R2=0.3964
[VA

# Compare TEST rows vs PREDICTIONS (side-by-side)

In [175]:
# ============================================
# Compare TEST rows vs PREDICTIONS (side-by-side)
# ============================================

import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# --- expects these to exist from your previous cells ---
# df         : full dataframe used to build the split
# test_idx   : indices for the test split
# test_df    : df.iloc[test_idx]
# pred_df    : DataFrame of predictions with index aligned to test_df
# MODEL_DIR  : (optional) just for reference; not used here

# -------- safety checks --------
if 'test_df' not in globals() or 'pred_df' not in globals():
    raise RuntimeError("Missing `test_df` or `pred_df`. Run your inference cell first.")

TARGETS = ["impressions","reach","clicks","actions","conversion_value"]

# keep some identifiers/features for context
id_cols = [c for c in ["row_index","campaign_id","business_type","campaign_duration","num_ads","cost"] if c in test_df.columns]

# copy to avoid modifying originals
tst = test_df.copy()
prd = pred_df.copy()

# force numeric for targets
for t in TARGETS:
    if t in tst.columns:
        tst[t] = pd.to_numeric(tst[t], errors="coerce").fillna(0.0)
    if t in prd.columns:
        prd[t] = pd.to_numeric(prd[t], errors="coerce").fillna(0.0)

# ----- build side-by-side with error columns -----
comp = tst[id_cols].copy()

def smape_vec(y, yhat, eps=1e-8):
    den = (np.abs(y) + np.abs(yhat)).clip(min=eps)
    return 2.0 * np.abs(yhat - y) / den

for t in TARGETS:
    if t in tst.columns and t in prd.columns:
        y  = tst[t].values
        yp = prd[t].values
        err = yp - y
        comp[f"{t}_true"]     = y
        comp[f"{t}_pred"]     = yp
        comp[f"{t}_err"]      = err
        comp[f"{t}_abs_err"]  = np.abs(err)
        comp[f"{t}_ape"]      = np.where(np.abs(y) > 0, np.abs(err) / np.abs(y), np.nan)   # absolute % error
        comp[f"{t}_sape"]     = smape_vec(y, yp)                                            # symmetric % error

print("Preview (first 10 rows):")
display(comp.head(10))

# ----- overall & by-business_type metrics (original scale) -----
def rmse(y, yhat):
    return float(np.sqrt(mean_squared_error(y, yhat)))
def wape(y, yhat):
    denom = np.abs(y).sum()
    return float(np.nan) if denom == 0 else float(np.abs(y - yhat).sum() / denom)
def smape(y, yhat, eps=1e-8):
    den = (np.abs(y) + np.abs(yhat)).clip(min=eps)
    return float(2.0 * np.mean(np.abs(yhat - y) / den))

rows = []
def add_group_metrics(name, df_):
    for t in TARGETS:
        yt, yp = f"{t}_true", f"{t}_pred"
        if yt in df_.columns and yp in df_.columns:
            y  = df_[yt].values
            yhat = df_[yp].values
            rows.append({
                "group": name,
                "target": t,
                "n": len(df_),
                "MAE": mean_absolute_error(y, yhat),
                "RMSE": rmse(y, yhat),
                "R2": r2_score(y, yhat),
                "WAPE": wape(y, yhat),
                "sMAPE": smape(y, yhat),
            })

# overall
add_group_metrics("ALL", comp)

# by business_type (if present)
if "business_type" in comp.columns:
    for bt, g in comp.groupby("business_type", dropna=False):
        add_group_metrics(f"bt={bt}", g)

metrics_df = pd.DataFrame(rows).sort_values(["group","target"]).reset_index(drop=True)
print("\nGroup metrics:")
display(metrics_df)

# ----- save outputs -----
CSV_PATH = "/content/test_vs_pred.csv"
PARQUET_PATH = "/content/test_vs_pred.parquet"

comp.to_csv(CSV_PATH, index=False)
print(f"\nSaved CSV → {CSV_PATH}")

# Try Parquet (requires pyarrow/fastparquet; skip if not available)
try:
    comp.to_parquet(PARQUET_PATH, index=False)
    print(f"Saved Parquet → {PARQUET_PATH}")
except Exception as e:
    print(f"Parquet not saved (engine missing?): {e}")

# Helpful filters to try:
# comp.sort_values("impressions_abs_err", ascending=False).head(20)
# comp.query("business_type == 'real_estate'").head()


Preview (first 10 rows):


Unnamed: 0,row_index,business_type,campaign_duration,num_ads,cost,impressions_true,impressions_pred,impressions_err,impressions_abs_err,impressions_ape,...,actions_err,actions_abs_err,actions_ape,actions_sape,conversion_value_true,conversion_value_pred,conversion_value_err,conversion_value_abs_err,conversion_value_ape,conversion_value_sape
6459,8244,unknown,5.0,7,2329.8003,124219,48003.792969,-76215.207031,76215.207031,0.613555,...,-1239.944336,1239.944336,0.212319,0.237536,1524214.0,0.0,-1524214.0,1524214.0,1.0,2.0
4516,5733,digital_marketing,4.0,1,749.8,5341,13082.419922,7741.419922,7741.419922,1.449433,...,1506.895508,1506.895508,1.033536,0.681407,0.0,0.0,0.0,0.0,,0.0
14597,18715,unknown,2.0,5,2999.9998,9075,67410.109375,58335.109375,58335.109375,6.428111,...,6400.251953,6400.251953,2.020282,1.005045,0.0,0.0,0.0,0.0,,0.0
17198,22047,unknown,1.0,4,4999.9502,116911,49347.992188,-67563.007812,67563.007812,0.577901,...,-6014.399414,6014.399414,0.438399,0.561474,0.0,0.0,0.0,0.0,,0.0
15456,19818,unknown,30.0,3,4984.133,132650,168738.046875,36088.046875,36088.046875,0.272055,...,-7760.186523,7760.186523,0.348616,0.42221,0.0,0.0,0.0,0.0,,0.0
7156,9155,unknown,2.0,7,5997.3148,29743,54993.742188,25250.742188,25250.742188,0.848964,...,-1662.997559,1662.997559,0.261354,0.300641,0.0,0.0,0.0,0.0,,0.0
16425,21052,real_estate,30.0,3,4992.9801,98255,54601.636719,-43653.363281,43653.363281,0.444286,...,-34831.530273,34831.530273,0.812928,1.369636,0.0,0.0,0.0,0.0,,0.0
9076,11605,digital_marketing,6.0,2,799.94,16831,26679.162109,9848.162109,9848.162109,0.58512,...,632.875122,632.875122,0.893891,0.617778,0.0,0.0,0.0,0.0,,0.0
11908,15262,unknown,11.0,2,999.5101,68731,33797.335938,-34933.664062,34933.664062,0.508266,...,2161.420166,2161.420166,22.514793,1.836833,0.0,0.0,0.0,0.0,,0.0
13148,16847,unknown,9.0,3,2999.65,182510,89694.976562,-92815.023438,92815.023438,0.508548,...,-251621.130859,251621.130859,0.942542,1.782658,0.0,0.0,0.0,0.0,,0.0



Group metrics:


Unnamed: 0,group,target,n,MAE,RMSE,R2,WAPE,sMAPE
0,ALL,actions,1787,23153.123047,72248.828696,0.030790,0.853700,0.880682
1,ALL,clicks,1787,2056.030518,4487.071205,0.103450,0.696665,0.774719
2,ALL,conversion_value,1787,32531.248657,312378.926777,-0.010964,1.000000,0.120873
3,ALL,impressions,1787,64000.488281,122646.665964,0.318774,0.570310,0.594699
4,ALL,reach,1787,60507.917969,118151.531721,0.293084,0.601200,0.627118
...,...,...,...,...,...,...,...,...
75,bt=unknown,actions,959,26179.896484,76452.564117,-0.024138,0.849806,0.894520
76,bt=unknown,clicks,959,2461.085938,4823.238746,0.082996,0.709373,0.812217
77,bt=unknown,conversion_value,959,41199.491929,366515.198414,-0.012797,1.000000,0.156413
78,bt=unknown,impressions,959,74635.531250,139837.041616,0.316229,0.584961,0.613430



Saved CSV → /content/test_vs_pred.csv
Saved Parquet → /content/test_vs_pred.parquet
