In [6]:
import os
import re
import ast
import json
import glob
import unicodedata
import pandas as pd

pd.set_option('future.no_silent_downcasting', True)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

os.chdir('/home/kahgin/fika-prep')

### Define function

In [7]:
_SLUG_RX = re.compile(r"[^a-z0-9]+")

def norm_token(s: str) -> str:
    s = s.strip().lower().replace("&", " and ")
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = _SLUG_RX.sub("_", s)
    return re.sub(r"_+", "_", s).strip("_")

def categories_to_tokens(val):
    """
    Accepts:
      - comma string: "Learning center, Açaí shop, Science museum"
      - JSON list string: '["Learning center","Açaí shop"]'
      - Python list: ["Learning center","Açaí shop"]
      - None/NaN
    Returns list[str] of normalized tokens:
      ["learning_center","acai_shop","science_museum"]
    """
    if val is None or (isinstance(val, float) and pd.isna(val)):
        return []
    if isinstance(val, list):
        items = [str(x) for x in val if str(x).strip()]
    elif isinstance(val, str):
        val = val.strip()
        try:
            parsed = json.loads(val)
            if isinstance(parsed, list):
                items = [str(x) for x in parsed if str(x).strip()]
            else:
                items = [p.strip() for p in val.split(",") if p.strip()]
        except Exception:
            items = [p.strip() for p in val.split(",") if p.strip()]
    else:
        items = [str(val).strip()] if str(val).strip() else []
    out = []
    for x in items:
        t = norm_token(x)
        if t:
            out.append(t)
    return out

def normalize_categories_column(df: pd.DataFrame, src="categories", dst="categories"):
    """
    Converts df[src] to a JSON array string of normalized tokens in df[dst].
    Example: "Açaí shop, Science museum" -> '["acai_shop","science_museum"]'
    """
    tokens = df[src].apply(categories_to_tokens)
    df[dst] = tokens.apply(json.dumps)
    return df

def categories_json_to_list(val) -> list[str]:
    """
    Safe reader for post-clean_data categories column.
    Input is expected to be a JSON list string like '["acai_shop","science_museum"]'.
    """
    if isinstance(val, list):
        return val
    if isinstance(val, str):
        try:
            v = json.loads(val)
            if isinstance(v, list):
                return v
        except Exception:
            pass
    return []

def clean_data(filename):
    """
    Load CSV -> replace some unicode -> dedup by name -> keep SG rows ->
    rename title->name -> normalize categories -> drop unused columns.
    """
    df = pd.read_csv(filename, low_memory=False)

    # Replace common unicode variants
    df = (df.replace('\u202f', ' ', regex=True)
            .replace('\u2013', '-', regex=True)
            .replace('\u0026', '&', regex=True))

    # Deduplicate by name
    df.drop_duplicates(subset=['name'], inplace=True)

    # Keep Singapore rows only
    df = df[df['complete_address'].astype(str).str.contains('"country":"SG"', na=False)]

    # Rename columns
    df.rename(columns={'title': 'name'}, inplace=True)

    # Normalize categories to JSON array string of tokens
    df = normalize_categories_column(df, src='categories', dst='categories')

    # Drop unused columns
    df = drop_columns(df)

    return df

def drop_missing_open_hours(df):
    """Filter rows where open_hours is non-empty object."""
    return df[df['open_hours'].astype(str) != "{}"]

def drop_low_ratings(df, threshold=1.0):
    """Filter rows where review_rating >= threshold."""
    return df[df['review_rating'].astype(float) >= threshold]

def drop_low_reviews(df, threshold=5):
    """Filter rows where review_count >= threshold."""
    return df[df['review_count'].astype(float) >= threshold]

def drop_columns(df):
    """Drop unused columns from the dataframe."""
    df.drop(columns=[
        'input_id',
        'popular_times',
        'plus_code',
        'videos',
        'reviews_per_rating',
        'cid',
        'status',
        'reviews_link',
        'thumbnail',
        # 'timezone',
        'data_id',
        'reservations',
        'order_online',
        'menu',
        'owner',
        # 'complete_address',
        'user_reviews',
        'user_reviews_extended',
        'emails',
    ], inplace=True, errors='ignore')
    return df

