In [None]:
!pip3 install python-docx
!pip3 install imbalanced-learn
!pip3 install ipython



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import warnings
import random
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, label_binarize
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score, confusion_matrix, classification_report, roc_curve, ConfusionMatrixDisplay,precision_recall_fscore_support
# from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from google.colab import files, drive
from docx import Document
from docx.shared import Inches
from io import StringIO
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical

# Set environment variables for reproducibility
os.environ['PYTHONHASHSEED'] = '42'
os.environ['TF_DETERMINISTIC_OPS'] = '1'

# Set random seeds for reproducibility across all libraries
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)

# Suppress all warnings
warnings.filterwarnings('ignore')


drive.mount('/content/drive/')


Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


## Make directory

In [None]:
class DataPipeline:
    def __init__(self, file_path, base_output_file):
        self.file_path = file_path
        self.base_output_file = base_output_file
        self.df = None
        self.X = None
        self.y = None
        self.num_classes = None
        # Create the results directory if it doesn't exist
        self.results_dir = base_output_file
        self.dataset_results_dir = os.path.join(self.results_dir, 'dataset')
        self._ensure_directories_exist()

    def _ensure_directories_exist(self):
        """Ensure the results and dataset directories exist."""
        if not os.path.exists(self.results_dir):
            os.makedirs(self.results_dir)
            print(f"Created directory: {self.results_dir}")

        if not os.path.exists(self.dataset_results_dir):
            os.makedirs(self.dataset_results_dir)
            print(f"Created directory: {self.dataset_results_dir}")

    def load_dataset(self):
        self.df = pd.read_csv(self.file_path)
        return self.df

    def plot_class_distribution(self, y, plot_title='Class Distribution'):
        plt.figure(figsize=(6, 4))
        y.value_counts().plot(kind='bar', color='green', alpha=0.7)
        plt.title(plot_title)
        plt.xlabel('Classes')
        plt.ylabel('Frequency')
        plot_path = os.path.join(self.dataset_results_dir, f'{plot_title}.png')
        plt.savefig(plot_path)
        plt.close()

    def display_dataset_summary(self):
        dataset_output_file_path = os.path.join(self.dataset_results_dir, 'dataset_summary.csv')
        with open(dataset_output_file_path, 'w') as f:
            f.write('--- Head of Dataset ---\n')
            self.df.head().to_csv(f, index=False)
            f.write('\n')

            f.write('--- Describe of Dataset ---\n')
            describe_df = self.df.describe().transpose()
            describe_df.to_csv(f)
            f.write('\n')

            f.write('--- Info of Dataset ---\n')
            buffer = StringIO()
            self.df.info(buf=buffer)
            info_lines = buffer.getvalue().splitlines()

            for line in info_lines:
                if 'Non-Null Count' in line:
                    f.write('Column, Non-Null Count, Dtype\n')
                elif '<class' not in line and 'memory' not in line:
                    parts = line.split()
                    if len(parts) > 3:
                        column_name = parts[0]
                        non_null_count = parts[-2]
                        dtype = parts[-1]
                        f.write(f'{column_name}, {non_null_count}, {dtype}\n')

        print(f"Dataset summary saved to {dataset_output_file_path}")

    def data_distribution_plot(self, df):
        dataset_images_directory = os.path.join(self.dataset_results_dir, 'images')

        # Ensure the directory exists before saving plots
        os.makedirs(dataset_images_directory, exist_ok=True)

        numerical_cols = df.select_dtypes(include=[np.number]).columns

        # Plot distribution for numerical columns
        for col in numerical_cols:
            plt.figure(figsize=(6, 4))
            sns.histplot(df[col], kde=True, bins=30)
            plt.title(f'Distribution of {col}')
            plt.xlabel(col)
            plt.ylabel('Frequency')
            plot_path = os.path.join(dataset_images_directory, f'Distribution_{col}.png')
            plt.savefig(plot_path)
            plt.close()  # Close the plot to avoid overlap

        categorical_cols = df.select_dtypes(include=['object']).columns

        # Plot distribution for categorical columns
        for col in categorical_cols:
            plt.figure(figsize=(6, 4))
            sns.countplot(data=df, x=col)
            plt.title(f'Distribution of {col}')
            plt.xlabel(col)
            plt.ylabel('Frequency')
            plt.xticks(rotation=45)
            plot_path = os.path.join(dataset_images_directory, f'Distribution_{col}.png')
            plt.savefig(plot_path)
            plt.close()  # Close the plot to avoid overlap

    def clean_data(self):
        cleaned_file_path = os.path.join(self.dataset_results_dir, 'cleaned_dataset.csv')
        numeric_cols = self.df.select_dtypes(include=['number']).columns
        rounded_means = self.df[numeric_cols].mean().round()
        self.df[numeric_cols] = self.df[numeric_cols].fillna(rounded_means)

        categorical_cols = self.df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            self.df[col] = self.df[col].fillna(self.df[col].mode()[0])

        self.df.to_csv(cleaned_file_path, index=False)
        print(f"Cleaned dataset saved to {cleaned_file_path}")
        return self.df

    def define_target(self, df_cleaned, target_column_name):
        self.y = df_cleaned[target_column_name]
        self.X = df_cleaned.drop(columns=[target_column_name])
        self.num_classes = len(np.unique(self.y))
        print(f"Detected {self.num_classes} unique classes in the target column.")
        return self.X, self.y

    def encode_categorical(self):
        label_encoders = {}

        # Check for categorical columns in X and apply LabelEncoder only if categorical columns exist
        categorical_columns = self.X.select_dtypes(include=['object']).columns

        # If categorical columns exist, apply LabelEncoder
        if not categorical_columns.empty:
            for col in categorical_columns:
                label_encoders[col] = LabelEncoder()
                self.X[col] = label_encoders[col].fit_transform(self.X[col])

        # Encode the target variable (y) if necessary
        if self.y.dtype == 'object' or self.y.dtype.name == 'category':
            y_encoder = LabelEncoder()
            y_encoded = y_encoder.fit_transform(self.y)
        else:
            y_encoded = self.y  # If y is numeric, no encoding is needed

        # Return the updated X and y_encoded
        return self.X, y_encoded

    def split_data(self, test_size, random_state=42):
        X_encoded, y_encoded = self.encode_categorical()
        X_train, X_test, y_train, y_test = train_test_split(X_encoded, y_encoded, test_size=test_size, random_state=random_state, stratify=self.y)
        return X_train, X_test, y_train, y_test

    def apply_standard_scaler(self, X_train, X_test):
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        return X_train_scaled, X_test_scaled

    def tune_model_params(self, default_model):
        param_defaults = {
        "RandomForestClassifier": {
            "n_estimators": 200,  # Number of trees in the forest. Other possible values: any positive integer, default=100
            "max_depth": 25,  # Maximum depth of the tree. Other possible values: any positive integer, default=None
            "min_samples_split": 10,  # Minimum number of samples required to split an internal node. Other possible values: any positive integer, default=2
            "min_samples_leaf": 4,  # Minimum number of samples required to be at a leaf node. Other possible values: any positive integer, default=1
            "criterion": 'gini',  # Function to measure the quality of a split. Other possible values: 'entropy', 'log_loss', default='gini'
            "random_state": 52  # Random seed for reproducibility. Other possible values: any integer, default=None
        },
        "SVC": {
            "C": 1.0,  # Regularization parameter. Other possible values: any positive float, default=1.0
            "kernel": 'rbf',  # Kernel type. Other possible values: 'linear', 'poly', 'sigmoid', 'precomputed', default='rbf'
            "gamma": 'scale',  # Kernel coefficient. Other possible values: 'auto', any positive float, default='scale'
            "degree": 3,  # Degree of the polynomial kernel function ('poly'). Other possible values: any positive integer, default=3
            "random_state": 52  # Random seed for reproducibility. Other possible values: any integer, default=None
        },
        "DecisionTreeClassifier": {
            "max_depth": 300,  # Maximum depth of the tree. Other possible values: any positive integer, default=None
            "min_samples_split": 20,  # Minimum number of samples required to split an internal node. Other possible values: any positive integer, default=2
            "min_samples_leaf": 8,  # Minimum number of samples required to be at a leaf node. Other possible values: any positive integer, default=1
            "criterion": 'entropy',  # Function to measure the quality of a split. Other possible values: 'gini', default='gini'
            "random_state": 52  # Random seed for reproducibility. Other possible values: any integer, default=None
        },
        "KNeighborsClassifier": {
            "n_neighbors": 10,  # Number of neighbors to use. Other possible values: any positive integer, default=5
            "weights": 'distance',  # Weight function used in prediction. Other possible values: 'uniform', default='uniform'
            "algorithm": 'auto',  # Algorithm used to compute the nearest neighbors. Other possible values: 'ball_tree', 'kd_tree', 'brute', default='auto'
            "leaf_size": 40  # Leaf size passed to BallTree or KDTree. Other possible values: any positive integer, default=30
        },
        "LogisticRegression": {
            "C": 0.3,  # Inverse of regularization strength. Other possible values: any positive float, default=1.0
            "solver": 'liblinear',  # Algorithm to use in the optimization problem. Other possible values: 'newton-cg', 'lbfgs', 'sag', 'saga', default='lbfgs'
            "max_iter": 300,  # Maximum number of iterations. Other possible values: any positive integer, default=100
            "penalty": 'l2',  # Norm used in the penalization. Other possible values: 'l1', 'elasticnet', None, default='l2'
            "random_state": 52  # Random seed for reproducibility. Other possible values: any integer, default=None
        }
    }

        model_name = default_model.__class__.__name__
        if model_name in param_defaults:
            params = param_defaults[model_name]
            return default_model.set_params(**params)
        else:
            print(f"\nNo specific tuning parameters available for {model_name}. Using default parameters.")
            return default_model
    from sklearn.model_selection import StratifiedKFold

    def cross_validate(self, model, X, y, n_splits=5):
        skf = StratifiedKFold(n_splits=n_splits)
        accuracy_scores = []
        precision_scores = []
        recall_scores = []
        f1_scores = []

        for train_index, test_index in skf.split(X, y):
            X_train, X_test = X.iloc[train_index], X.iloc[test_index]
            y_train, y_test = y.iloc[train_index], y.iloc[test_index]

            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)

            accuracy_scores.append(accuracy_score(y_test, y_pred))
            precision_scores.append(precision_score(y_test, y_pred, average='weighted'))
            recall_scores.append(recall_score(y_test, y_pred, average='weighted'))
            f1_scores.append(f1_score(y_test, y_pred, average='weighted'))

        print(f"Accuracy: {np.mean(accuracy_scores):.4f}")
        print(f"Precision: {np.mean(precision_scores):.4f}")
        print(f"Recall: {np.mean(recall_scores):.4f}")
        print(f"F1-Score: {np.mean(f1_scores):.4f}")



    def plot_metrics(self, metrics_summary, output_directory, doc):
        models = metrics_summary['Model']
        metrics = ['Accuracy', 'Precision', 'Recall', 'F1-Score']

        for metric in metrics:
            plt.figure()
            plt.bar(models, metrics_summary[metric], color='skyblue')
            plt.xlabel('Model')
            plt.ylabel(metric)
            plt.title(f'Model Comparison: {metric}')
            metric_plot_path = os.path.join(output_directory, f'{metric}_comparison.png')
            plt.savefig(metric_plot_path)
            plt.close()

            # Add the plot to the document
            doc.add_heading(f'{metric} Comparison', level=2)
            doc.add_picture(metric_plot_path, width=Inches(4.5))
            doc.add_paragraph("-" * 50)

    def train_and_evaluate_models(self, X_train_scaled, X_test_scaled, y_train, y_test, doc, output_directory, num_classes):
       # Here you can add more algorithms
        models = {
            "Random Forest": self.tune_model_params(RandomForestClassifier()),
            "SVM": self.tune_model_params(SVC(probability=True)),
            "Decision Tree": self.tune_model_params(DecisionTreeClassifier()),
            "KNN": self.tune_model_params(KNeighborsClassifier()),
            "Logistic Regression": self.tune_model_params(LogisticRegression())
        }

        metrics_summary = {
            'Model': [],
            'Accuracy': [],
            'Precision': [],
            'Recall': [],
            'F1-Score': [],
            'ROC AUC': []
        }

        for name, model in models.items():
            print(f"\nTraining and evaluating {name}...")
            model.fit(X_train_scaled, y_train)
            y_pred = model.predict(X_test_scaled)
            y_prob = model.predict_proba(X_test_scaled) if hasattr(model, "predict_proba") else model.decision_function(X_test_scaled)

            accuracy = accuracy_score(y_test, y_pred)
            precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted')

            classification_rep = classification_report(y_test, y_pred)
            print(f"{name} Accuracy: {accuracy:.4f}")
            print(f"{name} Classification Report:\n{classification_rep}\n")

            # Initialize roc_auc as None
            roc_auc = None

            # Calculate ROC AUC for binary and multi-class cases
            if num_classes > 2:
                # Multi-class case
                y_test_bin = label_binarize(y_test, classes=np.arange(num_classes))
                try:
                    roc_auc = roc_auc_score(y_test_bin, y_prob, multi_class='ovr')
                except ValueError:
                    roc_auc = None  # Handle any issues during calculation
            else:
                # Binary case
                try:
                    roc_auc = roc_auc_score(y_test, y_prob[:, 1])  # Assuming y_prob is the probability for the positive class
                except ValueError:
                    roc_auc = None  # Handle any issues during calculation

            # Print the ROC AUC result
            if roc_auc is not None:
                print(f"{name} ROC AUC: {roc_auc:.4f}")
            else:
                print(f"{name} ROC AUC: Not defined")


            print("-" * 50)

            # Save metrics to summary
            metrics_summary['Model'].append(name)
            metrics_summary['Accuracy'].append(accuracy)
            metrics_summary['Precision'].append(precision)
            metrics_summary['Recall'].append(recall)
            metrics_summary['F1-Score'].append(f1)
            metrics_summary['ROC AUC'].append(roc_auc if roc_auc is not None else "Not defined")

            # Plot ROC curve if ROC AUC is defined
            if roc_auc is not None:
                if num_classes == 2:
                    fpr, tpr, _ = roc_curve(y_test, y_prob[:, 1])
                else:
                    fpr, tpr, _ = roc_curve(y_test_bin.ravel(), y_prob.ravel())
                plt.figure()
                plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
                plt.plot([0, 1], [0, 1], color='grey', linestyle='--')
                plt.xlim([0.0, 1.0])
                plt.ylim([0.0, 1.05])
                plt.xlabel('False Positive Rate')
                plt.ylabel('True Positive Rate')
                plt.title(f'{name} ROC Curve')
                plt.legend(loc="lower right")

                # Save the plot
                roc_curve_path = os.path.join(output_directory, f'{name}_roc_curve.png')
                plt.savefig(roc_curve_path)
                plt.close()

                # Add results and ROC curve image to the document
                doc.add_heading(f'{name} Results', level=2)
                doc.add_paragraph(f"Accuracy: {accuracy:.4f}")
                doc.add_paragraph("Classification Report:")
                doc.add_paragraph(classification_rep)
                doc.add_paragraph(f"ROC AUC: {roc_auc:.4f}")
                doc.add_picture(roc_curve_path, width=Inches(4.5))
            else:
                doc.add_heading(f'{name} Results', level=2)
                doc.add_paragraph(f"Accuracy: {accuracy:.4f}")
                doc.add_paragraph("Classification Report:")
                doc.add_paragraph(classification_rep)
                doc.add_paragraph("ROC AUC: Not defined (only one class present in y_true)")

            doc.add_paragraph("-" * 50)

        # -------------------- Add Keras ANN Below -------------------- #

        print("\nTraining and evaluating Keras ANN...")

        ann_model = Sequential()
        ann_model.add(Dense(units=64, activation='relu', input_shape=(X_train_scaled.shape[1],)))
        ann_model.add(Dense(units=32, activation='relu'))

        if num_classes > 2:
            ann_model.add(Dense(units=num_classes, activation='softmax'))  # Multiclass classification
            loss_function = 'sparse_categorical_crossentropy'
        else:
            ann_model.add(Dense(units=1, activation='sigmoid'))  # Binary classification
            loss_function = 'binary_crossentropy'

        ann_model.compile(
            optimizer='adam',
            loss=loss_function,
            metrics=['accuracy']
        )

        ann_model.fit(
            X_train_scaled, y_train,
            epochs=10,
            batch_size=32,
            validation_data=(X_test_scaled, y_test),
            verbose=1
        )

        loss, accuracy = ann_model.evaluate(X_test_scaled, y_test, verbose=0)

        # Evaluate model based on the number of classes
        if len(np.unique(y_test)) == 1:
            print("Keras ANN ROC AUC: Not defined (only one class present in y_true)")
            roc_auc_ann = None  # ROC AUC cannot be defined
            precision_ann = None
            recall_ann = None
            f1_ann = None
        else:
            if num_classes > 2:
                y_prob_ann = ann_model.predict(X_test_scaled)
                y_test_bin = to_categorical(y_test, num_classes=num_classes)
                roc_auc_ann = roc_auc_score(y_test_bin, y_prob_ann, multi_class='ovr')
                precision_ann = precision_score(y_test_bin.argmax(axis=1), y_prob_ann.argmax(axis=1), average='weighted')
                recall_ann = recall_score(y_test_bin.argmax(axis=1), y_prob_ann.argmax(axis=1), average='weighted')
                f1_ann = f1_score(y_test_bin.argmax(axis=1), y_prob_ann.argmax(axis=1), average='weighted')
            else:
                y_prob_ann = ann_model.predict(X_test_scaled).ravel()
                y_pred_ann = (y_prob_ann > 0.5).astype(int)  # Convert probabilities to binary predictions
                roc_auc_ann = roc_auc_score(y_test, y_prob_ann)
                precision_ann = precision_score(y_test, y_pred_ann)
                recall_ann = recall_score(y_test, y_pred_ann)
                f1_ann = f1_score(y_test, y_pred_ann)

        # Append ANN metrics to summary
        metrics_summary['Model'].append("Keras ANN")
        metrics_summary['Accuracy'].append(float(accuracy))
        metrics_summary['Precision'].append(float(precision_ann) if precision_ann is not None else "Not defined")
        metrics_summary['Recall'].append(float(recall_ann) if recall_ann is not None else "Not defined")
        metrics_summary['F1-Score'].append(float(f1_ann) if f1_ann is not None else "Not defined")
        metrics_summary['ROC AUC'].append(roc_auc_ann if roc_auc_ann is not None else "Not defined")

        # Print results
        print(f"Keras ANN Accuracy: {accuracy:.4f}")
        if roc_auc_ann is not None:
            print(f"Keras ANN Precision: {precision_ann:.4f}")
            print(f"Keras ANN Recall: {recall_ann:.4f}")
            print(f"Keras ANN F1-Score: {f1_ann:.4f}")
            print(f"Keras ANN ROC AUC: {roc_auc_ann:.4f}")
        else:
            print("Keras ANN Precision, Recall, F1-Score, ROC AUC: Not defined (only one class present in y_true)")
        print("-" * 50)

        if roc_auc_ann is not None:
            # Generate the confusion matrix
            conf_matrix = confusion_matrix(y_test, y_pred_ann if num_classes == 2 else y_prob_ann.argmax(axis=1))

            # Print the confusion matrix
            print("Confusion Matrix:")
            print(conf_matrix)

            # Plot the confusion matrix
            disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix)
            disp.plot(cmap=plt.cm.Blues)

            # Save the confusion matrix plot
            conf_matrix_path = os.path.join(output_directory, 'Keras_ANN_confusion_matrix.png')
            plt.savefig(conf_matrix_path)
            plt.close()

            # Add confusion matrix to the document
            doc.add_heading('Keras ANN Confusion Matrix', level=2)
            doc.add_picture(conf_matrix_path, width=Inches(4.5))
        else:
            doc.add_heading('Keras ANN Confusion Matrix', level=2)
            doc.add_paragraph("Not defined (only one class present in y_true)")

        doc.add_paragraph("-" * 50)

        # Plotting the metrics for all models
        self.plot_metrics(metrics_summary, output_directory, doc)

        # Save the document
        output_file = os.path.join(output_directory, 'model_evaluation_results_org.docx')
        doc.save(output_file)
        print(f"Results saved to {output_file}")

    def run_pipeline(self, target_column_name, test_size):
        df = self.load_dataset()
        self.data_distribution_plot(df)
        self.display_dataset_summary()
        df_cleaned = self.clean_data()
        self.define_target(df_cleaned, target_column_name)
        self.plot_class_distribution(self.y)
        self.encode_categorical()
        X_train, X_test, y_train, y_test = self.split_data(test_size)
        X_train_scaled, X_test_scaled = self.apply_standard_scaler(X_train, X_test)
        return X_train_scaled, X_test_scaled, y_train, y_test



