# Klook Offer Curation

This notebook loads raw Klook offers, structures the relevant content, and uses the OpenAI Responses API to produce curation scores

In [None]:
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, Iterable, List

import pandas as pd
from openai import OpenAI
from IPython.display import display

# Ensure DataFrame columns like 'reason' show full content
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)

## Data Import and Structuring
Helpers for loading raw offer JSON files and shaping them into the consistent structure used downstream (`load_offers`, `extract_images`, `render_sections`, `structure_activity`).


In [None]:
OFFERS_DIR = Path("offers")


def load_offers(directory: Path) -> List[Dict[str, Any]]:
    """Load all JSON offers in the given directory."""
    offers: List[Dict[str, Any]] = []
    for path in sorted(directory.glob("*.json")):
        with path.open("r", encoding="utf-8") as fh:
            payload = json.load(fh)
        activity = payload.get("activity")
        if activity:
            offers.append({"path": str(path), "activity": activity})
    return offers


def extract_images(images: Iterable[Dict[str, Any]] | None) -> List[str]:
    """Flatten the image list into absolute URLs."""
    urls: List[str] = []
    if not images:
        return urls
    for image in images:
        if not isinstance(image, dict):
            continue
        host_url = image.get("image_url_host")
        if host_url:
            urls.append(host_url)
        nested = image.get("images")
        if isinstance(nested, list):
            for nested_image in nested:
                if isinstance(nested_image, dict):
                    nested_url = nested_image.get("image_url_host")
                    if nested_url:
                        urls.append(nested_url)
    # Preserve order while removing duplicates
    seen = set()
    unique_urls = []
    for url in urls:
        if url not in seen:
            unique_urls.append(url)
            seen.add(url)
    return unique_urls


def render_sections(section_info: Iterable[Dict[str, Any]] | None) -> str:
    """Convert section metadata into markdown text."""
    if not section_info:
        return ""
    chunks: List[str] = []
    for section in section_info:
        if not isinstance(section, dict):
            continue
        section_name = (section.get("section_name") or "").strip()
        group_blocks: List[str] = []
        for group in section.get("groups", []):
            if not isinstance(group, dict):
                continue
            group_name = (group.get("group_name") or "").strip()
            content = (group.get("content") or "").strip()
            if group_name and content:
                group_blocks.append(f"### {group_name}\n{content}")
            elif content:
                group_blocks.append(content)
        body = "\n\n".join([block for block in group_blocks if block])
        if section_name and body:
            chunks.append(f"## {section_name}\n{body}")
        elif body:
            chunks.append(body)
    return "\n\n".join([chunk for chunk in chunks if chunk])


def structure_activity(activity: Dict[str, Any], source_path: str) -> Dict[str, Any]:
    """Extract the fields needed for grading from an activity payload."""
    packages: List[Dict[str, Any]] = []
    for package in activity.get("package_list", []) or []:
        if not isinstance(package, dict):
            continue
        packages.append({
            "package_id": package.get("package_id"),
            "package_name": package.get("package_name"),
            "sections_markdown": render_sections(package.get("section_info")),
        })
    city_info = activity.get("city_info") or []
    primary_city = city_info[0] if city_info else {}
    category_info = activity.get("category_info") or {}
    status = activity.get("status") or activity.get("curation_status") or category_info.get("curation_status")

    return {
        "source_path": source_path,
        "activity_id": activity.get("activity_id"),
        "title": activity.get("title"),
        "subtitle": activity.get("subtitle"),
        "what_we_love": activity.get("what_we_love"),
        "location": activity.get("location"),
        "address": activity.get("address_desc_multilang"),
        "category": category_info.get("sub_category_name"),
        "category_detail": category_info,
        "description_markdown": render_sections(activity.get("section_info")),
        "packages": packages,
        "images": extract_images(activity.get("images")),
        "city": primary_city.get("city_name"),
        "country": primary_city.get("country_name"),
        "status": status,
        "raw": activity,
    }


## Curation via GPT
Utilities that prepare prompts, call the OpenAI Responses API, and interpret grading results (`load_api_key_from_file`, `load_api_key`, `get_client`, `summarise_packages`, `build_offer_prompt`, `collect_response_text`, `parse_json_response`, `grade_offer`, `grade_offers_parallel`).


