In [173]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats as ss
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder, LabelEncoder, RobustScaler,FunctionTransformer
from sklearn.impute import KNNImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import log_loss, accuracy_score, precision_score, recall_score, confusion_matrix, classification_report
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE


In [174]:
df = pd.read_csv('../../dataset_train.csv')
df = df.drop(columns=["label"])
df_test = pd.read_csv('../../dataset_test.csv')
df.head()
df_test.head()

Unnamed: 0,id,proto,state,dur,sbytes,dbytes,sttl,dttl,sloss,dloss,...,ct_flw_http_mthd,is_ftp_login,ct_ftp_cmd,ct_srv_src,ct_srv_dst,ct_dst_ltm,ct_src_ltm,ct_src_dport_ltm,ct_dst_sport_ltm,ct_dst_src_ltm
0,0,tcp,FIN,0.45498,534.0,268.0,254.0,252.0,2.0,1.0,...,0.0,0.0,0.0,5.0,5.0,2.0,2.0,2.0,1.0,2.0
1,1,tcp,FIN,0.648037,8854.0,268.0,254.0,252.0,4.0,1.0,...,0.0,,0.0,6.0,6.0,1.0,1.0,1.0,1.0,5.0
2,2,tcp,FIN,1.120856,3440.0,642.0,254.0,252.0,5.0,3.0,...,0.0,0.0,0.0,4.0,4.0,1.0,2.0,1.0,1.0,4.0
3,3,udp,INT,1e-06,244.0,0.0,254.0,,0.0,0.0,...,0.0,0.0,0.0,10.0,4.0,2.0,4.0,2.0,1.0,4.0
4,4,tcp,FIN,0.264763,1540.0,1644.0,31.0,29.0,4.0,4.0,...,,0.0,0.0,13.0,11.0,10.0,7.0,6.0,1.0,7.0


In [175]:
categorical_features = ['proto', 'state', 'service','is_sm_ips_ports','is_ftp_login','attack_cat']
noncategorical_features = [col for col in df.columns.tolist() if col not in categorical_features]

In [176]:
original_train = df.copy()
le_attack_cat = LabelEncoder()
df['attack_cat'] = le_attack_cat.fit_transform(df['attack_cat'])

train_set, val_set = train_test_split(df, test_size=0.2, random_state=42)

In [177]:
class FeatureImputer(BaseEstimator, TransformerMixin):
    def __init__(self, strategy='mean', fill_value=None):
        """
        Initialize the imputer for handling missing values.

        :param strategy: The strategy to use for imputation ('mean', 'median', 'most_frequent', 'constant').
                         Default is 'mean'.
        :param fill_value: The value to use for the 'constant' strategy. Default is None.
        """
        self.strategy = strategy
        self.fill_value = fill_value
        self.imputer = SimpleImputer(strategy=self.strategy, fill_value=self.fill_value)

    def fit(self, X):
        """
        Fit the imputer to the data.

        :param X: Features data with missing values
        """
        self.imputer.fit(X)

    def transform(self, X):
        """
        Transform the data by imputing the missing values.

        :param X: Features data with missing values
        :return: Data with missing values imputed
        """
        return self.imputer.transform(X)

    def fit_transform(self, X, y=None):
        """
        Fit the imputer and transform the data.

        :param X: Features data with missing values
        :return: Data with missing values imputed
        """
        return self.imputer.fit_transform(X)

    def get_imputation_statistics(self):
        """
        Get the imputation statistics (e.g., mean or median values used for imputation).

        :return: The statistics used for imputation (depending on the strategy)
        """
        return self.imputer.statistics_

In [178]:
class OutlierClipper(BaseEstimator, TransformerMixin):
    def __init__(self, lower_percentile=0.01, upper_percentile=0.99):
        self.lower_percentile = lower_percentile
        self.upper_percentile = upper_percentile

    def fit(self, X, y=None):
        # Ensure X is a DataFrame during fitting
        if not isinstance(X, np.ndarray):
            X = np.array(X)
        self.lower_bounds = np.percentile(X, self.lower_percentile * 100, axis=0)
        self.upper_bounds = np.percentile(X, self.upper_percentile * 100, axis=0)
        return self

    def transform(self, X):
        # Ensure X is a NumPy array during transformation
        if not isinstance(X, np.ndarray):
            X = np.array(X)
        return np.clip(X, self.lower_bounds, self.upper_bounds)

