## **GROUP ASSIGNMENT 2**

| Name      | UOW ID |
| ----------- | ----------- |
| Calaunan Alexander Jr Sumampong      | 7559161       |
| Deon Cham Hui Ern   | 7559471        |
| Elroy Chua Ming Xuan | 7431673 |
| Gonzales Raizel Vera Marie L. | 7436634 |

## (a) Discover and Visualise the data

#### Read the dataset

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

# Create a SparkSession
spark = SparkSession.builder.appName("UNSWProcessingSession").getOrCreate()

# Read the csv files into Spark dataframes
train_df = spark.read.csv('UNSW_NB15_training-set.csv', header=True, inferSchema=True)
test_df = spark.read.csv('UNSW_NB15_testing-set.csv', header=True, inferSchema=True)

# Print the number of rows and columns in the training and test dataframes
print("Training data shape: ", (train_df.count(), len(train_df.columns)))
print("Test data shape: ", (test_df.count(), len(test_df.columns)))

# Combine the training and test dataframes into a single dataframe
combined_df = train_df.union(test_df)

# Print the number of rows and columns in the combined dataframe
print("Combined data shape: ", (combined_df.count(), len(combined_df.columns)))

# Print the schema of the combined dataframe
print("\nCombined data schema:")
combined_df.printSchema()

# Describe the combined dataframe
print("\nCombined data description:")
combined_df.show()


Training data shape:  (82332, 45)
Test data shape:  (175341, 45)
Combined data shape:  (257673, 45)