In [None]:
SYSTEM_PROMPT = """You are a senior Luxury Escapes curation editor. Evaluate each Klook offer for suitability on our platform.
Consider title clarity, image relevance, category accuracy, description quality, and location correctness.
Return a strict JSON object with keys:
- score (0-5, integer)
- categories (array of categories that best describe the Klook activity. You can only choose from the list below)
- target_audiences (array of target audiences that best describe the Klook activity. You can only choose from the list below)
- reason (concise justification including any category recommendations or red flags).

## Categories
These are the possible categories, note each is nested in a parent category. Do not include the parent category in the array.

{
  "Wine & Dine": [
    "Fine dining",
    "Restaurants & bars",
    "Cafés",
    "High tea",
    "Food tours",
    "Wine country trips",
    "Breweries, distilleries & vineyards"
  ],
  "Top Activities": [
    "Yachts, boats & cruises",
    "Cooking classes",
    "Up in the air",
    "Outdoor activities",
    "Watersports",
    "Indoor activities",
    "Photoshoot - Travelshoot",
    "Wildlife Cruises",
    "Cinemas",
    "Golf",
    "Ski",
    "Beach & Pool Clubs",
    "School Holidays"
  ],
  "Attractions & Tickets": [
    "Theme & water parks",
    "Attraction passes",
    "Museums",
    "Zoos & aquariums",
    "Historical sites",
    "Galleries"
  ],
  "Live Events": [
    "Concerts",
    "Theatre",
    "Live sports",
    "Special Events"
  ],
  "Indulge Yourself": [
    "Spa & massage",
    "Hot springs",
    "Wellness"
  ],
  "Lux Exclusives": [
    "The best of the best"
  ],
  "Travel Essentials": [
    "Airport lounges",
    "Luggage",
    "Airport Services",
    "Water Transfers"
  ],
  "Day Tours": [
    "Guided tours",
    "Walking tours",
    "Bike tours",
    "Hop-on-hop-off",
    "Private tours"
  ],
  "Gift Inspiration": [
    "Foodie",
    "Thrill Seeker",
    "Animal Lover",
    "Spa-goer",
    "Family",
    "Aquatic Enthusiast"
  ]
}


## Target Audiences
These are the possible target audiences. Some or all can apply (it is most common for all to apply).
- Solo
- Couple
- Group
- Family
"""

In [None]:
MAX_IMAGES_TO_REVIEW = 8
MODEL_NAME = "gpt-5"
REASONING_EFFORT = "medium"
MAX_OUTPUT_TOKENS = 5000
OPENAI_API_KEY_ENV = "OPENAI_API_KEY"
ENV_PRIORITIES = [
    Path(".env"),
    Path(".openai_api_key"),
]

def load_api_key_from_file(path: Path) -> str | None:
    try:
        content = path.read_text(encoding="utf-8")
    except OSError:
        return None
    for raw_line in content.splitlines():
        line = raw_line.strip()
        if not line or line.startswith("#"):
            continue
        if "=" not in line:
            continue
        key, value = line.split("=", 1)
        if key.strip() == OPENAI_API_KEY_ENV:
            return value.strip().strip('"')
    return None


def load_api_key() -> str | None:
    value = os.getenv(OPENAI_API_KEY_ENV)
    if value:
        return value.strip()
    for env_path in ENV_PRIORITIES:
        if env_path.exists():
            candidate = load_api_key_from_file(env_path)
            if candidate:
                return candidate
    return None


_client: OpenAI | None = None


def get_client() -> OpenAI:
    """Return a shared OpenAI client, raising if the API key is missing."""
    global _client
    if _client is None:
        api_key = load_api_key()
        if not api_key:
            raise RuntimeError(
                "OpenAI API key not found. Set OPENAI_API_KEY or add it to a local .env file."
            )
        _client = OpenAI(api_key=api_key)
    return _client


def summarise_packages(packages: List[Dict[str, Any]]) -> str:
    if not packages:
        return "No packages available."
    lines: List[str] = []
    for idx, package in enumerate(packages, start=1):
        name = package.get("package_name") or f"Package {idx}"
        details = package.get("sections_markdown") or "No details supplied."
        lines.append(f"Package: {name}\n{details}")
    return "\n\n".join(lines)


