# Logistic Regression Experiments

In this notebook i will explore the effectiveness of using logistic regression on the UNSW_NB15 intrusion detection dataset

In [9]:
# Import necessary libraries
import pandas as pd
import numpy as np
import sklearn
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt


First we will preprocess the data by performing the log transformations and then encoding categorical features as numbers

In [None]:
from typing import List  # Import type hints for better code clarity

# Define lists of features for preprocessing:
# - 'categorical_features' will be one-hot encoded.
# - 'features_to_transform' will undergo a log transformation to reduce skewness.
categorical_features: List[str] = ["proto", "state", "service", "is_sm_ips_ports", "is_ftp_login"]
features_to_transform: List[str] = [
    'sbytes', 'dbytes', 'sttl', 'dttl', 'sload', 'dload', 'spkts', 'dpkts', 
    'swin', 'dwin', 'stcpb', 'dtcpb', 'smeansz', 'dmeansz', 'sjit', 'djit'
]

# Load the training and testing datasets from CSV files.
train_data: pd.DataFrame = pd.read_csv('../data/UNSW_NB15/UNSW_NB15_training-set.csv')
test_data: pd.DataFrame = pd.read_csv('../data/UNSW_NB15/UNSW_NB15_testing-set.csv')

# Clean both datasets by removing columns that are not needed for modeling.
columns_to_drop = ['attack_cat', 'id']
for df in [train_data, test_data]:
    for col in columns_to_drop:
        if col in df.columns:
            df.drop(col, axis=1, inplace=True)

def process_numeric_features(df: pd.DataFrame, features: List[str]) -> pd.DataFrame:
    """
    Applies a natural logarithm transformation (ln(x+1)) to each specified numeric feature 
    in the provided dataframe. This helps to normalize the distribution of features and 
    mitigate the effect of extreme values.
    """
    for feature in features:
        if feature in df.columns:
            df[feature] = np.log1p(df[feature])
    print("Log transformation applied to numeric features (if present) in the dataset")
    return df

def process_categorical_features(df: pd.DataFrame, cat_features: List[str]) -> pd.DataFrame:
    """
    One-hot encodes specified categorical features in the provided dataframe.
    Processes each feature independently. If a feature is missing, a warning is printed.
    Returns:
        df: Updated dataframe with one-hot encoded categorical features.
        updated_dummy_cols: List of the names of all the new categorical features.
    """
    for feature in cat_features:
        if feature in df.columns:
            dummies = pd.get_dummies(df[feature].astype(str), prefix=feature)
            df = df.drop(columns=[feature])
            df = pd.concat([df, dummies], axis=1)
        else:
            print(f"Warning: '{feature}' not found in the dataframe; skipping one-hot encoding for this feature.")
    print("One-hot encoding applied to categorical features in the dataset")
    updated_dummy_cols = [col for col in df.columns if any(col.startswith(f"{feature}_") for feature in cat_features)]
    print("Updated categorical feature columns:", updated_dummy_cols)
    return df, updated_dummy_cols

# Process numeric features on each dataset independently.
train_data = process_numeric_features(train_data, features_to_transform)
test_data = process_numeric_features(test_data, features_to_transform)

# Process categorical features on each dataset independently.
train_data, categorical_features = process_categorical_features(train_data, categorical_features)
test_data, _ = process_categorical_features(test_data, categorical_features)

# Output the shapes of the processed datasets and confirm that all features are numeric.
print(f"Training data shape: {train_data.shape}")
print(f"Testing data shape: {test_data.shape}")
print(f"Any non-numeric columns remaining in Training data: {any(not pd.api.types.is_numeric_dtype(train_data[col]) for col in train_data.columns)}")
print("List of columns in the Training Data:")
print(list(train_data.columns))
print("\nList of columns in the Testing Data:")
print(list(test_data.columns))


Below is the function that will do k-fold cross validation and train our model. this willbe used throughout the remainder of the notebook