Combined data schema:
root
 |-- id: integer (nullable = true)
 |-- dur: double (nullable = true)
 |-- proto: string (nullable = true)
 |-- service: string (nullable = true)
 |-- state: string (nullable = true)
 |-- spkts: integer (nullable = true)
 |-- dpkts: integer (nullable = true)
 |-- sbytes: integer (nullable = true)
 |-- dbytes: integer (nullable = true)
 |-- rate: double (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sload: double (nullable = true)
 |-- dload: double (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- dloss: integer (nullable = true)
 |-- sinpkt: double (nullable = true)
 |-- dinpkt: double (nullable = true)
 |-- sjit: double (nullable = true)
 |-- djit: double (nullable = true)
 |-- swin: integer (nullable = true)
 |-- stcpb: long (nullable = true)
 |-- dtcpb: long (nullable = true)
 |-- dwin: integer (

#### Drop id column from data

In [12]:
# Drop 'id' column from the combined dataframe
combined_df = combined_df.drop('id')

#### Display Statistics of the data 

In [13]:
# List of categorical column names
categorical_columns = ["attack_cat", "label", "proto", "service", "state", "ct_state_ttl", "is_ftp_login", "ct_ftp_cmd", "is_sm_ips_ports"]

# Calculate statistics for each categorical column
for col_name in categorical_columns:
    print(f"Statistics for column: {col_name}")
    
    # Display unique values
    unique_values = combined_df.select(col_name).distinct().collect()
    print("Unique Values:")
    for row in unique_values:
        print(row[col_name])
    print()
    
    # Display mode (most frequent value)
    mode_row = combined_df.groupBy(col_name).count().orderBy(desc("count")).first()
    mode_value = mode_row[col_name]
    mode_count = mode_row["count"]
    print(f"Mode: {mode_value} (Count: {mode_count})")
    
    # Display count and percentage distribution
    total_count = combined_df.count()
    categorical_counts = combined_df.groupBy(col_name).count()
    categorical_percentage = categorical_counts.withColumn("percentage", col("count") / total_count * 100)
    
    print("Count and Percentage Distribution:")
    categorical_percentage.show()
    print("-" * 40)

Statistics for column: attack_cat
Unique Values:
Worms
Shellcode
Fuzzers
Analysis
DoS
Reconnaissance
Backdoor
Exploits
Normal
Generic

Mode: Normal (Count: 93000)
Count and Percentage Distribution:
+--------------+-----+-------------------+
|    attack_cat|count|         percentage|
+--------------+-----+-------------------+
|         Worms|  174|0.06752744757890815|
|     Shellcode| 1511| 0.5864021453547714|
|       Fuzzers|24246|   9.40960054021958|
|      Analysis| 2677| 1.0389136618892938|
|           DoS|16353|  6.346415806079799|
|Reconnaissance|13987|  5.428197754518323|
|      Backdoor| 2329| 0.9038587667314776|
|      Exploits|44525| 17.279652893395895|
|        Normal|93000|  36.09225646458884|
|       Generic|58871|  22.84717451964311|
+--------------+-----+-------------------+

----------------------------------------
Statistics for column: label
Unique Values:
1
0

Mode: 1 (Count: 164673)
Count and Percentage Distribution:
+-----+------+------------------+
|label| count|  

In [14]:
# List of all column names
all_columns = combined_df.columns

# List of numerical column names (excluding categorical columns)
numerical_columns = [col_name for col_name in all_columns if col_name not in categorical_columns]

# Calculate statistics for each numerical column
for col_name in numerical_columns:
    print(f"Statistics for column: {col_name}")
    
    # Display count, mean, standard deviation, minimum, maximum
    numerical_stats = combined_df.describe([col_name])
    numerical_stats.show()
    
    print("-" * 40)

Statistics for column: dur
+-------+------------------+
|summary|               dur|
+-------+------------------+
|  count|            257673|
|   mean|1.2467150990092788|
| stddev| 5.974305404543933|
|    min|               0.0|
|    max|         59.999989|
+-------+------------------+

----------------------------------------
Statistics for column: spkts
+-------+------------------+
|summary|             spkts|
+-------+------------------+
|  count|            257673|
|   mean|19.777143899438435|
| stddev| 135.9471522771518|
|    min|                 1|
|    max|             10646|
+-------+------------------+

----------------------------------------
Statistics for column: dpkts
+-------+------------------+
|summary|             dpkts|
+-------+------------------+
|  count|            257673|
|   mean| 18.51470274339958|
| stddev|111.98596523102219|
|    min|                 0|
|    max|             11018|
+-------+------------------+

----------------------------------------
Statis

#### Check for missing values 

In [15]:
from pyspark.sql.functions import count, when, isnan, col

# Check for NULL values in the dataset
combined_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in combined_df.columns]).show()


[Stage 568:====>                                                   (1 + 8) / 12]

+---+-----+-------+-----+-----+-----+------+------+----+----+----+-----+-----+-----+-----+------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----+
|dur|proto|service|state|spkts|dpkts|sbytes|dbytes|rate|sttl|dttl|sload|dload|sloss|dloss|sinpkt|dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|label|
+---+-----+-------+-----+-----+-----+------+------+----+----+----+-----+-----+-----+-----+------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+------

                                                                                

- No NULL values detected in data 
- Check for '0' values in numerical columns  
- Drop columns with > 60% values with 0

In [16]:
from pyspark.sql import functions as F

# Count the total number of rows in the DataFrame
total_rows = combined_df.count()

# List to store columns that meet the drop criteria
cols_to_drop = []

# Count the number of '0' values in each numerical column
for col_name in numerical_columns:
    zero_count = combined_df.filter(F.col(col_name) == 0).count()
    zero_percentage = (zero_count / total_rows) * 100
    print(f"Number of '0' values in column '{col_name}': {zero_count}")
    print(
        f"Percentage of '0' values in column '{col_name}': {zero_percentage:.2f}%")
    print("-" * 40)

    # Check if column meets the 60% criteria
    if zero_percentage >= 60:
        cols_to_drop.append(col_name)

# Drop columns that meet the criteria
combined_df = combined_df.drop(*cols_to_drop)

# Remove the dropped columns from the numerical_columns list as well
numerical_columns = [
    col for col in numerical_columns if col not in cols_to_drop]
print(f"Dropped columns: {cols_to_drop}")


Number of '0' values in column 'dur': 3607
Percentage of '0' values in column 'dur': 1.40%
----------------------------------------
Number of '0' values in column 'spkts': 0
Percentage of '0' values in column 'spkts': 0.00%
----------------------------------------
Number of '0' values in column 'dpkts': 120288
Percentage of '0' values in column 'dpkts': 46.68%
----------------------------------------
Number of '0' values in column 'sbytes': 0
Percentage of '0' values in column 'sbytes': 0.00%
----------------------------------------
Number of '0' values in column 'dbytes': 120288
Percentage of '0' values in column 'dbytes': 46.68%
----------------------------------------
Number of '0' values in column 'rate': 3968
Percentage of '0' values in column 'rate': 1.54%
----------------------------------------
Number of '0' values in column 'sttl': 4210
Percentage of '0' values in column 'sttl': 1.63%
----------------------------------------
Number of '0' values in column 'dttl': 120439
Percen

#### Perform Correlations between target and feature columns

1. Correlation between 'attack_cat' and Categorical Columns:

- Since attack_cat is a categorical variable, Chi-squared test or Cramér's V can be used to measure the association between it and other categorical columns.
- If the value is closer to 1, it indicates a stronger association between the two categorical variables.

2. Correlation between 'attack_cat' and Numerical Columns:

- Perform "multinomial logistic regression" or "softmax regression." 
- An extension of binary logistic regression that can handle multiple categories.

3. Correlation between 'label' and Categorical Columns:

- Label is a binary variable (0 or 1), which is also considered a categorical column 
- Chi-squared test or Cramér's V can be used to measure the association between it and other categorical columns.
- If the value is closer to 1, it indicates a stronger association between the two categorical variables.

4. Correlation between 'label' and Numerical Columns:

- To analyze the correlation between label and numerical columns, point-biserial correlation coefficient is used 
- This coefficient quantifies the strength and direction of the linear relationship between a binary and a continuous variable.

#### Encode categorical columns before conducting correlations
- Encode the columns "attack_cat," "proto," "service," and "state" using Spark's StringIndexer

In [17]:
from pyspark.sql.functions import col, desc
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
import pandas as pd

# List of categorical columns to be encoded
columns_to_encode = ["attack_cat", "proto", "service", "state"]

# Create instances of StringIndexer for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index").fit(combined_df) for col in columns_to_encode]

# Create instances of OneHotEncoder for each categorical column index
one_hot_encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in columns_to_encode]

