In [1]:
import networkx as nx
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask.bag as db
import dask.distributed as Client
import lightgbm as lgb
import warnings
import time
import multiprocessing
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split, KFold
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import roc_auc_score
from imblearn.over_sampling import ADASYN
from imblearn.under_sampling import TomekLinks
from imblearn.pipeline import Pipeline as ImbPipeline
from typing import Dict, List, Tuple
from contextlib import contextmanager
from functools import partial
import time

In [2]:
class EnhancedSampler(BaseEstimator, TransformerMixin):
    def __init__(self, sampling_strategy='auto'):
        self.sampling_strategy = sampling_strategy
        self.pipeline = ImbPipeline([
            ('adasyn', ADASYN(sampling_strategy=sampling_strategy)),
            ('tomek', TomekLinks())
        ])
    
    def fit(self, X, y):
        return self
    
    def transform(self, X, y=None):
        if y is not None:
            X_resampled, y_resampled = self.pipeline.fit_resample(X, y)
            return X_resampled, y_resampled
        return X

In [3]:
# Load and preprocess data
edge_df = pd.read_csv("../../data/elliptic_txs_edgelist.csv")
class_df = pd.read_csv("../../data/elliptic_txs_classes.csv")

edge_df.rename(columns={"txId1": "source", "txId2": "target"}, inplace=True)
merged_df = edge_df.merge(class_df, left_on="source", right_on="txId", how="left")
merged_df = merged_df.merge(
    class_df,
    left_on="target",
    right_on="txId",
    how="left",
    suffixes=("_source", "_target"),
)

In [4]:
# Create Directed Graph
def create_directed_graph(edge_df):
    G_dir = nx.DiGraph()
    G_dir.add_nodes_from(class_df["txId"])
    G_dir.add_edges_from(edge_df[["source", "target"]].values)
    return G_dir

G_dir = create_directed_graph(edge_df)

In [5]:
def compute_enhanced_features_batch(nodes, G, max_hops=3):
    """Compute features for a batch of nodes to reduce graph copying overhead"""
    features_dict = {}
    for node in nodes:
        subgraph = nx.ego_graph(G, node, radius=max_hops, undirected=False)
        
        features = {
            "num_nodes": subgraph.number_of_nodes(),
            "num_edges": subgraph.number_of_edges(),
            "in_degree": subgraph.in_degree(node),
            "out_degree": subgraph.out_degree(node),
            "degree_centrality": nx.degree_centrality(subgraph)[node],
            "in_degree_centrality": nx.in_degree_centrality(subgraph)[node],
            "out_degree_centrality": nx.out_degree_centrality(subgraph)[node],
            "pagerank": nx.pagerank(subgraph)[node],
            "clustering_coeff": nx.clustering(subgraph.to_undirected(), node),
            "local_clustering_coeff": nx.average_clustering(subgraph.to_undirected()),
        }
        
        # More expensive computations
        try:
            features["average_neighbor_degree"] = np.mean(
                [subgraph.degree(n) for n in subgraph.neighbors(node)]
            ) if list(subgraph.neighbors(node)) else 0
        except:
            features["average_neighbor_degree"] = 0
            
        try:
            features["connectivity_ratio"] = (subgraph.number_of_edges() / 
                (subgraph.number_of_nodes() * (subgraph.number_of_nodes() - 1) + 1))
        except:
            features["connectivity_ratio"] = 0
            
        try:
            features["strongly_connected_components"] = len(list(nx.strongly_connected_components(subgraph)))
        except:
            features["strongly_connected_components"] = 0
            
        try:
            features["weakly_connected_components"] = len(list(nx.weakly_connected_components(subgraph)))
        except:
            features["weakly_connected_components"] = 0
            
        try:
            features["harmonic_centrality"] = nx.harmonic_centrality(subgraph)[node]
        except:
            features["harmonic_centrality"] = 0
            
        try:
            features["eigenvector_centrality"] = nx.eigenvector_centrality(
                subgraph,
                max_iter=100,
                tol=1e-2
            ).get(node, 0)
        except:
            features["eigenvector_centrality"] = 0
            
        try:
            strongly_components = list(nx.strongly_connected_components(subgraph))
            features["max_strongly_connected_components"] = max(len(c) for c in strongly_components) if strongly_components else 0
        except:
            features["max_strongly_connected_components"] = 0
            
        try:
            features["betweenness_centrality"] = nx.betweenness_centrality(subgraph)[node]
        except:
            features["betweenness_centrality"] = 0
            
        features_dict[node] = features
        
    return features_dict

