In [15]:
import os
import warnings
import numpy as np
import pandas as pd
import torch
import mlflow
import mlflow.pytorch
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import DoubleType, ArrayType, FloatType
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
import torch_geometric
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score
from typing import Iterator, Tuple

In [None]:
# Suppress warnings
warnings.filterwarnings('ignore')
sparkInit = SparkSession.builder \
            .appName("TransactionClassificationGNN") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.shuffle.partitions", "200") \
            .config("spark.sql.debug.maxToStringFields", "100") \
            .getOrCreate()
# def create_spark_session():
#     """
#     Create a Spark session with
#     """
#     try:
#         return SparkSession.builder \
#             .appName("TransactionClassificationGNN") \
#             .config("spark.sql.adaptive.enabled", "true") \
#             .config("spark.sql.shuffle.partitions", "200") \
#             .config("spark.sql.debug.maxToStringFields", "100") \
#             .getOrCreate()
#     except Exception as e:
#         print(f"Error creating Spark session: {e}")
#         raise

In [None]:
class ImprovedSparkGNN:
    def __init__(self, input_dir, num_features=166, num_classes=2):
        # Create Spark session with error handling
        self.spark = sparkInit
        
        # Validate SparkSession
        if not self.spark:
            raise RuntimeError("Failed to initialize SparkSession.")
        
        self.input_dir = input_dir
        self.num_features = num_features
        self.num_classes = num_classes
        
        # Detect and set device
        self.device = self._get_device()
        
        # Set random seeds for reproducibility
        self._set_seeds()
    
    def _get_device(self):
        """
        Detect and return appropriate device (GPU if available, else CPU)
        """
        if torch.cuda.is_available():
            print("Using CUDA GPU")
            return torch.device('cuda')
        elif torch.backends.mps.is_available():
            print("Using Apple Metal Performance Shaders")
            return torch.device('mps')
        else:
            print("Using CPU")
            return torch.device('cpu')
    
    def _set_seeds(self):
        """
        Set random seeds for reproducibility
        """
        torch.manual_seed(42)
        np.random.seed(42)
        if self.device.type == 'cuda':
            torch.cuda.manual_seed_all(42)
    
    def load_data(self):
        """
        Load data with more robust error handling
        """
        try:
            features_path = os.path.join(self.input_dir, 'elliptic_txs_features.csv')
            classes_path = os.path.join(self.input_dir, 'elliptic_txs_classes.csv')
            edgelist_path = os.path.join(self.input_dir, 'elliptic_txs_edgelist.csv')
            
            # Check if files exist
            for path in [features_path, classes_path, edgelist_path]:
                if not os.path.exists(path):
                    raise FileNotFoundError(f"File not found: {path}")
            
            features_df = self.spark.read.csv(
                features_path, 
                header=False, 
                inferSchema=True
            )
            
            classes_df = self.spark.read.csv(
                classes_path, 
                header=True, 
                inferSchema=True
            )
            
            edgelist_df = self.spark.read.csv(
                edgelist_path, 
                header=True, 
                inferSchema=True
            )
            
            # Rename and process columns efficiently
            features_cols = ['txId'] + [f'V{i}' for i in range(1, self.num_features + 1)]
            features_df = features_df.toDF(*features_cols)
            
            # Use broadcast join for small dataframes
            features_df = features_df.join(
                classes_df.withColumn('class_mapped', 
                    F.when(F.col('class') == '1', 'illicit')
                    .when(F.col('class') == '2', 'licit')
                    .otherwise('unknown')
                ), 
                'txId'
            )
            
            return features_df, edgelist_df
        
        except Exception as e:
            print(f"Error loading data: {e}")
            raise
    
    @pandas_udf("array<float>", PandasUDFType.GROUPED_MAP)
    def feature_engineering_udf(pdf: pd.DataFrame) -> pd.DataFrame:
        """
        Pandas UDF for feature engineering and normalization
        """
        # Dynamically determine feature columns
        feature_cols = [col for col in pdf.columns if col.startswith('V')]
        
        # Min-max scaling
        pdf[feature_cols] = (pdf[feature_cols] - pdf[feature_cols].min()) / (pdf[feature_cols].max() - pdf[feature_cols].min())
        
        # Add synthetic feature
        pdf['total_feature_sum'] = pdf[feature_cols].sum(axis=1)
        
        # Return processed dataframe
        return pdf[feature_cols + ['total_feature_sum', 'txId', 'class_mapped']]
    
    def preprocess_data(self, features_df):
        # Apply feature engineering UDF
        engineered_df = features_df.groupBy('txId').apply(feature_engineering_udf)
        
        # Rest of the method remains the same
        @pandas_udf(ArrayType(FloatType()), PandasUDFType.GROUPED_MAP)
        def vectorize_features(pdf: pd.DataFrame) -> pd.DataFrame:
            feature_cols = [f'V{i}' for i in range(1, 167)] + ['total_feature_sum']
            pdf['features'] = pdf[feature_cols].apply(lambda row: Vectors.dense(row.values), axis=1)
            return pdf[['txId', 'features', 'class_mapped']]
        
        vectorized_df = engineered_df.groupBy('txId').apply(vectorize_features)
        
        # String Indexing for labels
        indexer = StringIndexer(inputCol='class_mapped', outputCol='label')
        pipeline = Pipeline(stages=[indexer])
        model = pipeline.fit(vectorized_df)
        processed_df = model.transform(vectorized_df)
        
        return processed_df
    
    def create_torch_geometric_data(self, processed_df, edgelist_df):
        # Convert Spark DataFrame to Pandas for efficient processing
        processed_pdf = processed_df.toPandas()
        edge_pdf = edgelist_df.toPandas()
        
        # Create node ID mapping
        tx_id_mapping = {txid: idx for idx, txid in enumerate(processed_pdf['txId'])}
        
        # Prepare node features and labels
        node_features = torch.tensor(
            processed_pdf['features'].apply(lambda x: x.toArray()).values, 
            dtype=torch.float
        )
        
        node_labels = torch.tensor(
            processed_pdf['label'].values, 
            dtype=torch.long
        )
        
        # Filter and map edge indices
        edge_df_filtered = edge_pdf[
            edge_pdf['txId1'].isin(processed_pdf['txId']) & 
            edge_pdf['txId2'].isin(processed_pdf['txId'])
        ]
        
        edge_index_list = [
            (tx_id_mapping[row['txId1']], tx_id_mapping[row['txId2']]) 
            for _, row in edge_df_filtered.iterrows()
        ]
        
        edge_index = torch.tensor(edge_index_list, dtype=torch.long).t().contiguous()
        
        # Prepare graph data with train/val/test splits
        data = Data(x=node_features, edge_index=edge_index, y=node_labels)
        
        num_nodes = data.num_nodes
        train_mask = torch.zeros(num_nodes, dtype=torch.bool)
        val_mask = torch.zeros(num_nodes, dtype=torch.bool)
        test_mask = torch.zeros(num_nodes, dtype=torch.bool)
        
        known_mask = (data.y == 0) | (data.y == 1)
        known_indices = torch.nonzero(known_mask).squeeze()
        
        perm = torch.randperm(len(known_indices))
        known_indices = known_indices[perm]
        
        train_ratio, val_ratio = 0.8, 0.1
        train_size = int(train_ratio * len(known_indices))
        val_size = int(val_ratio * len(known_indices))
        
        train_indices = known_indices[:train_size]
        val_indices = known_indices[train_size:train_size+val_size]
        test_indices = known_indices[train_size+val_size:]
        
        train_mask[train_indices] = True
        val_mask[val_indices] = True
        test_mask[test_indices] = True
        
        data.train_mask = train_mask
        data.val_mask = val_mask
        data.test_mask = test_mask
        
        return data.to(self.device)
    
    class GCN(torch.nn.Module):
        def __init__(self, num_features, num_classes):
            super().__init__()
            self.conv1 = GCNConv(num_features, 16)
            self.conv2 = GCNConv(16, num_classes)
            self.dropout = torch.nn.Dropout(0.5)
        
        def forward(self, data):
            x, edge_index = data.x, data.edge_index
            x = self.conv1(x, edge_index)
            x = torch.nn.functional.relu(x)
            x = self.dropout(x)
            x = self.conv2(x, edge_index)
            return torch.nn.functional.log_softmax(x, dim=1)
    
    def train(self, data, num_epochs=100):
        # MLflow experiment tracking
        mlflow.set_experiment("Transaction_Classification_GNN")
        
        with mlflow.start_run():
            # Log hyperparameters
            mlflow.log_params({
                "num_features": data.num_features,
                "num_classes": self.num_classes,
                "learning_rate": 0.01,
                "weight_decay": 0.0005,
                "epochs": num_epochs
            })
            
            model = self.GCN(num_features=data.num_features, num_classes=self.num_classes).to(self.device)
            optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0005)
            criterion = torch.nn.CrossEntropyLoss()
            
            model.train()
            for epoch in range(num_epochs):
                optimizer.zero_grad()
                out = model(data)
                loss = criterion(out[data.train_mask], data.y[data.train_mask])
                loss.backward()
                optimizer.step()
                
                # Log training loss
                mlflow.log_metric("training_loss", loss.item(), step=epoch)
                
                if epoch % 10 == 0:
                    print(f'Epoch {epoch:03d}, Loss: {loss.item():.4f}')
            
            # Log model
            mlflow.pytorch.log_model(model, "gnn_model")
            
        return model
    
    def evaluate(self, model, data):
        model.eval()
        with torch.no_grad():
            out = model(data)
            pred = out.argmax(dim=1)

            test_mask = data.test_mask
            y_true = data.y[test_mask].cpu().numpy()
            y_pred = pred[test_mask].cpu().numpy()
            
            precision = precision_score(y_true, y_pred, average='weighted')
            recall = recall_score(y_true, y_pred, average='weighted')
            f1 = f1_score(y_true, y_pred, average='weighted')
            
            # Log evaluation metrics
            mlflow.log_metrics({
                "precision": precision,
                "recall": recall,
                "f1_score": f1
            })
            
            print("\nClassification Report:")
            print(classification_report(y_true, y_pred, target_names=['illicit', 'licit']))
            
            return {
                'precision': precision,
                'recall': recall,
                'f1_score': f1
            }
    
    def run_pipeline(self, mlflow_tracking=False):
        """
        Simplified pipeline method with optional MLflow tracking
        """
        mlflow.set_tracking_uri("http://localhost:5000")
        try:
            # Optional MLflow tracking
            if mlflow_tracking:
                mlflow.set_experiment("Transaction_Classification_GNN")
                mlflow.start_run()
            
            # Load and preprocess data
            features_df, edgelist_df = self.load_data()
            
            # Convert numeric columns to double
            numeric_cols = [f'V{i}' for i in range(1, self.num_features + 1)]
            for column in numeric_cols:
                features_df = features_df.withColumn(column, col(column).cast(DoubleType()))
            
            print("Data loaded successfully. Starting preprocessing...")
            
            # Preprocess data
            processed_df = self.preprocess_data(features_df)
            
            # Create graph data
            graph_data = self.create_torch_geometric_data(processed_df, edgelist_df)
            
            # Train model and log with MLflow
            model = self.train(graph_data)
            
            # Evaluate model and log metrics
            metrics = self.evaluate(model, graph_data)
            
            return model, metrics
            
        except Exception as e:
            print(f"Error in pipeline execution: {e}")
            raise
        
        finally:
            # Always stop the Spark session
            if self.spark:
                self.spark.stop()
            
            # End MLflow run if started
            if mlflow_tracking:
                mlflow.end_run()

RuntimeError: SparkContext or SparkSession should be created first.

In [14]:
if __name__ == '__main__':
    spark = create_spark_session()
    input_directory = ''
    
    try:
        # Create and run the pipeline
        gnn_pipeline = ImprovedSparkGNN(input_directory)
        model, metrics = gnn_pipeline.run_pipeline()
        print("\nFinal Metrics:")
        for metric, value in metrics.items():
            print(f"{metric.capitalize()}: {value:.4f}")
    
    except Exception as e:
        print(f"Pipeline failed: {e}")

Using CPU


                                                                                

Data loaded successfully. Starting preprocessing...
Error in pipeline execution: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP.
Pipeline failed: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP.