In [179]:
class DuplicateRemover(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X

    def fit_transform(self, X, y):
        self.fit(X,y)
        X_unique, indices = np.unique(X[0], axis=0, return_index=True)
        y_unique = X[1][indices]
        return X_unique, y_unique

In [180]:
class FeatureSelection(BaseEstimator, TransformerMixin):
    def __init__(self, k=10, score_func=f_classif):
        """
        Initialize the feature selection process.

        :param k: Number of top features to select. Default is 10.
        :param score_func: Scoring function to evaluate the features. Default is f_classif (ANOVA F-test).
        """
        self.k = k
        self.score_func = score_func
        self.selector = SelectKBest(score_func=self.score_func, k=self.k)

    def fit(self, X, y):
        """
        Fit the selector to the data.

        :param X: Features
        :param y: Target labels
        """
        self.selector.fit(X, y)

    def transform(self, X, y=None):
        """
        Apply the feature selection transformation.

        :param X: Features to transform
        :return: Transformed features
        """
        return self.selector.transform(X)

    def fit_transform(self, X,y):
        """
        Fit the selector and apply the transformation.

        :param X: Features
        :param y: Target labels
        :return: Transformed features
        """
        return self.selector.fit_transform(X, y)

    def get_support(self):
        """
        Get the mask of selected features.

        :return: Mask of selected features (True/False)
        """
        return self.selector.get_support()

    def get_selected_features(self):
        """
        Get the indices of the selected features.

        :return: List of selected feature indices
        """
        return self.selector.get_support(indices=True)

In [181]:
class FeatureScaling(BaseEstimator, TransformerMixin):
    def __init__(self, method="standard"):
        self.method = method
        self.scaler = None

    def fit(self, X, y=None):
        if self.method == "standard":
            self.scaler = StandardScaler().fit(X)
        elif self.method == "minmax":
            self.scaler = MinMaxScaler().fit(X)
        return self

    def transform(self, X):
        return self.scaler.transform(X) if self.scaler else X

In [182]:
class MixedEncodingTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, onehot_columns=None, label_columns=None):
        """
        Parameters:
        - onehot_columns: List of column indices for one-hot encoding.
        - label_columns: List of column indices for label encoding.
        """
        self.onehot_columns = onehot_columns or []
        self.label_columns = label_columns or []
        self.onehot_encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore') if self.onehot_columns else None
        self.label_encoder = LabelEncoder() if self.label_columns else None

    def fit(self, X, y=None):
        """
        Fit the transformers to the data.

        Parameters:
        - X: Input data array (2D).
        - y: Optional target labels, not used in this transformer.
        """
        if self.onehot_columns:
            # Fit one-hot encoder for the specified columns
            self.onehot_encoder.fit(X[:, self.onehot_columns])

        if self.label_columns:
            # Fit label encoder for the specified columns
            for col in self.label_columns:
                self.label_encoder.fit(X[:, col])

        return self

    def transform(self, X):
        """
        Transform the input data using the appropriate encoding methods.

        Parameters:
        - X: Input data array (2D).
        """
        X_transformed = X.copy()

        if self.onehot_columns:
            onehot_encoded = self.onehot_encoder.transform(X[:, self.onehot_columns])
            # Replace the original columns with one-hot encoded columns
            X_transformed = np.delete(X_transformed, self.onehot_columns, axis=1)
            X_transformed = np.hstack([X_transformed, onehot_encoded])

        if self.label_columns:
            for col in self.label_columns:
                X_transformed[:, col] = self.label_encoder.transform(X[:, col])

        return X_transformed

    def fit_transform(self, X, y=None):
        """
        Fit the transformers and transform the data.

        Parameters:
        - X: Input data array (2D).
        - y: Optional target labels, not used in this transformer.
        """
        self.fit(X, y)
        return self.transform(X)