# Create a pipeline with both StringIndexer and OneHotEncoder stages
pipeline = Pipeline(stages=indexers + one_hot_encoders)

# Fit and transform the pipeline on the data
encoded_df = pipeline.fit(combined_df).transform(combined_df)

# Display encoded columns
# Calculate statistics for each encoded categorical column again
for col_name in columns_to_encode:
    print(f"Statistics for column: {col_name}_index")
    
    # Display unique values
    unique_values = encoded_df.select(col_name + "_index").distinct().collect()
    print("Unique Values:")
    for row in unique_values:
        print(row[col_name + "_index"])
    print()
    
    # Display mode (most frequent value)
    mode_row = encoded_df.groupBy(col_name + "_index").count().orderBy(desc("count")).first()
    mode_value = mode_row[col_name + "_index"]
    mode_count = mode_row["count"]
    print(f"Mode: {mode_value} (Count: {mode_count})")
    
    # Display count and percentage distribution
    total_count = encoded_df.count()
    categorical_counts = encoded_df.groupBy(col_name + "_index").count()
    categorical_percentage = categorical_counts.withColumn("percentage", (col("count") / total_count) * 100)
    
    print("Count and Percentage Distribution:")
    categorical_percentage.show()
    print("-" * 40)

Statistics for column: attack_cat_index
Unique Values:
8.0
0.0
7.0
1.0
4.0
3.0
2.0
6.0
5.0
9.0

Mode: 0.0 (Count: 93000)
Count and Percentage Distribution:
+----------------+-----+-------------------+
|attack_cat_index|count|         percentage|
+----------------+-----+-------------------+
|             8.0| 1511| 0.5864021453547714|
|             0.0|93000|  36.09225646458884|
|             7.0| 2329| 0.9038587667314776|
|             1.0|58871|  22.84717451964311|
|             4.0|16353|  6.346415806079799|
|             3.0|24246|   9.40960054021958|
|             2.0|44525| 17.279652893395895|
|             6.0| 2677| 1.0389136618892938|
|             5.0|13987|  5.428197754518323|
|             9.0|  174|0.06752744757890815|
+----------------+-----+-------------------+

