**Create Polars dataframe**

Custom split data into Train, Validation, and Spilt, as well as adding additional features. Save those in Google Drive

**Set up**

In [16]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"phuongkhanh21","key":"0fd6eb70e1509aad441adeb76f0dfff2"}'}

In [17]:
from google.colab import drive
drive.mount('/content/drive')
drive_path = '/content/drive/MyDrive/Colab Notebooks/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [18]:
import os
os.makedirs('/root/.kaggle', exist_ok=True)
!mv kaggle.json /root/.kaggle/
!chmod 600 /root/.kaggle/kaggle.json

!kaggle datasets download -d berkanoztas/synthetic-transaction-monitoring-dataset-aml

import zipfile

with zipfile.ZipFile("synthetic-transaction-monitoring-dataset-aml.zip", 'r') as zip_ref:
    zip_ref.extractall("synthetic_transaction_data")

Dataset URL: https://www.kaggle.com/datasets/berkanoztas/synthetic-transaction-monitoring-dataset-aml
License(s): CC-BY-NC-SA-4.0
synthetic-transaction-monitoring-dataset-aml.zip: Skipping, found more recently modified local copy (use --force to force download)


**Import libraries**

In [19]:
!pip install rustworkx



In [20]:
import pandas as pd
import polars as pl
from datetime import timedelta
import numpy as np
import rustworkx as rx
from typing import List, Dict

**Custom split data in polars**

In [21]:
def custom_split_polars(df: pl.DataFrame, validation_days: int = 70, test_days: int = 35):
      """
      Chronological split (train → val → test) based on calendar days from earliest to latest.
      Ensures non-overlapping sequential windows.
      """
      # Convert to datetime if needed
      dtype = df["Date"].dtype
      if dtype == pl.Utf8:
          df = df.with_columns(pl.col("Date").str.strptime(pl.Datetime(), "%Y-%m-%d", strict=False))
      elif dtype == pl.Date:
          df = df.with_columns(pl.col("Date").cast(pl.Datetime()))
      df = df.sort("Date")
      min_date = df.select(pl.col("Date").min()).item()
      max_date = df.select(pl.col("Date").max()).item()
      total_days = (max_date - min_date).days
      # Split: oldest → newest
      test_cutoff = max_date - timedelta(days=test_days)
      val_cutoff = test_cutoff - timedelta(days=validation_days)
      train_df = df.filter(pl.col("Date") < pl.lit(val_cutoff))
      val_df   = df.filter((pl.col("Date") >= pl.lit(val_cutoff)) & (pl.col("Date") < pl.lit(test_cutoff)))
      test_df  = df.filter(pl.col("Date") >= pl.lit(test_cutoff))
      print(f"Split complete:")
      print(f"  Train: {train_df.height} rows (until {val_cutoff.date()})")
      print(f"  Val:   {val_df.height} rows ({val_cutoff.date()}–{test_cutoff.date()})")
      print(f"  Test:  {test_df.height} rows (after {test_cutoff.date()})")
      return train_df, val_df, test_df

**Engineering Features**

