API Key: AIzaSyBS1GuwhRtZ4zd88h2JDiEpdTyNeWQNGzE

# DABN23 ‚Äì Google Places ‚ÄúSight Finder‚Äù (Google Colab + SQLite on Google Drive)

This notebook:
1) Lets you enter a **city**
2) Finds the **top 10** places tagged as `tourist_attraction`, sorted by **number of reviews**
3) Fetches detailed fields (rating, review count, types, accessibility, opening hours, website/phone)
4) **Caches** results in a **SQLite database stored in Google Drive**
   ‚Üí fewer API calls, faster repeat runs, and persistent storage across sessions.

## What gets stored in SQLite?
- `place_id` (primary key)
- compact fields: name, address, rating, review_count, types, accessibility, opening hours, website/phone
- the full `summary_json`
- `fetched_at_utc` timestamp


## 1) Load API key (Colab Secrets)

In [1]:
import os

API_KEY = os.getenv("GOOGLE_MAPS_API_KEY")

if not API_KEY:
    raise RuntimeError("GOOGLE_MAPS_API_KEY not found in environment variables.")

print("API key loaded (length):", len(API_KEY))

API key loaded (length): 39


## 2) Mount Google Drive and configure the SQLite database

We store the SQLite file in Google Drive so it persists across sessions.

Default path:
`/content/drive/MyDrive/dabn23_places_cache.sqlite`


In [None]:
from google.colab import drive
import os

drive.mount("/content/drive")

DB_PATH = "/content/drive/MyDrive/dabn23_places_cache.sqlite"
print("SQLite DB path:", DB_PATH)
print("DB exists already?", os.path.exists(DB_PATH))

Mounted at /content/drive
SQLite DB path: /content/drive/MyDrive/dabn23_places_cache.sqlite
DB exists already? True


## 3) Imports, endpoints, and DB setup

In [None]:
import requests
import sqlite3
import json
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional

PLACES_TEXT_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
PLACES_DETAILS_URL_TMPL = "https://places.googleapis.com/v1/places/{place_id}"

# Connect DB and create tables if needed
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL;")

conn.execute("""
CREATE TABLE IF NOT EXISTS place_summary (
    place_id TEXT PRIMARY KEY,
    name TEXT,
    address TEXT,
    rating REAL,
    review_count INTEGER,
    category_primary TEXT,
    types_json TEXT,
    wheelchair_accessible_entrance INTEGER,
    opening_hours_json TEXT,
    website TEXT,
    phone TEXT,
    summary_json TEXT NOT NULL,
    fetched_at_utc TEXT NOT NULL
);
""")

conn.execute("""
CREATE TABLE IF NOT EXISTS city_top10 (
  city_key TEXT PRIMARY KEY,
  city_display TEXT,
  place_ids_json TEXT NOT NULL,
  created_at_utc TEXT NOT NULL
);
""")
conn.commit()

conn.execute("CREATE INDEX IF NOT EXISTS idx_review_count ON place_summary(review_count);")
conn.commit()

print("DB ready.")

DB ready.


## 4) Places API functions + summary builder

In [None]:
def text_search_many(query: str, language_code: str = "en", max_results: int = 20) -> List[Dict[str, Any]]:
    # Text Search (New): get a list of candidate places
    headers = {
        "Content-Type": "application/json",
        "X-Goog-Api-Key": API_KEY,
        "X-Goog-FieldMask": ",".join([
            "places.id",
            "places.displayName",
            "places.formattedAddress",
            "places.rating",
            "places.userRatingCount",
            "places.primaryType",
            "places.types",
        ])
    }
    payload = {"textQuery": query, "languageCode": language_code, "maxResultCount": max_results}
    r = requests.post(PLACES_TEXT_SEARCH_URL, json=payload, headers=headers, timeout=30)
    r.raise_for_status()
    return r.json().get("places", [])


def place_details(place_id: str, language_code: str = "en") -> Dict[str, Any]:
    # Place Details (New): fetch rich fields for one place_id
    url = PLACES_DETAILS_URL_TMPL.format(place_id=place_id)
    headers = {
        "X-Goog-Api-Key": API_KEY,
        "X-Goog-FieldMask": ",".join([
            "id",
            "displayName",
            "formattedAddress",
            "rating",
            "userRatingCount",
            "primaryType",
            "types",
            "accessibilityOptions",
            "regularOpeningHours",
            "websiteUri",
            "nationalPhoneNumber",
        ])
    }
    params = {"languageCode": language_code}
    r = requests.get(url, headers=headers, params=params, timeout=30)
    r.raise_for_status()
    return r.json()


def summarize_place(place: Dict[str, Any]) -> Dict[str, Any]:
    # Normalize Place Details JSON into a compact dictionary
    name = (place.get("displayName") or {}).get("text")
    acc = place.get("accessibilityOptions") or {}
    hours = place.get("regularOpeningHours") or {}
    weekday_desc = hours.get("weekdayDescriptions") or []

    return {
        "name": name,
        "address": place.get("formattedAddress"),
        "rating": place.get("rating"),
        "review_count": place.get("userRatingCount"),
        "category_primary": place.get("primaryType"),
        "types": place.get("types", []),
        "wheelchair_accessible_entrance": acc.get("wheelchairAccessibleEntrance"),
        "opening_hours_weekday_descriptions": weekday_desc,
        "website": place.get("websiteUri"),
        "phone": place.get("nationalPhoneNumber"),
        "place_id": place.get("id"),
    }

