In [1]:
from __future__ import annotations

import os, json, time, math, shutil
from pathlib import Path
from typing import Optional, Dict, Any, Tuple, Set

import numpy as np
import pandas as pd
import pyarrow.parquet as pq

# ====== Repo root của bạn (đang chạy ở /mnt/d/community-detection) ======
PROJECT_ROOT = Path("/mnt/d/community-detection").resolve()

DATA_DIR = PROJECT_ROOT / "data"
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

# Thư mục output "sạch" dùng về sau
DATA_CLEARED_DIR = PROCESSED_DIR / "data_cleared"

# Thư mục tạm trên filesystem Linux (quan trọng để tránh corrupt trên /mnt/d)
LINUX_TMP_DIR = Path.home() / "cd_parquet_tmp"
LINUX_TMP_DIR.mkdir(parents=True, exist_ok=True)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("DATA_DIR:", DATA_DIR)
print("DATA_CLEARED_DIR:", DATA_CLEARED_DIR)
print("LINUX_TMP_DIR:", LINUX_TMP_DIR)


PROJECT_ROOT: /mnt/d/community-detection
DATA_DIR: /mnt/d/community-detection/data
DATA_CLEARED_DIR: /mnt/d/community-detection/data/processed/data_cleared
LINUX_TMP_DIR: /home/dtnghia/cd_parquet_tmp


In [2]:
def parquet_validate(path: Path, read_rows: int = 5) -> Tuple[bool, str]:
    """
    Validate Parquet bằng cách:
    1) đọc metadata (ParquetFile)
    2) đọc thử vài dòng bằng pandas
    """
    try:
        pf = pq.ParquetFile(path)
        _ = pf.metadata.num_rows
    except Exception as e:
        return False, f"metadata fail: {type(e).__name__}: {e}"

    try:
        df_head = pd.read_parquet(path, engine="pyarrow").head(read_rows)
        _ = df_head.shape
    except Exception as e:
        return False, f"read fail: {type(e).__name__}: {e}"

    return True, "ok"


def parquet_write_safe(
    df: pd.DataFrame,
    final_path: Path,
    tmp_root: Path = LINUX_TMP_DIR,
    compression: str = "zstd",   # đổi sang zstd để tránh snappy-related pain khi file bị gián đoạn
) -> None:
    """
    1) ghi vào tmp (Linux FS) trước
    2) validate tmp
    3) copy sang final_path (/mnt/d/...)
    4) validate final_path
    """
    final_path.parent.mkdir(parents=True, exist_ok=True)

    tmp_path = tmp_root / f"{final_path.name}.tmp_{int(time.time())}.parquet"

    # Ghi tmp
    df.to_parquet(tmp_path, engine="pyarrow", index=False, compression=compression)

    ok, msg = parquet_validate(tmp_path)
    if not ok:
        raise OSError(f"[TMP PARQUET CORRUPT] {tmp_path} | {msg}")

    # Copy sang đích
    shutil.copyfile(tmp_path, final_path)

    ok2, msg2 = parquet_validate(final_path)
    if not ok2:
        raise OSError(f"[FINAL PARQUET CORRUPT] {final_path} | {msg2}")

    # cleanup tmp
    try:
        tmp_path.unlink(missing_ok=True)
    except Exception:
        pass

    print(f"[OK] wrote+validated: {final_path} | rows={len(df):,} cols={df.shape[1]}")


In [3]:
CFG: Dict[str, Any] = {
    "run": {
        "seed": 42,
    },
    "preprocess": {
        "min_checkins": 10,
        "min_degree": 3,
        "iterative_filter": True,
        "lat_range": [-90.0, 90.0],
        "lon_range": [-180.0, 180.0],
    },
    "features": {
        "log1p_counts": True,
        "standardize": True,
    },
    "lbsn2vec": {
        "snapshot": "old",     # old/new
        "tier": "curated",     # curated/raw (mình rebuild theo curated)
        "sample_frac": 1.0,    # đặt 1.0 để chạy full; 0.1 nếu cần test nhanh
        "chunksize": 2_000_000,
    },
    "brightkite": {
        "sample_frac": 1.0,
        "chunksize": 2_000_000,
    }
}