In [183]:
class SMOTEHandler(BaseEstimator, TransformerMixin):
    def __init__(self, random_state=None, sampling_strategy='auto'):
        """
        Initialize the SMOTE handler.

        :param random_state: Random state for reproducibility (default is None)
        :param sampling_strategy: Defines the sampling strategy for SMOTE (default is 'auto')
        """
        self.random_state = random_state
        self.sampling_strategy = sampling_strategy
        self.smote = SMOTE(random_state=self.random_state, sampling_strategy=self.sampling_strategy)

    def fit(self, X, y):
        """
        Fit the SMOTE model to the training data.

        :param X: Feature matrix
        :param y: Target vector
        :return: self
        """
        self.smote.fit(X, y)
        return self

    def transform(self, X):
        return X

    def fit_transform(self, X, y):
        """
        Fit and transform the dataset in one step.

        :param X: Feature matrix
        :param y: Target vector
        :return: Balanced feature matrix X, and target vector y
        """
        return self.smote.fit_resample(X, y)

In [184]:
from sklearn.preprocessing import Normalizer

class DataNormalizer(BaseEstimator, TransformerMixin):
    def __init__(self, norm='l2'):
        """
        Initialize the data normalizer.

        :param norm: Norm to use for normalization, can be 'l1', 'l2', or 'max'. Default is 'l2'.
        """
        self.norm = norm
        self.normalizer = Normalizer(norm=self.norm)

    def fit(self, X, y=None):
        """
        Fit the normalizer to the data.

        :param X: Feature matrix
        :param y: Target vector (optional)
        :return: self
        """
        self.normalizer.fit(X)
        return self

    def transform(self, X):
        """
        Normalize the data.

        :param X: Feature matrix
        :return: Normalized feature matrix
        """
        return self.normalizer.transform(X)

    def fit_transform(self, X, y=None):
        """
        Fit and transform the data in one step.

        :param X: Feature matrix
        :param y: Target vector (optional)
        :return: Normalized feature matrix
        """
        return self.normalizer.fit_transform(X)

In [185]:
class DimensionalityReducer(BaseEstimator, TransformerMixin):
    def __init__(self, n_components=None):
        """
        Initialize the PCA dimensionality reducer.

        :param n_components: Number of principal components to keep.
                              If None, keeps all components.
                              Can also be a float (explained variance ratio).
        """
        self.n_components = n_components
        self.pca = PCA(n_components=self.n_components)

    def fit(self, X, y=None):
        """
        Fit the PCA model to the data.

        :param X: Feature matrix.
        :param y: Target vector (optional).
        :return: self
        """
        self.pca.fit(X)
        return self

    def transform(self, X):
        """
        Transform the data to the lower-dimensional space.

        :param X: Feature matrix.
        :return: Transformed data in lower-dimensional space.
        """
        return self.pca.transform(X)

    def fit_transform(self, X, y=None):
        """
        Fit and transform the data in one step.

        :param X: Feature matrix.
        :param y: Target vector (optional).
        :return: Transformed data in lower-dimensional space.
        """
        return self.pca.fit_transform(X[0])

    def explained_variance_ratio(self):
        """
        Return the explained variance ratio of each principal component.

        :return: Array of explained variance ratios for each component.
        """
        return self.pca.explained_variance_ratio_

    def components(self):
        """
        Return the principal components (eigenvectors).

        :return: Matrix of principal components.
        """
        return self.pca.components_

In [186]:
categorical_without_target = [x for x in categorical_features if x != 'attack_cat']
onehot_features = ['service', 'proto']
label_features = ['state']

numeric_transformer = Pipeline(steps=[
    ('imputer', FeatureImputer(strategy='median')),
    ('outlier_clipper', OutlierClipper(lower_percentile=0.01, upper_percentile=0.99)),
    ('scaler', FeatureScaling(method='standard'))
])

categorical_transformer = Pipeline(steps=[
    ('imputer', FeatureImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, noncategorical_features),
        ('cat', categorical_transformer, categorical_without_target)
    ]
)

pipe = ImbPipeline([
    ('preprocessor', preprocessor),
    ('feature_selector', FeatureSelection(k=10)),
    ('smote', SMOTEHandler(random_state=42, sampling_strategy='auto')),
    ('duplicate_remover', DuplicateRemover()),
])

### NAIVE BAYES

In [187]:
import numpy as np