def normalize_city(city: str) -> str:
    return city.strip().lower()


## 5) SQLite cache helpers (load/save + TTL)

In [None]:
def normalize_city(city: str) -> str:
    # Used as the PRIMARY KEY in the city_top10 table
    return city.strip().lower()

def get_city_snapshot_place_ids(city: str) -> Optional[List[str]]:
    """
    Returns the stored list of top-10 place_ids for this city if it exists,
    otherwise returns None.
    """
    city_key = normalize_city(city)

    cur = conn.execute(
        "SELECT place_ids_json FROM city_top10 WHERE city_key = ?",
        (city_key,)
    )
    row = cur.fetchone()

    if not row:
        return None

    return json.loads(row[0])

from datetime import datetime, timezone

def save_city_snapshot_place_ids(city: str, place_ids: List[str]) -> None:
    """
    Saves the computed top-10 place_ids for a city.
    If the city already exists, it overwrites (UPSERT).
    """
    city_key = normalize_city(city)

    conn.execute(
        """
        INSERT INTO city_top10 (city_key, city_display, place_ids_json, created_at_utc)
        VALUES (?, ?, ?, ?)
        ON CONFLICT(city_key) DO UPDATE SET
          city_display = excluded.city_display,
          place_ids_json = excluded.place_ids_json,
          created_at_utc = excluded.created_at_utc
        """,
        (
            city_key,
            city.strip(),
            json.dumps(place_ids),
            datetime.now(timezone.utc).isoformat(),
        )
    )
    conn.commit()

def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def iso_to_dt(iso_str: str) -> datetime:
    return datetime.fromisoformat(iso_str)

def get_cached_summary(place_id: str, max_age_days: int = 30) -> Optional[Dict[str, Any]]:
    cur = conn.execute(
        "SELECT summary_json, fetched_at_utc FROM place_summary WHERE place_id = ?",
        (place_id,)
    )
    row = cur.fetchone()
    if not row:
        return None

    summary_json, fetched_at = row

    try:
        fetched_dt = iso_to_dt(fetched_at)
    except Exception:
        return None

    age_days = (datetime.now(timezone.utc) - fetched_dt).total_seconds() / 86400.0
    if age_days > max_age_days:
        return None

    return json.loads(summary_json)

def upsert_summary(summary: Dict[str, Any]) -> None:
    place_id = summary.get("place_id")
    if not place_id:
        return

    w = summary.get("wheelchair_accessible_entrance")
    w_int = 1 if w is True else 0 if w is False else None

    conn.execute(
        """INSERT INTO place_summary (
            place_id, name, address, rating, review_count, category_primary,
            types_json, wheelchair_accessible_entrance, opening_hours_json,
            website, phone, summary_json, fetched_at_utc
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(place_id) DO UPDATE SET
            name=excluded.name,
            address=excluded.address,
            rating=excluded.rating,
            review_count=excluded.review_count,
            category_primary=excluded.category_primary,
            types_json=excluded.types_json,
            wheelchair_accessible_entrance=excluded.wheelchair_accessible_entrance,
            opening_hours_json=excluded.opening_hours_json,
            website=excluded.website,
            phone=excluded.phone,
            summary_json=excluded.summary_json,
            fetched_at_utc=excluded.fetched_at_utc
        """,
        (
            place_id,
            summary.get("name"),
            summary.get("address"),
            summary.get("rating"),
            summary.get("review_count"),
            summary.get("category_primary"),
            json.dumps(summary.get("types", []), ensure_ascii=False),
            w_int,
            json.dumps(summary.get("opening_hours_weekday_descriptions", []), ensure_ascii=False),
            summary.get("website"),
            summary.get("phone"),
            json.dumps(summary, ensure_ascii=False),
            utc_now_iso(),
        )
    )
    conn.commit()

def get_place_summary_cached(place_id: str, language_code: str = "en") -> dict:
    cached = get_cached_summary(place_id)

    if cached is not None:
        cached["_source"] = "cache"
        return cached

    details = place_details(place_id, language_code=language_code)
    summary = summarize_place(details)
    upsert_summary(summary)
    summary["_source"] = "api"
    return summary


## 6) Top 10 tourist attractions in a city (ranked by review count) + caching

In [None]:
def top_tourist_attractions_by_reviews_static_city(
    city: str,
    n: int = 10,
    language_code: str = "en",
    search_pool: int = 50,
) -> List[Dict[str, Any]]:
    """
    Static city-level cache:
    - If the city already exists in city_top10 -> reuse the stored place_ids
    - Otherwise compute top N once, store place_ids, then reuse forever
    """

    # 1) Try to load the city snapshot (the stored list of place_ids)
    place_ids = get_city_snapshot_place_ids(city)

    if place_ids is not None:
        city_source = "city_snapshot"   # we did NOT recompute the top 10
    else:
        city_source = "computed"        # we WILL compute top 10 now

        # 2) Compute top N place_ids for the city (first time only)
        candidates = text_search_many(
            f"tourist attractions in {city}",
            language_code=language_code,
            max_results=search_pool
        )

        # Strict filter: only tourist attractions
        filtered = [
            p for p in candidates
            if "tourist_attraction" in (p.get("types") or [])
        ]

        # Sort by number of reviews (descending)
        filtered_sorted = sorted(
            filtered,
            key=lambda p: p.get("userRatingCount", 0) or 0,
            reverse=True
        )

        place_ids = [p["id"] for p in filtered_sorted[:n]]

        # 3) Save the snapshot so next time we don't recompute
        save_city_snapshot_place_ids(city, place_ids)

    # 4) Resolve place_ids -> detailed summaries (cached per place_id or fetched once)
    results: List[Dict[str, Any]] = []
    for pid in place_ids[:n]:
        s = get_place_summary_cached(pid, language_code=language_code)
        s["_city_source"] = city_source   # helpful for demo/table
        results.append(s)

    return results