def combine_dataframes(dfs):
    """Concat dataframes, dedup by name, print dup count."""
    combined = pd.concat(dfs, ignore_index=True)
    num_duplicates = combined['name'].duplicated().sum()
    combined = combined.drop_duplicates(subset=['name'])
    print(f"duplicate rows: {num_duplicates}")
    return combined

def save_categories(df, exclude_keyword=None, filename='../text/categories.txt'):
    tokens = []
    for val in df['categories'].dropna():
        tokens.extend(categories_json_to_list(val))

    unique = sorted(set(tokens))

    if exclude_keyword:
        ex_kw = [kw.lower() for kw in exclude_keyword]
        unique = [c for c in unique if all(kw not in c.lower() for kw in ex_kw)]

    with open(filename, 'w', encoding='utf-8') as f:
        for category in unique:
            f.write(f"{category}\n")


def update_flag_by_options(row, flag_col, keywords, about_col='about'):
    """Update boolean flag based on 'about' JSON options."""

    flag_val = row.get(flag_col, False)
    if flag_val:
        return True

    about_data = row.get(about_col)

    if pd.isna(about_data):
        return flag_val

    if isinstance(about_data, str):
        try:
            about_data = json.loads(about_data)
        except json.JSONDecodeError:
            return flag_val

    keywords = [kw.lower() for kw in keywords]

    for cat in about_data:
        for opt in cat.get('options', []):
            if not opt.get('enabled', False):
                continue
            opt_name = opt.get('name', '').lower()
            for kw in keywords:
                if kw in opt_name:
                    return True

    return flag_val

def update_flag_by_categories(row: dict, flag_col: str, target_categories: list[str]):
    """
    Sets a boolean flag to True if any target category token is present.
    Expects row['categories'] to be a JSON-array string.
    """
    # keep prior True
    if row.get(flag_col, False):
        return True

    cats = set(categories_json_to_list(row.get("categories")))
    targets = {norm_token(t) for t in target_categories}
    return len(cats & targets) > 0

def filter_exclude_categories(df, exclude_file='../text/exclude.txt'):
    """
    Remove excluded categories based on a file list.
    Operates on normalized token list; drops row if it becomes empty (unless it contains 'tourist_attraction').
    """
    try:
        with open(exclude_file, 'r', encoding='utf-8') as f:
            exclude = {norm_token(line) for line in f if line.strip()}
    except FileNotFoundError:
        print(f"Warning: {exclude_file} not found. Returning original DataFrame.")
        return df

    if not exclude:
        print("Warning: No exclude categories found. Returning original DataFrame.")
        return df

    def _filter_tokens(val):
        toks = categories_json_to_list(val)
        if "tourist_attraction" in toks:
            return json.dumps(toks)
        kept = [t for t in toks if t not in exclude]
        return json.dumps(kept) if kept else None

    df['categories'] = df['categories'].apply(_filter_tokens)
    df = df[df['categories'].notna()].reset_index(drop=True)
    return df

def remove_street_view(images):
    """
    If 'images' is a JSON string list of dicts: filter out street view.
    Otherwise return as-is.
    """
    if not isinstance(images, (list, str)):
        return images

    try:
        image_json = json.loads(images) if isinstance(images, str) else images
        filtered_images = [
            img for img in image_json
            if not any(kw in str(img.get('title', '')).lower() for kw in ['street view', '360'])
            and not any(kw in str(img.get('image', '')).lower() for kw in ['streetview'])
        ]
        return filtered_images
    except (json.JSONDecodeError, TypeError):
        return images

def map_price(price):
    """Map price strings/symbols to 1..4."""
    if pd.isna(price):
        return None
    price = str(price).strip()
    symbol_map = {'$': 1, '$$': 2, '$$$': 3, '$$$$': 4}
    if price in symbol_map:
        return symbol_map[price]
    nums = re.findall(r'\d+', price)
    if not nums:
        return None
    nums = [int(n) for n in nums]
    mid = sum(nums) / len(nums)
    if mid < 20: return 1
    if mid <= 50: return 2
    if mid <= 100: return 3
    return 4