In [6]:
def extract_features_parallel(G, merged_df, node_class, max_hops=3, n_jobs=-1):
    """Extract features with optimized batching for many CPUs"""
    start_time = time.time()
    
    # Get nodes for the specified class
    nodes = merged_df[merged_df["class_source"] == str(node_class)]["source"].tolist()
    total_nodes = len(nodes)
    
    # Calculate optimal batch size for 256 CPUs
    n_cpus = n_jobs if n_jobs > 0 else multiprocessing.cpu_count()
    
    # Aim for 2 batches per CPU to keep all cores busy
    target_n_batches = n_cpus * 2
    batch_size = max(1, total_nodes // target_n_batches)
    
    # Split nodes into batches
    node_batches = [nodes[i:i + batch_size] for i in range(0, len(nodes), batch_size)]
    n_batches = len(node_batches)
    
    print(f"Processing {total_nodes} nodes in {n_batches} batches")
    print(f"Batch size: {batch_size} nodes")
    print(f"Using {n_cpus} CPU cores")
    
    # Process batches in parallel using multiprocessing
    results = Parallel(n_jobs=n_jobs, verbose=1, prefer="processes")(
        delayed(compute_enhanced_features_batch)(batch, G, max_hops)
        for batch in node_batches
    )
    
    # Merge results
    combined_features = {}
    for result in results:
        combined_features.update(result)
    
    end_time = time.time()
    elapsed = end_time - start_time
    nodes_per_second = total_nodes / elapsed
    print(f"Processed {total_nodes} nodes in {elapsed:.2f} seconds")
    print(f"Processing speed: {nodes_per_second:.1f} nodes/second")
    
    return combined_features

In [None]:
# Extract features for each class with batching
class_1_features = extract_features_parallel(
    G_dir, 
    merged_df, 
    node_class=1, 
    n_jobs=256 # Adjust based on your graph size
)
class_2_features = extract_features_parallel(
    G_dir, 
    merged_df, 
    node_class=2, 
    n_jobs=256
)

all_features = pd.DataFrame.from_dict(
    {**class_1_features, **class_2_features}, orient="index"
)
all_features["class"] = ["1"] * len(class_1_features) + ["2"] * len(class_2_features)

# Prepare the data
X = all_features.drop(columns=["class"])
y = all_features["class"].astype(int)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)

In [None]:

@contextmanager
def timer(name: str):
    """Context manager for timing code blocks"""
    start = time.time()
    yield
    print(f"{name} took {time.time() - start:.2f} seconds")

def calculate_scale_pos_weight(y: np.ndarray) -> float:
    """Calculate scale_pos_weight for imbalanced datasets"""
    return np.sum(y == 0) / np.sum(y == 1)

def get_default_params(scale_pos_weight: float, num_cpu_per_model: int) -> Dict:
    """Get default LightGBM parameters optimized for HPC"""
    return {
        'objective': 'binary',
        'metric': 'auc',
        'boosting_type': 'gbdt',
        'num_leaves': 255,  # Increased for more powerful machine
        'learning_rate': 0.05,
        'feature_fraction': 0.9,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'verbose': -1,
        'num_threads': num_cpu_per_model,  # Dedicated CPUs per model
        'scale_pos_weight': scale_pos_weight,
        'deterministic': True,
        'force_col_wise': True,
        'min_data_in_leaf': 20,
        'max_bin': 255,
        'device_type': 'cpu',  # Explicitly set device type
        'linear_tree': False,  # Better for parallel processing
        'histogram_pool_size': -1,  # Use all available memory for histograms
        'use_missing': True,  # Optimize handling of missing values
        'feature_pre_filter': False,  # Disable pre-filtering for better parallel performance
    }