## 7) Interactive UI: enter a city + click Search (shows cache vs API source)

In [None]:
import pandas as pd
import ipywidgets as widgets
from IPython.display import display

def results_to_dataframe(results: List[Dict[str, Any]]) -> pd.DataFrame:
    df = pd.DataFrame(results)
    cols = [
        "name",
        "rating",
        "review_count",
        "category_primary",
        "wheelchair_accessible_entrance",
        "address",
        "website",
        "phone",
        "place_id",
        "_city_source",
        "_source",
    ]
    cols = [c for c in cols if c in df.columns]
    return df[cols]

def print_opening_hours(summary: Dict[str, Any]) -> None:
    hours = summary.get("opening_hours_weekday_descriptions") or []
    if not hours:
        print("No opening hours available.")
        return
    for line in hours:
        print(line)

city_input = widgets.Text(
    value="Paris",
    description="City:",
    placeholder="e.g., Paris, Rome, Stockholm",
    layout=widgets.Layout(width="420px")
)

button = widgets.Button(description="Search", button_style="primary")
output = widgets.Output()

def on_button_click(_):
    with output:
        output.clear_output()
        city = city_input.value.strip()
        if not city:
            print("Please enter a city name.")
            return

        print(f"Searching top tourist attractions in {city} (ranked by review count)...")
        print(f"Cache TTL: {ttl_input.value} days\n")

        try:
            results = top_tourist_attractions_by_reviews_static_city(
                city,
                n=10,
                language_code="en",
                search_pool=50,
                cache_max_age_days=int(ttl_input.value),
            )

            if not results:
                print("No tourist attractions found (type=tourist_attraction). Try another city.")
                return

            display(results_to_dataframe(results))

            print("\nExample opening hours (top result):")
            print("-", results[0].get("name"), f"(source: {results[0].get('_source')})")
            print_opening_hours(results[0])

        except requests.HTTPError as e:
            resp = getattr(e, "response", None)
            if resp is not None:
                print("HTTPError:", resp.status_code)
                print(resp.text[:1200])
            else:
                print("HTTPError:", str(e))

button.on_click(on_button_click)

display(city_input, button, output)

