In [None]:
import os, json, numpy as np, pandas as pd
import networkx as nx
from networkx.readwrite import json_graph

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
DATA_DIR = ''
PROC_DIR = os.path.join(DATA_DIR, 'processed')

IN_PARQUET = os.path.join(
    PROC_DIR,
    'principal_sample_feats_by_account_100bps.parquet'
)

OUT_DIR = os.path.join(PROC_DIR, 'graphs_accounts_dir_edges')
os.makedirs(OUT_DIR, exist_ok=True)

In [None]:
df = pd.read_parquet(IN_PARQUET)
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True, errors='coerce')
df = df.sort_values('timestamp').reset_index(drop=True)

for c in ['src_bank','dst_bank','pay_currency','recv_currency','payment_format','currency_pair']:
    if c in df.columns:
        df[c] = df[c].astype('string')

for c in ['label','_same_bank']:
    if c in df.columns:
        df[c] = df[c].astype('int8')

In [None]:
accounts = pd.Index(pd.concat([df['src_account'], df['dst_account']], ignore_index=True).astype('string').unique(), name='account')
node_id = pd.RangeIndex(start=0, stop=len(accounts), name='node_id')
mapping = pd.DataFrame({'account': accounts}).reset_index().rename(columns={'index':'node_id'})
mapping.to_parquet(os.path.join(OUT_DIR, 'mapping_accounts.parquet'), index=False)

acc2id = pd.Series(mapping['node_id'].values, index=mapping['account']).astype('int32')
df['src_id'] = acc2id[df['src_account'].astype('string')].values
df['dst_id'] = acc2id[df['dst_account'].astype('string')].values
df['src_id'] = df['src_id'].astype('int32')
df['dst_id'] = df['dst_id'].astype('int32')

In [None]:
edge_base_cols = ['src_id','dst_id','timestamp','label','tx_id']
edge_feat_candidates = [
    '_dow','_hour','_amt_log','_same_bank','cp_bucket',
    '_src_tx_count_prev','_src_amt_sum_prev','_src_amt_mean_prev','_src_amt_std_prev','_src_secs_since_last',
    '_src_roll_mean_K','_src_roll_std_K','_src_roll_sum_K','_src_ewm_mean',
    '_dst_rx_count_prev','_dst_amt_sum_prev','_dst_amt_mean_prev','_dst_amt_std_prev','_dst_secs_since_last',
    '_dst_roll_mean_K','_dst_roll_std_K','_dst_roll_sum_K','_dst_ewm_mean',
    '_src_amt_z','_dst_amt_z',
    'amount_paid','fx_spread'
]
edge_feat_cols = [c for c in edge_feat_candidates if c in df.columns]

edges = df[edge_base_cols + edge_feat_cols].copy()

In [None]:
ts = pd.to_datetime(edges['timestamp'], utc=True, errors='coerce')
q1, q2 = ts.quantile([0.70, 0.85])

edges['split'] = 'test'
edges.loc[ts < q2, 'split'] = 'valid'
edges.loc[ts < q1, 'split'] = 'train'

edges['split'] = edges['split'].astype(pd.CategoricalDtype(
    categories=['train','valid','test'],
    ordered=True
))

In [None]:
print(edges['split'].value_counts())
print()
print((edges['split'].value_counts()/len(edges)).round(4))
print('\nfraude por split:', edges.groupby('split')['label'].mean().round(6))

split
train    70947
test     15207
valid    15204
Name: count, dtype: int64

split
train    0.70
test     0.15
valid    0.15
Name: count, dtype: float64

fraude por split: split
train    0.040890
valid    0.048408
test     0.101269
Name: label, dtype: float64


  print('\nfraude por split:', edges.groupby('split')['label'].mean().round(6))


In [None]:
agg_out = df.groupby('src_id').agg(
    out_deg=('src_id','size'),
    out_sum=('amount_paid','sum'),
    out_mean=('amount_paid','mean'),
    out_unique_dst=('dst_id','nunique'),
    first_seen_out=('timestamp','min'),
    last_seen_out=('timestamp','max'),
).reset_index()

agg_in = df.groupby('dst_id').agg(
    in_deg=('dst_id','size'),
    in_sum=('amount_paid','sum'),
    in_mean=('amount_paid','mean'),
    in_unique_src=('src_id','nunique'),
    first_seen_in=('timestamp','min'),
    last_seen_in=('timestamp','max'),
).reset_index()

nodes = pd.DataFrame({'node_id': node_id})
nodes = nodes.merge(agg_out, how='left', left_on='node_id', right_on='src_id').drop(columns=['src_id'])
nodes = nodes.merge(agg_in,  how='left', left_on='node_id', right_on='dst_id').drop(columns=['dst_id'])

fill0_cols = ['out_deg','out_sum','out_unique_dst','in_deg','in_sum','in_unique_src']
for c in fill0_cols:
    if c in nodes.columns:
        nodes[c] = nodes[c].fillna(0).astype('int32' if 'deg' in c or 'unique' in c else 'float32')

