In [2]:
import pandas as pd
import numpy as np
from pathlib import Path

# paths
GEO_CSV = "/home/opc/datasets/mbd_mini_dataset/geo_data_with_features.csv"
TX_CSV  = "/home/opc/datasets/PS_20174392719_1491204439457_log.csv"
OUT_DIR = "/home/opc/datasets/mbd_mini_dataset"
GEO_OUT = f"{OUT_DIR}/geo_linked.csv.gz"
TX_OUT  = f"{OUT_DIR}/tx_linked.csv.gz"

# knobs
LOW_Q = 0.20
MID_FRAUD_RATE = 0.50
REUSE_FACTOR = 20         # used to set the high cut automatically
BATCH_SIZE = 250_000      # tune for memory
RANDOM_SEED = 13

rng = np.random.default_rng(RANDOM_SEED)

def load_geo_min(path: str) -> pd.DataFrame:
    use = ["client_id","event_time","composite_suspicion_score","latitude","longitude"]
    head = pd.read_csv(path, nrows=0)
    use = [c for c in use if c in head.columns]
    geo = pd.read_csv(path, usecols=use, low_memory=False)
    if "event_time" in geo.columns:
        try:
            geo["event_time"] = pd.to_datetime(geo["event_time"])
            geo_day = geo["event_time"].dt.floor("D")
            # convert to integer day for fast matching, NaT goes to -1
            day_int = geo_day.view("int64") // 86_400_000_000_000
            day_int = day_int.fillna(-1).astype(np.int64)
            geo["geo_day_int"] = day_int
        except Exception:
            geo["geo_day_int"] = -1
    else:
        geo["geo_day_int"] = -1
    return geo

