# Healthcare IoT Device Network Intrusion Detection System

### Creating the dataframes

In [None]:
import pandas as pd
import os
import glob

# options to see all columns in dataframes
pd.set_option("display.max_columns", None)

# get directory folder of train and test csv files (make sure to replace with path on your computer)
train_dir = r"C:\Users\spark\CMPE 132 Project\CMPE132Project\Dataset\Train".replace(
    "\\", "/"
)
test_dir = r"C:\Users\spark\CMPE 132 Project\CMPE132Project\Dataset\Test".replace(
    "\\", "/"
)

# store paths of all csv files in the folder in a list
train_csv_files = glob.glob(os.path.join(train_dir, "*.csv"))
test_csv_files = glob.glob(os.path.join(test_dir, "*.csv"))

# list to store csv files as dataframes
train_dataframes = []
test_dataframes = []

# create dataframes for each csv with column to label the class type
# get filename from path
for file in train_csv_files:
    filename = os.path.basename(file)

    # split the file name to get the class of the attack
    attack_type = filename.split("_train.pcap.csv")[0]

    # since some attacks are split into multiple files we need to remove numbers from the string
    # if last character is a number we remove it to get the name of the attack type
    if attack_type[-1].isdigit():
        attack_type = attack_type[:-1]

    # create a dataframe for the specific csv file
    df = pd.read_csv(file)

    # create a new column to classify the attack type in that csv file
    df["classification"] = attack_type

    # append the dataframe to the dataframes list
    train_dataframes.append(df)

    # delete the dataframe
    del df

# do the same for the test data
for file in test_csv_files:
    filename = os.path.basename(file)

    # split the file name to get the class of the attack
    attack_type = filename.split("_test.pcap.csv")[0]

    # since some attacks are split into multiple files we need to remove numbers from the string
    # if last character is a number we remove it to get the name of the attack type
    if attack_type[-1].isdigit():
        attack_type = attack_type[:-1]

    # create a dataframe for the specific csv file
    df = pd.read_csv(file)

    # create a new column to classify the attack type in that csv file
    df["classification"] = attack_type

    # append the dataframe to the dataframes list
    test_dataframes.append(df)

    # delete the dataframe
    del df


# combine all of the dataframes into one
train_df = pd.concat(train_dataframes, ignore_index=True)
test_df = pd.concat(test_dataframes, ignore_index=True)

# downsample to a certain fraction of random samples per class depending on how many rows in the class
# if smaller class don't cut out rows or if normal traffic don't cut out
result_dfs = []
for class_name, group in train_df.groupby("classification"):
    if len(group) > 10000 and class_name.lower() != "benign":
        result_dfs.append(group.sample(n=1000, random_state=42))
    elif class_name.lower() == "benign":
        result_dfs.append(group.sample(n=2000, random_state=42))
    else:
        result_dfs.append(group)
train_df = pd.concat(result_dfs, ignore_index=True)

# add binary label to the training and testing dataframe in order to test binary classification as well
train_df["binary_classification"] = train_df["classification"].apply(
    lambda binary: 0 if binary.lower() == "benign" else 1
)
test_df["binary_classification"] = test_df["classification"].apply(
    lambda binary: 0 if binary.lower() == "benign" else 1
)

print(train_df["classification"].value_counts())
print(train_df["binary_classification"].value_counts())
print(test_df["binary_classification"].value_counts())

print(f"rows in training dataset: {train_df.shape[0]}")
print(f"rows in testing dataset: {test_df.shape[0]}")

### Data Preprocessing

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# create training and testing data from the dataframes made earlier
# must drop the classification rows in order to work with the feature data
# create both y training and testing data for multi class classification and binary classification
X_train = train_df.drop(columns=["classification", "binary_classification"])
y_train_multi = train_df["classification"]
y_train_binary = train_df["binary_classification"]

X_test = test_df.drop(columns=["classification", "binary_classification"])
y_test_multi = test_df["classification"]
y_test_binary = test_df["binary_classification"]

# standardize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# use PCA to reduce feature
pca = PCA(n_components=0.95)
X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.transform(X_test_scaled)

print(f"Reduced number of features after PCA: {X_train_pca.shape[1]}")

del X_train, X_test, X_train_scaled, X_test_scaled

### Random Forest Classifier Training

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import RandomizedSearchCV

In [None]:
# train random forest classifier model
# create two different RF classifiers, one to fit the multi class classification data
# and one to fit the binary classification data
rf_multi = RandomForestClassifier(random_state=42, n_jobs=12)
rf_binary = RandomForestClassifier(random_state=42, n_jobs=12)

rf_multi.fit(X_train_pca, y_train_multi)
rf_binary.fit(X_train_pca, y_train_binary)

#### Accuracy of Random Forest Classifier Model

In [None]:
# check accuracy of model
rf_y_pred_multi = rf_multi.predict(X_test_pca)
rf_y_pred_binary = rf_binary.predict(X_test_pca)

print(
    "\nClassification Report for Multi-Class Classification:\n",
    classification_report(y_test_multi, rf_y_pred_multi),
)
print(
    "\nClassification Report for Binary Classification:\n",
    classification_report(y_test_binary, rf_y_pred_binary),
)

#### Optimizing the Random Forest Classifier

In [None]:
import numpy as np

# parameters to search through
rf_params_grid = {
    "n_estimators": [100, 200, 400, 800],
    "max_depth": [None, 10, 20, 40, 80],
    "min_samples_split": [2, 4, 8, 16],
    "min_samples_leaf": [1, 2, 4, 8],
}