In [22]:
def feature_engineering(df: pl.DataFrame) -> pl.DataFrame:
    """
    Build and return engineered features from a Polars DataFrame.
    """

    # Dropping column Laundering_type
    df = df.drop("Laundering_type")

    # Temporal features
    def temporal_features(df):

        return df.with_columns([
            df["Date"].dt.year().alias("year"),
            df["Date"].dt.month().alias("month"),
            df["Date"].dt.day().alias("day_of_month"),
            df["Date"].dt.weekday().alias("day_of_week"),
            df["Date"].dt.ordinal_day().alias("day_of_year"),
            df["Time"].dt.hour().alias("hour"),
            df["Time"].dt.minute().alias("minute"),
            df["Time"].dt.second().alias("second"),
        ])

    # Risk features
    high_risk_countries = ['Mexico', 'Turkey', 'Morocco', 'UAE']

    def risk_features(df):

        return df.with_columns([
            (df["Payment_currency"] != df["Received_currency"]).cast(pl.Int8).alias("currency_mismatch"),
            (df["Payment_type"] == "Cross-border").cast(pl.Int8).alias("cross_border"),
            df["Sender_bank_location"].is_in(high_risk_countries).cast(pl.Int8).alias("high_risk_sender"),
            df["Receiver_bank_location"].is_in(high_risk_countries).cast(pl.Int8).alias("high_risk_receiver")])

    def build_window_features_lazy(
        df,
        specs,
        date_col="Date",
        sender_col="Sender_account",
        receiver_col="Receiver_account",
        amount_col="Amount",
        index_name="__row_idx",
        label_choice="left",
    ):
        lf = df.lazy() if isinstance(df, pl.DataFrame) else df
        lf = lf.with_columns(pl.arange(0, pl.len()).over(pl.lit(True)).alias(index_name))
        out_lf = lf

        for spec in specs:
            kind = spec.get("kind", "rolling")

            if kind == "rolling":
                # existing rolling logic (no change)
                name = spec["name"]
                direction = spec["type"]  # "fanin" or "fanout"
                period_days = int(spec["period_days"])
                every = spec.get("every", "1d")

                if direction == "fanin":
                    group_by = receiver_col
                    agg_on = sender_col
                else:
                    group_by = sender_col
                    agg_on = receiver_col

                win_label = label_choice
                strategy = "forward" if win_label == "left" else "backward"

                right = (
                    lf
                    .sort([group_by, date_col])
                    .group_by_dynamic(
                        index_column=date_col,
                        every=every,
                        period=f"{period_days}d",
                        group_by=group_by,
                        closed="both",
                        label=win_label
                    )
                    .agg(pl.col(agg_on).n_unique().alias(name))
                    .sort([group_by, date_col])
                )

                left = out_lf.sort([group_by, date_col])

                out_lf = left.join_asof(
                    right,
                    left_on=date_col,
                    right_on=date_col,
                    by=group_by,
                    strategy=strategy,
                )

            elif kind == "monthly":
                # existing monthly logic (no change)
                name = spec["name"]
                side = spec.get("side", "receive")
                group_col = receiver_col if side == "receive" else sender_col

                monthly_agg = (
                    lf
                    .with_columns(pl.col(date_col).dt.truncate("1mo").alias("__month"))
                    .group_by([group_col, "__month"])
                    .agg(pl.col(amount_col).sum().alias(name))
                )

                out_lf = (
                    out_lf
                    .with_columns(pl.col(date_col).dt.truncate("1mo").alias("__month"))
                    .join(monthly_agg, on=[group_col, "__month"], how="left")
                    .drop("__month")
                )

            elif kind == "daily_pair_count":
                # NEW: back_and_forth_transfers (exact-match on day + pair)
                name = spec["name"]  # e.g., "back_and_forth_transfers"
                # day key = calendar day (truncate to 1 day)
                day_key = "__day"
                # compute counts per sender/receiver/day using lf (lazy)
                pair_daily_agg = (
                    lf
                    .with_columns(pl.col(date_col).dt.truncate("1d").alias(day_key))
                    .group_by([sender_col, receiver_col, day_key])
                    .agg(pl.len().alias(name))  # .len() counts rows in group
                )

                # attach day key to working frame and join exact on pair + day
                out_lf = (
                    out_lf
                    .with_columns(pl.col(date_col).dt.truncate("1d").alias(day_key))
                    .join(pair_daily_agg, on=[sender_col, receiver_col, day_key], how="left")
                    .fill_null(0)       # optional: replace nulls with 0
                    .with_columns(pl.col(name).cast(pl.Int64))  # ensure integer type
                    .drop(day_key)
                )

            else:
                raise ValueError("spec kind must be 'rolling', 'monthly', or 'daily_pair_count'")

        return out_lf


    def compute_derived_features_lazy(
        lf: pl.LazyFrame,
        *,
        fanin_col: str = "fanin_30d",
        fanout_col: str = "fanout_30d",
        daily_receive_col: str = "daily_receive",
        monthly_receive_col: str = "monthly_receive",
        monthly_send_col: str = "monthly_send",
        amount_col: str = "Amount",
        sender_col: str = "Sender_account",
        receiver_col: str = "Receiver_account",
        index_name: str = "__row_idx",
    ) -> pl.LazyFrame:
        """
        Take a LazyFrame and return a LazyFrame with derived features:
          - fan_in_out_ratio (safe division, 0 when denom missing or zero)
          - fanin_intensity_ratio (fanin_30d / daily_receive, 0 when denom missing or zero)
          - amount_dispersion_std (per-sender std of Amount, filled 0 when null)
          - sent_to_received_ratio_monthly (monthly_receive / monthly_send, 0 when denom missing or zero)

        If `daily_receive` is not present in lf.schema(), it is computed lazily as the
        per-receiver unique-senders per calendar day (dt.truncate("1d")) and joined back.
        The function is fully lazy; call .collect(...) when ready.
        """
        # ensure lazy input
        lf = lf if isinstance(lf, pl.LazyFrame) else lf.lazy()

        # Attempt to read schema; if unavailable assume missing and compute
        try:
            schema = lf.schema()
            has_daily = daily_receive_col in schema
        except Exception:
            has_daily = False

        # If daily_receive missing, compute it lazily (exact day bucket of unique senders per receiver)
        if not has_daily:
            day_key = "__day_for_daily_receive"
            daily_receive_agg = (
                lf
                .with_columns(pl.col("Date").dt.truncate("1d").alias(day_key))
                .group_by([receiver_col, day_key])
                .agg(pl.col(sender_col).n_unique().alias(daily_receive_col))
            )
            lf = (
                lf
                .with_columns(pl.col("Date").dt.truncate("1d").alias(day_key))
                .join(daily_receive_agg, on=[receiver_col, day_key], how="left")
                .drop(day_key)
            )

        # safe division helper expression
        def safe_div_expr(num: str, den: str, out_name: str):
            return (
                pl.when(pl.col(den).is_null() | (pl.col(den) == 0))
                  .then(0.0)
                  .otherwise(pl.col(num).cast(pl.Float64) / pl.col(den).cast(pl.Float64))
                  .alias(out_name)
            )

        fan_in_out_expr = safe_div_expr(fanin_col, fanout_col, "fan_in_out_ratio")
        fanin_intensity_expr = safe_div_expr(fanin_col, daily_receive_col, "fanin_intensity_ratio")
        sent_to_received_monthly_expr = safe_div_expr(monthly_receive_col, monthly_send_col, "sent_to_received_ratio_monthly")

        # per-sender std aggregation (lazy) and join back
        sender_std_agg = (
            lf
            .select([sender_col, amount_col])
            .group_by(sender_col)
            .agg(pl.col(amount_col).std().alias("__amount_std"))
        )

        out = (
            lf
            .join(sender_std_agg, on=sender_col, how="left")
            .with_columns(
                pl.col("__amount_std").cast(pl.Float64).fill_null(0.0).alias("amount_dispersion_std")
            )
            .drop("__amount_std")
            .with_columns([
                fan_in_out_expr,
                fanin_intensity_expr,
                sent_to_received_monthly_expr
            ])
        )

        return out
    # Temporal features
    df = temporal_features(df)

    # Risk features
    df = risk_features(df)

    # Rolling window computing
    specs = [
    {"name":"fanin_30d", "kind":"rolling", "type":"fanin", "period_days":30, "every":"1d"},
    {"name":"fanout_30d", "kind":"rolling", "type":"fanout", "period_days":30, "every":"1d"},
    {"name":"daily_recieve", "kind":"rolling", "type":"fanin", "period_days":1, "every":"1d"},
    {"name":"monthly_receive", "kind":"monthly", "side":"receive"},
    {"name":"monthly_send",    "kind":"monthly", "side":"send"},
    {"name":"back_and_forth_transfers", "kind":"daily_pair_count"},
    ]
    lazy_with_features = build_window_features_lazy(df, specs, amount_col="Amount", label_choice="left")
    plan = (
    lazy_with_features
    .sort(["Sender_account", "Date"])
    .with_columns([pl.col("Sender_account").set_sorted(), pl.col("Date").set_sorted()])
    )
    df_streamed = plan.collect(engine="streaming")
    df = df_streamed.sort("__row_idx").drop("__row_idx")

    # More computation
    lazy_with_derived = compute_derived_features_lazy(lazy_with_features)

    # Before streaming collect: pick a primary grouping ordering that matches your rolling computations.
    # If most rolling features used Receiver_account then Date, use that; otherwise use the grouping you chose.
    plan_derived = (
        lazy_with_derived
        .sort(["Sender_account", "Date"])
        .with_columns([pl.col("Sender_account").set_sorted(), pl.col("Date").set_sorted()])
    )

    df_streamed = plan_derived.collect(engine="streaming")
    df = df_streamed.sort("__row_idx").drop("__row_idx")

    return df