In [None]:
def train_fold(fold: int, 
               train_idx: np.ndarray, 
               val_idx: np.ndarray, 
               X_train: pd.DataFrame, 
               y_train: pd.Series, 
               X_test: pd.DataFrame, 
               params: Dict, 
               feature_names: List[str]) -> Tuple:
    """Train a single fold with dedicated resources"""
    # Split data
    X_train_fold = X_train.iloc[train_idx]
    y_train_fold = y_train.iloc[train_idx]
    X_val_fold = X_train.iloc[val_idx]
    y_val_fold = y_train.iloc[val_idx]
    
    # Create datasets
    train_set = lgb.Dataset(
        X_train_fold, 
        y_train_fold, 
        feature_name=feature_names,
        free_raw_data=True
    )
    val_set = lgb.Dataset(
        X_val_fold, 
        y_val_fold, 
        feature_name=feature_names,
        free_raw_data=True,
        reference=train_set
    )
    
    # Train model
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        model = lgb.train(
            params,
            train_set,
            num_boost_round=1000,
            valid_sets=[val_set],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50),
                lgb.log_evaluation(period=100)
            ]
        )
    
    # Evaluate
    val_preds = model.predict(X_val_fold)
    cv_score = roc_auc_score(y_val_fold, val_preds)
    
    # Get feature importance
    importance = pd.DataFrame({
        'feature': feature_names,
        'importance': model.feature_importance('gain'),
        'fold': fold
    })
    
    # Make test predictions if test data is provided
    test_predictions = model.predict(X_test) if X_test is not None else None
    
    return model, cv_score, importance, test_predictions

In [None]:
class LightGBMCV:
    def __init__(self, 
                 params: Dict = None, 
                 n_splits: int = 5, 
                 random_state: int = 42, 
                 n_jobs: int = None):
        self.params = params
        self.n_splits = n_splits
        self.random_state = random_state
        self.n_jobs = n_jobs or min(multiprocessing.cpu_count(), 256)  # Default to available CPUs
        self.models = []
        self.feature_importance = None
        self.cv_scores = []
        
    def train_and_evaluate(
        self, 
        X_train: pd.DataFrame, 
        y_train: pd.Series, 
        X_test: pd.DataFrame = None
    ) -> Tuple[List[float], pd.DataFrame, np.ndarray]:
        """
        Train models with cross-validation using parallel processing
        """
        with timer("Cross-validation training"):
            # Calculate CPUs per model
            cpus_per_model = max(1, self.n_jobs // self.n_splits)
            
            # Calculate scale_pos_weight if not provided
            if self.params is None:
                scale_pos_weight = calculate_scale_pos_weight(y_train)
                self.params = get_default_params(scale_pos_weight, cpus_per_model)
            else:
                self.params['num_threads'] = cpus_per_model
            
            # Setup cross-validation
            kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
            
            # Prepare feature names
            feature_names = X_train.columns.tolist()
            
            # Train models in parallel
            with Parallel(n_jobs=self.n_splits, backend='multiprocessing', verbose=1) as parallel:
                results = parallel(
                    delayed(train_fold)(
                        fold, train_idx, val_idx, X_train, y_train, 
                        X_test, self.params, feature_names
                    )
                    for fold, (train_idx, val_idx) in enumerate(kf.split(X_train))
                )
            
            # Unpack results
            self.models, self.cv_scores, importances, test_preds = zip(*results)
            
            # Aggregate feature importance
            self.feature_importance = pd.concat(importances)
            self.feature_importance = (
                self.feature_importance.groupby('feature')
                .agg({
                    'importance': ['mean', 'std']
                })
                .reset_index()
            )
            self.feature_importance.columns = ['feature', 'importance_mean', 'importance_std']
            self.feature_importance = self.feature_importance.sort_values(
                'importance_mean', 
                ascending=False
            )
            
            # Aggregate test predictions
            test_predictions = (
                np.mean(test_preds, axis=0) if X_test is not None else None
            )
            
        return self.cv_scores, self.feature_importance, test_predictions

    def get_feature_importance(self, top_n: int = None) -> pd.DataFrame:
        """Get feature importance with optional top N filter"""
        if self.feature_importance is None:
            raise ValueError("Model hasn't been trained yet")
        if top_n:
            return self.feature_importance.head(top_n)
        return self.feature_importance

In [None]:
# Initialize with high CPU count
cv_model = LightGBMCV(n_splits=5, n_jobs=-1)  # Use all 256 CPUs

# Train and evaluate
cv_scores, feature_importance, test_predictions = cv_model.train_and_evaluate(
    X_train, 
    y_train, 
    X_test
)

# Print results
print(f"CV Scores: {cv_scores}")
print(f"Mean CV Score: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")

# Get top 10 features
top_features = cv_model.get_feature_importance(top_n=10)
print("\nTop 10 Features:")
print(top_features)