----------------------------------------
Statistics for column: proto_index
Unique Values:
70.0
8.0
67.0
0.0
69.0
7.0
112.0
124.0
128.0
108.0
88.0
49.0
101.0
98.0
116.0
107.0
29.0
75.0
64.0
47.0
42.0
44.0
96.0
62.

#### 1. Correlation between 'attack_cat' and Categorical Columns (Cramér's V)

In [18]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import ChiSquareTest
import pandas as pd
import numpy as np
import scipy.stats

# List of dependent variable and independent variables
dv = "attack_cat_index"
iv_cols = ["proto_index", "service_index", "state_index", "ct_state_ttl", "is_ftp_login", "ct_ftp_cmd", "is_sm_ips_ports"]

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=iv_cols, outputCol="features")
assembled_df = assembler.transform(encoded_df)

# Calculate Cramer's V values for each independent variable
cramers_v_values_iv = {}
for iv in iv_cols:
    # Cross-tabulate the independent variable and the dependent variable
    contingency_table = assembled_df.crosstab(iv, dv)
    
    # Convert the contingency table to a pandas DataFrame
    contingency_df = contingency_table.toPandas()

    # Convert string values to numeric types
    contingency_df = contingency_df.apply(pd.to_numeric, errors="coerce")
    
    # Calculate the chi-square statistic
    chi2, _, _, _ = scipy.stats.chi2_contingency(contingency_df.values)
    
    # Calculate Cramer's V value
    cramers_v_value = np.sqrt(chi2 / (contingency_df.values.sum() * (min(contingency_df.shape[0], len(iv_cols)) - 1)))
    
    # Store the Cramer's V value for the independent variable
    cramers_v_values_iv[iv] = cramers_v_value
    
    # Display chi-square statistic and Cramer's V value for each independent variable
    print(f"Independent Variable: {iv}")
    print(f"Chi-Square Statistic: {chi2}")
    print(f"Cramer's V Value: {cramers_v_value}")
    print("---------")

# Display the sorted Cramer's V values for independent variables
print("\nCramer's V values for independent variables (sorted): ")
sorted_iv = sorted(cramers_v_values_iv.items(), key=lambda x: x[1], reverse=True)
for iv, value in sorted_iv:
    print(f"{iv}: {value}")



Independent Variable: proto_index
Chi-Square Statistic: 329651.2386025059
Cramer's V Value: 0.4540911663926888
---------
Independent Variable: service_index
Chi-Square Statistic: 269422.1920257995
Cramer's V Value: 0.41738888335873475
---------
Independent Variable: state_index
Chi-Square Statistic: 305897.82929099636
Cramer's V Value: 0.44476619484489527
---------
Independent Variable: ct_state_ttl
Chi-Square Statistic: 272652.0249839073
Cramer's V Value: 0.4199296937000646
---------
Independent Variable: is_ftp_login
Chi-Square Statistic: 45671.47211622534
Cramer's V Value: 0.24306449322854293
---------
Independent Variable: ct_ftp_cmd
Chi-Square Statistic: 41580.6790751981
Cramer's V Value: 0.23192352626810445
---------
Independent Variable: is_sm_ips_ports
Chi-Square Statistic: 6674.152048894995
Cramer's V Value: 0.16093953736367256
---------

Cramer's V values for independent variables (sorted): 
proto_index: 0.4540911663926888
state_index: 0.44476619484489527
ct_state_ttl: 0.4199

#### 2. Correlation between 'attack_cat' and Numerical Columns (Multinomial logistic regression)

In [19]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Select the relevant columns
selected_columns = ["attack_cat", "proto", "service", "state", "ct_state_ttl", "is_ftp_login", "ct_ftp_cmd", "is_sm_ips_ports"]
data = combined_df[selected_columns]