np.random.seed(int(CFG["run"]["seed"]))
print(json.dumps(CFG, indent=2))


{
  "run": {
    "seed": 42
  },
  "preprocess": {
    "min_checkins": 10,
    "min_degree": 3,
    "iterative_filter": true,
    "lat_range": [
      -90.0,
      90.0
    ],
    "lon_range": [
      -180.0,
      180.0
    ]
  },
  "features": {
    "log1p_counts": true,
    "standardize": true
  },
  "lbsn2vec": {
    "snapshot": "old",
    "tier": "curated",
    "sample_frac": 1.0,
    "chunksize": 2000000
  },
  "brightkite": {
    "sample_frac": 1.0,
    "chunksize": 2000000
  }
}


In [4]:
PATHS = {
    "lbsn2vec": {
        "root": DATA_DIR / "LBSN2Vec",
        "friendship_old": DATA_DIR / "LBSN2Vec" / "dataset_WWW_friendship_old.txt",
        "friendship_new": DATA_DIR / "LBSN2Vec" / "dataset_WWW_friendship_new.txt",
        "checkins_curated": DATA_DIR / "LBSN2Vec" / "dataset_WWW_Checkins_anonymized.txt",
        "poi": DATA_DIR / "LBSN2Vec" / "raw_POIs.txt",
    },
    "brightkite": {
        "root": DATA_DIR / "Brightkite",
        "edges": DATA_DIR / "Brightkite" / "Brightkite_edges.txt",
        "checkins": DATA_DIR / "Brightkite" / "Brightkite_totalCheckins.txt",
    }
}

for ds, p in PATHS.items():
    print("\n==", ds)
    for k, v in p.items():
        print(f"{k:>15}:", v, "| exists=", v.exists())



== lbsn2vec
           root: /mnt/d/community-detection/data/LBSN2Vec | exists= True
 friendship_old: /mnt/d/community-detection/data/LBSN2Vec/dataset_WWW_friendship_old.txt | exists= True
 friendship_new: /mnt/d/community-detection/data/LBSN2Vec/dataset_WWW_friendship_new.txt | exists= True
checkins_curated: /mnt/d/community-detection/data/LBSN2Vec/dataset_WWW_Checkins_anonymized.txt | exists= True
            poi: /mnt/d/community-detection/data/LBSN2Vec/raw_POIs.txt | exists= True

== brightkite
           root: /mnt/d/community-detection/data/Brightkite | exists= True
          edges: /mnt/d/community-detection/data/Brightkite/Brightkite_edges.txt | exists= True
       checkins: /mnt/d/community-detection/data/Brightkite/Brightkite_totalCheckins.txt | exists= True


In [5]:
def read_edges_two_cols(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, sep=r"\s+", header=None, usecols=[0, 1], dtype=str, engine="python")
    df.columns = ["u", "v"]
    df["u"] = df["u"].astype(str)
    df["v"] = df["v"].astype(str)
    return df

def read_pois_minimal(path: Path) -> pd.DataFrame:
    poi = pd.read_csv(path, sep=r"\s+", header=None, usecols=[0, 1, 2], dtype=str, engine="python")
    poi.columns = ["venue_id", "lat", "lon"]
    poi["venue_id"] = poi["venue_id"].astype(str)
    poi["lat"] = pd.to_numeric(poi["lat"], errors="coerce")
    poi["lon"] = pd.to_numeric(poi["lon"], errors="coerce")
    return poi

def parse_lbsn_curated_checkins_9col_chunk(chunk: pd.DataFrame) -> pd.DataFrame:
    if chunk.shape[1] < 9:
        raise ValueError(f"Expected >=9 columns, got {chunk.shape}")

    df = chunk.iloc[:, :9].copy()
    df.columns = ["user_id", "venue_id", "dow", "mon", "day", "time", "tz", "year", "tz_offset_min"]

    ts_str = (
        df["dow"].astype(str) + " " +
        df["mon"].astype(str) + " " +
        df["day"].astype(str) + " " +
        df["time"].astype(str) + " " +
        df["tz"].astype(str) + " " +
        df["year"].astype(str)
    )
    ts = pd.to_datetime(ts_str, format="%a %b %d %H:%M:%S %z %Y", errors="coerce")

    out = pd.DataFrame({
        "user_id": df["user_id"].astype(str),
        "venue_id": df["venue_id"].astype(str),
        "ts": ts,
    })
    return out