In [None]:
def main():
    # Step 1: Define paths for dataset and results
    file_path = '/content/drive/MyDrive/ResearchTask/Research_Dataset/Copy of preprocessed_merged_files.csv' # multi class
    base_output_file = '/content/drive/MyDrive/ResearchTask/results'
    # Initialize the DataPipeline class
    pipeline = DataPipeline(file_path, base_output_file)

    # Input for target column and test size
    target_column_name = input("Enter the name of the target column: ")
    test_size_input = input("Enter the test size as a fraction (e.g., 0.2 for 20%): ")
    test_size = float(test_size_input)

    dataset_name = "original dataset"

    # Load the dataset (assuming the DataPipeline class has a method for loading data)
    dataset = pipeline.load_dataset()

    # Analyze class distribution for imbalance
    print("\nAnalyzing class distribution...")
    analyze_class_distribution(dataset, target_column_name)

    # Run the pipeline up to data splitting
    X_train_scaled, X_test_scaled, y_train, y_test = pipeline.run_pipeline(target_column_name, test_size)

    display(HTML(f"<h3 style='color: red;'>Results for {dataset_name}</h3>"))

    # Define the directory for images
    result_images_directory = os.path.join(pipeline.results_dir, 'images')

    # Create the directory if it doesn't exist
    os.makedirs(result_images_directory, exist_ok=True)

    # Document to save the results
    doc = Document()

    # Train and evaluate the models
    pipeline.train_and_evaluate_models(X_train_scaled, X_test_scaled, y_train, y_test, doc, result_images_directory, pipeline.num_classes)