**Recast**

In [23]:
"""
We recast the integer‑based columns following the logic rules outlined in the paper.
"Explainable Feature Engineering for Multi-class Money Laundering Classification"
This recasting is performed to optimize storage efficiency and reduce overall memory consumption."
Excluding "Sender_account" and "Receiver_account" variables.
"""

def recast(df):
    exclude = ['Sender_account', 'Receiver_account']

    for col in df.columns:
        if col not in exclude:
            dtype = df[col].dtype
            if dtype in (pl.Int64, pl.Int32):
              maxval = df[col].max()
              if maxval:
                  if maxval < 127:
                      df = df.with_columns(df[col].cast(pl.Int8).alias(col))
                  elif maxval < 32767:
                      df = df.with_columns(df[col].cast(pl.Int16).alias(col))
                  elif maxval < 2147483647:
                      df = df.with_columns(df[col].cast(pl.Int32).alias(col))

    return df

**Build graph network**

In [24]:
"""
Compute circular_transaction_count using a
calendar‑month sliding window, ensuring rustworkx is installed

"""

def circular_transaction_feature(df:pl.DataFrame):
    # Iterate over monthly groups
    results = []
    for (year, month), group in df.group_by(["year", "month"]):
        res = circular_count_monthly(group, year, month)
        if res.height > 0:
            results.append(res)

    # Combine all results
    out_rx = pl.concat(results, how="vertical") if results else pl.DataFrame()

    # Join back to original df
    df_result = (
        df.join(out_rx, on=["Sender_account", "year", "month"], how="left")
        .with_columns(
            pl.col("circular_transaction_count").fill_null(0)
        )
    )
    return df_result