def load_tx_min(path: str) -> pd.DataFrame:
    tx = pd.read_csv(path, low_memory=False)
    need = ["isFraud","nameOrig","step"]
    if not set(need).issubset(tx.columns):
        raise ValueError(f"tx needs columns {need}")
    tx["isFraud"] = pd.to_numeric(tx["isFraud"], errors="coerce").fillna(0).astype(int)
    tx["tx_day_int"] = (pd.to_numeric(tx["step"], errors="coerce").fillna(0) // 24).astype(np.int64)
    return tx

def pick_high_cut(css: pd.Series, n_tx_fraud: int, reuse_factor: int) -> float:
    target_high = max(n_tx_fraud * reuse_factor, n_tx_fraud)
    target_high = min(target_high, len(css))
    q = 1.0 - target_high / len(css)
    q = float(np.clip(q, 0.90, 0.9999))
    return css.quantile(q)

def band_and_labels(geo: pd.DataFrame, low_q=LOW_Q, high_thr=None) -> tuple[pd.Series, np.ndarray]:
    css = geo["composite_suspicion_score"].astype(float)
    low_thr = css.quantile(low_q)
    band = np.where(css >= high_thr, "high", np.where(css <= low_thr, "low", "mid"))
    # labels, high to 1, low to 0, mid Bernoulli
    labels = np.zeros(len(geo), dtype=np.int8)
    labels[band == "high"] = 1
    mid_mask = band == "mid"
    n_mid = int(mid_mask.sum())
    if n_mid:
        labels[mid_mask] = rng.binomial(1, MID_FRAUD_RATE, size=n_mid).astype(np.int8)
    return pd.Series(band, index=geo.index, name="geo_band"), labels

def build_pools(tx: pd.DataFrame):
    # return global index arrays and per day dicts for each label
    tx_idx = np.arange(len(tx), dtype=np.int64)
    fraud_mask = tx["isFraud"].values == 1
    non_mask   = ~fraud_mask
    pools = {
        1: {
            "idx": tx_idx[fraud_mask],
            "by_day": {},
        },
        0: {
            "idx": tx_idx[non_mask],
            "by_day": {},
        }
    }
    # group by day for each label
    for lbl in [1, 0]:
        sub = tx.loc[pools[lbl]["idx"], ["tx_day_int"]].reset_index(drop=True)
        # map back positions into pools[lbl]["idx"]
        positions = np.arange(len(sub), dtype=np.int64)
        by_day = {}
        for day, pos_arr in pd.Series(positions).groupby(sub["tx_day_int"]):
            by_day[int(day)] = pools[lbl]["idx"][pos_arr.values]
        pools[lbl]["by_day"] = by_day
    return pools

def choose_indices_for_batch(day_ints: np.ndarray, labels: np.ndarray, pools: dict) -> np.ndarray:
    n = len(day_ints)
    out = np.empty(n, dtype=np.int64)
    # process by label, then by day, vectorized choice
    for lbl in [1, 0]:
        lbl_mask = labels == lbl
        if not lbl_mask.any():
            continue
        days = day_ints[lbl_mask]
        # unique days in this label
        uniq_days, counts = np.unique(days, return_counts=True)
        # for each day group, choose from day pool if available, else from global pool
        for day, cnt in zip(uniq_days, counts):
            if day in pools[lbl]["by_day"] and len(pools[lbl]["by_day"][day]) > 0:
                pool = pools[lbl]["by_day"][day]
            else:
                pool = pools[lbl]["idx"]
            picks = pool[rng.integers(0, len(pool), size=cnt)]
            out[lbl_mask & (day_ints == day)] = picks
    return out

def write_two_csvs_streaming(geo: pd.DataFrame, tx: pd.DataFrame,
                             geo_idx: np.ndarray, tx_idx: np.ndarray,
                             geo_out: str, tx_out: str, start_link_id: int):
    # slice once for speed, then write aligned
    geo_slice = geo.iloc[geo_idx].copy()
    tx_slice  = tx.iloc[tx_idx].copy()

    # add link_id and label
    link_ids = np.arange(start_link_id, start_link_id + len(geo_slice), dtype=np.int64)
    labels = tx_slice["isFraud"].astype(int).values

    geo_slice.insert(0, "link_id", link_ids)
    geo_slice.insert(1, "label_isFraud", labels)

    tx_slice.insert(0, "link_id", link_ids)
    tx_slice.insert(1, "label_isFraud", labels)

    # on first write, include header, then append
    mode = "x" if not Path(geo_out).exists() else "a"
    header = mode == "x"
    geo_slice.to_csv(geo_out, index=False, compression="gzip", mode="w" if header else "a", header=header)
    tx_slice.to_csv(tx_out, index=False, compression="gzip", mode="w" if header else "a", header=header)

def main_fast():
    Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

    geo = load_geo_min(GEO_CSV)
    tx  = load_tx_min(TX_CSV)

    n_fraud = int((tx["isFraud"] == 1).sum())
    high_thr = pick_high_cut(geo["composite_suspicion_score"].astype(float), n_fraud, REUSE_FACTOR)
    band, labels = band_and_labels(geo, low_q=LOW_Q, high_thr=high_thr)
    geo = geo.assign(geo_band=band.values)

    print({"low": int((band == "low").sum()),
           "mid": int((band == "mid").sum()),
           "high": int((band == "high").sum())})
    print("chosen high threshold:", high_thr)

    pools = build_pools(tx)

    # remove old outputs if present
    for p in [GEO_OUT, TX_OUT]:
        try:
            Path(p).unlink()
        except FileNotFoundError:
            pass

    link_id = 0
    n = len(geo)
    for start in range(0, n, BATCH_SIZE):
        end = min(start + BATCH_SIZE, n)
        batch_idx = np.arange(start, end, dtype=np.int64)

        day_ints = geo.loc[batch_idx, "geo_day_int"].values
        lbls     = labels[batch_idx]

        # pick tx indices for this batch, preferring same day pool, else global pool
        tx_indices = choose_indices_for_batch(day_ints, lbls, pools)

        # write aligned chunks to the two CSVs
        write_two_csvs_streaming(
            geo=geo[["client_id","event_time","latitude","longitude","composite_suspicion_score","geo_day_int","geo_band"]],
            tx=tx[["step","type","amount","nameOrig","oldbalanceOrg","newbalanceOrig","nameDest","oldbalanceDest","newbalanceDest","isFraud","isFlaggedFraud","tx_day_int"]],
            geo_idx=batch_idx,
            tx_idx=tx_indices,
            geo_out=GEO_OUT,
            tx_out=TX_OUT,
            start_link_id=link_id
        )
        link_id += len(batch_idx)
        print(f"Wrote rows {start} to {end}")

    print(f"\nDone. Wrote\n  {GEO_OUT}\n  {TX_OUT}")

main_fast()

  day_int = geo_day.view("int64") // 86_400_000_000_000


{'low': 1341882, 'mid': 5240513, 'high': 163370}
chosen high threshold: 0.41931943492766466
Wrote rows 0 to 250000
Wrote rows 250000 to 500000
Wrote rows 500000 to 750000
Wrote rows 750000 to 1000000
Wrote rows 1000000 to 1250000
Wrote rows 1250000 to 1500000
Wrote rows 1500000 to 1750000
Wrote rows 1750000 to 2000000
Wrote rows 2000000 to 2250000
Wrote rows 2250000 to 2500000
Wrote rows 2500000 to 2750000
Wrote rows 2750000 to 3000000
Wrote rows 3000000 to 3250000
Wrote rows 3250000 to 3500000
Wrote rows 3500000 to 3750000
Wrote rows 3750000 to 4000000
Wrote rows 4000000 to 4250000
Wrote rows 4250000 to 4500000
Wrote rows 4500000 to 4750000
Wrote rows 4750000 to 5000000
Wrote rows 5000000 to 5250000
Wrote rows 5250000 to 5500000
Wrote rows 5500000 to 5750000
Wrote rows 5750000 to 6000000
Wrote rows 6000000 to 6250000
Wrote rows 6250000 to 6500000
Wrote rows 6500000 to 6745765

Done. Wrote
  /home/opc/datasets/mbd_mini_dataset/geo_linked.csv.gz
  /home/opc/datasets/mbd_mini_dataset/tx_

In [4]:
import pandas as pd
from pathlib import Path

GEO_OUT = "/home/opc/datasets/mbd_mini_dataset/geo_linked.csv.gz"
TX_OUT  = "/home/opc/datasets/mbd_mini_dataset/tx_linked.csv.gz"

for p in [GEO_OUT, TX_OUT]:
    print(Path(p), "size MB:", Path(p).stat().st_size / 1e6)

# read a small sample for a quick glance
geo_head = pd.read_csv(GEO_OUT, nrows=5)
tx_head  = pd.read_csv(TX_OUT,  nrows=5)
print("\nGEO columns:", list(geo_head.columns))
print("TX  columns:", list(tx_head.columns))

/home/opc/datasets/mbd_mini_dataset/geo_linked.csv.gz size MB: 121.424672
/home/opc/datasets/mbd_mini_dataset/tx_linked.csv.gz size MB: 230.815509

GEO columns: ['link_id', 'label_isFraud', 'client_id', 'event_time', 'latitude', 'longitude', 'composite_suspicion_score', 'geo_day_int', 'geo_band']
TX  columns: ['link_id', 'label_isFraud', 'step', 'type', 'amount', 'nameOrig', 'oldbalanceOrg', 'newbalanceOrig', 'nameDest', 'oldbalanceDest', 'newbalanceDest', 'isFraud', 'isFlaggedFraud', 'tx_day_int']


In [5]:
import pandas as pd

def fraud_ratio(path, label_col="label_isFraud", chunksize=500_000):
    total = 0
    fraud = 0
    for chunk in pd.read_csv(path, usecols=[label_col], chunksize=chunksize):
        vc = chunk[label_col].value_counts()
        total += int(vc.sum())
        fraud += int(vc.get(1, 0))
    rate = fraud / total if total else 0
    return {"rows": total, "fraud": fraud, "non_fraud": total - fraud, "fraud_rate": rate}

print("\nFraud ratio, geo file:")
print(fraud_ratio(GEO_OUT))

print("\nFraud ratio, tx file:")
print(fraud_ratio(TX_OUT))


Fraud ratio, geo file:
{'rows': 6745765, 'fraud': 2783163, 'non_fraud': 3962602, 'fraud_rate': 0.4125792997532526}

Fraud ratio, tx file:
{'rows': 6745765, 'fraud': 2783163, 'non_fraud': 3962602, 'fraud_rate': 0.4125792997532526}


In [6]:
import pandas as pd

# get basic counts and uniqueness for link_id without full load
def link_id_stats(path, id_col="link_id", chunksize=500_000):
    total = 0
    any_dupe = False
    seen = set()
    for chunk in pd.read_csv(path, usecols=[id_col], chunksize=chunksize):
        s = chunk[id_col]
        total += len(s)
        if s.duplicated().any():
            any_dupe = True
        # optional, skip storing all ids to save memory
    return {"rows": total, "has_duplicates": any_dupe}

print("\nlink_id stats, geo:", link_id_stats(GEO_OUT))
print("link_id stats, tx: ", link_id_stats(TX_OUT))

# exact equality of row counts between files
geo_rows = sum(1 for _ in pd.read_csv(GEO_OUT, usecols=["link_id"], chunksize=1_000_000))
tx_rows  = sum(1 for _ in pd.read_csv(TX_OUT,  usecols=["link_id"], chunksize=1_000_000))
print(f"\nRow counts, geo {geo_rows}, tx {tx_rows}")


link_id stats, geo: {'rows': 6745765, 'has_duplicates': False}
link_id stats, tx:  {'rows': 6745765, 'has_duplicates': False}

Row counts, geo 7, tx 7


In [10]:
import pandas as pd
import numpy as np
from pathlib import Path
from collections import defaultdict

BASE = "/home/opc/datasets/mbd_mini_dataset"
GEO_IN  = f"{BASE}/geo_linked.csv.gz"
TX_IN   = f"{BASE}/tx_linked.csv.gz"
GEO_OUT = f"{BASE}/geo_linked_with_entity.csv.gz"
TX_OUT  = f"{BASE}/tx_linked_with_entity.csv.gz"
ENT_SUM = f"{BASE}/entity_summary.csv.gz"

CHUNK = 500_000  # tune for RAM and speed

# ---------- Union-Find that can add nodes on the fly ----------
class DSU:
    __slots__ = ("parent", "rank")
    def __init__(self):
        self.parent = np.empty(0, dtype=np.int64)
        self.rank   = np.empty(0, dtype=np.int8)
    def _add(self, n_new: int):
        if n_new <= 0:
            return
        start = len(self.parent)
        self.parent = np.append(self.parent, np.arange(start, start + n_new, dtype=np.int64))
        self.rank   = np.append(self.rank,   np.zeros(n_new, dtype=np.int8))
    def add_nodes_until(self, max_idx: int):
        need = max_idx + 1 - len(self.parent)
        if need > 0:
            self._add(need)
    def find(self, x: int) -> int:
        # iterative path compression
        p = self.parent
        while x != p[x]:
            p[x] = p[p[x]]
            x = p[x]
        return x
    def union(self, a: int, b: int):
        p = self.parent
        r = self.rank
        ra = self.find(a)
        rb = self.find(b)
        if ra == rb:
            return
        if r[ra] < r[rb]:
            p[ra] = rb
        elif r[ra] > r[rb]:
            p[rb] = ra
        else:
            p[rb] = ra
            r[ra] += 1

# ---------- Build entities by streaming the pairs ----------
def build_entity_map(geo_path: str, tx_path: str, chunk: int = CHUNK):
    dsu = DSU()
    node_index = {}  # maps "g:<id>" or "t:<id>" to int index
    next_idx = 0

    # iterate both files in lockstep
    geo_iter = pd.read_csv(geo_path, usecols=["link_id", "client_id"], chunksize=chunk)
    tx_iter  = pd.read_csv(tx_path,  usecols=["link_id", "nameOrig"],  chunksize=chunk)

    total_pairs = 0
    for gi, ti in zip(geo_iter, tx_iter):
        # alignment check, same link_id order
        if not np.array_equal(gi["link_id"].values, ti["link_id"].values):
            raise RuntimeError("link_id order mismatch between geo and tx chunks, sort files by link_id then rerun")

        g_ids = gi["client_id"].astype(str).values
        t_ids = ti["nameOrig"].astype(str).values

        # map to integer node ids
        g_keys = ["g:" + s for s in g_ids]
        t_keys = ["t:" + s for s in t_ids]

        # assign indices for any new keys
        for key in np.concatenate([g_keys, t_keys]):
            if key not in node_index:
                node_index[key] = next_idx
                next_idx += 1

        # ensure DSU arrays can hold new nodes
        dsu.add_nodes_until(next_idx - 1)

        # union all pairs in this chunk
        for gk, tk in zip(g_keys, t_keys):
            dsu.union(node_index[gk], node_index[tk])

        total_pairs += len(gi)
        print(f"Unioned {total_pairs:,} pairs, nodes {len(node_index):,}")

    # compress to roots and compact to entity_id, 0..E-1
    roots = {}
    root_to_entity = {}
    entity_ids = np.empty(len(node_index), dtype=np.int64)
    for key, idx in node_index.items():
        r = dsu.find(idx)
        roots[idx] = r
        if r not in root_to_entity:
            root_to_entity[r] = len(root_to_entity)
        entity_ids[idx] = root_to_entity[r]

    print(f"Built {len(root_to_entity):,} entities across {len(node_index):,} nodes")

    return node_index, entity_ids, root_to_entity

# ---------- Write enriched outputs in streaming mode ----------
def write_with_entity(geo_in: str, tx_in: str, node_index: dict, entity_ids: np.ndarray,
                      geo_out: str, tx_out: str, chunk: int = CHUNK):
    # make reverse map index -> entity_id for fast lookups
    idx_to_entity = entity_ids  # alias, same array
    # lazily create outputs
    for p in [geo_out, tx_out]:
        try:
            Path(p).unlink()
        except FileNotFoundError:
            pass

    link_rows = 0
    for gi, ti in zip(
        pd.read_csv(geo_in, chunksize=chunk, low_memory=False),
        pd.read_csv(tx_in,  chunksize=chunk, low_memory=False),
    ):
        if not np.array_equal(gi["link_id"].values, ti["link_id"].values):
            raise RuntimeError("link_id order mismatch during write phase")

        # look up entity for each row using the corresponding ids
        g_keys = ("g:" + gi["client_id"].astype(str)).values
        t_keys = ("t:" + ti["nameOrig"].astype(str)).values

        # both sides should map to the same entity, take either
        ent = np.fromiter((idx_to_entity[node_index[k]] for k in g_keys), dtype=np.int64, count=len(g_keys))
        gi_out = gi.copy()
        ti_out = ti.copy()
        gi_out.insert(1, "entity_id", ent)
        ti_out.insert(1, "entity_id", ent)

        # append, preserve headers only on first chunk
        mode = "a"
        header = not Path(geo_out).exists()
        gi_out.to_csv(geo_out, index=False, compression="gzip", mode="w" if header else mode, header=header)
        ti_out.to_csv(tx_out,  index=False, compression="gzip", mode="w" if header else mode, header=header)
        link_rows += len(gi)
        print(f"Wrote {link_rows:,} rows with entity_id")

# ---------- Small summary for QA ----------
def write_entity_summary(geo_with_ent: str, tx_with_ent: str, out_path: str):
    # count events and fraud per entity without full load
    ent_counts = defaultdict(lambda: {"geo":0, "tx":0, "fraud":0})
    for chunk in pd.read_csv(geo_with_ent, usecols=["entity_id"], chunksize=CHUNK):
        for e, c in chunk["entity_id"].value_counts().items():
            ent_counts[int(e)]["geo"] += int(c)
    for chunk in pd.read_csv(tx_with_ent, usecols=["entity_id","label_isFraud"], chunksize=CHUNK):
        vc = chunk.groupby("entity_id")["label_isFraud"].agg(["count","sum"])
        for e, row in vc.iterrows():
            ent_counts[int(e)]["tx"]    += int(row["count"])
            ent_counts[int(e)]["fraud"] += int(row["sum"])

    rows = []
    for e, d in ent_counts.items():
        rows.append({"entity_id": e, "geo_events": d["geo"], "tx_events": d["tx"],
                     "tx_fraud": d["fraud"], "tx_fraud_rate": d["fraud"] / d["tx"] if d["tx"] else 0.0})
    df = pd.DataFrame(rows).sort_values("tx_events", ascending=False)
    df.to_csv(out_path, index=False, compression="gzip")
    print(f"Wrote {out_path}, entities {len(df)}")

def main():
    Path(BASE).mkdir(parents=True, exist_ok=True)

    # 1, build the entity map by streaming pairs
    node_index, entity_ids, root_to_entity = build_entity_map(GEO_IN, TX_IN, chunk=CHUNK)

    # 2, write enriched files with entity_id added
    write_with_entity(GEO_IN, TX_IN, node_index, entity_ids, GEO_OUT, TX_OUT, chunk=CHUNK)

    # 3, write a small entity summary for QA
    write_entity_summary(GEO_OUT, TX_OUT, ENT_SUM)

    print("\nDone")
    print(f"Geo with entity, {GEO_OUT}")
    print(f"Tx  with entity, {TX_OUT}")
    print(f"Entity summary, {ENT_SUM}")

if __name__ == "__main__":
    main()

Unioned 500,000 pairs, nodes 291,332
Unioned 1,000,000 pairs, nodes 567,833
Unioned 1,500,000 pairs, nodes 831,169
Unioned 2,000,000 pairs, nodes 1,081,180
Unioned 2,500,000 pairs, nodes 1,316,445
Unioned 3,000,000 pairs, nodes 1,548,463
Unioned 3,500,000 pairs, nodes 1,766,486
Unioned 4,000,000 pairs, nodes 1,972,268
Unioned 4,500,000 pairs, nodes 2,172,665
Unioned 5,000,000 pairs, nodes 2,362,655
Unioned 5,500,000 pairs, nodes 2,543,611
Unioned 6,000,000 pairs, nodes 2,716,825
Unioned 6,500,000 pairs, nodes 2,882,005
Unioned 6,745,765 pairs, nodes 2,960,624
Built 28 entities across 2,960,624 nodes
Wrote 500,000 rows with entity_id
Wrote 1,000,000 rows with entity_id
Wrote 1,500,000 rows with entity_id
Wrote 2,000,000 rows with entity_id
Wrote 2,500,000 rows with entity_id
Wrote 3,000,000 rows with entity_id
Wrote 3,500,000 rows with entity_id
Wrote 4,000,000 rows with entity_id
Wrote 4,500,000 rows with entity_id
Wrote 5,000,000 rows with entity_id
Wrote 5,500,000 rows with entity_id