def parse_snap_checkins_chunk(chunk: pd.DataFrame) -> pd.DataFrame:
    """
    Brightkite/Gowalla thường: user_id, ts, lat, lon, location_id (>=5 cols).
    Mình giữ 5 cột đầu nếu có.
    """
    if chunk.shape[1] < 4:
        raise ValueError(f"SNAP chunk has too few cols: {chunk.shape}")

    if chunk.shape[1] >= 5:
        sub = chunk.iloc[:, :5].copy()
        sub.columns = ["user_id", "ts", "lat", "lon", "venue_id"]
    else:
        sub = chunk.iloc[:, :4].copy()
        sub.columns = ["user_id", "ts", "lat", "lon"]
        sub["venue_id"] = "NA"

    sub["user_id"] = sub["user_id"].astype(str)
    sub["venue_id"] = sub["venue_id"].astype(str)
    sub["ts"] = pd.to_datetime(sub["ts"], errors="coerce")
    sub["lat"] = pd.to_numeric(sub["lat"], errors="coerce")
    sub["lon"] = pd.to_numeric(sub["lon"], errors="coerce")
    return sub[["user_id", "ts", "lat", "lon", "venue_id"]]


In [6]:
def lbsn_step1_load_parse(cfg: Dict[str, Any]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    c = cfg["lbsn2vec"]
    snapshot = c["snapshot"]
    sample_frac = float(c["sample_frac"])
    chunksize = int(c["chunksize"])
    seed = int(cfg["run"]["seed"])

    edges_path = PATHS["lbsn2vec"]["friendship_old"] if snapshot == "old" else PATHS["lbsn2vec"]["friendship_new"]
    checkins_path = PATHS["lbsn2vec"]["checkins_curated"]
    poi_path = PATHS["lbsn2vec"]["poi"]

    # 1) edges
    edges_all = read_edges_two_cols(edges_path)

    # 2) sample users (optional)
    keep_users: Optional[Set[str]] = None
    if sample_frac < 1.0:
        users = set()
        for chunk in pd.read_csv(checkins_path, sep=r"\s+", header=None, dtype=str, engine="python", chunksize=chunksize):
            users.update(chunk.iloc[:, 0].astype(str).unique().tolist())
        users = np.array(list(users), dtype=object)
        rng = np.random.default_rng(seed)
        rng.shuffle(users)
        n = int(np.ceil(len(users) * sample_frac))
        keep_users = set(users[:n].tolist())
        print(f"[LBSN C1] sampled users: {len(keep_users):,} / total≈{len(users):,}")

    if keep_users is not None:
        edges_raw = edges_all[edges_all["u"].isin(keep_users) & edges_all["v"].isin(keep_users)].copy()
    else:
        edges_raw = edges_all

    print(f"[LBSN C1] edges_raw: {edges_raw.shape}")

    # 3) checkins chunked -> parse -> filter users
    pieces = []
    for chunk in pd.read_csv(checkins_path, sep=r"\s+", header=None, dtype=str, engine="python", chunksize=chunksize):
        if keep_users is not None:
            chunk = chunk[chunk.iloc[:, 0].astype(str).isin(keep_users)]
        if len(chunk) == 0:
            continue
        pieces.append(parse_lbsn_curated_checkins_9col_chunk(chunk))

    checkins_core = pd.concat(pieces, ignore_index=True) if pieces else pd.DataFrame(columns=["user_id","venue_id","ts"])
    print(f"[LBSN C1] checkins_core: {checkins_core.shape}")

    # 4) POI join
    pois = read_pois_minimal(poi_path)
    if len(checkins_core) > 0:
        needed = set(checkins_core["venue_id"].unique().tolist())
        pois = pois[pois["venue_id"].isin(needed)].copy()

    checkins_raw = checkins_core.merge(pois, on="venue_id", how="left")
    checkins_raw["lat"] = pd.to_numeric(checkins_raw["lat"], errors="coerce")
    checkins_raw["lon"] = pd.to_numeric(checkins_raw["lon"], errors="coerce")

    print(f"[LBSN C1] checkins_raw: {checkins_raw.shape} | ts_ok={checkins_raw['ts'].notna().mean():.3f} lat_ok={checkins_raw['lat'].notna().mean():.3f} lon_ok={checkins_raw['lon'].notna().mean():.3f}")
    return edges_raw, checkins_raw

edges_raw_lbsn, checkins_raw_lbsn = lbsn_step1_load_parse(CFG)
edges_raw_lbsn.head(), checkins_raw_lbsn.head()


[LBSN C1] edges_raw: (363704, 2)
[LBSN C1] checkins_core: (22809624, 3)
[LBSN C1] checkins_raw: (22809624, 5) | ts_ok=1.000 lat_ok=1.000 lon_ok=1.000


(    u       v
 0  15  595326
 1  19      54
 2  19    1061
 3  19    1356
 4  19    1668,
    user_id                  venue_id                        ts        lat  \
 0   822121  4b4b87b5f964a5204a9f26e3 2012-04-03 18:00:07+00:00  41.029717   
 1   208842  4b4606f2f964a520751426e3 2012-04-03 18:00:08+00:00  30.270786   
 2   113817  4b4bade2f964a520cfa326e3 2012-04-03 18:00:09+00:00  40.436712   
 3    14732  4c143cada5eb76b0dc7dc1b7 2012-04-03 18:00:09+00:00  31.188807   
 4  1397630  4e88cf4ed22d53877981fdab 2012-04-03 18:00:09+00:00  19.399745   
 
          lon  
 0  28.974420  
 1 -97.753153  
 2 -79.990132  
 3 -81.376461  
 4 -99.102595  )

In [7]:
def brightkite_step1_load_parse(cfg: Dict[str, Any]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    c = cfg["brightkite"]
    sample_frac = float(c["sample_frac"])
    chunksize = int(c["chunksize"])
    seed = int(cfg["run"]["seed"])

    edges_path = PATHS["brightkite"]["edges"]
    checkins_path = PATHS["brightkite"]["checkins"]

    edges_all = read_edges_two_cols(edges_path)

    # sample users from checkins (optional)
    keep_users: Optional[Set[str]] = None
    if sample_frac < 1.0:
        users = set()
        for chunk in pd.read_csv(checkins_path, sep=r"\s+", header=None, dtype=str, engine="python", chunksize=chunksize):
            users.update(chunk.iloc[:, 0].astype(str).unique().tolist())
        users = np.array(list(users), dtype=object)
        rng = np.random.default_rng(seed)
        rng.shuffle(users)
        n = int(np.ceil(len(users) * sample_frac))
        keep_users = set(users[:n].tolist())
        print(f"[BK C1] sampled users: {len(keep_users):,} / total≈{len(users):,}")

    if keep_users is not None:
        edges_raw = edges_all[edges_all["u"].isin(keep_users) & edges_all["v"].isin(keep_users)].copy()
    else:
        edges_raw = edges_all
    print(f"[BK C1] edges_raw: {edges_raw.shape}")

    pieces = []
    for chunk in pd.read_csv(checkins_path, sep=r"\s+", header=None, dtype=str, engine="python", chunksize=chunksize):
        if keep_users is not None:
            chunk = chunk[chunk.iloc[:, 0].astype(str).isin(keep_users)]
        if len(chunk) == 0:
            continue
        pieces.append(parse_snap_checkins_chunk(chunk))

    checkins_raw = pd.concat(pieces, ignore_index=True) if pieces else pd.DataFrame(columns=["user_id","ts","lat","lon","venue_id"])
    print(f"[BK C1] checkins_raw: {checkins_raw.shape} | ts_ok={checkins_raw['ts'].notna().mean():.3f} lat_ok={checkins_raw['lat'].notna().mean():.3f} lon_ok={checkins_raw['lon'].notna().mean():.3f}")
    return edges_raw, checkins_raw

edges_raw_bk, checkins_raw_bk = brightkite_step1_load_parse(CFG)
edges_raw_bk.head(), checkins_raw_bk.head()


[BK C1] edges_raw: (428156, 2)
[BK C1] checkins_raw: (4747287, 5) | ts_ok=1.000 lat_ok=1.000 lon_ok=1.000


(   u  v
 0  0  1
 1  0  2
 2  0  3
 3  0  4
 4  0  5,
   user_id                        ts        lat         lon  \
 0       0 2010-10-17 01:48:53+00:00  39.747652 -104.992510   
 1       0 2010-10-16 06:02:04+00:00  39.891383 -105.070814   
 2       0 2010-10-16 03:48:54+00:00  39.891077 -105.068532   
 3       0 2010-10-14 18:25:51+00:00  39.750469 -104.999073   
 4       0 2010-10-14 00:21:47+00:00  39.752713 -104.996337   
 
                                    venue_id  
 0          88c46bf20db295831bd2d1718ad7e6f5  
 1          7a0f88982aa015062b95e3b4843f9ca2  
 2          dd7cd3d264c2d063832db506fba8bf79  
 3  9848afcc62e500a01cf6fbf24b797732f8963683  
 4          2ef143e12038c870038df53e0478cefc  )

In [8]:
def make_undirected_dedup(edges: pd.DataFrame) -> pd.DataFrame:
    u = edges["u"].astype(str).to_numpy()
    v = edges["v"].astype(str).to_numpy()
    u2 = np.where(u <= v, u, v)
    v2 = np.where(u <= v, v, u)
    out = pd.DataFrame({"u": u2, "v": v2})
    out = out[out["u"] != out["v"]]
    out = out.drop_duplicates(["u", "v"]).reset_index(drop=True)
    return out

def clean_checkins(chk: pd.DataFrame, cfg: dict) -> pd.DataFrame:
    lat_lo, lat_hi = cfg["preprocess"]["lat_range"]
    lon_lo, lon_hi = cfg["preprocess"]["lon_range"]

    out = chk.copy()
    out["user_id"] = out["user_id"].astype(str)
    out["venue_id"] = out["venue_id"].astype(str)
    out["ts"] = pd.to_datetime(out["ts"], errors="coerce")
    out["lat"] = pd.to_numeric(out["lat"], errors="coerce")
    out["lon"] = pd.to_numeric(out["lon"], errors="coerce")

    out = out.dropna(subset=["user_id", "venue_id", "ts", "lat", "lon"])
    out = out[(out["lat"] >= lat_lo) & (out["lat"] <= lat_hi) & (out["lon"] >= lon_lo) & (out["lon"] <= lon_hi)]

    # normalize tz-aware -> UTC naive
    try:
        if hasattr(out["ts"].dt, "tz") and out["ts"].dt.tz is not None:
            out["ts"] = out["ts"].dt.tz_convert("UTC").dt.tz_localize(None)
    except Exception:
        out["ts"] = pd.to_datetime(out["ts"], errors="coerce")

    out = out.dropna(subset=["ts"]).reset_index(drop=True)
    return out[["user_id", "ts", "lat", "lon", "venue_id"]].copy()


def step2_clean(edges_raw: pd.DataFrame, checkins_raw: pd.DataFrame, cfg: dict):
    edges_clean = make_undirected_dedup(edges_raw)
    checkins_clean = clean_checkins(checkins_raw, cfg)
    return edges_clean, checkins_clean

edges_clean_lbsn, checkins_clean_lbsn = step2_clean(edges_raw_lbsn, checkins_raw_lbsn, CFG)
edges_clean_bk, checkins_clean_bk = step2_clean(edges_raw_bk, checkins_raw_bk, CFG)

print("[LBSN C2]", edges_clean_lbsn.shape, checkins_clean_lbsn.shape)
print("[BK   C2]", edges_clean_bk.shape, checkins_clean_bk.shape)


[LBSN C2] (363699, 2) (22809619, 5)
[BK   C2] (214078, 2) (4747172, 5)


In [9]:
def degree_from_edges(edges: pd.DataFrame) -> pd.Series:
    u = edges["u"].astype(str)
    v = edges["v"].astype(str)
    return pd.concat([u, v]).value_counts()

def filter_induced_once(edges: pd.DataFrame, chk: pd.DataFrame, k: int, d: int):
    ccount = chk["user_id"].astype(str).value_counts()
    deg = degree_from_edges(edges)

    users_ok = ccount[ccount >= k].index
    deg_ok = deg[deg >= d].index
    v_keep = pd.Index(users_ok).intersection(pd.Index(deg_ok))

    edges2 = edges[edges["u"].isin(v_keep) & edges["v"].isin(v_keep)].copy().reset_index(drop=True)
    chk2 = chk[chk["user_id"].isin(v_keep)].copy().reset_index(drop=True)

    return v_keep, edges2, chk2

def step3_filter(edges_clean: pd.DataFrame, checkins_clean: pd.DataFrame, cfg: dict):
    k = int(cfg["preprocess"].get("min_checkins", 10))
    d = int(cfg["preprocess"].get("min_degree", 3))
    iterative = bool(cfg["preprocess"].get("iterative_filter", True))

    edges_tmp = edges_clean.copy()
    chk_tmp = checkins_clean.copy()

    prev_users = -1
    for r in range(1, 21):
        v_keep, edges_tmp, chk_tmp = filter_induced_once(edges_tmp, chk_tmp, k=k, d=d)
        n_users = len(v_keep)
        if (not iterative) or (n_users == prev_users):
            break
        prev_users = n_users

    users_final = pd.DataFrame({"user_id": pd.Index(chk_tmp["user_id"].unique()).sort_values()})
    edges_final = edges_tmp
    checkins_final = chk_tmp
    return users_final, edges_final, checkins_final

users_final_lbsn, edges_final_lbsn, checkins_final_lbsn = step3_filter(edges_clean_lbsn, checkins_clean_lbsn, CFG)
users_final_bk, edges_final_bk, checkins_final_bk = step3_filter(edges_clean_bk, checkins_clean_bk, CFG)

print("[LBSN C3]", users_final_lbsn.shape, edges_final_lbsn.shape, checkins_final_lbsn.shape)
print("[BK   C3]", users_final_bk.shape, edges_final_bk.shape, checkins_final_bk.shape)


[LBSN C3] (47389, 1) (279816, 2) (10328914, 5)
[BK   C3] (15092, 1) (116506, 2) (3656191, 5)


In [10]:
def _entropy_from_counts(counts: np.ndarray, eps: float = 1e-12) -> float:
    s = counts.sum()
    if s <= 0:
        return 0.0
    p = counts / (s + eps)
    p = p[p > 0]
    return float(-(p * np.log(p + eps)).sum())

def _haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0
    lat1 = np.radians(lat1); lon1 = np.radians(lon1)
    lat2 = np.radians(lat2); lon2 = np.radians(lon2)
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat/2.0)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2.0)**2
    c = 2*np.arctan2(np.sqrt(a), np.sqrt(1-a))
    return R * c