def build_offer_prompt(offer: Dict[str, Any]) -> str:
    """Format the offer payload into a single prompt string."""
    lines = [
        f"Activity ID: {offer.get('activity_id')}",
        f"Title: {offer.get('title') or 'N/A'}",
        f"Subtitle: {offer.get('subtitle') or 'N/A'}",
        f"What we love: {offer.get('what_we_love') or 'N/A'}",
        f"Location (lat,long): {offer.get('location') or 'N/A'}",
        f"Address: {offer.get('address') or 'N/A'}",
        f"City: {offer.get('city') or 'N/A'}",
        f"Country: {offer.get('country') or 'N/A'}",
        f"Current category: {offer.get('category') or 'N/A'}",
        "",
        "Offer description markdown:",
        offer.get('description_markdown') or 'No description supplied.',
        "",
        "Packages:",
        summarise_packages(offer.get('packages') or []),
    ]
    return "\n".join(lines)


def collect_response_text(response: Any) -> str:
    """Extract concatenated text from a Responses API call."""
    payload = response.to_dict() if hasattr(response, "to_dict") else response
    chunks: list[str] = []
    for item in payload.get("output", []):
        for content in item.get("content", []):
            if content.get("type") == "output_text":
                chunks.append(content.get("text", ""))
    return "".join(chunks).strip()


def parse_json_response(text: str) -> Dict[str, Any]:
    """Parse the model's JSON response, tolerating surrounding text."""
    if not text:
        return {"score": None, "reason": "Empty response from model."}
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        import re
        match = re.search(r"\{.*\}", text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(0))
            except json.JSONDecodeError:
                pass
    return {"score": None, "reason": f"Failed to parse JSON: {text}"}



def normalise_str_list(value: Any) -> List[str]:
    """Return a list of strings for JSON list-or-string values."""
    if isinstance(value, list):
        return [str(item) for item in value if item is not None]
    if value in (None, ""):
        return []
    return [str(value)]

def grade_offer(offer: Dict[str, Any]) -> Dict[str, Any]:
    """Call the OpenAI Responses API to grade a single offer."""
    client = get_client()
    prompt = build_offer_prompt(offer)

    image_payload = [
        {"type": "input_image", "image_url": url}
        for url in (offer.get("images") or [])[:MAX_IMAGES_TO_REVIEW]
    ]

    offer_content = [{"type": "input_text", "text": prompt}] + image_payload
    response_id: str | None = None
    categories: List[str] = []
    target_audiences: List[str] = []

    try:
        response = client.responses.create(
            model=MODEL_NAME,
            instructions=SYSTEM_PROMPT,
            input=[{"role": "user", "content": offer_content}],
            reasoning={"effort": REASONING_EFFORT},
            max_output_tokens=MAX_OUTPUT_TOKENS,
            metadata={
                "activity_id": str(offer.get("activity_id")),
                "activity_title": offer.get("title"),
                "activity_url": f"https://www.klook.com/en-AU/activity/{offer.get('activity_id')}",
                "activity_category": offer.get("category"),
            },
        )
        response_id = getattr(response, "id", None)
        response_text = collect_response_text(response)
        parsed = parse_json_response(response_text)
        categories = normalise_str_list(parsed.get("categories"))
        target_audiences = normalise_str_list(parsed.get("target_audiences"))
    except Exception as exc:  # noqa: BLE001
        detail = getattr(exc, "response", None)
        extra_info = None
        if detail is not None:
            try:
                extra_info = detail.json()
            except Exception:  # noqa: BLE001
                if hasattr(detail, "text") and detail.text:
                    extra_info = detail.text
                elif hasattr(detail, "content") and detail.content:
                    extra_info = detail.content
        reason = f"Model call failed: {exc}"
        if extra_info is not None:
            reason = f"{reason} | {extra_info}"
        return {
            "activity_id": offer.get("activity_id"),
            "score": None,
            "reason": reason,
            "categories": categories,
            "target_audiences": target_audiences,
            "response_id": None,
        }

    score = parsed.get("score")
    try:
        score_value = float(score) if score is not None else None
    except (TypeError, ValueError):
        score_value = None
    if score_value is not None:
        score_value = max(0.0, min(5.0, score_value))

    reason = parsed.get("reason") or parsed
    if isinstance(reason, dict):
        reason = json.dumps(reason)

    return {
        "activity_id": offer.get("activity_id"),
        "score": score_value,
        "reason": reason,
        "categories": categories,
        "target_audiences": target_audiences,
        "response_id": response_id,
    }