In [11]:
def perform_logistic_regression_cv(df, feature_columns, label_column='label', n_splits=5, random_state=42):
    # This function performs k-fold cross-validation for a logistic regression model.
    # The model uses L2 regularization with the saga solver for optimization.
    
    # Import necessary libraries for model training, evaluation, and progress monitoring.
    from sklearn.linear_model import LogisticRegression  # For building the logistic regression model
    from sklearn.model_selection import KFold            # For splitting data into folds for cross-validation
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix  # To compute performance metrics
    import numpy as np                                     # For numerical computations (mean, std, etc.)
    from tqdm.notebook import tqdm                          # To display a progress bar during cross-validation
    
    # Convert the provided dataframe columns into numpy arrays for efficient computation.
    X = df[feature_columns].values  # Feature matrix constructed from specified columns
    y = df[label_column].values       # Target vector extracted from the label column
    
    # Set up K-Fold cross-validation with shuffling for randomness.
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    
    # Initialize lists to store metrics for each fold.
    accuracies, precisions, recalls, f1_scores = [], [], [], []
    tpr_list, fpr_list, tnr_list, fnr_list = [], [], [], []
    
    # Iterate over each train-test split generated by KFold.
    for train_index, test_index in tqdm(kf.split(X), total=n_splits, desc="Cross-validation"):
        # Split the dataset into training and test sets for the current fold.
        X_train, X_test = X[train_index], X[test_index]  # Extract training and testing features.
        y_train, y_test = y[train_index], y[test_index]     # Extract corresponding target labels.
        
        # Initialize the Logistic Regression model with specified hyperparameters.
        model = LogisticRegression(
            penalty='l2',              # Use L2 regularization to mitigate overfitting.
            C=1.0,                     # Inverse of regularization strength; a default value of 1.0.
            solver='saga',             # Saga solver supports L2 penalty and works well with large datasets.
            max_iter=1000,             # Maximum number of iterations to ensure convergence.
            tol=1e-4,                  # Convergence tolerance; training stops when improvements fall below this threshold.
            random_state=random_state, # Set random state for reproducibility.
            n_jobs=-1,                 # Utilize all available CPU cores for computation.
            verbose=False               # Enable verbose output to monitor model training progress.
        )
        
        # Fit the model on the training data.
        model.fit(X_train, y_train)
        # Use the trained model to predict target labels on the test set.
        y_pred = model.predict(X_test)
        
        # Compute performance metrics for the current fold and append them to the lists.
        accuracies.append(accuracy_score(y_test, y_pred))  # Overall accuracy of predictions.
        precisions.append(precision_score(y_test, y_pred, zero_division=0))  # Precision; handles division by zero.
        recalls.append(recall_score(y_test, y_pred, zero_division=0))  # Recall; handles division by zero.
        f1_scores.append(f1_score(y_test, y_pred, zero_division=0))    # F1 score; harmonic mean of precision and recall.
        
        # Generate confusion matrix and unpack into true negatives, false positives, false negatives, and true positives.
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        # Calculate and store the True Positive Rate (Sensitivity) for the fold.
        tpr_list.append(tp / (tp + fn) if (tp + fn) > 0 else 0)
        # Calculate and store the False Positive Rate for the fold.
        fpr_list.append(fp / (fp + tn) if (fp + tn) > 0 else 0)
        # Calculate and store the True Negative Rate (Specificity) for the fold.
        tnr_list.append(tn / (tn + fp) if (tn + fp) > 0 else 0)
        # Calculate and store the False Negative Rate for the fold.
        fnr_list.append(fn / (fn + tp) if (fn + tp) > 0 else 0)
    
    # Compile the computed metrics into a dictionary by calculating the mean and standard deviation across folds.
    results = {
        'accuracy': {'mean': np.mean(accuracies), 'std': np.std(accuracies)},
        'precision': {'mean': np.mean(precisions), 'std': np.std(precisions)},
        'recall': {'mean': np.mean(recalls), 'std': np.std(recalls)},
        'f1': {'mean': np.mean(f1_scores), 'std': np.std(f1_scores)},
        'true_positive_rate': {'mean': np.mean(tpr_list), 'std': np.std(tpr_list)},
        'false_positive_rate': {'mean': np.mean(fpr_list), 'std': np.std(fpr_list)},
        'true_negative_rate': {'mean': np.mean(tnr_list), 'std': np.std(tnr_list)},
        'false_negative_rate': {'mean': np.mean(fnr_list), 'std': np.std(fnr_list)}
    }
    # Return the aggregated cross-validation results.
    return results