Text(value='Paris', description='City:', layout=Layout(width='420px'), placeholder='e.g., Paris, Rome, Stockho‚Ä¶

Button(button_style='primary', description='Search', style=ButtonStyle())

Output()

## Step 8) Inspect city snapshots‚Äù

In [None]:
df_cities = pd.read_sql_query(
    """
    SELECT
      city_display,
      city_key,
      created_at_utc,
      place_ids_json
    FROM city_top10
    ORDER BY created_at_utc DESC
    """,
    conn
)
df_cities


Unnamed: 0,city_display,city_key,created_at_utc,place_ids_json


In [None]:
conn.execute("SELECT COUNT(*) FROM city_top10").fetchone()


(0,)

## Step 9) New Code Cell

In [None]:
# STEP 9 ‚Äî Optional: Force recompute a city's top 10 snapshot

import ipywidgets as widgets
from IPython.display import display

force_city_input = widgets.Text(
    value="Paris",
    description="City:",
    layout=widgets.Layout(width="400px")
)

force_button = widgets.Button(
    description="Force Recompute",
    button_style="warning"
)

force_output = widgets.Output()

def on_force_click(_):
    with force_output:
        force_output.clear_output()
        city = force_city_input.value.strip()

        if not city:
            print("Please enter a city name.")
            return

        print(f"Forcing recompute for {city}...\n")

        # Recompute top 10
        candidates = text_search_many(
            f"tourist attractions in {city}",
            language_code="en",
            max_results=50
        )

        filtered = [
            p for p in candidates
            if "tourist_attraction" in (p.get("types") or [])
        ]

        filtered_sorted = sorted(
            filtered,
            key=lambda p: p.get("userRatingCount", 0) or 0,
            reverse=True
        )

        place_ids = [p["id"] for p in filtered_sorted[:10]]

        # Overwrite snapshot in DB
        save_city_snapshot_place_ids(city, place_ids)

        print("City snapshot updated successfully.")

force_button.on_click(on_force_click)

display(force_city_input, force_button, force_output)


Text(value='Paris', description='City:', layout=Layout(width='400px'))



Output()

## 10) Tripadvisor (api version) (key EEFB1022B43A49F985B989549EAA7625)

In [None]:
#RETRIEVE MY IP
import urllib.request

def get_public_ip():
    try:
        # Queries a free API that returns your public IP address
        public_ip = urllib.request.urlopen('https://api.ipify.org').read().decode('utf8')
        return public_ip
    except Exception as e:
        return f"Error retrieving public IP: {e}"

print(f"Your Public IP is: {get_public_ip()}")

Your Public IP is: 35.247.187.107


In [None]:
from google.colab import userdata

TA_API_KEY = userdata.get("TRIPADVISOR_API_KEY")

if not TA_API_KEY:
    raise RuntimeError(
        "API key not found. Add TRIPADVISOR_API_KEY in Colab Secrets (üîë icon on the left) "
        "and re-run this cell."
    )

print("TripAdvisor API key loaded (length):", len(TA_API_KEY))


TripAdvisor API key loaded (length): 32


In [None]:
from google.colab import drive
import os

drive.mount("/content/drive")

TA_DB_PATH = "/content/drive/MyDrive/dabn23_tripadvisor_cache.sqlite"
print("SQLite DB path:", TA_DB_PATH)
print("DB exists already?", os.path.exists(TA_DB_PATH))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
SQLite DB path: /content/drive/MyDrive/dabn23_tripadvisor_cache.sqlite
DB exists already? True


In [None]:
import requests
import sqlite3
import json
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional

# TripAdvisor Content API v1 base
TA_SEARCH_URL  = "https://api.content.tripadvisor.com/api/v1/location/search"
TA_DETAILS_URL = "https://api.content.tripadvisor.com/api/v1/location/{location_id}/details"

# Connect and create tables (mirroring Google Places schema)
ta_conn = sqlite3.connect(TA_DB_PATH)
ta_conn.execute("PRAGMA journal_mode=WAL;")

ta_conn.execute("""
CREATE TABLE IF NOT EXISTS ta_place_summary (
    place_id      TEXT PRIMARY KEY,   -- TripAdvisor location_id
    name          TEXT,
    address       TEXT,
    rating        REAL,
    review_count  INTEGER,
    category_primary TEXT,            -- from category.name
    groups_json   TEXT,               -- attraction subcategories (JSON array)
    wheelchair_accessible_entrance INTEGER,  -- always NULL (not in TA API)
    opening_hours_json TEXT,          -- always NULL (not in TA API)
    website       TEXT,               -- TripAdvisor listing URL (web_url)
    phone         TEXT,               -- always NULL (not in TA API)
    summary_json  TEXT NOT NULL,
    fetched_at_utc TEXT NOT NULL
);
""")

ta_conn.execute("""
CREATE TABLE IF NOT EXISTS ta_city_top10 (
    city_key       TEXT PRIMARY KEY,
    city_display   TEXT,
    place_ids_json TEXT NOT NULL,
    created_at_utc TEXT NOT NULL
);
""")

ta_conn.execute("CREATE INDEX IF NOT EXISTS ta_idx_review_count ON ta_place_summary(review_count);")
ta_conn.commit()

print("TripAdvisor DB ready.")


TripAdvisor DB ready.


In [None]:
def ta_get_city_location(city: str, language: str = "en") -> Dict:
    """
    Step 1: Resolve a city name into a TripAdvisor geo entry.
    Returns a dict with location_id, name, and latLong string.
    """
    params = {
        "key":         TA_API_KEY,
        "searchQuery": city,
        "category":    "geos",       # search only for geographic locations
        "language":    language,
    }
    r = requests.get(TA_SEARCH_URL, params=params, timeout=30)
    r.raise_for_status()
    results = r.json().get("data", [])

    if not results:
        raise ValueError(f"Could not resolve city: {city}")

    # Take the first (best) geo match
    geo = results[0]
    addr = geo.get("address_obj") or {}

    # Build latLong string for the next call, if coordinates are available
    lat = geo.get("latitude")
    lon = geo.get("longitude")
    lat_long = f"{lat},{lon}" if lat and lon else None

    print(f"  City resolved ‚Üí {geo.get('name')} "
          f"(ID: {geo.get('location_id')}, latLong: {lat_long})")
    return {
        "location_id": geo.get("location_id"),
        "name":        geo.get("name"),
        "lat_long":    lat_long,
        "address":     addr.get("address_string"),
    }


def ta_search_attractions(city_geo: Dict, language: str = "en") -> List[Dict]:
    """
    Step 2: Search attractions filtered to the resolved city's coordinates.
    city_geo must be the dict returned by ta_get_city_location().
    """
    params = {
        "key":      TA_API_KEY,
        "category": "attractions",
        "language": language,
    }

    # Use latLong if available ‚Äî this pins results to the city geography
    if city_geo.get("lat_long"):
        params["latLong"] = city_geo["lat_long"]
    else:
        # Fallback: text query (less precise)
        params["searchQuery"] = city_geo["name"]

    r = requests.get(TA_SEARCH_URL, params=params, timeout=30)
    r.raise_for_status()
    return r.json().get("data", [])


def ta_location_details(location_id: str, language: str = "en") -> Dict:
    """Fetch full details for one TripAdvisor location_id."""
    url = TA_DETAILS_URL.format(location_id=location_id)
    params = {"key": TA_API_KEY, "language": language, "currency": "USD"}
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    return r.json()


def ta_summarize(place: Dict) -> Dict:
    """Normalize TripAdvisor location details into the project schema."""
    addr = place.get("address_obj") or {}
    full_address = addr.get("address_string") or ", ".join(filter(None, [
        addr.get("street1"), addr.get("city"),
        addr.get("state"), addr.get("country")
    ]))
    cat    = place.get("category") or {}
    groups = [g.get("name") for g in (place.get("groups") or []) if g.get("name")]

    return {
        "place_id":                           str(place.get("location_id", "")),
        "name":                               place.get("name"),
        "address":                            full_address,
        "rating":                             float(place["rating"]) if place.get("rating") else None,
        "review_count":                       int(place["num_reviews"]) if place.get("num_reviews") else None,
        "category_primary":                   cat.get("name"),
        "groups":                             groups,
        "wheelchair_accessible_entrance":     None,
        "opening_hours_weekday_descriptions": None,
        "phone":                              None,
        "website":                            place.get("web_url"),
    }


In [None]:
def ta_normalize_city(city: str) -> str:
    return city.strip().lower()

def ta_get_city_snapshot(city: str) -> Optional[List[str]]:
    row = ta_conn.execute(
        "SELECT place_ids_json FROM ta_city_top10 WHERE city_key = ?",
        (ta_normalize_city(city),)
    ).fetchone()
    return json.loads(row[0]) if row else None

def ta_save_city_snapshot(city: str, place_ids: List[str]) -> None:
    ta_conn.execute("""
        INSERT INTO ta_city_top10 (city_key, city_display, place_ids_json, created_at_utc)
        VALUES (?, ?, ?, ?)
        ON CONFLICT(city_key) DO UPDATE SET
            city_display   = excluded.city_display,
            place_ids_json = excluded.place_ids_json,
            created_at_utc = excluded.created_at_utc
    """, (ta_normalize_city(city), city.strip(),
          json.dumps(place_ids), datetime.now(timezone.utc).isoformat()))
    ta_conn.commit()

def ta_get_cached(place_id: str, max_age_days: int = 30) -> Optional[Dict]:
    row = ta_conn.execute(
        "SELECT summary_json, fetched_at_utc FROM ta_place_summary WHERE place_id = ?",
        (place_id,)
    ).fetchone()
    if not row:
        return None
    age = (datetime.now(timezone.utc) - datetime.fromisoformat(row[1])).total_seconds() / 86400
    return json.loads(row[0]) if age <= max_age_days else None

def ta_upsert(summary: Dict) -> None:
    ta_conn.execute("""
        INSERT INTO ta_place_summary (
            place_id, name, address, rating, review_count, category_primary,
            groups_json, wheelchair_accessible_entrance, opening_hours_json,
            website, phone, summary_json, fetched_at_utc
        ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
        ON CONFLICT(place_id) DO UPDATE SET
            name=excluded.name, address=excluded.address, rating=excluded.rating,
            review_count=excluded.review_count, category_primary=excluded.category_primary,
            groups_json=excluded.groups_json, website=excluded.website,
            summary_json=excluded.summary_json, fetched_at_utc=excluded.fetched_at_utc
    """, (
        summary["place_id"], summary["name"], summary["address"],
        summary["rating"], summary["review_count"], summary["category_primary"],
        json.dumps(summary.get("groups", []), ensure_ascii=False),
        None, None,  # wheelchair & hours ‚Üí always NULL
        summary["website"], None,  # phone ‚Üí always NULL
        json.dumps(summary, ensure_ascii=False),
        datetime.now(timezone.utc).isoformat(),
    ))
    ta_conn.commit()

def ta_get_place_cached(location_id: str, language: str = "en") -> Dict:
    cached = ta_get_cached(location_id)
    if cached:
        cached["_source"] = "cache"
        return cached
    details  = ta_location_details(location_id, language=language)
    summary  = ta_summarize(details)
    ta_upsert(summary)
    summary["_source"] = "api"
    return summary


In [None]:
def ta_top10_attractions(city: str, n: int = 10, language: str = "en") -> List[Dict]:
    """
    Fetch top-N attractions for a city, pinned geographically via latLong.
    Uses city_snapshot cache to avoid re-resolving on repeat runs.
    """
    place_ids = ta_get_city_snapshot(city)

    if place_ids is None:
        # Step 1: resolve city ‚Üí geo ID + coordinates
        city_geo = ta_get_city_location(city, language=language)

        # Step 2: search attractions pinned to those coordinates
        candidates = ta_search_attractions(city_geo, language=language)

        # Sort by review count, take top N
        candidates_sorted = sorted(
            candidates,
            key=lambda p: int(p.get("num_reviews", 0) or 0),
            reverse=True
        )
        place_ids = [str(p["location_id"]) for p in candidates_sorted[:n]]
        ta_save_city_snapshot(city, place_ids)
        city_source = "computed"
    else:
        city_source = "city_snapshot"

    # Step 3: resolve each place_id ‚Üí full details (cached)
    results = []
    for pid in place_ids[:n]:
        s = ta_get_place_cached(pid, language=language)
        s["_city_source"] = city_source
        results.append(s)

    return results


In [None]:
import pandas as pd
import ipywidgets as widgets
from IPython.display import display

def ta_results_to_df(results: List[Dict]) -> pd.DataFrame:
    df = pd.DataFrame(results)
    cols = ["name", "rating", "review_count", "category_primary", "groups",
            "wheelchair_accessible_entrance", "address", "website",
            "phone", "place_id", "_city_source", "_source"]
    return df[[c for c in cols if c in df.columns]]

city_input_ta = widgets.Text(
    value="Paris", description="City:",
    placeholder="e.g., Paris, Rome, Stockholm",
    layout=widgets.Layout(width="420px")
)
btn_ta  = widgets.Button(description="Search (TA)", button_style="warning")
out_ta  = widgets.Output()

def on_ta_search(_):
    with out_ta:
        out_ta.clear_output()
        city = city_input_ta.value.strip()
        if not city:
            print("Please enter a city name.")
            return
        print(f"Searching TripAdvisor top attractions in {city}...")
        try:
            results = ta_top10_attractions(city, n=10)
            if not results:
                print("No results found. Try another city.")
                return
            display(ta_results_to_df(results))
        except requests.HTTPError as e:
            print("HTTPError:", e.response.status_code if e.response else str(e))
            print(e.response.text[:1000] if e.response else "")

btn_ta.on_click(on_ta_search)
display(city_input_ta, btn_ta, out_ta)


Text(value='Paris', description='City:', layout=Layout(width='420px'), placeholder='e.g., Paris, Rome, Stockho‚Ä¶



Output()

## 11) Tripadvisor (selenium verison)

In [None]:
#!pip install requests beautifulsoup4 selenium lxml
#uncomment if needed

In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pandas as pd

def dismiss_cookies():
    """Handle TripAdvisor OneTrust cookie banner."""
    try:
        # Step 1: Wait for and click "Show Purposes"
        show_btn = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, "onetrust-pc-btn-handler"))
        )
        show_btn.click()
        time.sleep(1)  # Brief pause for expansion
    except:
        print("Show Purposes not found or already expanded")

    try:
        # Step 2: Wait for and click "Reject All"
        reject_btn = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, ".ot-pc-refuse-all-handler"))
        )
        reject_btn.click()
        time.sleep(2)  # Allow dismissal
        print("Cookies dismissed")
    except:
        print("Reject All not found")