def analyze_class_distribution(dataset, target_column):
    """
    Analyzes and visualizes class distribution to check for imbalance.

    Parameters:
        dataset (pd.DataFrame): The input dataset.
        target_column (str): The name of the target column.
    """
    import matplotlib.pyplot as plt

    class_counts = dataset[target_column].value_counts()
    class_percentages = (class_counts / len(dataset)) * 100

    print("\nClass Distribution:")
    print(class_counts)
    print("\nClass Percentages:")
    print(class_percentages)

if __name__ == "__main__":
    main()

Created directory: /content/drive/MyDrive/ResearchTask/results/dataset
Enter the name of the target column: class
Enter the test size as a fraction (e.g., 0.2 for 20%): 0.2

Analyzing class distribution...

Class Distribution:
class
0    51
1    48
Name: count, dtype: int64

Class Percentages:
class
0    51.515152
1    48.484848
Name: count, dtype: float64
Dataset summary saved to /content/drive/MyDrive/ResearchTask/results/dataset/dataset_summary.csv
Cleaned dataset saved to /content/drive/MyDrive/ResearchTask/results/dataset/cleaned_dataset.csv
Detected 2 unique classes in the target column.



Training and evaluating Random Forest...
Random Forest Accuracy: 0.9500
Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.91      1.00      0.95        10
           1       1.00      0.90      0.95        10

    accuracy                           0.95        20
   macro avg       0.95      0.95      0.95        20
