In [None]:
!pip install outscraper pillow ftfy regex tqdm open_clip_torch


In [None]:
import os
import json
import random
import re
import requests
from pathlib import Path
from collections import defaultdict
from io import BytesIO
from PIL import Image
from tqdm import tqdm

from outscraper import ApiClient

import torch
import open_clip
import numpy as np

# ======================
# CONFIG
# ======================

OUTSCRAPER_API_KEY = "YOUR_KEY_HERE"  # <-- Replace!

REVIEWS_NEEDED = 5
IMAGES_NEEDED = 5
PHOTOS_TO_FETCH = 10
DEBUG = False

OUTPUT_DIR = Path("./vibecheck_output/")
IMAGES_DIR = OUTPUT_DIR / "images"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
IMAGES_DIR.mkdir(parents=True, exist_ok=True)

def debug(msg):
    if DEBUG:
        print("DEBUG:", msg)


## Load CLIP ViT-B/32

In [None]:
# Load OpenAI CLIP ViT-B/32
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

model, _, preprocess = open_clip.create_model_and_transforms(
    model_name="ViT-B-32",
    pretrained="openai"
)
model = model.to(device)
tokenizer = open_clip.get_tokenizer("ViT-B-32")

# Precompute text embeddings
TEXT_PROMPTS = {
    "food": "a close-up photo of food on a plate, highly detailed dish photograph",
    "vibe": "the interior ambience of a restaurant with lighting, cozy dining room, vibe shot"
}

with torch.no_grad():
    text_tokens = {k: tokenizer([v]).to(device) for k,v in TEXT_PROMPTS.items()}
    text_embeds = {
        k: model.encode_text(text_tokens[k]).float() for k in TEXT_PROMPTS
    }
    # Normalize embeddings
    for k in text_embeds:
        text_embeds[k] /= text_embeds[k].norm(dim=-1, keepdim=True)


In [None]:
def clip_vibe_score(image: Image.Image):
    """
    Returns:
        score_vibe, score_food, classification_label
    """
    image_tensor = preprocess(image).unsqueeze(0).to(device)

    with torch.no_grad():
        img_embed = model.encode_image(image_tensor).float()
        img_embed /= img_embed.norm(dim=-1, keepdim=True)

    score_food = (img_embed @ text_embeds["food"].T).item()
    score_vibe = (img_embed @ text_embeds["vibe"].T).item()

    if score_vibe > score_food:
        return score_vibe, score_food, "VIBE"
    else:
        return score_vibe, score_food, "FOOD"


In [None]:
def get_random_dc_restaurants(client, n=5):
    print("üìç Fetching DC restaurants...")
    results = client.google_maps_search(
        "restaurants in washington dc",
        limit=50,
        language="en",
        region="us"
    )

    if not results:
        print("‚ùå No results returned")
        return []

    places = results[0] if isinstance(results[0], list) else results

    names = [p["name"] for p in places if p.get("name")]
    names = list(set(names))

    if len(names) < n:
        print("‚ùå Not enough restaurants found")
        return []

    selected = random.sample(names, n)
    print("üéØ Selected restaurants:")
    for i, name in enumerate(selected, 1):
        print(f"  {i}. {name}")

    return selected


def get_restaurant_info(client, name):
    results = client.google_maps_search(
        f"{name} restaurant washington dc",
        limit=1, language="en", region="us"
    )
    if not results or not results[0]:
        return None

    place = results[0][0]
    place_id = place.get("place_id") or place.get("google_id") or place.get("cid")

    return {
        "name": place.get("name"),
        "place_id": place_id,
        "rating": place.get("rating"),
        "address": place.get("full_address"),
        "type": place.get("type"),
    }


def get_reviews(client, place_id, limit=5):
    res = client.google_maps_reviews(
        [place_id],
        reviews_limit=limit,
        language="en",
        sort="most_relevant"
    )
    if not res:
        return []

    data = res[0]
    for key in ["reviews_data", "reviews", "data"]:
        if key in data:
            reviews = data[key]
            break
    else:
        return []

    return reviews[:limit]


def get_photos(client, place_id, limit=10):
    res = client.google_maps_photos([place_id], photosLimit=limit)
    if not res:
        return []

    place_data = res[0]
    for key in ["photos", "photos_data", "data"]:
        if key in place_data:
            photos = place_data[key]
            return photos[:limit]

    return []


## CLIP-Based Vibe Photo Filter

In [None]:
def filter_vibe_photos_clip(photo_list, needed=5):
    kept = []

    for i, p in enumerate(photo_list):
        if len(kept) >= needed:
            break

        url = p.get("photo_url") or p.get("original") or p.get("url")
        if not url:
            continue

        try:
            img = Image.open(BytesIO(requests.get(url).content)).convert("RGB")
        except:
            continue

        score_vibe, score_food, label = clip_vibe_score(img)
        print(f"  {i+1:02d}. {label}  (vibe={score_vibe:.3f}, food={score_food:.3f})")

        if label == "VIBE":
            kept.append((p, url))

    return kept


## Main

In [None]:
def process_restaurant(client, name, index):
    print("\n" + "‚îÄ"*60)
    print(f"[{index+1}] üçΩÔ∏è {name}")
    print("‚îÄ"*60)

    info = get_restaurant_info(client, name)
    if not info:
        print("‚ùå Could not fetch restaurant info.")
        return None

    print(f"Found: {info['name']} ({info['rating']}‚≠ê)")

    # Reviews
    reviews = get_reviews(client, info["place_id"], REVIEWS_NEEDED)
    if len(reviews) < REVIEWS_NEEDED:
        print("‚ùå Not enough reviews.")
        return None

    print(f"‚úÖ Got {len(reviews)} reviews")

    # Photos
    photos = get_photos(client, info["place_id"], PHOTOS_TO_FETCH)
    if len(photos) == 0:
        print("‚ùå No photos found")
        return None

    print(f"üì∑ Found {len(photos)} photos ‚Äî filtering via CLIP...")

    # CLIP filter
    vibe_photos = filter_vibe_photos_clip(photos, needed=IMAGES_NEEDED)

    if len(vibe_photos) < IMAGES_NEEDED:
        print("‚ùå Not enough vibe photos.")
        return None

    print(f"üéâ Kept {len(vibe_photos)} vibe photos")

    # Save vibe photos locally
    safe_name = "".join(c if c.isalnum() else "_" for c in info["name"])
    saved_files = []

    for idx, (p, url) in enumerate(vibe_photos):
        filename = f"{safe_name}_{idx+1}.jpg"
        filepath = IMAGES_DIR / filename

        try:
            img_bytes = requests.get(url).content
            with open(filepath, "wb") as f:
                f.write(img_bytes)
            saved_files.append(filename)
        except:
            continue

    return {
        "restaurant": info,
        "reviews": reviews,
        "photos": saved_files
    }


In [None]:
client = ApiClient(api_key=OUTSCRAPER_API_KEY)

restaurants = get_random_dc_restaurants(client, n=5)
results = []

for i, rname in enumerate(restaurants):
    out = process_restaurant(client, rname, i)
    if out:
        results.append(out)

# Save JSON
with open(OUTPUT_DIR / "results.json", "w") as f:
    json.dump(results, f, indent=2)

print("\nDONE! Saved results and images in:", OUTPUT_DIR)