# Encode categorical variables
label_encoder = LabelEncoder()
for column in selected_columns:
    data[column] = label_encoder.fit_transform(data[column])

# Split the data into features and target
X = data.drop('attack_cat', axis=1)
y = data['attack_cat']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and fit the multinomial logistic regression model
model = LogisticRegression(multi_class='multinomial', solver='lbfgs')
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_test)

# Calculate accuracy
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

RecursionError: maximum recursion depth exceeded in comparison

#### 3. Correlation between 'label' and Categorical Columns (Cramér's V)

In [20]:
# List of dependent variable and independent variables
dv = "label"
iv_cols = ["proto_index", "service_index", "state_index", "ct_state_ttl", "is_ftp_login", "ct_ftp_cmd", "is_sm_ips_ports"]

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=iv_cols, outputCol="features")
assembled_df = assembler.transform(encoded_df)

# Calculate Cramer's V values for each independent variable
cramers_v_values_iv = {}
for iv in iv_cols:
    # Cross-tabulate the independent variable and the dependent variable
    contingency_table = assembled_df.crosstab(iv, dv)
    
    # Convert the contingency table to a pandas DataFrame
    contingency_df = contingency_table.toPandas()

    # Convert string values to numeric types
    contingency_df = contingency_df.apply(pd.to_numeric, errors="coerce")
    
    # Calculate the chi-square statistic
    chi2, _, _, _ = scipy.stats.chi2_contingency(contingency_df.values)
    
    # Calculate Cramer's V value
    cramers_v_value = np.sqrt(chi2 / (contingency_df.values.sum() * (min(contingency_df.shape[0], len(iv_cols)) - 1)))
    
    # Store the Cramer's V value for the independent variable
    cramers_v_values_iv[iv] = cramers_v_value
    
    # Display chi-square statistic and Cramer's V value for each independent variable
    print(f"Independent Variable: {iv}")
    print(f"Chi-Square Statistic: {chi2}")
    print(f"Cramer's V Value: {cramers_v_value}")
    print("---------")

# Display the sorted Cramer's V values for independent variables
print("\nCramer's V values for independent variables (sorted): ")
sorted_iv = sorted(cramers_v_values_iv.items(), key=lambda x: x[1], reverse=True)
for iv, value in sorted_iv:
    print(f"{iv}: {value}")


Independent Variable: proto_index
Chi-Square Statistic: 152965.02723943844
Cramer's V Value: 0.30932268531122875
---------
Independent Variable: service_index
Chi-Square Statistic: 55186.8776593981
Cramer's V Value: 0.18890441450895396
---------
Independent Variable: state_index
Chi-Square Statistic: 248521.966437425
Cramer's V Value: 0.40089068358648605
---------
Independent Variable: ct_state_ttl
Chi-Square Statistic: 200623.02670408247
Cramer's V Value: 0.3602156774780111
---------
Independent Variable: is_ftp_login
Chi-Square Statistic: 41764.89283064305
Cramer's V Value: 0.23243670077647027
---------
Independent Variable: ct_ftp_cmd
Chi-Square Statistic: 37673.899252221665
Cramer's V Value: 0.22075944967778802
---------
Independent Variable: is_sm_ips_ports
Chi-Square Statistic: 6674.1520488949955
Cramer's V Value: 0.16093953736367256
---------

Cramer's V values for independent variables (sorted): 
state_index: 0.40089068358648605
ct_state_ttl: 0.3602156774780111
proto_index: 0.3

#### 4. Correlation between 'label' and Numerical Columns (Point-Biserial)

In [21]:
from scipy.stats import pointbiserialr

# List of independent variables (IVs)
dv = "label"
iv_cols = ["dur", "spkts", "dpkts", "sbytes", "dbytes", "rate", "sttl", "dttl", "sload", "dload",
           "sloss", "dloss", "sinpkt", "dinpkt", "sjit", "djit", "swin", "stcpb", "dtcpb", "dwin",
           "tcprtt", "synack", "ackdat", "smean", "dmean", "trans_depth", "response_body_len",
           "ct_srv_src", "ct_dst_ltm", "ct_src_dport_ltm", "ct_dst_sport_ltm", "ct_dst_src_ltm",
           "ct_flw_http_mthd", "ct_src_ltm", "ct_srv_dst"]