weighted avg       0.95      0.95      0.95        20


Random Forest ROC AUC: 0.9900
--------------------------------------------------

Training and evaluating SVM...
SVM Accuracy: 1.0000
SVM Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        10
           1       1.00      1.00      1.00        10

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20


SVM ROC AUC: 1.0000
--------------------------------------------------

T

In [None]:
def main():
    # Step 1: Define paths for dataset and results
    # file_path = '/content/drive/MyDrive/Colab Notebooks/Original_dataset/preprocessed_merged_files.csv' # binary class
    file_path = '/content/drive/MyDrive/ResearchTask/Research_Dataset/Copy of preprocessed_merged_files.csv' # multi class
    base_output_file = '/content/drive/MyDrive/ResearchTask/results/'
    # Initialize the DataPipeline class
    pipeline = DataPipeline(file_path, base_output_file)

    target_column_name = input("Enter the name of the target column: ")
    test_size_input = input("Enter the test size as a fraction (e.g., 0.2 for 20%): ")
    test_size = float(test_size_input)

    dataset_name = "original dataset"

    # Run the initial pipeline up to data splitting
    X_train_scaled, X_test_scaled, y_train, y_test = pipeline.run_pipeline(target_column_name, test_size)

    display(HTML(f"<h3 style='color: red;'>Results for {dataset_name}</h3>"))

    # Define the directory for images
    result_images_directory = os.path.join(pipeline.results_dir, 'images')

    # Create the directory if it doesn't exist
    os.makedirs(result_images_directory, exist_ok=True)

    # Document to save the results
    doc = Document()

    # Train and evaluate the models
    pipeline.train_and_evaluate_models(X_train_scaled, X_test_scaled, y_train, y_test, doc, result_images_directory, pipeline.num_classes)

