In [20]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.spatial.distance import euclidean
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

# Set random seed for reproducibility
np.random.seed(42)

def load_cicmalmem_dataset(filepath):
    """
    Load the CICMalMem 2022 dataset
    """
    try:
        # Load the dataset
        df = pd.read_csv(filepath)
        print(f"Dataset loaded successfully!")
        print(f"Shape: {df.shape}")
        print(f"Columns: {df.columns.tolist()}")
        
        # Display basic info
        print("\nDataset Info:")
        print(f"Total samples: {len(df)}")
        if 'label' in df.columns:
            print(f"Label distribution:\n{df['label'].value_counts()}")
        elif 'Label' in df.columns:
            print(f"Label distribution:\n{df['Label'].value_counts()}")
        
        return df
    
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None

# Load your dataset
# Replace 'path_to_your_dataset.csv' with actual path
df = load_cicmalmem_dataset('Obfuscated-MalMem2022.csv')



Dataset loaded successfully!
Shape: (58596, 57)
Columns: ['Category', 'pslist.nproc', 'pslist.nppid', 'pslist.avg_threads', 'pslist.nprocs64bit', 'pslist.avg_handlers', 'dlllist.ndlls', 'dlllist.avg_dlls_per_proc', 'handles.nhandles', 'handles.avg_handles_per_proc', 'handles.nport', 'handles.nfile', 'handles.nevent', 'handles.ndesktop', 'handles.nkey', 'handles.nthread', 'handles.ndirectory', 'handles.nsemaphore', 'handles.ntimer', 'handles.nsection', 'handles.nmutant', 'ldrmodules.not_in_load', 'ldrmodules.not_in_init', 'ldrmodules.not_in_mem', 'ldrmodules.not_in_load_avg', 'ldrmodules.not_in_init_avg', 'ldrmodules.not_in_mem_avg', 'malfind.ninjections', 'malfind.commitCharge', 'malfind.protection', 'malfind.uniqueInjections', 'psxview.not_in_pslist', 'psxview.not_in_eprocess_pool', 'psxview.not_in_ethread_pool', 'psxview.not_in_pspcid_list', 'psxview.not_in_csrss_handles', 'psxview.not_in_session', 'psxview.not_in_deskthrd', 'psxview.not_in_pslist_false_avg', 'psxview.not_in_eprocess

In [27]:
def extract_first_digit(value):
    """
    Extract the first significant digit from a number
    Following the paper's methodology (Equation 3)
    """
    if pd.isna(value):
        return 0
    
    # Convert to numeric if it's a string
    try:
        if isinstance(value, str):
            # Try to convert string to float
            value = float(value)
        
        if value == 0:
            return 0
            
    except (ValueError, TypeError):
        # If conversion fails, return 0
        return 0
    
    # Take absolute value (modulus operation from paper)
    abs_value = abs(float(value))
    
    # Collapse the number to extract first digit
    # D_collapsed = |10 * a / 10^int(log10(a))|
    if abs_value < 1:
        # For numbers < 1, multiply until >= 1
        while abs_value < 1 and abs_value > 0:
            abs_value *= 10
    
    if abs_value == 0:
        return 0
        
    # Extract first digit
    try:
        first_digit = int(str(abs_value)[0])
        return first_digit if first_digit != 0 else 1
    except (ValueError, IndexError):
        return 0

def get_digit_frequencies(data_series):
    """
    Calculate the frequency of each digit (1-9) in a data series
    """
    # Convert series to list and ensure numeric values
    if isinstance(data_series, pd.Series):
        data_list = data_series.tolist()
    else:
        data_list = list(data_series)
    
    # Extract first digits with improved error handling
    first_digits = []
    for val in data_list:
        digit = extract_first_digit(val)
        if digit > 0:  # Only include valid digits 1-9
            first_digits.append(digit)
    
    # Count frequencies
    digit_counts = {i: first_digits.count(i) for i in range(1, 10)}
    total_count = sum(digit_counts.values())
    
    if total_count == 0:
        return np.zeros(9)
    
    # Calculate relative frequencies
    frequencies = np.array([digit_counts[i] / total_count for i in range(1, 10)])
    return frequencies

def clean_and_prepare_data(df, label_col='binary_label'):
    """
    Clean and prepare the dataset for Benford's Law analysis
    """
    print("Cleaning and preparing data...")
    
    # Create a copy
    df_clean = df.copy()
    
    # Convert label to binary if needed
    if label_col not in df_clean.columns:
        if 'Class' in df_clean.columns:
            unique_classes = df_clean['Class'].unique()
            print(f"Unique classes: {unique_classes}")
            
            # Map to binary (assuming first class is benign=0, others malicious=1)
            if len(unique_classes) == 2:
                df_clean['binary_label'] = (df_clean['Class'] != unique_classes[0]).astype(int)
                label_col = 'binary_label'
            else:
                # For multi-class, convert to binary (benign vs malicious)
                benign_classes = ['Benign', 'benign', 'BENIGN', 'Normal', 'normal']
                df_clean['binary_label'] = (~df_clean['Class'].isin(benign_classes)).astype(int)
                label_col = 'binary_label'
    
    # Get all potential feature columns (exclude categorical columns)
    feature_cols = []
    exclude_cols = ['Category', 'Class', 'binary_label']
    
    for col in df_clean.columns:
        if col not in exclude_cols:
            feature_cols.append(col)
    
    print(f"Found {len(feature_cols)} potential feature columns")
    
    # Convert all feature columns to numeric
    for col in feature_cols:
        df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
    
    # Remove columns with too many NaN values (>50%)
    valid_cols = []
    for col in feature_cols:
        nan_ratio = df_clean[col].isna().sum() / len(df_clean)
        if nan_ratio < 0.5:  # Keep columns with <50% NaN values
            valid_cols.append(col)
        else:
            print(f"Removing {col}: {nan_ratio:.2%} NaN values")
    
    print(f"Kept {len(valid_cols)} valid feature columns")
    
    # Fill remaining NaN values with 0 (common approach for this type of data)
    for col in valid_cols:
        df_clean[col] = df_clean[col].fillna(0)
    
    return df_clean, valid_cols, label_col

# Updated feature selection function
def select_features_benford_improved(df, feature_cols, label_column='binary_label'):
    """
    Improved feature selection that handles data type issues
    """
    # Define thresholds from the paper
    PEARSON_THRESHOLD = 0.5  # Lowered for initial testing
    KL_THRESHOLD = 0.20      # Increased for initial testing
    EUCLIDEAN_THRESHOLD = 0.30
    MAD_THRESHOLD = 0.01     # Increased for initial testing
    
    print(f"Analyzing {len(feature_cols)} numerical features...")
    
    # Calculate Benford's distribution
    _, benford_expected = calculate_benford_distribution()
    
    # Store results
    feature_analysis = []
    dist_func = DistanceFunctions()
    
    for col in feature_cols:
        try:
            # Get data for this feature
            col_data = df[col].dropna()
            
            # Skip if not enough data or all zeros
            if len(col_data) < 100 or col_data.nunique() < 5:
                continue
            
            # Get observed frequencies for this feature
            observed_freq = get_digit_frequencies(col_data)
            
            # Skip if not enough digit variety
            if np.sum(observed_freq) == 0:
                continue
            
            # Calculate all distance metrics
            kl_div = dist_func.kullback_leibler_divergence(observed_freq, benford_expected)
            js_div = dist_func.jensen_shannon_divergence(observed_freq, benford_expected)
            mad = dist_func.mean_absolute_deviation(observed_freq, benford_expected)
            pearson_corr = dist_func.pearson_correlation(observed_freq, benford_expected)
            euclidean_dist = dist_func.euclidean_distance(observed_freq, benford_expected)
            ks_stat, ks_pvalue = dist_func.kolmogorov_smirnov_test(observed_freq, benford_expected)
            
            # Count how many criteria are met
            criteria_met = 0
            if not np.isnan(pearson_corr) and pearson_corr >= PEARSON_THRESHOLD:
                criteria_met += 1
            if not np.isnan(kl_div) and kl_div <= KL_THRESHOLD:
                criteria_met += 1
            if not np.isnan(euclidean_dist) and euclidean_dist <= EUCLIDEAN_THRESHOLD:
                criteria_met += 1
            if not np.isnan(mad) and mad <= MAD_THRESHOLD:
                criteria_met += 1
            
            feature_analysis.append({
                'feature': col,
                'kl_divergence': kl_div,
                'js_divergence': js_div,
                'mad': mad,
                'pearson_correlation': pearson_corr,
                'euclidean_distance': euclidean_dist,
                'ks_statistic': ks_stat,
                'ks_pvalue': ks_pvalue,
                'criteria_met': criteria_met,
                'benford_conformity': pearson_corr if not np.isnan(pearson_corr) else 0
            })
            
        except Exception as e:
            print(f"Error analyzing feature {col}: {e}")
            continue
    
    # Convert to DataFrame for easier analysis
    results_df = pd.DataFrame(feature_analysis)
    
    if len(results_df) == 0:
        print("No valid features found! Using top features by data quality...")
        # Fallback: select features with good data quality
        selected_features = feature_cols[:10]  # Take first 10 as fallback
    else:
        # Select features that meet at least 1 criteria (relaxed from 2)
        candidate_features = results_df[results_df['criteria_met'] >= 1]
        
        if len(candidate_features) == 0:
            # Further fallback: select top features by Pearson correlation
            candidate_features = results_df.nlargest(10, 'benford_conformity')
        
        selected_features = candidate_features['feature'].tolist()
    
    print(f"\nFeature Analysis Results:")
    print(f"Total features analyzed: {len(results_df)}")
    print(f"Features meeting ≥1 criteria: {len(selected_features)}")
    
    # Show top features
    if len(results_df) > 0:
        top_features = results_df.nlargest(10, 'benford_conformity')
        print(f"\nTop 10 Features by Benford's Law Conformity:")
        for _, row in top_features.iterrows():
            print(f"{row['feature']}: Pearson={row['pearson_correlation']:.3f}, "
                  f"KL={row['kl_divergence']:.3f}, Criteria={row['criteria_met']}")
    
    return selected_features, results_df

In [29]:
def run_complete_benford_analysis(df):
    """
    Run the complete Benford's Law malware detection analysis
    """
    print("="*70)
    print("BENFORD'S LAW MALWARE DETECTION - COMPLETE ANALYSIS")
    print("="*70)
    
    # Step 1: Clean and prepare data
    df_clean, feature_cols, label_col = clean_and_prepare_data(df)
    
    print(f"\nLabel distribution after cleaning:")
    print(df_clean[label_col].value_counts())
    
    # Step 2: Feature selection
    print(f"\n{'='*50}")
    print("FEATURE SELECTION BASED ON BENFORD'S LAW")
    print(f"{'='*50}")
    
    selected_features, analysis_results = select_features_benford_improved(df_clean, feature_cols, label_col)
    
    if len(selected_features) == 0:
        print("No features selected! Using top features by variance...")
        # Fallback selection
        feature_vars = df_clean[feature_cols].var().sort_values(ascending=False)
        selected_features = feature_vars.head(5).index.tolist()
    
    print(f"\nSelected {len(selected_features)} features for modeling:")
    for i, feature in enumerate(selected_features[:10], 1):  # Show max 10
        print(f"{i}. {feature}")
    
    # Step 3: Train and evaluate model
    print(f"\n{'='*50}")
    print("MODEL TRAINING AND EVALUATION")
    print(f"{'='*50}")
    
    model, results, test_data = train_and_evaluate_benford_model(df_clean, selected_features, label_col)
    
    return model, results, test_data, selected_features, analysis_results

# Run the complete analysis
try:
    model, results, test_data, selected_features, analysis_results = run_complete_benford_analysis(df)
    print("\n🎉 Analysis completed successfully!")
    
except Exception as e:
    print(f"\n❌ Error during analysis: {e}")
    import traceback
    traceback.print_exc()

BENFORD'S LAW MALWARE DETECTION - COMPLETE ANALYSIS
Cleaning and preparing data...
Found 55 potential feature columns
Kept 55 valid feature columns

Label distribution after cleaning:
binary_label
0    29298
1    29298
Name: count, dtype: int64

FEATURE SELECTION BASED ON BENFORD'S LAW
Analyzing 55 numerical features...

Feature Analysis Results:
Total features analyzed: 48
Features meeting ≥1 criteria: 23

Top 10 Features by Benford's Law Conformity:
psxview.not_in_ethread_pool: Pearson=0.994, KL=0.036, Criteria=3
psxview.not_in_pspcid_list: Pearson=0.990, KL=0.031, Criteria=3
psxview.not_in_pslist: Pearson=0.990, KL=0.031, Criteria=3
malfind.protection: Pearson=0.968, KL=0.249, Criteria=2
psxview.not_in_deskthrd_false_avg: Pearson=0.965, KL=0.585, Criteria=1
dlllist.ndlls: Pearson=0.946, KL=0.690, Criteria=1
pslist.nppid: Pearson=0.870, KL=1.061, Criteria=1
malfind.uniqueInjections: Pearson=0.869, KL=1.089, Criteria=1
svcscan.nactive: Pearson=0.864, KL=1.196, Criteria=1
svcscan.share

Traceback (most recent call last):
  File "/var/folders/pg/065496ks5dxfngyxnk48wjpr0000gn/T/ipykernel_40349/2219921756.py", line 43, in <module>
    model, results, test_data, selected_features, analysis_results = run_complete_benford_analysis(df)
                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/pg/065496ks5dxfngyxnk48wjpr0000gn/T/ipykernel_40349/2219921756.py", line 37, in run_complete_benford_analysis
    model, results, test_data = train_and_evaluate_benford_model(df_clean, selected_features, label_col)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/pg/065496ks5dxfngyxnk48wjpr0000gn/T/ipykernel_40349/4265649076.py", line 72, in train_and_evaluate_benford_model
    y_pred_train = model.predict(X_train)
                   ^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/pg/065496ks5dxfngyxnk48wjpr0000gn/T/ipykernel_40349/613079078.py", li

In [22]:
class DistanceFunctions:
    """
    Implementation of all distance functions used in the paper
    """
    
    @staticmethod
    def kullback_leibler_divergence(observed, expected):
        """
        Calculate KL divergence: D_KL(P||Q) = Σ P(i) * log(P(i)/Q(i))
        """
        # Add small epsilon to avoid division by zero
        epsilon = 1e-10
        observed = observed + epsilon
        expected = expected + epsilon
        
        # Normalize to ensure they sum to 1
        observed = observed / np.sum(observed)
        expected = expected / np.sum(expected)
        
        kl_div = np.sum(observed * np.log(observed / expected))
        return kl_div
    
    @staticmethod
    def jensen_shannon_divergence(observed, expected):
        """
        Calculate Jensen-Shannon divergence
        """
        epsilon = 1e-10
        observed = observed + epsilon
        expected = expected + epsilon
        
        # Normalize
        P = observed / np.sum(observed)
        Q = expected / np.sum(expected)
        
        # Calculate M = 0.5 * (P + Q)
        M = 0.5 * (P + Q)
        
        # JS divergence
        js_div = 0.5 * np.sum(P * np.log(P / M)) + 0.5 * np.sum(Q * np.log(Q / M))
        return js_div
    
    @staticmethod
    def mean_absolute_deviation(observed, expected):
        """
        Calculate Mean Absolute Deviation (MAD)
        MAD = Σ|Fr - Ef| / N
        """
        return np.mean(np.abs(observed - expected))
    
    @staticmethod
    def pearson_correlation(observed, expected):
        """
        Calculate Pearson correlation coefficient
        """
        correlation, _ = stats.pearsonr(observed, expected)
        return correlation if not np.isnan(correlation) else 0
    
    @staticmethod
    def kolmogorov_smirnov_test(observed, expected):
        """
        Perform Kolmogorov-Smirnov test
        """
        # Convert to cumulative distributions
        observed_cumsum = np.cumsum(observed)
        expected_cumsum = np.cumsum(expected)
        
        # KS statistic is the maximum difference
        ks_statistic = np.max(np.abs(observed_cumsum - expected_cumsum))
        
        # Calculate p-value using scipy
        _, p_value = stats.ks_2samp(observed, expected)
        
        return ks_statistic, p_value
    
    @staticmethod
    def z_statistic(observed, expected, n):
        """
        Calculate Z-statistic for each digit
        Z = |AP - FE| - (1/2N) / sqrt(FE * (1-FE) / N)
        """
        if n == 0:
            return np.zeros_like(observed)
        
        z_scores = []
        for i in range(len(observed)):
            numerator = abs(observed[i] - expected[i]) - (1.0 / (2 * n))
            denominator = np.sqrt((expected[i] * (1 - expected[i])) / n)
            
            if denominator == 0:
                z_scores.append(0)
            else:
                z_scores.append(numerator / denominator)
        
        return np.array(z_scores)
    
    @staticmethod
    def euclidean_distance(observed, expected):
        """
        Calculate Euclidean distance between observed and expected frequencies
        """
        return euclidean(observed, expected)

# Test the distance functions
print("Testing Distance Functions:")
test_observed = np.array([0.301, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046])
test_expected = benford_prob

dist_func = DistanceFunctions()
print(f"KL Divergence: {dist_func.kullback_leibler_divergence(test_observed, test_expected):.6f}")
print(f"JS Divergence: {dist_func.jensen_shannon_divergence(test_observed, test_expected):.6f}")
print(f"MAD: {dist_func.mean_absolute_deviation(test_observed, test_expected):.6f}")
print(f"Pearson Correlation: {dist_func.pearson_correlation(test_observed, test_expected):.6f}")
print(f"Euclidean Distance: {dist_func.euclidean_distance(test_observed, test_expected):.6f}")

Testing Distance Functions:
KL Divergence: 0.000001
JS Divergence: 0.000000
MAD: 0.000101
Pearson Correlation: 0.999999
Euclidean Distance: 0.000373


In [28]:
def select_features_benford(df, label_column='label'):
    """
    Select features that best conform to Benford's Law
    Based on the paper's methodology and thresholds
    """
    # Define thresholds from the paper
    PEARSON_THRESHOLD = 0.85
    KL_THRESHOLD = 0.10
    EUCLIDEAN_THRESHOLD = 0.20
    MAD_THRESHOLD = 1e-5
    
    # Get numerical columns (exclude label)
    numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if label_column in numerical_cols:
        numerical_cols.remove(label_column)
    
    print(f"Analyzing {len(numerical_cols)} numerical features...")
    
    # Calculate Benford's distribution
    _, benford_expected = calculate_benford_distribution()
    
    # Store results
    feature_analysis = []
    dist_func = DistanceFunctions()
    
    for col in numerical_cols:
        try:
            # Get observed frequencies for this feature
            observed_freq = get_digit_frequencies(df[col].dropna())
            
            # Skip if not enough data
            if np.sum(observed_freq) == 0:
                continue
            
            # Calculate all distance metrics
            kl_div = dist_func.kullback_leibler_divergence(observed_freq, benford_expected)
            js_div = dist_func.jensen_shannon_divergence(observed_freq, benford_expected)
            mad = dist_func.mean_absolute_deviation(observed_freq, benford_expected)
            pearson_corr = dist_func.pearson_correlation(observed_freq, benford_expected)
            euclidean_dist = dist_func.euclidean_distance(observed_freq, benford_expected)
            ks_stat, ks_pvalue = dist_func.kolmogorov_smirnov_test(observed_freq, benford_expected)
            
            # Count how many criteria are met (paper's approach)
            criteria_met = 0
            if pearson_corr >= PEARSON_THRESHOLD:
                criteria_met += 1
            if kl_div <= KL_THRESHOLD:
                criteria_met += 1
            if euclidean_dist <= EUCLIDEAN_THRESHOLD:
                criteria_met += 1
            if mad <= MAD_THRESHOLD:
                criteria_met += 1
            
            feature_analysis.append({
                'feature': col,
                'kl_divergence': kl_div,
                'js_divergence': js_div,
                'mad': mad,
                'pearson_correlation': pearson_corr,
                'euclidean_distance': euclidean_dist,
                'ks_statistic': ks_stat,
                'ks_pvalue': ks_pvalue,
                'criteria_met': criteria_met,
                'benford_conformity': pearson_corr if not np.isnan(pearson_corr) else 0
            })
            
        except Exception as e:
            print(f"Error analyzing feature {col}: {e}")
            continue
    
    # Convert to DataFrame for easier analysis
    results_df = pd.DataFrame(feature_analysis)
    
    # Select features that meet at least 2 criteria (paper's approach)
    selected_features = results_df[results_df['criteria_met'] >= 2]['feature'].tolist()
    
    print(f"\nFeature Analysis Results:")
    print(f"Total features analyzed: {len(results_df)}")
    print(f"Features meeting ≥2 criteria: {len(selected_features)}")
    
    # Show top features by Benford conformity
    top_features = results_df.nlargest(10, 'benford_conformity')
    print(f"\nTop 10 Features by Benford's Law Conformity:")
    for _, row in top_features.iterrows():
        print(f"{row['feature']}: Pearson={row['pearson_correlation']:.3f}, "
              f"KL={row['kl_divergence']:.3f}, Criteria={row['criteria_met']}")
    
    return selected_features, results_df

# Apply feature selection (you'll run this once you have your dataset loaded)
selected_features, analysis_results = select_features_benford(df)

Analyzing 56 numerical features...

Feature Analysis Results:
Total features analyzed: 53
Features meeting ≥2 criteria: 3

Top 10 Features by Benford's Law Conformity:
psxview.not_in_ethread_pool: Pearson=0.994, KL=0.036, Criteria=3
psxview.not_in_pspcid_list: Pearson=0.990, KL=0.031, Criteria=3
psxview.not_in_pslist: Pearson=0.990, KL=0.031, Criteria=3
malfind.protection: Pearson=0.968, KL=0.249, Criteria=1
psxview.not_in_deskthrd_false_avg: Pearson=0.965, KL=0.585, Criteria=1
dlllist.ndlls: Pearson=0.946, KL=0.690, Criteria=1
pslist.nppid: Pearson=0.870, KL=1.061, Criteria=1
malfind.uniqueInjections: Pearson=0.869, KL=1.089, Criteria=1
psxview.not_in_eprocess_pool: Pearson=0.864, KL=1.201, Criteria=1
modules.nmodules: Pearson=0.864, KL=1.201, Criteria=1


In [24]:
# Check the label columns
print("Checking label distributions:")
print("\nCategory column:")
print(df['Category'].value_counts())
print("\nClass column:")
print(df['Class'].value_counts())

# Let's use 'Class' as our main label (looks like it's binary)
# Prepare labels - convert to binary if needed
if df['Class'].dtype == 'object':
    # Convert string labels to binary
    unique_labels = df['Class'].unique()
    print(f"\nUnique labels in Class: {unique_labels}")
    
    # Map to binary (0=benign, 1=malicious)
    label_mapping = {}
    if len(unique_labels) == 2:
        # Assume first unique value is benign (0), second is malicious (1)
        label_mapping = {unique_labels[0]: 0, unique_labels[1]: 1}
        df['binary_label'] = df['Class'].map(label_mapping)
    else:
        # If more than 2 classes, we might need to group them
        print("Multiple classes detected. Please specify which should be considered malicious.")

print(f"\nFinal label distribution:")
if 'binary_label' in df.columns:
    print(df['binary_label'].value_counts())
    label_col = 'binary_label'
else:
    label_col = 'Class'

# Now run feature selection
print("\n" + "="*60)
print("RUNNING BENFORD'S LAW FEATURE SELECTION")
print("="*60)

selected_features, analysis_results = select_features_benford(df, label_column=label_col)

Checking label distributions:

Category column:
Category
Benign                                                                                     29298
Spyware-Gator-1bdcd3b777965f67678748d2577b119a275aca9aed9549d45e64e692a54a7b5e-1.raw           2
Spyware-Gator-1bfb316482877ee42e5a5078fef44c0eb51bc44c1e88ecbccd02ce4dc4694bd3-2.raw           2
Spyware-Gator-1bfb316482877ee42e5a5078fef44c0eb51bc44c1e88ecbccd02ce4dc4694bd3-10.raw          2
Spyware-Gator-1bfb316482877ee42e5a5078fef44c0eb51bc44c1e88ecbccd02ce4dc4694bd3-1.raw           2
                                                                                           ...  
Spyware-Gator-0b25829d15dc951a44e7652fc6de9d936d7d51f29586d56dbf8fccea419252ac-6.raw           1
Spyware-Gator-0b25829d15dc951a44e7652fc6de9d936d7d51f29586d56dbf8fccea419252ac-5.raw           1
Spyware-Gator-0b25829d15dc951a44e7652fc6de9d936d7d51f29586d56dbf8fccea419252ac-4.raw           1
Spyware-Gator-0b25829d15dc951a44e7652fc6de9d936d7d51f29586d56dbf8fccea

In [25]:
class BenfordMalwareDetector:
    """
    Benford's Law-based Malware Detection Model
    """
    
    def __init__(self):
        self.selected_features = []
        self.benford_expected = None
        self.feature_thresholds = {}
        self.dist_func = DistanceFunctions()
        
    def fit(self, X, y, selected_features):
        """
        Fit the model by calculating thresholds for each selected feature
        """
        self.selected_features = selected_features
        _, self.benford_expected = calculate_benford_distribution()
        
        print(f"Training on {len(selected_features)} selected features...")
        
        # Calculate thresholds for each feature based on benign samples
        benign_mask = (y == 0)
        
        for feature in selected_features:
            if feature in X.columns:
                # Get benign samples for this feature
                benign_data = X[benign_mask][feature].dropna()
                
                if len(benign_data) > 0:
                    # Calculate observed frequencies for benign data
                    observed_freq = get_digit_frequencies(benign_data)
                    
                    # Calculate distance metrics
                    kl_div = self.dist_func.kullback_leibler_divergence(observed_freq, self.benford_expected)
                    pearson_corr = self.dist_func.pearson_correlation(observed_freq, self.benford_expected)
                    
                    # Store thresholds (use benign behavior as baseline)
                    self.feature_thresholds[feature] = {
                        'kl_threshold': kl_div * 1.5,  # Allow some deviation
                        'pearson_threshold': max(0.5, pearson_corr * 0.8)  # Minimum correlation
                    }
        
        print(f"Model trained on {len(self.feature_thresholds)} features")
        return self
    
    def predict(self, X):
        """
        Predict using the best combination from the paper: KL + Pearson correlation
        """
        predictions = []
        
        for idx, row in X.iterrows():
            suspicion_score = 0
            valid_features = 0
            
            for feature in self.selected_features:
                if feature in X.columns:
                    feature_data = [row[feature]] if not pd.isna(row[feature]) else []
                    
                    if len(feature_data) > 0:
                        # Get observed frequencies for this sample's feature
                        observed_freq = get_digit_frequencies(feature_data * 100)  # Replicate to get frequencies
                        
                        if np.sum(observed_freq) > 0:
                            # Calculate KL divergence and Pearson correlation
                            kl_div = self.dist_func.kullback_leibler_divergence(observed_freq, self.benford_expected)
                            pearson_corr = self.dist_func.pearson_correlation(observed_freq, self.benford_expected)
                            
                            # Check against thresholds
                            if feature in self.feature_thresholds:
                                thresholds = self.feature_thresholds[feature]
                                
                                # High KL divergence = suspicious
                                if kl_div > thresholds['kl_threshold']:
                                    suspicion_score += 1
                                
                                # Low correlation = suspicious  
                                if pearson_corr < thresholds['pearson_threshold']:
                                    suspicion_score += 1
                                
                                valid_features += 1
            
            # Make prediction based on suspicion score
            if valid_features > 0:
                suspicion_ratio = suspicion_score / (valid_features * 2)  # *2 because we have 2 criteria per feature
                prediction = 1 if suspicion_ratio > 0.5 else 0  # Threshold from paper's methodology
            else:
                prediction = 0  # Default to benign if no valid features
            
            predictions.append(prediction)
        
        return np.array(predictions)
    
    def predict_proba(self, X):
        """
        Return prediction probabilities
        """
        probabilities = []
        
        for idx, row in X.iterrows():
            suspicion_score = 0
            valid_features = 0
            
            for feature in self.selected_features:
                if feature in X.columns:
                    feature_data = [row[feature]] if not pd.isna(row[feature]) else []
                    
                    if len(feature_data) > 0:
                        observed_freq = get_digit_frequencies(feature_data * 100)
                        
                        if np.sum(observed_freq) > 0:
                            kl_div = self.dist_func.kullback_leibler_divergence(observed_freq, self.benford_expected)
                            pearson_corr = self.dist_func.pearson_correlation(observed_freq, self.benford_expected)
                            
                            if feature in self.feature_thresholds:
                                thresholds = self.feature_thresholds[feature]
                                
                                if kl_div > thresholds['kl_threshold']:
                                    suspicion_score += 1
                                if pearson_corr < thresholds['pearson_threshold']:
                                    suspicion_score += 1
                                
                                valid_features += 1
            
            if valid_features > 0:
                suspicion_ratio = suspicion_score / (valid_features * 2)
                prob_malicious = min(max(suspicion_ratio, 0), 1)  # Clamp between 0 and 1
            else:
                prob_malicious = 0.5  # Neutral if no valid features
            
            probabilities.append([1 - prob_malicious, prob_malicious])
        
        return np.array(probabilities)

In [26]:
def evaluate_model(y_true, y_pred, model_name="Model"):
    """
    Comprehensive evaluation of the model
    """
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    
    print(f"\n{model_name} Performance:")
    print("-" * 40)
    print(f"Accuracy:  {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1-Score:  {f1:.4f}")
    
    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    print(f"\nConfusion Matrix:")
    print(f"True Negatives (TN):  {cm[0,0]}")
    print(f"False Positives (FP): {cm[0,1]}")
    print(f"False Negatives (FN): {cm[1,0]}")
    print(f"True Positives (TP):  {cm[1,1]}")
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'confusion_matrix': cm
    }