def get_tripadvisor_info(place_name: str, lang: str = 'en') -> dict:
    search_url = f"https://www.tripadvisor.com/Search?q={place_name.replace(' ', '+')}&searchNear=&o=0&{lang}"
    driver.get(search_url)
    time.sleep(2)

    # Dismiss cookies immediately after load
    dismiss_cookies()

    # Proceed with search results
    try:
        # Find and click top result (adjust selector if needed via inspect)
        top_link = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "div.result a.review_count"))
        )
        top_link.click()
        time.sleep(3)

        # Extract data (refine selectors as needed)
        rating_elem = driver.find_element(By.CSS_SELECTOR, "svg[data-testid='icon-rating']")
        rating = rating_elem.get_attribute('aria-label')
        review_count = driver.find_element(By.CSS_SELECTOR, "[data-test-target='REVIEWS_HEADER_COUNT']").text

        return {
            'place': place_name,
            'rating': rating,
            'reviews': review_count,
            'url': driver.current_url
        }
    except Exception as e:
        return {'error': str(e), 'place': place_name}

In [None]:

''' FIX ANTI ROBOT '''
# Setup headless Chrome
options = webdriver.ChromeOptions()
#options.add_argument('--headless')
driver = webdriver.Chrome(options=options)

venice_info = get_tripadvisor_info("Venice")
print(venice_info)