if __name__ == "__main__":
    main()

Enter the name of the target column: class
Enter the test size as a fraction (e.g., 0.2 for 20%): 0.2
Dataset summary saved to /content/drive/MyDrive/ResearchTask/dataset/dataset_summary.csv
Cleaned dataset saved to /content/drive/MyDrive/ResearchTask/dataset/cleaned_dataset.csv
Detected 2 unique classes in the target column.



Training and evaluating Random Forest...
Random Forest Accuracy: 0.9500
Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.91      1.00      0.95        10
           1       1.00      0.90      0.95        10

    accuracy                           0.95        20
   macro avg       0.95      0.95      0.95        20
weighted avg       0.95      0.95      0.95        20


Random Forest ROC AUC: 0.9900
--------------------------------------------------

Training and evaluating SVM...
SVM Accuracy: 1.0000
SVM Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        10
           1       1.00      1.00      1.00        10

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20


SVM ROC AUC: 1.0000
--------------------------------------------------

T



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 71ms/step
Keras ANN Accuracy: 1.0000
Keras ANN Precision: 1.0000
Keras ANN Recall: 1.0000
Keras ANN F1-Score: 1.0000
Keras ANN ROC AUC: 1.0000
--------------------------------------------------
Confusion Matrix:
[[10  0]
 [ 0 10]]
Results saved to /content/drive/MyDrive/ResearchTask/images/model_evaluation_results_org.docx



## Oversampling

### SMOTE

In [None]:
from imblearn.over_sampling import SMOTE
from docx import Document
import os
import pandas as pd
import numpy as np

# Function to apply weighted SMOTE
def apply_weighted_smote(X, y, sampling_strategy):
    # Create SMOTE instance with the custom sampling strategy
    smote = SMOTE(
            sampling_strategy=sampling_strategy,  # Determines the resampling strategy. Other possible values: 'auto', float, dict, callable, default='auto'
            random_state=42,  # Random seed for reproducibility. Other possible values: any integer, default=None
            k_neighbors=5,  # Number of nearest neighbors to use for generating the synthetic samples. Other possible values: any positive integer, default=5
            n_jobs=None  # Number of parallel jobs to run for nearest neighbors computation. Other possible values: any integer or None, default=None
        )

    # Apply SMOTE
    X_resampled, y_resampled = smote.fit_resample(X, y)

    # Check the shape of resampled datasets
    print(f"SMOTE Resampled X shape: {X_resampled.shape}")
    print(f"SMOTE Resampled y shape: {y_resampled.shape}")

    return X_resampled, y_resampled