def train_and_evaluate_benford_model(df, selected_features, label_col='binary_label'):
    """
    Train and evaluate the Benford's Law malware detection model
    """
    print("\n" + "="*60)
    print("TRAINING AND EVALUATING BENFORD'S LAW MODEL")
    print("="*60)
    
    # Prepare features and labels
    X = df[selected_features + [col for col in df.columns if col.startswith(('pslist', 'handles', 'malfind', 'psxview', 'dlllist', 'ldrmodules', 'modules', 'svcscan', 'callbacks'))]].copy()
    y = df[label_col].values
    
    # Remove any remaining non-numeric columns
    numeric_cols = X.select_dtypes(include=[np.number]).columns
    X = X[numeric_cols]
    
    print(f"Using {len(X.columns)} features for training")
    print(f"Dataset size: {len(X)} samples")
    print(f"Label distribution: {np.bincount(y)}")
    
    # Split data (80-20 split like in the paper)
    split_idx = int(0.8 * len(X))
    indices = np.random.permutation(len(X))
    
    train_idx = indices[:split_idx]
    test_idx = indices[split_idx:]
    
    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]
    
    print(f"Train set: {len(X_train)} samples")
    print(f"Test set: {len(X_test)} samples")
    
    # Initialize and train the model
    model = BenfordMalwareDetector()
    model.fit(X_train, y_train, selected_features)
    
    # Make predictions
    print("\nMaking predictions...")
    y_pred_train = model.predict(X_train)
    y_pred_test = model.predict(X_test)
    
    # Evaluate on training set
    train_results = evaluate_model(y_train, y_pred_train, "Training Set")
    
    # Evaluate on test set  
    test_results = evaluate_model(y_test, y_pred_test, "Test Set")
    
    # Compare with paper results
    print("\n" + "="*60)
    print("COMPARISON WITH PAPER RESULTS")
    print("="*60)
    print("Paper Results (KL + Pearson, α=0.1):")
    print("Accuracy: 85.60%, Precision: 88.30%, Recall: 82.08%, F1-Score: 85.08%")
    print(f"\nOur Results:")
    print(f"Accuracy: {test_results['accuracy']:.2%}, Precision: {test_results['precision']:.2%}, "
          f"Recall: {test_results['recall']:.2%}, F1-Score: {test_results['f1_score']:.2%}")
    
    return model, test_results, (X_test, y_test, y_pred_test)

# Now let's run everything
print("Starting the complete Benford's Law malware detection process...")

# Run the training and evaluation
model, results, test_data = train_and_evaluate_benford_model(df, selected_features, label_col)

Starting the complete Benford's Law malware detection process...

TRAINING AND EVALUATING BENFORD'S LAW MODEL
Using 64 features for training
Dataset size: 58596 samples
Label distribution: [29298 29298]
Train set: 46876 samples
Test set: 11720 samples
Training on 3 selected features...


TypeError: bad operand type for abs(): 'str'