def build_user_features_from_checkins(users_final: pd.DataFrame, checkins_final: pd.DataFrame,
                                      log1p_counts: bool = True, standardize: bool = True):
    chk = checkins_final.copy()
    chk["user_id"] = chk["user_id"].astype(str)
    chk["venue_id"] = chk["venue_id"].astype(str)
    chk["ts"] = pd.to_datetime(chk["ts"], errors="coerce")
    chk = chk.dropna(subset=["ts", "lat", "lon", "user_id"])

    chk["hour"] = chk["ts"].dt.hour.astype(int)
    chk["dow"]  = chk["ts"].dt.dayofweek.astype(int)
    chk["date"] = chk["ts"].dt.date

    g = chk.groupby("user_id", sort=False)

    num_checkins = g.size().rename("num_checkins")
    num_active_days = g["date"].nunique().rename("num_active_days")
    num_unique_venues = g["venue_id"].nunique().rename("num_unique_venues")

    mean_lat = g["lat"].mean().rename("mean_lat")
    mean_lon = g["lon"].mean().rename("mean_lon")
    std_lat  = g["lat"].std(ddof=0).fillna(0.0).rename("std_lat")
    std_lon  = g["lon"].std(ddof=0).fillna(0.0).rename("std_lon")

    rog = {}
    med_dist = {}
    for uid, sub in g:
        latc = float(sub["lat"].mean())
        lonc = float(sub["lon"].mean())
        d = _haversine_km(sub["lat"].to_numpy(), sub["lon"].to_numpy(), latc, lonc)
        rog[uid] = float(np.sqrt(np.mean(d**2))) if len(d) else 0.0
        med_dist[uid] = float(np.median(d)) if len(d) else 0.0
    rog = pd.Series(rog, name="radius_of_gyration_km")
    med_dist = pd.Series(med_dist, name="median_dist_to_centroid_km")

    hour_counts = pd.crosstab(chk["user_id"], chk["hour"])
    for h in range(24):
        if h not in hour_counts.columns:
            hour_counts[h] = 0
    hour_counts = hour_counts[list(range(24))].sort_index(axis=1)

    dow_counts = pd.crosstab(chk["user_id"], chk["dow"])
    for d0 in range(7):
        if d0 not in dow_counts.columns:
            dow_counts[d0] = 0
    dow_counts = dow_counts[list(range(7))].sort_index(axis=1)

    hour_entropy = hour_counts.apply(lambda r: _entropy_from_counts(r.to_numpy()), axis=1).rename("hour_entropy")
    dow_entropy  = dow_counts.apply(lambda r: _entropy_from_counts(r.to_numpy()), axis=1).rename("dow_entropy")
    venue_entropy = g["venue_id"].apply(lambda s: _entropy_from_counts(s.value_counts().to_numpy())).rename("venue_entropy")

    feat = pd.concat([
        num_checkins, num_active_days, num_unique_venues,
        mean_lat, mean_lon, std_lat, std_lon,
        rog, med_dist,
        hour_entropy, dow_entropy, venue_entropy
    ], axis=1)

    hour_prop = hour_counts.div(hour_counts.sum(axis=1).replace(0, np.nan), axis=0).fillna(0.0)
    hour_prop.columns = [f"hour_{h:02d}_p" for h in hour_prop.columns]
    dow_prop = dow_counts.div(dow_counts.sum(axis=1).replace(0, np.nan), axis=0).fillna(0.0)
    dow_prop.columns = [f"dow_{d0}_p" for d0 in dow_prop.columns]

    feat = feat.join(hour_prop, how="left").join(dow_prop, how="left").fillna(0.0)

    if log1p_counts:
        for c in ["num_checkins", "num_active_days", "num_unique_venues"]:
            feat[c] = np.log1p(feat[c].astype(float))

    user_order = users_final["user_id"].astype(str).tolist()
    feat = feat.reindex(user_order).fillna(0.0)
    feat.index.name = "user_id"

    if standardize:
        mu = feat.mean(axis=0)
        sd = feat.std(axis=0, ddof=0).replace(0, 1.0)
        feat = (feat - mu) / sd

    X_users = feat.to_numpy(dtype=np.float32)
    return X_users, feat