def deprioritize_category(row, keyword):
    toks = categories_json_to_list(row.get("categories"))
    key = norm_token(keyword)
    if key not in toks:
        return json.dumps(toks)
    toks = [t for t in toks if t != key]
    toks.append(key)
    return json.dumps(toks)

def clean_images_field(images_raw):
    """Extract all image URLs from images field if it's a list of dicts."""
    if isinstance(images_raw, list):
        return [
            item["image"]
            for item in images_raw
            if isinstance(item, dict) and "image" in item and isinstance(item["image"], str)
        ]
    return []

def clean_videos_field(videos_raw):
    """Extract and clean video URLs from a string."""
    if not isinstance(videos_raw, str) or not videos_raw.strip():
        return []
    urls = re.findall(r'https://[^,\s]+', videos_raw)
    cleaned = []
    for url in urls:
        url = url.split("|")[0]
        if "=mm" in url:
            url = url.split("=mm")[0]
        cleaned.append(url)
    return cleaned

def remove_about(row, category_name):
    """Remove a category block by name from 'about' JSON."""
    if pd.isna(row) or pd.isna(category_name):
        return row
    data = json.loads(row) if isinstance(row, str) else row
    filtered = [cat for cat in data if cat.get('name') != category_name]
    return json.dumps(filtered) if isinstance(row, str) else filtered

def change_resolution(link, scale=3):
    """Scale Google image URL width/height by 'scale' if matches pattern."""
    pattern = r'(=w)(\d+)(-h)(\d+)(-k-no)$'
    replacement = lambda m: f"=w{int(m.group(2)) * scale}-h{int(m.group(4)) * scale}-k-no"
    return re.sub(pattern, replacement, link)

def change_image_resolutions(image_list):
    """Apply change_resolution to each link in list."""
    if not isinstance(image_list, list):
        return image_list
    return [change_resolution(link) for link in image_list if isinstance(link, str)]

def save_about_field(df, filename="../text/about_field.txt"):
    """Group 'about' enabled option names by category display name (raw, not tokens)."""
    grouped = {}
    for about_data in df["about"].dropna():
        if isinstance(about_data, str):
            try:
                about_data = json.loads(about_data)
            except json.JSONDecodeError:
                continue
        for cat in about_data:
            cname = cat.get("name")
            if not cname:
                continue
            grouped.setdefault(cname, set())
            for opt in cat.get("options", []):
                if opt.get("enabled"):
                    grouped[cname].add(opt.get("name", "").strip())

    with open(filename, "w", encoding="utf-8") as f:
        for cname in sorted(grouped):
            f.write(f"{cname}\n")
            for oname in sorted(grouped[cname]):
                f.write(f"- {oname}\n")
            f.write("\n")

def to_csv(df, filename):
    '''Save the dataframe to a CSV file.'''
    if not df.empty:
        df.to_csv(filename, index=False)
    else:
        print(f"No data to save to {os.path.basename(filename)}.")

### Concatenate all data

In [8]:
INPUT_DIR = 'sg'
OUTPUT_DIR = 'output'
TEXT_DIR = 'text'

# Read all CSV files in the folder
csv_files = glob.glob(os.path.join(INPUT_DIR, "*.csv"))

# Clean and combine data
dataframes = [clean_data(file) for file in csv_files]
pois = combine_dataframes(dataframes)

# print(pois.dtypes)

duplicate rows: 765


In [9]:
# Clean data
pois = drop_low_reviews(pois, threshold=5)
pois = drop_low_ratings(pois, threshold=1.5)
pois['price_range'] = pois['price_range'].apply(map_price)
pois = pois.rename(columns={"price_range": "price_level"})
# save_categories(pois, filename=os.path.join(TEXT_DIR, 'categories.txt'))
# save_about_field(pois, filename=os.path.join(TEXT_DIR, 'about_field.txt'))
pois = filter_exclude_categories(pois, exclude_file=os.path.join(TEXT_DIR, 'exclude.txt'))

# Handle images & videos
pois['images'] = pois['images'].apply(remove_street_view)
pois['images'] = pois['images'].apply(clean_images_field)
pois['images'] = pois['images'].apply(change_image_resolutions)
# pois['videos'] = pois['videos'].apply(clean_videos_field)