def smote_pipeline():
    # Step 1: Define paths for dataset and results
    file_path = '/content/drive/MyDrive/ResearchTask/Research_Dataset/Copy of preprocessed_merged_files.csv' # binary class
    base_output_file = '/content/drive/MyDrive/Colab Notebooks/results/'
    name = 'smote'

    # Define the directory for SMOTE results
    base_output_file_smote = os.path.join(base_output_file, name)

    # Create the directories if they don't exist
    os.makedirs(base_output_file_smote, exist_ok=True)

    # Initialize the DataPipeline class
    pipeline = DataPipeline(file_path, base_output_file_smote)

    target_column_name = input("Enter the name of the target column: ")
    test_size_input = input("Enter the test size as a fraction (e.g., 0.2 for 20%): ")
    test_size = float(test_size_input)

    # Run the initial pipeline steps
    df = pipeline.load_dataset()
    df_cleaned = pipeline.clean_data()
    X, y = pipeline.define_target(df_cleaned, target_column_name)

    # Determine the number of classes
    num_classes = len(np.unique(y))

    # Split the data
    X_train, X_test, y_train, y_test = pipeline.split_data(test_size=test_size)

    # Define the custom sampling strategy
    if num_classes > 2:  # Multi-class case
        class_counts = np.bincount(y_train)
        sampling_strategy = {i: max(class_counts) for i in range(num_classes)}

        ## Can also define custom sampling strategy for each class
        # sampling_strategy = {
        #     0: 120,  # Resample class 0 to 120 samples
        #     1: 100,  # Resample class 1 to 100 samples
        #     2: 8    # Resample class 2 to 80 samples
        # }

    else:  # Binary case
        class_counts = np.bincount(y_train)
        sampling_strategy = {0: class_counts[0], 1: class_counts[1] * 2}  # Adjust this strategy as per need


    # Apply custom SMOTE
    X_smote, y_smote = apply_weighted_smote(X_train, y_train, sampling_strategy)

    # Plot the distribution after SMOTE
    pipeline.data_distribution_plot(pd.DataFrame(X_smote, columns=X_train.columns))
    pipeline.plot_class_distribution(pd.Series(y_smote), plot_title='SMOTE Class Distribution')

    # Scale the resampled datasets
    X_smote_scaled, X_test_scaled = pipeline.apply_standard_scaler(X_smote, X_test)

    # Define the directory for images
    result_images_directory_smote = os.path.join(pipeline.results_dir, 'images_SMOTE')

    # Create the directories if they don't exist
    os.makedirs(result_images_directory_smote, exist_ok=True)

    # Document to save the results
    doc = Document()

    # Train and evaluate models with the SMOTE-applied dataset
    pipeline.train_and_evaluate_models(X_smote_scaled, X_test_scaled, y_smote, y_test, doc, result_images_directory_smote, num_classes)

    # Save the results to a document
    output_file_path = os.path.join(base_output_file_smote, 'model_evaluation_results_with_smote.docx')
    doc.save(output_file_path)
    print(f"Results saved to '{output_file_path}'")

if __name__ == "__main__":
    smote_pipeline()


Created directory: /content/drive/MyDrive/Colab Notebooks/results/smote/dataset
Enter the name of the target column: class
Enter the test size as a fraction (e.g., 0.2 for 20%): 0.3
Cleaned dataset saved to /content/drive/MyDrive/Colab Notebooks/results/smote/dataset/cleaned_dataset.csv
Detected 2 unique classes in the target column.
SMOTE Resampled X shape: (102, 41)
SMOTE Resampled y shape: (102,)

Training and evaluating Random Forest...
Random Forest Accuracy: 0.9667
Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.94      1.00      0.97        15
           1       1.00      0.93      0.97        15

    accuracy                           0.97        30
   macro avg       0.97      0.97      0.97        30
weighted avg       0.97      0.97      0.97        30


Random Forest ROC AUC: 1.0000
--------------------------------------------------

Training and evaluating SVM...
SVM Accuracy: 1.0000
SVM Classification Report

### ADASYN

In [None]:
from imblearn.over_sampling import ADASYN
from docx import Document
import os
import pandas as pd
import numpy as np

# Function to apply ADASYN
def apply_adasyn(X, y, sampling_strategy):
    try:
        adasyn = ADASYN(
            sampling_strategy=sampling_strategy,  # Determines the resampling strategy. Other possible values: 'auto', float, dict, callable, default='auto'
            random_state=42,  # Random seed for reproducibility. Other possible values: any integer, default=None
            n_neighbors=5,  # Number of nearest neighbors to use to construct synthetic samples. Other possible values: any positive integer, default=5
            n_jobs= None  # Number of parallel jobs to run for nearest neighbors computation. Other possible values: any integer or None, default=None
        )
        X_res, y_res = adasyn.fit_resample(X, y)
        print(f"ADASYN Resampled X shape: {X_res.shape}")
        print(f"ADASYN Resampled y shape: {y_res.shape}")

        return X_res, y_res

    except ValueError as e:
        print(f"Error in ADASYN resampling: {e}")
        print("Falling back to the original data without resampling.")
        return X, y