def grade_offers_parallel(offers: List[Dict[str, Any]], max_workers: int | None = None) -> List[Dict[str, Any]]:
    """Grade offers in parallel using a thread pool."""
    if not offers:
        return []
    worker_count = max_workers or min(8, len(offers)) or 1
    results: List[Dict[str, Any]] = []
    with ThreadPoolExecutor(max_workers=worker_count) as executor:
        future_map = {executor.submit(grade_offer, offer): offer for offer in offers}
        for future in as_completed(future_map):
            offer = future_map[future]
            try:
                result = future.result()
            except Exception as exc:  # noqa: BLE001
                result = {
                    "activity_id": offer.get("activity_id"),
                    "score": None,
                    "reason": f"Unexpected error: {exc}",
                    "categories": [],
                    "target_audiences": [],
                    "response_id": None,
                }
            results.append(result)
    results.sort(key=lambda item: (item.get("activity_id"), item.get("reason")))
    return results


In [None]:
raw_offers = load_offers(OFFERS_DIR)
structured_offers = [structure_activity(item["activity"], item["path"]) for item in raw_offers]
print(f"Loaded {len(structured_offers)} offers from {OFFERS_DIR.resolve()}")
overview_records = [
    {
        "activity_id": offer.get("activity_id"),
        "title": offer.get("title"),
        "city": offer.get("city"),
        "country": offer.get("country"),
        "category": offer.get("category"),
        "status": offer.get("status"),
        "image_count": len(offer.get("images") or []),
        "package_count": len(offer.get("packages") or []),
    }
    for offer in structured_offers
]
overview_df = pd.DataFrame(overview_records)
overview_df


In [None]:
if structured_offers:
    sample_offer = structured_offers[0]
    sample_summary = {
        "activity_id": sample_offer.get("activity_id"),
        "title": sample_offer.get("title"),
        "subtitle": sample_offer.get("subtitle"),
        "what_we_love": sample_offer.get("what_we_love"),
        "location": sample_offer.get("location"),
        "category": sample_offer.get("category"),
        "images": sample_offer.get("images"),
        "description_markdown": sample_offer.get("description_markdown"),
    }
    pd.Series(sample_summary)
else:
    print("No offers found.")


In [None]:
if structured_offers:
    sample_packages = pd.DataFrame(structured_offers[0].get("packages") or [])
    sample_packages


In [None]:
offers_to_grade = [
    offer for offer in structured_offers
    if (offer.get("status") or "").upper() != "CURATED"
]
print(f"Queued {len(offers_to_grade)} offers for grading.")


In [None]:
if offers_to_grade:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        print("OPENAI_API_KEY is not set in this kernel session. Skipping grading.")
    else:
        grading_results = grade_offers_parallel(offers_to_grade)
        results_df = pd.DataFrame(grading_results)
        results_df["activity_url"] = results_df["activity_id"].apply(
            lambda x: f"https://www.klook.com/en-AU/activity/{x}" if pd.notna(x) else None
        )
        results_df["log_url"] = results_df["response_id"].apply(
            lambda resp: f"https://platform.openai.com/logs/{resp}"
            if isinstance(resp, str) and resp
            else None
        )
        results_df = results_df[["activity_id", "activity_url", "categories", "target_audiences", "score", "reason", "log_url"]]
        display(results_df)
else:
    print("No offers require grading.")


## Export to CSV

In [None]:
if 'results_df' in locals():
    export_path = Path('graded_offers.csv')
    results_df.to_csv(export_path, index=False)
    print(f'Exported results to {export_path.resolve()}')
else:
    print('results_df is not defined. Run the grading cell first.')