def circular_count_monthly(pdf, year, month):
    edges = list(zip(pdf["Sender_account"], pdf["Receiver_account"]))
    if not edges:
        return empty_month_frame()

    G = rx.PyDiGraph()
    node_idx = {}
    for u, v in edges:
        if u not in node_idx:
            node_idx[u] = G.add_node(u)
        if v not in node_idx:
            node_idx[v] = G.add_node(v)
        G.add_edge(node_idx[u], node_idx[v], None)

    cycles = rx.simple_cycles(G)

    counter = {}
    for cyc in cycles:
        cyc_nodes = [G[node] for node in cyc]
        for node in cyc_nodes:
            counter[node] = counter.get(node, 0) + 1

    return pl.DataFrame({
        "Sender_account": list(counter.keys()),
        "circular_transaction_count": list(counter.values()),
        "year": [year] * len(counter),
        "month": [month] * len(counter)
    })

**Import data**

In [25]:
os.listdir("synthetic_transaction_data")
df = pl.read_csv("synthetic_transaction_data/SAML-D.csv")
df.head(5)

Time,Date,Sender_account,Receiver_account,Amount,Payment_currency,Received_currency,Sender_bank_location,Receiver_bank_location,Payment_type,Is_laundering,Laundering_type
str,str,i64,i64,f64,str,str,str,str,str,i64,str
"""10:35:19""","""2022-10-07""",8724731955,2769355426,1459.15,"""UK pounds""","""UK pounds""","""UK""","""UK""","""Cash Deposit""",0,"""Normal_Cash_Deposits"""
"""10:35:20""","""2022-10-07""",1491989064,8401255335,6019.64,"""UK pounds""","""Dirham""","""UK""","""UAE""","""Cross-border""",0,"""Normal_Fan_Out"""
"""10:35:20""","""2022-10-07""",287305149,4404767002,14328.44,"""UK pounds""","""UK pounds""","""UK""","""UK""","""Cheque""",0,"""Normal_Small_Fan_Out"""
"""10:35:21""","""2022-10-07""",5376652437,9600420220,11895.0,"""UK pounds""","""UK pounds""","""UK""","""UK""","""ACH""",0,"""Normal_Fan_In"""
"""10:35:21""","""2022-10-07""",9614186178,3803336972,115.25,"""UK pounds""","""UK pounds""","""UK""","""UK""","""Cash Deposit""",0,"""Normal_Cash_Deposits"""


**Convert Amount to log-Amount and convert strings to Datetime**

In [26]:
df = df.with_columns(
    pl.col("Amount").log().alias("Amount")
)
    # Convert datetime
df = df.with_columns(
    pl.col("Date").str.strptime(pl.Date, "%Y-%m-%d").alias("Date"),
    pl.col("Time").str.strptime(pl.Time, "%H:%M:%S").alias("Time"))

In [27]:
df_train, df_val, df_test = custom_split_polars(df)

Split complete:
  Train: 6397772 rows (until 2023-05-10)
  Val:   2076692 rows (2023-05-10–2023-07-19)
  Test:  1030388 rows (after 2023-07-19)


In [28]:
df_train = feature_engineering(df_train)
df_train = circular_transaction_feature(df_train)
df_train = recast(df_train)

df_val = feature_engineering(df_val)
df_val = circular_transaction_feature(df_val)
df_val = recast(df_val)

df_test = feature_engineering(df_test)
df_test = circular_transaction_feature(df_test)
df_test = recast(df_test)

  df_streamed = plan.collect(engine="streaming")
  schema = lf.schema()
  df_streamed = plan_derived.collect(engine="streaming")


**Save dataframes**

In [29]:
# Save Polars DataFrame as Parquet (efficient + preserves schema)
df_train.write_parquet(os.path.join(drive_path, "df_train.parquet"))
df_val.write_parquet(os.path.join(drive_path, "df_val.parquet"))
df_test.write_parquet(os.path.join(drive_path, "df_test.parquet"))