def adasyn_pipeline():
    # Step 1: Define paths for dataset and results
    file_path = '/content/drive/MyDrive/ResearchTask/Research_Dataset/Copy of preprocessed_merged_files.csv'  # Binary
    # file_path = '/content/drive/MyDrive/Colab Notebooks/Original_dataset/cyberthreat.csv' # multi class
    base_output_file = '/content/drive/MyDrive/Colab Notebooks/results/'
    name = 'adasyn'

    # Define the directory for ADASYN results
    base_output_file_adasyn = os.path.join(base_output_file, name)

    # Create the directories if they don't exist
    os.makedirs(base_output_file_adasyn, exist_ok=True)

    # Initialize the DataPipeline class
    pipeline = DataPipeline(file_path, base_output_file_adasyn)

    target_column_name = input("Enter the name of the target column: ")
    test_size_input = input("Enter the test size as a fraction (e.g., 0.2 for 20%): ")
    test_size = float(test_size_input)

    # Run the initial pipeline steps
    df = pipeline.load_dataset()
    df_cleaned = pipeline.clean_data()
    X, y = pipeline.define_target(df_cleaned, target_column_name)

    # Determine the number of classes
    num_classes = len(np.unique(y))

    # Split the data
    X_train, X_test, y_train, y_test = pipeline.split_data(test_size=test_size)

    # Define the custom sampling strategy
    if num_classes > 2:  # Multi-class case
        class_counts = np.bincount(y_train)
        sampling_strategy = {i: int(max(class_counts) * 1.2) for i in range(num_classes)}

        ## Can also define custom sampling strategy for each class
        # sampling_strategy = {
        #     0: 120,  # Resample class 0 to 120 samples
        #     1: 100,  # Resample class 1 to 100 samples
        #     2: 80    # Resample class 2 to 80 samples
        # }

    else:  # Binary case
        class_counts = np.bincount(y_train)
        sampling_strategy = {0: class_counts[0], 1: int(class_counts[1] * 1.5)}  # Adjust this strategy as per need

    # Apply ADASYN with error handling
    X_adasyn, y_adasyn = apply_adasyn(X_train, y_train, sampling_strategy)

    # Plot the distribution after ADASYN
    pipeline.data_distribution_plot(pd.DataFrame(X_adasyn, columns=X_train.columns))
    pipeline.plot_class_distribution(pd.Series(y_adasyn), plot_title='ADASYN Class Distribution')

    # Scale the resampled datasets
    X_adasyn_scaled, X_test_scaled = pipeline.apply_standard_scaler(X_adasyn, X_test)

    # Define the directory for images
    result_images_directory_adasyn = os.path.join(pipeline.results_dir, 'images_ADASYN')

    # Create the directories if they don't exist
    os.makedirs(result_images_directory_adasyn, exist_ok=True)

    # Document to save the results
    doc = Document()

    # Train and evaluate models with the ADASYN-applied dataset
    pipeline.train_and_evaluate_models(X_adasyn_scaled, X_test_scaled, y_adasyn, y_test, doc, result_images_directory_adasyn, num_classes)

    # Save the results to a document
    output_file_path = os.path.join(base_output_file_adasyn, 'model_evaluation_results_with_adasyn.docx')
    doc.save(output_file_path)
    print(f"Results saved to '{output_file_path}'")

if __name__ == "__main__":
    adasyn_pipeline()


Created directory: /content/drive/MyDrive/Colab Notebooks/results/adasyn/dataset
Enter the name of the target column: class
Enter the test size as a fraction (e.g., 0.2 for 20%): 0.3
Cleaned dataset saved to /content/drive/MyDrive/Colab Notebooks/results/adasyn/dataset/cleaned_dataset.csv
Detected 2 unique classes in the target column.
ADASYN Resampled X shape: (86, 41)
ADASYN Resampled y shape: (86,)

Training and evaluating Random Forest...
Random Forest Accuracy: 0.9667
Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.94      1.00      0.97        15
           1       1.00      0.93      0.97        15

    accuracy                           0.97        30
   macro avg       0.97      0.97      0.97        30
weighted avg       0.97      0.97      0.97        30


Random Forest ROC AUC: 0.9956
--------------------------------------------------

Training and evaluating SVM...
SVM Accuracy: 1.0000
SVM Classification Repo

## Ensemble

In [None]:
from sklearn.ensemble import StackingClassifier, GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report, roc_auc_score, roc_curve
from sklearn.preprocessing import StandardScaler, label_binarize
from docx import Document
import os
import numpy as np
import matplotlib.pyplot as plt

def stack_models(X_train_scaled, y_train):
    """Define and train the Stacking Classifier."""
    base_learners = [
        ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
        ('svc', SVC(probability=True, random_state=42)),
        ('knn', KNeighborsClassifier(n_neighbors=5)),
        ('dt', DecisionTreeClassifier(random_state=42))
    ]

    meta_model = LogisticRegression()
    stacking_model = StackingClassifier(estimators=base_learners, final_estimator=meta_model)

    stacking_model.fit(X_train_scaled, y_train)
    return stacking_model