X_lbsn, feat_lbsn = build_user_features_from_checkins(
    users_final_lbsn, checkins_final_lbsn,
    log1p_counts=bool(CFG["features"]["log1p_counts"]),
    standardize=bool(CFG["features"]["standardize"]),
)
X_bk, feat_bk = build_user_features_from_checkins(
    users_final_bk, checkins_final_bk,
    log1p_counts=bool(CFG["features"]["log1p_counts"]),
    standardize=bool(CFG["features"]["standardize"]),
)

print("[LBSN C4] X:", X_lbsn.shape, "feat:", feat_lbsn.shape)
print("[BK   C4] X:", X_bk.shape, "feat:", feat_bk.shape)


[LBSN C4] X: (47389, 43) feat: (47389, 43)
[BK   C4] X: (15092, 43) feat: (15092, 43)


In [11]:
def save_step0_4_artifacts(dataset: str,
                           users_final: pd.DataFrame,
                           edges_clean: pd.DataFrame,
                           edges_final: pd.DataFrame,
                           checkins_clean: pd.DataFrame,
                           checkins_final: pd.DataFrame,
                           X_users: np.ndarray,
                           feat_df: pd.DataFrame):
    out_dir = DATA_CLEARED_DIR / dataset
    out_dir.mkdir(parents=True, exist_ok=True)

    # Parquet (write via Linux tmp -> copy -> validate)
    parquet_write_safe(edges_clean,   out_dir / "edges_clean.parquet")
    parquet_write_safe(edges_final,   out_dir / "edges_final.parquet")
    parquet_write_safe(checkins_clean, out_dir / "checkins_clean.parquet")
    parquet_write_safe(checkins_final, out_dir / "checkins_final.parquet")
    parquet_write_safe(users_final,   out_dir / "users_final.parquet")

    # NPY (rất ổn định, ít rủi ro hơn parquet)
    np.save(out_dir / "X_users.npy", X_users)

    # feat_df: nếu bạn vẫn muốn lưu parquet, giữ như dưới (đã safe write)
    parquet_write_safe(feat_df.reset_index(), out_dir / "feat_df.parquet")

    # lưu config nhỏ để trace
    meta = {"dataset": dataset, "created_at": time.strftime("%Y-%m-%d %H:%M:%S"), "cfg": CFG}
    with open(out_dir / "data_cleared_meta.json", "w", encoding="utf-8") as f:
        json.dump(meta, f, indent=2, ensure_ascii=False)

    print(f"\n[DONE] data_cleared/{dataset} ready:", out_dir)