driver.quit()


SessionNotCreatedException: Message: session not created: Chrome instance exited. Examine ChromeDriver verbose log to determine the cause.; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors#sessionnotcreatedexception
Stacktrace:
#0 0x588335ecccca <unknown>
#1 0x5883358dc682 <unknown>
#2 0x58833591a23b <unknown>
#3 0x588335914f09 <unknown>
#4 0x5883359668de <unknown>
#5 0x588335965fcc <unknown>
#6 0x58833592388f <unknown>
#7 0x588335924651 <unknown>
#8 0x588335e91119 <unknown>
#9 0x588335e94021 <unknown>
#10 0x588335e7d8d9 <unknown>
#11 0x588335e94bee <unknown>
#12 0x588335e63c50 <unknown>
#13 0x588335eb9318 <unknown>
#14 0x588335eb94e8 <unknown>
#15 0x588335ecb313 <unknown>
#16 0x7fc966608ac3 <unknown>


## Dataset overview - for allignment with tripadvisor

In [None]:
import sqlite3
import json
import pandas as pd
from IPython.display import display, HTML

# ‚îÄ‚îÄ Local path (update if your .sqlite lives elsewhere) ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
DB_PATH = "dabn23_places_cache.sqlite"

conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
print("Connected to:", DB_PATH)

Connected to: dabn23_places_cache.sqlite


In [None]:
import sqlite3

DB_PATH = "dabn23_places_cache.sqlite"

conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL;")

conn.execute("""
CREATE TABLE IF NOT EXISTS place_summary (
    place_id TEXT PRIMARY KEY,
    name TEXT,
    address TEXT,
    rating REAL,
    review_count INTEGER,
    category_primary TEXT,
    types_json TEXT,
    wheelchair_accessible_entrance INTEGER,
    opening_hours_json TEXT,
    website TEXT,
    phone TEXT,
    summary_json TEXT NOT NULL,
    fetched_at_utc TEXT NOT NULL
);
""")

conn.execute("""
CREATE TABLE IF NOT EXISTS city_top10 (
    city_key TEXT PRIMARY KEY,
    city_display TEXT,
    place_ids_json TEXT NOT NULL,
    created_at_utc TEXT NOT NULL
);
""")

conn.execute("CREATE INDEX IF NOT EXISTS idx_review_count ON place_summary(review_count);")
conn.commit()
print("DB ready.")







# ‚îÄ‚îÄ Row counts for both tables ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
tables = ["place_summary", "city_top10"]
for t in tables:
    count = conn.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0]
    print(f"  {t}: {count} rows")


DB ready.
  place_summary: 0 rows
  city_top10: 0 rows


In [None]:
tables = ["ta_place_summary", "ta_city_top10"]
for t in tables:
    count = ta_conn.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0]
    print(f"  {t}: {count} rows")

  ta_place_summary: 30 rows
  ta_city_top10: 3 rows


In [None]:
pd.read_sql_query("SELECT * FROM ta_city_top10", ta_conn)