# Calculate point-biserial correlation for each IV
pointbiserial_correlations = {}
for iv in iv_cols:
    # Convert Spark DataFrame to a list of values
    iv_values = [row[iv] for row in assembled_df.select(iv).collect()]
    label_values = [row["label"] for row in assembled_df.select("label").collect()]
    
    # Calculate point-biserial correlation coefficient
    pointbiserial_corr, _ = pointbiserialr(iv_values, label_values)
    
    # Store the correlation coefficient
    pointbiserial_correlations[iv] = pointbiserial_corr
    
    # Display correlation coefficient for each IV
    print(f"Independent Variable: {iv}")
    print(f"Point-Biserial Correlation: {pointbiserial_corr}")
    print("---------")

# Display the sorted point-biserial correlations for IVs
print("\nPoint-Biserial Correlations for independent variables (sorted): ")
sorted_iv = sorted(pointbiserial_correlations.items(), key=lambda x: x[1], reverse=True)
for iv, value in sorted_iv:
    print(f"{iv}: {value}")

Independent Variable: dur
Point-Biserial Correlation: 0.02909611699630219
---------
Independent Variable: spkts
Point-Biserial Correlation: -0.043040466784108264
---------
Independent Variable: dpkts
Point-Biserial Correlation: -0.09739388286260972
---------
Independent Variable: sbytes
Point-Biserial Correlation: 0.01937643251890141
---------
Independent Variable: dbytes
Point-Biserial Correlation: -0.06040284182726326
---------
Independent Variable: rate
Point-Biserial Correlation: 0.33588263882771363
---------
Independent Variable: sttl
Point-Biserial Correlation: 0.6240823834365409
---------
Independent Variable: dttl
Point-Biserial Correlation: 0.019368911398127166
---------
Independent Variable: sload
Point-Biserial Correlation: 0.16524867685806202
---------
Independent Variable: dload
Point-Biserial Correlation: -0.3521688041691041
---------
Independent Variable: sloss
Point-Biserial Correlation: 0.0018282740800961133
---------
Independent Variable: dloss
Point-Biserial Correlat

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `trans_depth` cannot be resolved. Did you mean one of the following? [`ackdat`, `attack_cat`, `ct_srv_dst`, `rate`, `ct_dst_ltm`].;
'Project ['trans_depth]
+- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 26 more fields]
   +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 25 more fields]
      +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 24 more fields]
         +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 23 more fields]
            +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 22 more fields]
               +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 21 more fields]
                  +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 20 more fields]
                     +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 19 more fields]
                        +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 18 more fields]
                           +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 17 more fields]
                              +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 20 more fields]
                                 +- Union false, false
                                    :- Relation [id#8270,dur#8271,proto#8272,service#8273,state#8274,spkts#8275,dpkts#8276,sbytes#8277,dbytes#8278,rate#8279,sttl#8280,dttl#8281,sload#8282,dload#8283,sloss#8284,dloss#8285,sinpkt#8286,dinpkt#8287,sjit#8288,djit#8289,swin#8290,stcpb#8291L,dtcpb#8292L,dwin#8293,... 21 more fields] csv
                                    +- Relation [id#8377,dur#8378,proto#8379,service#8380,state#8381,spkts#8382,dpkts#8383,sbytes#8384,dbytes#8385,rate#8386,sttl#8387,dttl#8388,sload#8389,dload#8390,sloss#8391,dloss#8392,sinpkt#8393,dinpkt#8394,sjit#8395,djit#8396,swin#8397,stcpb#8398L,dtcpb#8399L,dwin#8400,... 21 more fields] csv


#### @Raizel TO-DO: Data Visualization

#### @Elroy TO-DO
## (b) Prepare the data for ML Algorithms
#### Treating Missing Values

- Check for std dev of numerical columns first

In [25]:
# Calculate statistics for each numerical column
for col_name in numerical_columns:
    print(f"Standard Deviation for column: {col_name}")
    
    # Display standard deviation
    std_deviation = combined_df.describe([col_name]).filter(col("summary") == "stddev").select(col_name).first()[col_name]
    print(f"Standard Deviation: {std_deviation}")
    
    print("-" * 40)

Standard Deviation for column: dur
Standard Deviation: 5.974305404543933
----------------------------------------
Standard Deviation for column: spkts
Standard Deviation: 135.9471522771518
----------------------------------------
Standard Deviation for column: dpkts
Standard Deviation: 111.98596523102219
----------------------------------------
Standard Deviation for column: sbytes
Standard Deviation: 173773.8806049035
----------------------------------------
Standard Deviation for column: dbytes
Standard Deviation: 146199.28193837983
----------------------------------------
Standard Deviation for column: rate
Standard Deviation: 160344.63669326645
----------------------------------------
Standard Deviation for column: sttl
Standard Deviation: 102.48826801762412
----------------------------------------
Standard Deviation for column: dttl
Standard Deviation: 112.76213134623771
----------------------------------------
Standard Deviation for column: sload
Standard Deviation: 1.85731252841

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `trans_depth` cannot be resolved. Did you mean one of the following? [`ackdat`, `attack_cat`, `ct_srv_dst`, `rate`, `ct_dst_ltm`].;
'Project ['trans_depth]
+- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 17 more fields]
   +- Project [dur#8271, proto#8272, service#8273, state#8274, spkts#8275, dpkts#8276, sbytes#8277, dbytes#8278, rate#8279, sttl#8280, dttl#8281, sload#8282, dload#8283, sloss#8284, dloss#8285, sinpkt#8286, dinpkt#8287, sjit#8288, djit#8289, swin#8290, stcpb#8291L, dtcpb#8292L, dwin#8293, tcprtt#8294, ... 20 more fields]
      +- Union false, false
         :- Relation [id#8270,dur#8271,proto#8272,service#8273,state#8274,spkts#8275,dpkts#8276,sbytes#8277,dbytes#8278,rate#8279,sttl#8280,dttl#8281,sload#8282,dload#8283,sloss#8284,dloss#8285,sinpkt#8286,dinpkt#8287,sjit#8288,djit#8289,swin#8290,stcpb#8291L,dtcpb#8292L,dwin#8293,... 21 more fields] csv
         +- Relation [id#8377,dur#8378,proto#8379,service#8380,state#8381,spkts#8382,dpkts#8383,sbytes#8384,dbytes#8385,rate#8386,sttl#8387,dttl#8388,sload#8389,dload#8390,sloss#8391,dloss#8392,sinpkt#8393,dinpkt#8394,sjit#8395,djit#8396,swin#8397,stcpb#8398L,dtcpb#8399L,dwin#8400,... 21 more fields] csv


- High std dev scores observed for numerical columns 
- Median imputation is chosen 

#### Performing median impute on numerical columns 

In [23]:
from pyspark.ml.feature import Imputer
# Create an instance of the Imputer class
imputer = Imputer(
    inputCols=numerical_columns,
    outputCols=["{}_imputed".format(c) for c in numerical_columns],
    strategy="median"
)

# Fit and transform the DataFrame with the imputer
imputed_df = imputer.fit(combined_df).transform(combined_df)

# Check for NULL values after imputation
imputed_df.select(["{}_imputed".format(c) for c in numerical_columns]).show()


IllegalArgumentException: trans_depth does not exist. Available: dur, proto, service, state, spkts, dpkts, sbytes, dbytes, rate, sttl, dttl, sload, dload, sloss, dloss, sinpkt, dinpkt, sjit, djit, swin, stcpb, dtcpb, dwin, tcprtt, synack, ackdat, smean, dmean, ct_srv_src, ct_state_ttl, ct_dst_ltm, ct_src_dport_ltm, ct_dst_sport_ltm, ct_dst_src_ltm, is_ftp_login, ct_ftp_cmd, ct_src_ltm, ct_srv_dst, is_sm_ips_ports, attack_cat, label

## (c) Select and Train Models

In [None]:
# Close the SparkSession
# spark.stop()