In [10]:
# Flags
pois['kids_friendly'] = pois.apply(update_flag_by_options, axis=1, flag_col='kids_friendly', keywords=['Good for kids'])
pois['pets_friendly'] = pois.apply(update_flag_by_options, axis=1, flag_col='pets_friendly', keywords=['Dogs allowed', 'Dogs allowed inside', 'Dogs allowed outside'])
pois['wheelchair_rental'] = pois.apply(update_flag_by_options, axis=1, flag_col='wheelchair_rental', keywords=['Wheelchair rental'])
pois['wheelchair_accessible_car_park'] = pois.apply(update_flag_by_options, axis=1, flag_col='wheelchair_accessible_car_park', keywords=['Wheelchair accessible car park'])
pois['wheelchair_accessible_entrance'] = pois.apply(update_flag_by_options, axis=1, flag_col='wheelchair_accessible_entrance', keywords=['Wheelchair accessible entrance'])
pois['wheelchair_accessible_seating'] = pois.apply(update_flag_by_options, axis=1, flag_col='wheelchair_accessible_seating', keywords=['Wheelchair accessible seating'])
pois['wheelchair_accessible_toilet'] = pois.apply(update_flag_by_options, axis=1, flag_col='wheelchair_accessible_toilet', keywords=['Wheelchair accessible toilet'])
pois['halal_food'] = pois.apply(update_flag_by_options, axis=1, flag_col='halal_food', keywords=['Halal food'])
pois['vegan_options'] = pois.apply(update_flag_by_options, axis=1, flag_col='vegan_options', keywords=['Vegan options'])
pois['vegetarian_options'] = pois.apply(update_flag_by_options, axis=1, flag_col='vegetarian_options', keywords=['Vegetarian options'])
pois['reservations_required'] = pois.apply(update_flag_by_options, axis=1, flag_col='reservations_required', keywords=['Reservations required'])
# pois['hiking'] = pois.apply(update_flag_by_options, axis=1, flag_col='hiking', keywords=['Hiking', 'Point-to-point trail', 'Trail difficulty'])
# pois['cycling'] = pois.apply(update_flag_by_options, axis=1, flag_col='cycling', keywords=['Cycling'])

pois['halal_food'] = pois.apply(update_flag_by_categories, axis=1, flag_col='halal_food', target_categories=['Halal restaurant'])
pois['vegetarian_options'] = pois.apply(update_flag_by_categories, axis=1, flag_col='vegetarian_options', target_categories=['Vegetarian restaurant', 'Vegetarian cafe and deli'])
pois['vegan_options'] = pois.apply(update_flag_by_categories, axis=1, flag_col='vegan_options', target_categories=['Vegan restaurant'])
pois['pets_friendly'] = pois.apply(update_flag_by_categories, axis=1, flag_col='pets_friendly', target_categories=['Cat cafe', 'Dog cafe'])

# Remove certain "about" field
pois['about'] = pois['about'].apply(remove_about, category_name='Atmosphere')
pois['about'] = pois['about'].apply(remove_about, category_name='Amenities')
pois['about'] = pois['about'].apply(remove_about, category_name='Dining options')
pois['about'] = pois['about'].apply(remove_about, category_name='From the business')
pois['about'] = pois['about'].apply(remove_about, category_name='Getting here')
pois['about'] = pois['about'].apply(remove_about, category_name='Offerings')
pois['about'] = pois['about'].apply(remove_about, category_name='Parking')
pois['about'] = pois['about'].apply(remove_about, category_name='Payments')
pois['about'] = pois['about'].apply(remove_about, category_name='Pets')
pois['about'] = pois['about'].apply(remove_about, category_name='Popular for')
pois['about'] = pois['about'].apply(remove_about, category_name='Recycling')
pois['about'] = pois['about'].apply(remove_about, category_name='Service options')

pois['categories'] = pois.apply(deprioritize_category, axis=1, keyword='Tourist attraction')
# to_csv(pois, f"{OUTPUT_DIR}poi.csv")

### Michelin