Unnamed: 0,city_key,city_display,place_ids_json,created_at_utc
0,venice,Venice,"[""191175"", ""194252"", ""194332"", ""104591"", ""1912...",2026-02-20T13:32:19.830959+00:00
1,lund,Lund,"[""319367"", ""313571"", ""267409"", ""6748767"", ""735...",2026-02-20T13:47:38.901985+00:00
2,paris,Paris,"[""189229"", ""2323543"", ""2613395"", ""188679"", ""12...",2026-02-20T13:58:04.425641+00:00


In [None]:
pd.read_sql_query("SELECT * FROM ta_place_summary", ta_conn)


Unnamed: 0,place_id,name,address,rating,review_count,category_primary,groups_json,wheelchair_accessible_entrance,opening_hours_json,website,phone,summary_json,fetched_at_utc
0,191175,Piazza San Marco,"P.za San Marco, 30100 Venice Italy",4.5,37917,attraction,"[""Sights & Landmarks""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""191175"", ""name"": ""Piazza San Mar...",2026-02-20T13:32:20.506905+00:00
1,194252,Ponte di Rialto,"Ruga degli Orefici, 30125 Venice Italy",4.2,18029,attraction,"[""Sights & Landmarks""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""194252"", ""name"": ""Ponte di Rialt...",2026-02-20T13:32:20.889860+00:00
2,194332,Campanile di San Marco,"Sestiere San Marco 328, 30124 Venice Italy",4.6,9352,attraction,"[""Sights & Landmarks""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""194332"", ""name"": ""Campanile di S...",2026-02-20T13:32:21.240074+00:00
3,104591,Venice Beach,"Los Angeles, CA 90291",3.8,11002,attraction,"[""Nature & Parks"", ""Outdoor Activities""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""104591"", ""name"": ""Venice Beach"",...",2026-02-20T13:32:21.593761+00:00
4,191226,Basilica di San Marco,"San Marco, 328, 30124 Venice Italy",4.5,29418,attraction,"[""Sights & Landmarks""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""191226"", ""name"": ""Basilica di Sa...",2026-02-20T13:32:22.048495+00:00
5,9768024,Venice Grand Canal Mall,"Upper McKinley Rd, Taguig City, Luzon 1630 Phi...",3.9,431,attraction,"[""Shopping""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""9768024"", ""name"": ""Venice Grand ...",2026-02-20T13:32:22.331097+00:00
6,2421861,Friend in Venice Tours,"Dorsoduro, Venice Italy",5.0,861,attraction,"[""Food & Drink"", ""Classes & Workshops"", ""Tours""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""2421861"", ""name"": ""Friend in Ven...",2026-02-20T13:32:22.629079+00:00
7,266790,Venice Canals Walkway,"Washington Blvd Dell Ave & Court A, Los Angele...",4.4,2077,attraction,"[""Sights & Landmarks""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""266790"", ""name"": ""Venice Canals ...",2026-02-20T13:32:22.903366+00:00
8,191182,Scuola Grande Di San Rocco,"San Polo, 3052, 30125 Venice Italy",4.7,3138,attraction,"[""Sights & Landmarks""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""191182"", ""name"": ""Scuola Grande ...",2026-02-20T13:32:23.192767+00:00
9,104239,Muscle Beach Venice Gym,"1800 Ocean Front Walk, Los Angeles, CA 90291",3.6,1601,attraction,"[""Spas & Wellness""]",,,https://www.tripadvisor.com/Attraction_Review-...,,"{""place_id"": ""104239"", ""name"": ""Muscle Beach V...",2026-02-20T13:32:23.477549+00:00


## Peak hours data using selenium - google maps

In [None]:
#%pip install google-colab-selenium




In [None]:
# Install dependencies (run once per session)
!apt-get update -q
!apt-get install -q chromium-chromedriver
!pip install -q selenium Pillow

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from IPython.display import Image, display as ipy_display
import os

