In [None]:
# Daily Viral Places Classification (sequential by day)
# This notebook processes one day at a time from START_DATE to END_DATE.

import os
import datetime as dt
import pandas as pd
from tqdm.contrib.concurrent import thread_map
from google.cloud import storage

from banjo import utils
from utils.query import get_viral_places_query
from utils.helper import (
    upload_url_to_gcs,
    process_urls_threaded,
)



In [None]:
# Parameters
PROJECT_ID   = "myaigcp"
BUCKET_NAME  = "shiba-inu-temp"
BUCKET_ROOT  = "maps_stories_daily"

# Date range (inclusive)
START_DATE = "2025-07-01"  # YYYY-MM-DD
END_DATE   = "2025-07-03"  # YYYY-MM-DD

# Scoring weights (used by get_viral_places_query)
VIEW_WEIGHT      = 0.6
FRESHNESS_WEIGHT = 0.4

# Make GCP project discoverable, construct Storage client
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
storage_client = storage.Client(project=PROJECT_ID)



In [None]:
# Helpers for date iteration

def iter_dates(start_date: str, end_date: str):
    start = dt.datetime.strptime(start_date, "%Y-%m-%d").date()
    end   = dt.datetime.strptime(end_date,   "%Y-%m-%d").date()
    cur = start
    while cur <= end:
        yield cur
        cur = cur + dt.timedelta(days=1)



In [None]:
# Classification setup (configure as needed)
from banjo.utils.shibainu import Classification, configure_logger
from utils.prompt import PROMPT

configure_logger()

# Minimal example config; adjust provider/model as needed
classifier = Classification(
    provider_name="openai",
    model_name='gpt-4o-mini',
    input_type="video",
    provider_config={},
    processor_config={
        "processing_mode": "image_url",
        "download": True,
        "sampling_mode": "fps",
        "sampling_value": 1.0,
        "max_frames": 20,
        "image_quality": 75,
        "image_format": "JPEG",
    },
    model_parameters={
        "temperature": 0,
        "max_token": 128,
    },
    prompt=PROMPT,
)



In [None]:
# Main per-day loop

from utils.helper import parse_incident_json_broken

all_days = []  # collect daily outputs if desired
bucket = storage_client.bucket(BUCKET_NAME)

for day in iter_dates(START_DATE, END_DATE):
    ymd = day.strftime('%Y%m%d')
    print(f"\n=== Processing day {ymd} ===")

    # Per-day bucket subfolder
    bucket_folder = f"{BUCKET_ROOT}/{ymd}"

    # 1) Query data for this day
    query = get_viral_places_query(
        start_date=ymd,
        end_date=ymd,
        view_weight=VIEW_WEIGHT,
        freshness_weight=FRESHNESS_WEIGHT,
    )
    df = utils.gbq.read_gbq(
        query,
        project_id=PROJECT_ID,
        dialect="standard",
        priority="interactive",
    )

    if df.empty:
        print(f"No rows for {ymd} — skipping")
        continue

    # 2) Filter non-null URL rows
    df = df[df['media_url'].notna()].copy()
    if df.empty:
        print(f"No media_url for {ymd} — skipping")
        continue

    # 3) Upload original media URLs to GCS (keep original media_url for classification)
    urls = df['media_url'].tolist()
    ids  = df['story_snap_id'].tolist()
    df['gcs_url'] = process_urls_threaded(
        urls, ids,
        bucket_name=BUCKET_NAME,
        bucket_folder=bucket_folder,
        storage_client=storage_client,
        max_workers=10,
    )

    # 4) Classify using original media_url to avoid signing gs://
    try:
        cls_results = thread_map(classifier.classify, df['media_url'].tolist(), max_workers=5)
        df['labels'] = [classifier.get_result(r) for r in cls_results]
        df['prompt_tokens'] = [classifier.get_token_usage(r).get('prompt_tokens', 0) for r in cls_results]
        df['completion_tokens'] = [classifier.get_token_usage(r).get('completion_tokens', 0) for r in cls_results]

        # Parse labels into structured columns
        parsed = df['labels'].apply(parse_incident_json_broken)
        parsed_df = pd.DataFrame(list(parsed)) if len(parsed) else pd.DataFrame()
        if not parsed_df.empty:
            for col in parsed_df.columns:
                df[col] = parsed_df[col]
    except Exception as e:
        print(f"Classification failed for {ymd}: {e}")

    # 5) Save per-day CSV to GCS
    try:
        tmp_path = f"/tmp/{ymd}_classified.csv"
        df.to_csv(tmp_path, index=False)
        blob = bucket.blob(f"{bucket_folder}/classified.csv")
        blob.upload_from_filename(tmp_path, content_type='text/csv')
        os.unlink(tmp_path)
        print(f"Saved gs://{BUCKET_NAME}/{bucket_folder}/classified.csv")
    except Exception as e:
        print(f"Save failed for {ymd}: {e}")

    # 6) Save or collect per-day results
    all_days.append(df)

# Optional: stitch all days
df_all_days = pd.concat(all_days, ignore_index=True) if all_days else pd.DataFrame()
print(f"\nCompleted. Total rows across days: {len(df_all_days)}")