for c in ['out_mean','in_mean']:
    if c in nodes.columns:
        nodes[c] = nodes[c].fillna(0).astype('float32')

def most_frequent(s: pd.Series):
    if s.empty: return pd.NA
    return s.mode().iloc[0] if not s.mode().empty else pd.NA

bank_src = df.groupby('src_id')['src_bank'].agg(most_frequent).rename('bank_hint_src')
bank_dst = df.groupby('dst_id')['dst_bank'].agg(most_frequent).rename('bank_hint_dst')
bank = pd.concat([bank_src, bank_dst], axis=1)

bank['bank_hint'] = bank.apply(
    lambda r: r['bank_hint_src'] if pd.notna(r['bank_hint_src']) else r['bank_hint_dst'],
    axis=1
)
bank = bank[['bank_hint']].reset_index().rename(columns={'index':'node_id'})
nodes = nodes.merge(bank, on='node_id', how='left')

nodes['first_seen'] = nodes[['first_seen_in','first_seen_out']].min(axis=1)
nodes['last_seen']  = nodes[['last_seen_in','last_seen_out']].max(axis=1)
nodes = nodes.drop(columns=['first_seen_in','first_seen_out','last_seen_in','last_seen_out'])

nodes['deg'] = nodes['in_deg'].astype('int32') + nodes['out_deg'].astype('int32')
nodes['sum_total'] = nodes['in_sum'].astype('float32') + nodes['out_sum'].astype('float32')
nodes['unique_neighbors'] = nodes['in_unique_src'].astype('int32') + nodes['out_unique_dst'].astype('int32')

nodes['bank_hint'] = nodes['bank_hint'].astype('string')
nodes = nodes.sort_values('node_id').reset_index(drop=True)

In [None]:
edges_path = os.path.join(OUT_DIR, 'edges.parquet')
nodes_path = os.path.join(OUT_DIR, 'nodes.parquet')

edges.to_parquet(edges_path, index=False)
nodes.to_parquet(nodes_path, index=False)

print("Salvos:")
print(" -", edges_path)
print(" -", nodes_path)
print(" -", os.path.join(OUT_DIR, 'mapping_accounts.parquet'))

In [None]:
G = nx.DiGraph()
G.add_nodes_from(nodes['node_id'].tolist())

for row in edges[['src_id','dst_id','label']].itertuples(index=False):
    G.add_edge(int(row.src_id), int(row.dst_id), label=int(row.label))

nx_json = json_graph.node_link_data(G)
with open(os.path.join(OUT_DIR, 'graph_nx.json'), 'w') as f:
    json.dump(nx_json, f)

print(" -", os.path.join(OUT_DIR, 'graph_nx.json'))

In [None]:
PYG_OUT = os.path.join(OUT_DIR, 'pyg_np')
os.makedirs(PYG_OUT, exist_ok=True)

edge_index = np.vstack([edges['src_id'].to_numpy(np.int64),
                        edges['dst_id'].to_numpy(np.int64)])
np.save(os.path.join(PYG_OUT, 'edge_index.npy'), edge_index)

num_edge_feats = []
for c in edge_feat_cols:
    if pd.api.types.is_numeric_dtype(edges[c]):
        num_edge_feats.append(c)

edge_attr = edges[num_edge_feats].to_numpy(dtype=np.float32, copy=True)
np.save(os.path.join(PYG_OUT, 'edge_attr.npy'), edge_attr)

edge_y = edges['label'].to_numpy(dtype=np.int64, copy=True)
np.save(os.path.join(PYG_OUT, 'edge_y.npy'), edge_y)

node_num_cols = ['in_deg','out_deg','deg','in_sum','out_sum','sum_total','in_mean','out_mean','unique_neighbors']
node_num_cols = [c for c in node_num_cols if c in nodes.columns]
node_x = nodes[node_num_cols].fillna(0).to_numpy(dtype=np.float32, copy=True)
np.save(os.path.join(PYG_OUT, 'node_x.npy'), node_x)

split_map = {'train':0, 'valid':1, 'test':2}
edge_split = edges['split'].map(split_map).to_numpy(dtype=np.int64)
np.save(os.path.join(PYG_OUT, 'edge_split.npy'), edge_split)

print("Arquivos PyG salvos em:", PYG_OUT)

In [None]:
meta = {
  "nodes": {
    "count": int(nodes.shape[0]),
    "features_numeric": node_num_cols,
    "categoricals": ["bank_hint"]
  },
  "edges": {
    "count": int(edges.shape[0]),
    "features_numeric": num_edge_feats,
    "columns": edge_base_cols + edge_feat_cols + ['split'],
    "task": "edge_classification",
    "label_col": "label",
    "split": {"train": float((edges['split']=='train').mean()),
              "valid": float((edges['split']=='valid').mean()),
              "test":  float((edges['split']=='test').mean())}
  },
  "mapping_files": {
    "accounts": "mapping_accounts.parquet"
  }
}
with open(os.path.join(OUT_DIR, 'meta.json'), 'w') as f:
    json.dump(meta, f, indent=2, default=str)

print(" -", os.path.join(OUT_DIR, 'meta.json'))