def pretty_print_results(results):
    # Iterate through each metric in the results dictionary.
    for metric, values in results.items():
        # Format and print each metric's name, mean, and standard deviation.
        print(f"{metric.replace('_', ' ').capitalize()}: {values['mean']:.4f} (±{values['std']:.4f})")


In [None]:
# Detailed Numeric Feature Selection Process
# ------------------------------------------------------------
# This section identifies and ranks numeric features based on their absolute Pearson correlation
# with the target label. We exclude both the 'label' column and any categorical features.
#
# The process involves:
# 1. Extracting numeric features from the training dataset.
# 2. Computing the absolute correlation of each feature with the target variable.
# 3. Sorting features in descending order based on their correlation.
# 4. Creating feature subsets corresponding to the top 20%, 40%, 60%, and 80% of features,
#    in addition to a full sorted list of all features.
#
# These feature subsets can help in selecting the most impactful variables for model training.
# ------------------------------------------------------------

# Step 1: Extract numeric features by excluding 'label' and categorical features.
numeric_features = [col for col in train_data.columns if col != 'label' and col not in categorical_features]

# Safety check: Remove 'label' if it is inadvertently included.
if 'label' in numeric_features:
    numeric_features.remove('label')

# Step 2: Compute the absolute Pearson correlation between each numeric feature and the target label.
correlations = []
for feature in numeric_features:
    try:
        corr_value = abs(train_data[feature].corr(train_data['label']))
        correlations.append((feature, corr_value))
    except KeyError:
        print(f"Error: Unable to calculate correlation for feature '{feature}'.")

# Step 3: Sort the features by their correlation strength in descending order.
sorted_correlations = sorted(correlations, key=lambda item: item[1], reverse=True)

# Step 4: Determine indices for the top percentiles.
n_features = len(sorted_correlations)
index_20 = int(np.ceil(n_features * 0.20))
index_40 = int(np.ceil(n_features * 0.40))
index_60 = int(np.ceil(n_features * 0.60))
index_80 = int(np.ceil(n_features * 0.80))

# Create lists of features for each specified percentile.
top_20_features = [feature for feature, _ in sorted_correlations[:index_20]]
top_40_features = [feature for feature, _ in sorted_correlations[:index_40]]
top_60_features = [feature for feature, _ in sorted_correlations[:index_60]]
top_80_features = [feature for feature, _ in sorted_correlations[:index_80]]
all_correlated_features = [feature for feature, _ in sorted_correlations]

# Display the feature groups.
print("\nTop 20 percentile features:")
print(top_20_features)

print("\nTop 40 percentile features:")
print(top_40_features)

print("\nTop 60 percentile features:")
print(top_60_features)

print("\nTop 80 percentile features:")
print(top_80_features)

print("\nAll correlated features (in sorted order):")
print(all_correlated_features)


In [None]:
from tqdm import tqdm

for numeric_features_subset in tqdm([top_20_features, top_40_features, top_60_features, top_80_features, all_correlated_features], desc="Evaluating feature subsets"):
    # Evaluate model performance using the numeric features only.
    print("\nPerforming logistic regression with numeric features only:")
    pretty_print_results(perform_logistic_regression_cv(train_data, numeric_features_subset))
    
    # Evaluate model performance using numeric features combined with categorical features.
    print("\nPerforming logistic regression with numeric + categorical features:")
    combined_features = numeric_features_subset + categorical_features
    pretty_print_results(perform_logistic_regression_cv(train_data, combined_features))