In [11]:
# Load and prepare Michelin data
michelin = pd.read_csv(os.path.join(OUTPUT_DIR, "michelin.csv"))
michelin["price"] = michelin["price"].apply(map_price)
michelin["images"] = michelin["images"].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else [])

# Create lookups
michelin_by_phone = michelin.dropna(subset=["phone"]).drop_duplicates(subset=["phone"]).set_index("phone")
michelin_by_name = michelin.drop_duplicates(subset=["name"]).set_index("name")

# Fill empty price_level
def get_price(row):
    # Try phone match first
    phone = row.get("phone")
    if pd.notna(phone) and phone in michelin_by_phone.index:
        return michelin_by_phone.loc[phone, "price"]
    # Fallback: name match (title case to be consistent)
    name = row.get("name", "").title()
    if name in michelin_by_name.index:
        return michelin_by_name.loc[name, "price"]
    return row.get("price_level")

pois["price_level"] = pois.apply(get_price, axis=1)

# Fill empty description
def get_description(row):
    phone = row.get("phone")
    if pd.notna(phone) and phone in michelin_by_phone.index:
        return michelin_by_phone.loc[phone, "description"]
    name = row.get("name", "").title()
    if name in michelin_by_name.index:
        return michelin_by_name.loc[name, "description"]
    return row.get("descriptions")

pois["descriptions"] = pois.apply(get_description, axis=1)

# Merge images
def to_list(s):
    if isinstance(s, list):
        return s
    if not s or not isinstance(s, str):
        return []
    try:
        return ast.literal_eval(s)
    except Exception:
        return []

def merge_images(row):
    existing = to_list(row["images"])
    phone = row.get("phone")
    name = row.get("name", "").title()

    michelin_imgs = []
    if pd.notna(phone) and phone in michelin_by_phone.index:
        michelin_imgs = michelin_by_phone.loc[phone, "images"]
    elif name in michelin_by_name.index:
        michelin_imgs = michelin_by_name.loc[name, "images"]

    return michelin_imgs + existing

pois["images"] = pois.apply(merge_images, axis=1)

# Save final data
to_csv(pois, os.path.join(OUTPUT_DIR, "poi.csv"))


In [12]:
from pathlib import Path

TEXT_DIR = Path("../text")
ATTRACTIONS_DIR = TEXT_DIR / "attractions"
ATTRACTIONS_DIR.mkdir(parents=True, exist_ok=True)

def read_set(path: Path) -> set[str]:
    if not path.exists():
        return set()
    return {
        line.strip()
        for line in path.read_text(encoding="utf-8").splitlines()
        if line.strip()
    }

def write_set(path: Path, items: set[str]) -> None:
    # Sort for deterministic diffs
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text("\n".join(sorted(items)) + ("\n" if items else ""), encoding="utf-8")

# 1) Load the universe
categories = read_set(TEXT_DIR / "categories.txt")

# 2) Load only selected root files, but all attraction files
root_files = {
    "exclude": TEXT_DIR / "exclude.txt",
    "meal": TEXT_DIR / "meal.txt",
    "accommodation": TEXT_DIR / "accommodation.txt",
}

groups: dict[str, set[str]] = {name: read_set(path) for name, path in root_files.items()}

# Load all attraction category files dynamically
for p in ATTRACTIONS_DIR.glob("*.txt"):
    groups[p.stem] = read_set(p)

# 3) Constrain everything to known categories
for k in list(groups.keys()):
    groups[k] = groups[k] & categories

# 4) Special rule for 'family' if present
# family := family ∩ categories minus nature and cultural_history
if "family" in groups:
    groups["family"] = groups["family"] - groups.get("nature", set()) - groups.get("cultural_history", set())

# 5) Compute filtered union and uniques
filter_categories = set().union(*groups.values()) if groups else set()
unique = categories - filter_categories

# 6) Write outputs
write_set(TEXT_DIR / "unique.txt", unique)

# Persist each group back to its original file path
for name, items in groups.items():
    if name in root_files:
        out_path = root_files[name]
    else:
        out_path = ATTRACTIONS_DIR / f"{name}.txt"
    write_set(out_path, items)