# initialize RandomizedSearchCV object
rf_multi_randomized_search = RandomizedSearchCV(
    estimator=rf_multi,
    param_distributions=rf_params_grid,
    cv=3,
    n_iter=10,
    random_state=42,
    n_jobs=12,
    verbose=2,
    scoring="f1_macro",
)
rf_binary_randomized_search = RandomizedSearchCV(
    estimator=rf_binary,
    param_distributions=rf_params_grid,
    cv=3,
    n_iter=10,
    random_state=42,
    n_jobs=12,
    verbose=2,
    scoring="f1_macro",
)

# start search on hyper parameters for both binary and multi class classification
rf_multi_search = rf_multi_randomized_search.fit(X_train_pca, y_train_multi)
rf_binary_search = rf_binary_randomized_search.fit(X_train_pca, y_train_binary)


In [None]:
# print best estimator and best parameters
print(
    f"Best estimator for multi class classification: {rf_multi_search.best_estimator_}"
)
print(f"Best parameters for multi class classification: {rf_multi_search.best_params_}")
print(f"Best estimator for binary classification: {rf_binary_search.best_estimator_}")
print(f"Best parameters for binary classification: {rf_binary_search.best_params_}")

# print classification report for those parameters for both multi class and binary classification
rf_y_pred_multi_random_search = rf_multi_search.predict(X_test_pca)
rf_y_pred_binary_random_search = rf_binary_search.predict(X_test_pca)
print(
    "\nClassification Report for RandomizedSearchCV Multi Class Classification:\n",
    classification_report(y_test_multi, rf_y_pred_multi_random_search),
)
print(
    "\nClassification Report for RandomizedSearchCV Binary Classification:\n",
    classification_report(y_test_binary, rf_y_pred_binary_random_search),
)

### Gradient Boosting Classifier Training

In [None]:
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import RandomizedSearchCV

In [None]:
# train gradient boosting classifier model
os.environ["LOKY_MAX_CPU_COUNT"] = "12"

hgbc_multi = HistGradientBoostingClassifier(random_state=42)
hgbc_binary = HistGradientBoostingClassifier(random_state=42)
hgbc_multi.fit(X_train_pca, y_train_multi)
hgbc_binary.fit(X_train_pca, y_train_binary)

#### Accuracy of Gradient Boosting Classifier Model

In [None]:
# check accuracy of gradient boosting classifier model
hgbc_y_pred_multi = hgbc_multi.predict(X_test_pca)
hgbc_y_pred_binary = hgbc_binary.predict(X_test_pca)
print(
    "\nClassification Report for RandomizedSearchCV Multi Class Classification:\n",
    classification_report(y_test_multi, hgbc_y_pred_multi),
)
print(
    "\nClassification Report for RandomizedSearchCV Binary Classification:\n",
    classification_report(y_test_binary, hgbc_y_pred_binary),
)

#### Optimizing Gradient Boosting Classifier Model

In [None]:
import numpy as np

# create params grid
hgbc_params_grid = {
    "learning_rate": [0.01, 0.05, 0.1, 0.2],
    "max_iter": np.linspace(100, 1000, 10, dtype=int),
    "max_leaf_nodes": [None] + list(np.linspace(10, 100, 10, dtype=int)),
    "max_depth": [None] + list(np.linspace(1, 100, 10, dtype=int)),
    "min_samples_leaf": np.linspace(1, 200, 10, dtype=int),
    "l2_regularization": np.linspace(0.0, 1.0, 10),
    "max_features": np.linspace(0.1, 1.0, 10),
    "max_bins": np.linspace(2, 255, 10, dtype=int),
}


# initialize RandomizedSearchCV object
hgbc_multi_randomized_search = RandomizedSearchCV(
    estimator=hgbc_multi,
    param_distributions=hgbc_params_grid,
    cv=3,
    random_state=42,
    n_jobs=12,
    verbose=2,
    scoring="f1_macro",
)
hgbc_binary_randomized_search = RandomizedSearchCV(
    estimator=hgbc_binary,
    param_distributions=hgbc_params_grid,
    cv=3,
    random_state=42,
    n_jobs=12,
    verbose=2,
    scoring="f1_macro",
)
# start search on hyper parameters
hgbc_multi_search = hgbc_multi_randomized_search.fit(X_train_pca, y_train_multi)
hgbc_binary_search = hgbc_binary_randomized_search.fit(X_train_pca, y_train_binary)

In [None]:
# print best estimator and best parameters for both multi class and binary classifications
print(
    f"Best estimator for multi class classification: {hgbc_multi_search.best_estimator_}"
)
print(
    f"Best parameters for multi class classification: {hgbc_multi_search.best_params_}"
)
print(f"Best estimator for binary classification: {hgbc_binary_search.best_estimator_}")
print(f"Best parameters for binary classification: {hgbc_binary_search.best_params_}")

# print classification report for those parameters
hgbc_multi_y_pred_random_search = hgbc_multi_search.predict(X_test_pca)
hgbc_binary_y_pred_random_search = hgbc_binary_search.predict(X_test_pca)
print(
    "\nClassification Report for best parameters for multi class classification:\n",
    classification_report(y_test_multi, hgbc_multi_y_pred_random_search),
)
print(
    "\nClassification Report for best parameters for binary classification:\n",
    classification_report(y_test_binary, hgbc_binary_y_pred_random_search),
)

### Multi-layer Perceptron (MLP) Classifier Training

In [None]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report

In [None]:
# train MLP model
mlpc = MLPClassifier(random_state=42, max_iter=10000)
mlpc.fit(X_train_pca, y_train)

#### Accuracy of MLP Classifier Model

In [None]:
# check accuracy of MLP neural network
y_pred = mlpc.predict(X_test_pca)
print("\nClassification Report:\n", classification_report(y_test, y_pred))

### Optimizing MLP Classifier Model