options = Options()
options.add_argument("--headless=new")            # required in Colab
options.add_argument("--no-sandbox")              # required: Colab runs as root
options.add_argument("--disable-dev-shm-usage")   # avoids memory crashes
options.add_argument("--window-size=1920,1080")   # full resolution screenshots
options.add_argument(
    "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)

driver = webdriver.Chrome(options=options)

def screenshot(filename="debug.png"):
    """Take a screenshot and display it inline in the notebook."""
    driver.save_screenshot(filename)
    ipy_display(Image(filename))

print("Driver started.")


Hit:1 https://cli.github.com/packages stable InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists...
Building dependency tree...
Reading state information...
chromium-chromedriver is already the newest version (1:85.0.4183.83-0ubuntu2.22.04.1).
0 upgraded, 0 newly install

SessionNotCreatedException: Message: session not created: Chrome instance exited. Examine ChromeDriver verbose log to determine the cause.; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors#sessionnotcreatedexception
Stacktrace:
#0 0x555dbc1bccca <unknown>
#1 0x555dbbbcc682 <unknown>
#2 0x555dbbc0a23b <unknown>
#3 0x555dbbc04f09 <unknown>
#4 0x555dbbc568de <unknown>
#5 0x555dbbc55fcc <unknown>
#6 0x555dbbc1388f <unknown>
#7 0x555dbbc14651 <unknown>
#8 0x555dbc181119 <unknown>
#9 0x555dbc184021 <unknown>
#10 0x555dbc16d8d9 <unknown>
#11 0x555dbc184bee <unknown>
#12 0x555dbc153c50 <unknown>
#13 0x555dbc1a9318 <unknown>
#14 0x555dbc1a94e8 <unknown>
#15 0x555dbc1bb313 <unknown>
#16 0x7b18379a1ac3 <unknown>


In [None]:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

def dismiss_google_consent(driver):
    """Dismiss the GDPR consent banner on Google Maps (EU only)."""
    try:
        accept_btn = WebDriverWait(driver, 8).until(
            EC.element_to_be_clickable((
                By.XPATH,
                '//button[.//span[contains(text(),"Accept all") '
                'or contains(text(),"Reject all")]]'
            ))
        )
        accept_btn.click()
        time.sleep(1)
        print("  Google consent dismissed.")
    except:
        print("  No consent popup found.")


In [None]:
import sqlite3
import json

def get_attraction_names(city: str, conn: sqlite3.Connection):
    """
    Look up stored attraction names for a city from ta_city_top10 + ta_place_summary.
    Returns a list of name strings, or None if city not found.
    """
    city_key = city.strip().lower()

    row = conn.execute(
        "SELECT place_ids_json FROM ta_city_top10 WHERE city_key = ?",
        (city_key,)
    ).fetchone()

    if not row:
        return None  # city not in DB

    place_ids = json.loads(row[0])

    if not place_ids:
        return []

    # Fetch names in the same ranked order
    placeholders = ",".join("?" * len(place_ids))
    name_map = dict(conn.execute(
        f"SELECT place_id, name FROM ta_place_summary WHERE place_id IN ({placeholders})",
        place_ids
    ).fetchall())

    # Preserve original ranking order
    return [name_map[pid] for pid in place_ids if pid in name_map]


In [None]:
from datetime import datetime

def current_hour_label() -> str:
    """
    Returns current hour in Google Maps format, e.g.:
      14:xx  ‚Üí  '2 pm'
      09:xx  ‚Üí  '9 am'
      12:xx  ‚Üí  '12 pm'
      00:xx  ‚Üí  '12 am'
    """
    now = datetime.now()
    hour = now.hour

    if hour == 0:
        return "12 am"
    elif hour < 12:
        return f"{hour} am"
    elif hour == 12:
        return "12 pm"
    else:
        return f"{hour - 12} pm"

print(f"Current hour label: '{current_hour_label()}'")


In [None]:
import re
from selenium.common.exceptions import NoSuchElementException, TimeoutException

def get_current_busyness(driver, attraction_name: str) -> int | None:
    """
    Searches Google Maps for attraction_name and returns current busyness
    as an integer percentage (e.g. 77), or None if not available.
    """
    print(f"\n  Searching: {attraction_name}")

    # --- 1. Navigate to Google Maps and search ---
    driver.get("https://www.google.com/maps")
    time.sleep(2)
    dismiss_google_consent(driver)

    # Find search bar using the exact class/attributes you provided
    search_bar = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.NAME, "q"))
    )
    search_bar.clear()
    search_bar.send_keys(attraction_name)

    # Click the search button
    search_btn = driver.find_element(By.CSS_SELECTOR, "button.mL3xi")
    search_btn.click()
    time.sleep(3)

    # --- 2. Handle disambiguation (list vs. direct result) ---
    try:
        # If we land on a results list, click the first result
        first_result = WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "a.hfpxzc"))
        )
        first_result.click()
        time.sleep(3)
        print(f"    ‚Üí Clicked top result from list")
    except TimeoutException:
        # Already on the place page directly
        print(f"    ‚Üí Landed directly on place page")

    # --- 3. Check for peak hours parent element ---
    try:
        peak_section = WebDriverWait(driver, 6).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "div.UmE4Qe"))
        )
    except TimeoutException:
        print(f"    ‚úó No peak hours data available")
        return None

    # --- 4. Find the hourly bar matching the current time ---
    target_label = current_hour_label()
    hourly_bars = peak_section.find_elements(By.CSS_SELECTOR, "div.dpoVLd")

    for bar in hourly_bars:
        aria = bar.get_attribute("aria-label") or ""
        if target_label in aria:
            # Extract percentage with regex: "77% busy at 2 pm."
            match = re.search(r"(\d+)%", aria)
            if match:
                pct = int(match.group(1))
                print(f"    ‚úì Current busyness: {pct}% (matched '{aria.strip()}')")
                return pct

    print(f"    ‚úó No bar found for '{target_label}' (place may be closed now)")
    return None


In [None]:
from datetime import datetime

def scrape_peak_hours(city: str, conn: sqlite3.Connection):
    now_label = datetime.now().strftime("%H:%M")
    results_key = f"Crowdedness_now_{now_label}"

    # Step 1: check city in DB
    print(f"Looking up '{city}' in ta_city_top10...")
    names = get_attraction_names(city, conn)

    if names is None:
        print("No Data on this city!")
        driver.quit()
        return

    print(f"Found {len(names)} attractions: {names}\n")

    # Step 2: scrape each attraction
    crowdedness = {}
    for name in names:
        pct = get_current_busyness(driver, name)
        crowdedness[name] = pct
        time.sleep(2)  # polite delay between searches

    # Step 3: wrap in the named dict and print
    output = {results_key: crowdedness}
    print(f"\n{'='*50}")
    print(f"Results ‚Äî {results_key}")
    print('='*50)
    for attraction, value in crowdedness.items():
        display_val = f"{value}%" if value is not None else "N/A"
        print(f"  {attraction:<45} {display_val}")

    driver.quit()
    print("\nDriver closed.")
    return output

# --- Run it ---
result = scrape_peak_hours("Paris", ta_conn)
drive.close()