class NaiveBayes:
    def __init__(self):
        self.class_priors = {}
        self.mean = {}
        self.var = {}  
        self.classes = None

    def fit(self, X, y):
        """
        Train the Naive Bayes model on numerical data.
        X: np.ndarray - Feature matrix
        y: np.ndarray or pd.Series - Target labels
        """

        if isinstance(X, tuple):
            X = X[0] 
        X = np.array(X)

        self.classes = np.unique(y) 
        n_samples = len(y)

        for cls in self.classes:
            class_count = np.sum(y == cls)
            self.class_priors[cls] = class_count / n_samples

            X_c = X[np.array(y == cls)] 

            self.mean[cls] = X_c.mean(axis=0)
            self.var[cls] = X_c.var(axis=0)

    def _likelihood_num(self, class_idx, x):
        """
        Compute likelihood for numerical features using Gaussian distribution.
        class_idx: Class index
        x: Feature vector
        """
        mean = self.mean[class_idx]
        var = self.var[class_idx]
        likelihood = -((x - mean) ** 2) / (2 * var + 1e-6) 
        likelihood = np.exp(likelihood) / np.sqrt(2 * np.pi * var + 1e-6)
        return likelihood.prod()

    def predict(self, X):
        """
        Predict the class for each sample in X.
        X: np.ndarray - Feature matrix
        """
        if isinstance(X, tuple):
            X = X[0] 
        X = np.array(X)  

        predictions = []
        for x in X: 
            class_probs = {}
            for cls in self.classes:
                
                class_prob = self.class_priors[cls]

                class_prob *= self._likelihood_num(cls, x)

                class_probs[cls] = class_prob

            predictions.append(max(class_probs, key=class_probs.get))
        return np.array(predictions)

    def accuracy(self, y_true, y_pred):
        """
        Calculate accuracy of the model.
        y_true: np.ndarray - True labels
        y_pred: np.ndarray - Predicted labels
        """
        return np.mean(y_true == y_pred)


In [188]:
from sklearn.metrics import accuracy_score

x_train_set = train_set.drop('attack_cat', axis=1)
y_train_set = train_set['attack_cat']
x_val_set = val_set.drop('attack_cat', axis=1)
y_val_set = val_set['attack_cat']

x_train_set_processed, y_train_set_processed = pipe.fit_transform(x_train_set, y_train_set)
x_val_set_processed = pipe.transform(x_val_set)

nb = NaiveBayes()
nb.fit(x_train_set_processed, y_train_set_processed)

y_val_pred_custom = nb.predict(x_val_set_processed)

accuracy_custom = nb.accuracy(y_val_set, y_val_pred_custom)
print(f"Custom Naive Bayes Validation Accuracy: {accuracy_custom:.4f}")
gnb = GaussianNB()
gnb.fit(x_train_set_processed, y_train_set_processed)

y_val_pred_builtin = gnb.predict(x_val_set_processed)

accuracy_builtin = accuracy_score(y_val_set, y_val_pred_builtin)
print(f"Built-In Naive Bayes Validation Accuracy: {accuracy_builtin:.4f}")

print(f"Accuracy Difference: {abs(accuracy_custom - accuracy_builtin):.4f}")

Custom Naive Bayes Validation Accuracy: 0.3067
Built-In Naive Bayes Validation Accuracy: 0.3049
Accuracy Difference: 0.0018


### EXPORT MODEL

In [189]:
import pickle
with open('../../model-nb.pkl', 'wb') as file:
    pickle.dump(nb, file)

### IMPORT MODEL

In [190]:
with open ("../../model-nb.pkl", "rb") as file:
    loaded_model = pickle.load(file)

In [191]:
import pandas as pd

x_test_set = df_test.copy()
# Process the test set
x_test_set_processed = pipe.transform(x_test_set)
if isinstance(x_test_set_processed, tuple):
    x_test_set_processed = x_test_set_processed[0]

# Make predictions
predictions = loaded_model.predict(x_test_set_processed)

# Create a DataFrame for export
# Assuming `x_test_set` has an identifier column like 'id' or use index
y_test_predict = loaded_model.predict(x_test_set_processed)
reversed = le_attack_cat.inverse_transform(y_test_predict)

result_df = pd.DataFrame({
    'id': range(len(reversed)),  # Replace 'id' with the appropriate identifier if available
    'prediction': reversed
})

# Save predictions to CSV
result_df.to_csv("../../submissions-nb.csv", index=False)
print("Predictions saved to prediction.csv")


Predictions saved to prediction.csv
