In [1]:
from datetime import datetime
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
from sklearn.naive_bayes import GaussianNB
import joblib
import pandas as pd


In [2]:
def domain_age_lessThanOne(report, create_date, update_date):
    if create_date != "" and create_date != "expired" and not pd.isna(create_date):
        age = datetime.strptime(report[:10], '%Y-%m-%d') - datetime.strptime(create_date[:10], '%Y-%m-%d')
        return (age.days // 365) < 1
    elif create_date == "" and update_date != "":
        age = datetime.strptime(report[:10], '%Y-%m-%d') - datetime.strptime(update_date[:10], '%Y-%m-%d')
        if age.days < 365:
            return None
        else:
            return False
    elif create_date == "expired":
        return True
    return None

In [3]:
def binary_to_numeric(value):
    if value:
        return 1
    if not value:
        return 0
    else:
        return None

In [4]:
def preprocess_data(data, features):
    
    preprocessed_data = data[features].copy()
    preprocessed_data['new_domain'] = None
    report_date = "2024-04-23"
    for index, item in preprocessed_data.iterrows():
        new_domain = domain_age_lessThanOne(report_date, item['creation_date'], item['updated_date'])
        preprocessed_data.loc[index, 'new_domain'] = new_domain
        
    preprocessed_data = preprocessed_data.drop('creation_date', axis=1)
    preprocessed_data = preprocessed_data.drop('updated_date', axis=1)
        
    # Transform binary values to numerical
    preprocessed_data['control_over_dns'] = preprocessed_data['control_over_dns'].astype(float).replace({True: 1.0, False: 0.0})
    preprocessed_data['domain_indexed'] = preprocessed_data['domain_indexed'].astype(float).replace({True: 1.0, False: 0.0})
    preprocessed_data['is_archived'] = preprocessed_data['is_archived'].astype(float).replace({True: 1.0, False: 0.0})
    preprocessed_data['known_hosting'] = preprocessed_data['known_hosting'].astype(float).replace({True: 1.0, False: 0.0})
    preprocessed_data['new_domain'] = preprocessed_data['new_domain'].astype(float).replace({True: 1.0, False: 0.0})
    preprocessed_data['is_on_root'] = preprocessed_data['is_on_root'].astype(float).replace({True: 1.0, False: 0.0})
    preprocessed_data['is_subdomain'] = preprocessed_data['is_subdomain'].astype(float).replace({True: 1.0, False: 0.0})
        
    return preprocessed_data

In [ ]:
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier


def train_imputers(X_train, numerical_features):
    features_with_missing = X_train.columns[X_train.isnull().any()].tolist()
    features_with_missing.sort(key=lambda x: X_train[x].isnull().sum())

    trained_imputers = {}
    best_params_dict = {}

    for feature in features_with_missing:
        complete_train = X_train.dropna(subset=[feature])
        
        param_grid = {
            "n_estimators": [100, 200, 500],
            "max_depth": [None, 5, 10],
            "min_samples_split": [2, 5, 10]
        }
        
        if feature in numerical_features:
            model = RandomForestRegressor(random_state=0)
            scoring = 'neg_mean_squared_error'
        else:
            model = RandomForestClassifier(random_state=0)
            scoring = 'accuracy'
        
        X_train_feat = complete_train.drop(feature, axis=1)
        y_train_feat = complete_train[feature]

        grid_search = GridSearchCV(model, param_grid, cv=5, scoring=scoring, n_jobs=-1)
        grid_search.fit(X_train_feat, y_train_feat)
        
        best_model = grid_search.best_estimator_
        best_params_dict[feature] = grid_search.best_params_
        
        # Store the trained imputer model for this feature
        trained_imputers[feature] = best_model

        # Impute missing values in the training set itself
        X_train_null = X_train[X_train[feature].isnull()].drop(feature, axis=1)
        if len(X_train_null) > 0:
            imputed_values = best_model.predict(X_train_null)
            X_train.loc[X_train[feature].isnull(), feature] = imputed_values

    return X_train, trained_imputers


def apply_imputers(X_test, trained_imputers):
    for feature, imputer_model in trained_imputers.items():
        X_test_null = X_test[X_test[feature].isnull()].drop(feature, axis=1)
        if len(X_test_null) > 0:
            imputed_values = imputer_model.predict(X_test_null)
            X_test.loc[X_test[feature].isnull(), feature] = imputed_values
    return X_test

In [10]:
def perform_classification(data, labels, sample_ids, path_prefix):
    # Map labels to numeric values
    label_mapping = {'attackers_domain': 0, 'compromised_domain': 1, 'shared_domain': 2}
    y = labels.map(label_mapping)
    X = data.copy()
    
    numerical_features = ['between_archives_distance', 'phish_archives_distance']
    
    scaler_max_abs_list = []
    
    param_grid = {
        'var_smoothing': np.logspace(-9, -1, 10)
    }
    model_to_tune = GaussianNB()
    
    # Declare the inner and outer cross-validation strategies
    inner_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
    outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
    
    outer_confusion_matrices = []
    outer_precision_list = []
    outer_recall_list = []
    outer_f1_list = []
    y_true_list = []
    y_pred_list = []
    sample_id_list = []
    fold_data_list = []
    best_params_list = []
    
    for i, (outer_train_index, outer_test_index) in enumerate(outer_cv.split(X, y)):
        X_outer_train = X.iloc[outer_train_index].reset_index(drop=True)
        X_outer_test = X.iloc[outer_test_index].reset_index(drop=True)
        y_outer_train = y.iloc[outer_train_index].reset_index(drop=True)
        y_outer_test = y.iloc[outer_test_index].reset_index(drop=True)
        sample_ids_outer_test = sample_ids.iloc[outer_test_index].reset_index(drop=True)
    
        # Fit MaxAbsScaler on X_outer_train[numerical_features]
        scaler = MaxAbsScaler()
        scaler.fit(X_outer_train[numerical_features])

        scaler_max_abs_list.append(scaler.max_abs_)

        X_outer_train_scaled = X_outer_train.copy()
        X_outer_test_scaled = X_outer_test.copy()
        X_outer_train_scaled[numerical_features] = scaler.transform(
            X_outer_train[numerical_features]
        )
        X_outer_test_scaled[numerical_features] = scaler.transform(
            X_outer_test[numerical_features]
        )
        
        # Handle missing values (if any) in training set and get trained imputers
        X_outer_train_scaled_imputed, trained_imputers = train_imputers(X_outer_train_scaled, numerical_features)
        X_outer_test_scaled_imputed = apply_imputers(X_outer_test_scaled, trained_imputers)
    
        # Inner cross-validation for parameter search on the current outer fold
        model = GridSearchCV(estimator=model_to_tune, param_grid=param_grid, cv=inner_cv, n_jobs=-1, scoring="f1_macro")
        model.fit(X_outer_train_scaled_imputed, y_outer_train)
    
        best_params_list.append(model.best_params_)
    
        y_pred = model.predict(X_outer_test_scaled_imputed)
    
        confusion_matrix_values = confusion_matrix(y_outer_test, y_pred)
        outer_confusion_matrices.append(confusion_matrix_values)
    
        precision, recall, f1, _ = precision_recall_fscore_support(
            y_outer_test, y_pred, average=None, labels=[0, 1, 2]
        )
        outer_precision_list.append(precision)
        outer_recall_list.append(recall)
        outer_f1_list.append(f1)
    
        y_true_list.extend(y_outer_test)
        y_pred_list.extend(y_pred)
        sample_id_list.extend(sample_ids_outer_test)
    
        print(f"Outer Fold {i+1} Confusion Matrix:\n{outer_confusion_matrices[-1]}")
        for j, (p, r, f_val) in enumerate(zip(precision, recall, f1)):
            print(
                f"Outer Fold {i+1} Class {j} Precision: {p:.3f}, Recall: {r:.3f}, F1-score: {f_val:.3f}"
            )
    
        fold_data = X_outer_test.copy()
        fold_data['sample_id'] = sample_ids_outer_test
        fold_data['actual'] = y_outer_test.map({v: k for k, v in label_mapping.items()})
        fold_data['predicted'] = pd.Series(y_pred).map(
            {v: k for k, v in label_mapping.items()}
        )
        fold_data_list.append(fold_data)
    
    fold_data_all = pd.concat(fold_data_list, axis=0).reset_index(drop=True)
    fold_data_all.to_csv(f"{path_prefix}naive_bayes_predictions_all.csv", index=False)
    
    # Aggregate best parameters from each fold
    best_params_df = pd.DataFrame(best_params_list)
    # Choose the parameters that appear most frequently
    best_params = best_params_df.mode().iloc[0].to_dict()
    
    param_types = {
        'var_smoothing': float
    }
    
    for param, param_type in param_types.items():
        if param in best_params:
            best_params[param] = param_type(best_params[param])
    
    scaler_max_abs_array = np.array(scaler_max_abs_list)
    aggregated_max_abs = np.mean(scaler_max_abs_array, axis=0)

    max_abs_df = pd.DataFrame({
        'feature': numerical_features,
        'max_abs': aggregated_max_abs
    })
    max_abs_df.to_csv(f"{path_prefix}scaler_max_abs_values.csv", index=False)

    # Fit final scaler on entire dataset for future use
    final_scaler = MaxAbsScaler()
    final_scaler.fit(X[numerical_features])
    
    joblib.dump(final_scaler, f"{path_prefix}scaler.pkl")
    
    # Transform the entire dataset
    X_scaled = X.copy()
    X_scaled[numerical_features] = final_scaler.transform(X[numerical_features])
    
    X_scaled_imputed, trained_imputers_whole = train_imputers(X_scaled, numerical_features)
    
    # Retrain the final model on the entire dataset using the best hyperparameters
    model_final = GaussianNB(**best_params)
    model_final.fit(X_scaled_imputed, y)
    
    joblib.dump(model_final, f"{path_prefix}naive_bayes_model.pkl")
    
    model_params = model_final.get_params()
    with open(f"{path_prefix}naive_bayes_model_params.txt", 'w') as f:
        for param, value in model_params.items():
            f.write(f"{param}: {value}\n")
    
    average_precision = np.mean(outer_precision_list, axis=0)
    average_recall = np.mean(outer_recall_list, axis=0)
    average_f1 = np.mean(outer_f1_list, axis=0)
    
    std_precision = np.std(outer_precision_list, axis=0)
    std_recall = np.std(outer_recall_list, axis=0)
    std_f1 = np.std(outer_f1_list, axis=0)

    print("\nAverage and Standard Deviation of Precision, Recall, and F1-score Across All Folds:")
    for j in range(len(average_precision)):
        print(
            f"Class {j} - Precision: {average_precision[j]:.3f} ± {std_precision[j]:.3f}, "
            f"Recall: {average_recall[j]:.3f} ± {std_recall[j]:.3f}, "
            f"F1-score: {average_f1[j]:.3f} ± {std_f1[j]:.3f}"
        )

In [11]:
# List of selected features
selected_features = [
    'creation_date',
    'updated_date',
    'control_over_dns',
    'domain_indexed',
    'known_hosting',
    'is_archived',
    'is_on_root',
    'is_subdomain',
    'between_archives_distance',
    'phish_archives_distance'
]

In [12]:
numerical_features = ['between_archives_distance', 'phish_archives_distance']

In [13]:
# Load the data
path_prefix = '../../results/'
df = pd.read_csv('../../PhishXtract-Class/Phish-Xtract-Class-Labeled/validated_dataset_for_classification.csv')
target = df['verified_category']
ids = df['id']

In [14]:
# Preprocess the data
transformed_data = preprocess_data(df, selected_features)

In [16]:
import numpy as np

print(np.unique(target, return_counts=True))

(array(['attackers_domain', 'compromised_domain', 'shared_domain'],
      dtype=object), array([1376,  106, 3954]))


In [17]:
perform_classification(transformed_data, target, ids, path_prefix)

Outer Fold 1 Confusion Matrix:
[[258   7  11]
 [  0  15   7]
 [  6  23 761]]
Outer Fold 1 Class 0 Precision: 0.977, Recall: 0.935, F1-score: 0.956
Outer Fold 1 Class 1 Precision: 0.333, Recall: 0.682, F1-score: 0.448
Outer Fold 1 Class 2 Precision: 0.977, Recall: 0.963, F1-score: 0.970
Outer Fold 2 Confusion Matrix:
[[258  12   5]
 [  1  18   2]
 [  0  25 766]]
Outer Fold 2 Class 0 Precision: 0.996, Recall: 0.938, F1-score: 0.966
Outer Fold 2 Class 1 Precision: 0.327, Recall: 0.857, F1-score: 0.474
Outer Fold 2 Class 2 Precision: 0.991, Recall: 0.968, F1-score: 0.980
Outer Fold 3 Confusion Matrix:
[[267   4   4]
 [  1  14   6]
 [  7  29 755]]
Outer Fold 3 Class 0 Precision: 0.971, Recall: 0.971, F1-score: 0.971
Outer Fold 3 Class 1 Precision: 0.298, Recall: 0.667, F1-score: 0.412
Outer Fold 3 Class 2 Precision: 0.987, Recall: 0.954, F1-score: 0.970
Outer Fold 4 Confusion Matrix:
[[257   8  10]
 [  1  13   7]
 [  5  32 754]]
Outer Fold 4 Class 0 Precision: 0.977, Recall: 0.935, F1-score