save_step0_4_artifacts(
    "lbsn2vec",
    users_final_lbsn, edges_clean_lbsn, edges_final_lbsn, checkins_clean_lbsn, checkins_final_lbsn,
    X_lbsn, feat_lbsn
)

save_step0_4_artifacts(
    "brightkite",
    users_final_bk, edges_clean_bk, edges_final_bk, checkins_clean_bk, checkins_final_bk,
    X_bk, feat_bk
)


[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec/edges_clean.parquet | rows=363,699 cols=2
[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec/edges_final.parquet | rows=279,816 cols=2
[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec/checkins_clean.parquet | rows=22,809,619 cols=5
[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec/checkins_final.parquet | rows=10,328,914 cols=5
[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec/users_final.parquet | rows=47,389 cols=1
[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec/feat_df.parquet | rows=47,389 cols=44

[DONE] data_cleared/lbsn2vec ready: /mnt/d/community-detection/data/processed/data_cleared/lbsn2vec
[OK] wrote+validated: /mnt/d/community-detection/data/processed/data_cleared/brightkite/edges_clean.parquet | rows=214,078 c

In [12]:
def verify_cleared(dataset: str):
    base = DATA_CLEARED_DIR / dataset
    print("\n=== VERIFY", dataset, "===")

    for name in ["edges_clean.parquet","edges_final.parquet","checkins_clean.parquet","checkins_final.parquet","users_final.parquet","feat_df.parquet"]:
        p = base / name
        ok, msg = parquet_validate(p)
        print(name, "->", "OK" if ok else f"FAIL ({msg})")

    x = np.load(base / "X_users.npy")
    print("X_users.npy ->", x.shape, x.dtype)

verify_cleared("lbsn2vec")
verify_cleared("brightkite")



=== VERIFY lbsn2vec ===
edges_clean.parquet -> OK
edges_final.parquet -> OK
checkins_clean.parquet -> OK
checkins_final.parquet -> OK
users_final.parquet -> OK
feat_df.parquet -> OK
X_users.npy -> (47389, 43) float32

=== VERIFY brightkite ===
edges_clean.parquet -> OK
edges_final.parquet -> OK
checkins_clean.parquet -> OK
checkins_final.parquet -> OK
users_final.parquet -> OK
feat_df.parquet -> OK
X_users.npy -> (15092, 43) float32