def stacking_pipeline():
    # Step 1: Define paths for dataset and results
    file_path = '/content/drive/MyDrive/ResearchTask/Research_Dataset/Copy of preprocessed_merged_files.csv'  # Binary
    # file_path = '/content/drive/MyDrive/Colab Notebooks/Original_dataset/cyberthreat.csv'  # Multi-class
    base_output_file = '/content/drive/MyDrive/Colab Notebooks/results/'
    name = 'stacking'

    # Define the directory for stacking results
    base_output_file_stacking = os.path.join(base_output_file, name)
    os.makedirs(base_output_file_stacking, exist_ok=True)

    # Initialize the DataPipeline class (assuming you have this class for data handling)
    pipeline = DataPipeline(file_path, base_output_file_stacking)

    target_column_name = input("Enter the name of the target column: ")
    test_size_input = input("Enter the test size as a fraction (e.g., 0.2 for 20%): ")
    test_size = float(test_size_input)

    # Run the initial pipeline steps
    df = pipeline.load_dataset()
    df_cleaned = pipeline.clean_data()
    X, y = pipeline.define_target(df_cleaned, target_column_name)

    # Determine the number of classes
    num_classes = len(np.unique(y))

    # Split the data
    X_train, X_test, y_train, y_test = pipeline.split_data(test_size=test_size)

    # Standardize the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Define the directory for images
    result_images_directory_stacking = os.path.join(pipeline.results_dir, 'images_stacking')
    os.makedirs(result_images_directory_stacking, exist_ok=True)

    # Document to save the results
    doc = Document()

    # Define models
    models = {
        "Random Forest": RandomForestClassifier(n_estimators=100, random_state=42),
        "SVM": SVC(probability=True, random_state=42),
        "Decision Tree": DecisionTreeClassifier(random_state=42),
        "KNN": KNeighborsClassifier(),
        "Logistic Regression": LogisticRegression(),
        "Stacking": stack_models(X_train_scaled, y_train),
        "Gradient Boosting": GradientBoostingClassifier(random_state=42),
        "XGBoost": XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)
    }

    # Train and evaluate models
    for name, model in models.items():
        print(f"\nTraining and evaluating {name}...")

        # Train the model
        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_test_scaled)
        y_prob = model.predict_proba(X_test_scaled) if hasattr(model, "predict_proba") else None

        # Calculate metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted')
        classification_rep = classification_report(y_test, y_pred)
        print(f"{name} Accuracy: {accuracy:.4f}")
        print(f"{name} Classification Report:\n{classification_rep}")

        # ROC AUC Calculation
        roc_auc = None
        if num_classes > 2:  # Multi-class case
            y_test_bin = label_binarize(y_test, classes=list(range(num_classes)))
            if y_prob is not None:
                try:
                    roc_auc = roc_auc_score(y_test_bin, y_prob, multi_class='ovr')
                except Exception as e:
                    print(f"Error calculating ROC AUC for {name}: {e}")
        elif y_prob is not None:
            roc_auc = roc_auc_score(y_test, y_prob[:, 1])

        # Print ROC AUC if available
        if roc_auc is not None:
            print(f"{name} ROC AUC: {roc_auc:.4f}")
        else:
            print(f"{name} ROC AUC: Not available")

        # Plot ROC curve if applicable
        if roc_auc is not None and y_prob is not None:
            if num_classes == 2:
                fpr, tpr, _ = roc_curve(y_test, y_prob[:, 1])
            else:
                fpr, tpr, _ = roc_curve(y_test_bin.ravel(), y_prob.ravel())

            plt.figure()
            plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
            plt.plot([0, 1], [0, 1], color='grey', linestyle='--')
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title(f'{name} ROC Curve')
            plt.legend(loc="lower right")

            # Save the plot
            roc_curve_path = os.path.join(result_images_directory_stacking, f"{name}_roc_curve.png")
            plt.savefig(roc_curve_path)
            plt.close()

            # Add results and ROC curve image to the document
            doc.add_heading(f"{name} Results", level=2)
            doc.add_paragraph(f"Accuracy: {accuracy:.4f}")
            doc.add_paragraph("Classification Report:")
            doc.add_paragraph(classification_rep)
            doc.add_paragraph(f"ROC AUC: {roc_auc:.4f}")
            doc.add_picture(roc_curve_path, width=Inches(4.5))
        else:
            doc.add_heading(f"{name} Results", level=2)
            doc.add_paragraph(f"Accuracy: {accuracy:.4f}")
            doc.add_paragraph("Classification Report:")
            doc.add_paragraph(classification_rep)
            doc.add_paragraph("ROC AUC: Not available")

    # Save the results document
    doc_path = os.path.join(result_images_directory_stacking, "model_evaluation_results_with_stacking.docx")
    doc.save(doc_path)
    print(f"Results saved to {doc_path}")


if __name__ == "__main__":
    stacking_pipeline()


Created directory: /content/drive/MyDrive/Colab Notebooks/results/stacking/dataset
Enter the name of the target column: class
Enter the test size as a fraction (e.g., 0.2 for 20%): 0.2
Cleaned dataset saved to /content/drive/MyDrive/Colab Notebooks/results/stacking/dataset/cleaned_dataset.csv
Detected 2 unique classes in the target column.

Training and evaluating Random Forest...
Random Forest Accuracy: 0.9500
Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.91      1.00      0.95        10
           1       1.00      0.90      0.95        10

    accuracy                           0.95        20
   macro avg       0.95      0.95      0.95        20
weighted avg       0.95      0.95      0.95        20

Random Forest ROC AUC: 1.0000

Training and evaluating SVM...
SVM Accuracy: 1.0000
SVM Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        10
  

In [None]:
def main():
    # Step 1: Define paths for dataset and results
    # file_path = '/content/drive/MyDrive/Colab Notebooks/Original_dataset/preprocessed_merged_files.csv' # binary class
    file_path = '/content/drive/MyDrive/ResearchTask/Research_Dataset/Copy of preprocessed_merged_files.csv' # multi class
    base_output_file = '/content/drive/MyDrive/ResearchTask/results/'
    # Initialize the DataPipeline class
    pipeline = DataPipeline(file_path, base_output_file)

    target_column_name = input("Enter the name of the target column: ")
    test_size_input = input("Enter the test size as a fraction (e.g., 0.2 for 20%): ")
    test_size = float(test_size_input)

    dataset_name = "original dataset"

    # Run the initial pipeline up to data splitting
    X_train_scaled, X_test_scaled, y_train, y_test = pipeline.run_pipeline(target_column_name, test_size)

    display(HTML(f"<h3 style='color: red;'>Results for {dataset_name}</h3>"))

    # Define the directory for images
    result_images_directory = os.path.join(pipeline.results_dir, 'images')

    # Create the directory if it doesn't exist
    os.makedirs(result_images_directory, exist_ok=True)

    # Document to save the results
    doc = Document()

    # Train and evaluate the models
    pipeline.train_and_evaluate_models(X_train_scaled, X_test_scaled, y_train, y_test, doc, result_images_directory, pipeline.num_classes)

if __name__ == "__main__":
    main()

Enter the name of the target column: class
Enter the test size as a fraction (e.g., 0.2 for 20%): 0.2
Dataset summary saved to /content/drive/MyDrive/ResearchTask/results/dataset/dataset_summary.csv
Cleaned dataset saved to /content/drive/MyDrive/ResearchTask/results/dataset/cleaned_dataset.csv
Detected 2 unique classes in the target column.



Training and evaluating Random Forest...
Random Forest Accuracy: 0.9500
Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.91      1.00      0.95        10
           1       1.00      0.90      0.95        10

    accuracy                           0.95        20
   macro avg       0.95      0.95      0.95        20
weighted avg       0.95      0.95      0.95        20


Random Forest ROC AUC: 0.9900
--------------------------------------------------

Training and evaluating SVM...
SVM Accuracy: 1.0000
SVM Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        10
           1       1.00      1.00      1.00        10

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20


SVM ROC AUC: 1.0000
--------------------------------------------------

T