### Run the code below if pyspark operations throw random errors

In [1]:
import shutil
shutil.rmtree("C:/Users/m/AppData/Local/Temp", ignore_errors=True) 

### Importing libraries, starting spark session and taking a look at the data

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [3]:
spark = SparkSession.builder.appName("Finance_Model_Training").getOrCreate()

In [4]:
train_df = spark.read.csv("data/Train_data.csv", header=True, inferSchema=True)
test_df = spark.read.csv("data/Train_data.csv", header=True, inferSchema=True)

In [5]:
train_df.show()

+--------+-------------+----------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type|   service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|d

In [6]:
train_df.columns

['duration',
 'protocol_type',
 'service',
 'flag',
 'src_bytes',
 'dst_bytes',
 'land',
 'wrong_fragment',
 'urgent',
 'hot',
 'num_failed_logins',
 'logged_in',
 'num_compromised',
 'root_shell',
 'su_attempted',
 'num_root',
 'num_file_creations',
 'num_shells',
 'num_access_files',
 'num_outbound_cmds',
 'is_host_login',
 'is_guest_login',
 'count',
 'srv_count',
 'serror_rate',
 'srv_serror_rate',
 'rerror_rate',
 'srv_rerror_rate',
 'same_srv_rate',
 'diff_srv_rate',
 'srv_diff_host_rate',
 'dst_host_count',
 'dst_host_srv_count',
 'dst_host_same_srv_rate',
 'dst_host_diff_srv_rate',
 'dst_host_same_src_port_rate',
 'dst_host_srv_diff_host_rate',
 'dst_host_serror_rate',
 'dst_host_srv_serror_rate',
 'dst_host_rerror_rate',
 'dst_host_srv_rerror_rate',
 'class']

### To understand what features we are playing with, description of all the features is given below:

##### **Basic features which describe the connection without looking at the payload**
- **duration** – Length (in seconds) of the connection.
- **protocol_type** – Network protocol used (e.g., TCP, UDP, ICMP).
- **service** – Destination service (e.g., http, telnet, ftp).
- **flag** – Status flag of the connection (e.g., S0, SF, REJ).
- **src_bytes** – Number of data bytes sent from source to destination.
- **dst_bytes** – Number of data bytes sent from destination to source.
- **land** – 1 if connection is from/to the same host/port; 0 otherwise.
- **wrong_fragment** – Number of wrong fragments in the packet.
- **urgent** – Number of urgent packets.
##### **Content features which describe what is within the connection payload**
- **hot** – Number of “hot” indicators (e.g., suspicious commands).
- **num_failed_logins** – Number of failed login attempts.
- **logged_in** – 1 if successfully logged in; 0 otherwise.
- **num_compromised** – Number of compromised conditions.
- **root_shell** – 1 if root shell is obtained; 0 otherwise.
- **su_attempted** – 1 if “su root” command attempted; 0 otherwise.
- **num_root** – Number of “root” accesses.
- **num_file_creations** – Number of file creation operations.
- **num_shells** – Number of shell prompts opened.
- **num_access_files** – Number of attempts to access control files.
- **num_outbound_cmds** – Number of outbound commands.
- **is_host_login** – 1 if user is a “host login”; 0 otherwise.
- **is_guest_login** – 1 if user is a “guest login”; 0 otherwise.
##### **Traffic features based on the past 2 seconds wiindow for same host
- **count** – Number of connections to the same host in the past 2 seconds.
- **srv_count** – Number of connections to the same service.
- **serror_rate** – % of connections with “SYN” errors.
- **srv_serror_rate** – % of connections with “SYN” errors to the same service.
- **rerror_rate** – % of connections with “REJ” errors.
- **srv_rerror_rate** – % of REJ errors to the same service.
- **same_srv_rate** – % of connections to the same service.
- **diff_srv_rate** – % of connections to different services.
- **srv_diff_host_rate** – % of connections to the same service but different host.
- **dst_host_count** – Number of connections to the destination host.
- **dst_host_srv_count** – Connections to the destination host using the same service.
- **dst_host_same_srv_rate** – % of same-service connections among dst_host_count.
- **dst_host_diff_srv_rate** – % of different-service connections.
- **dst_host_same_src_port_rate** – % of connections from same source port.
- **dst_host_srv_diff_host_rate** – % of same-service connections to different hosts.
- **dst_host_serror_rate** – % of connections with SYN errors.
- **dst_host_srv_serror_rate** – % of same-service SYN errors.
- **dst_host_rerror_rate** – % of connections with REJ errors.
- **dst_host_srv_rerror_rate** – % of same-service REJ errors.
##### **Label**
- **class** - either "normal" or "anomally"

#### Let's see if the data has duplicates and/or missing values

In [7]:
if train_df.count() == train_df.na.drop(how="any").count():
    print("No missing values in the dataset")
else:
    print("Missing values, need to treat the dataset")

if train_df.count() == train_df.dropDuplicates().count():
    print("No duplicates in the dataset")
else:
    print("Duplicate values, need to treat the dataset")

No missing values in the dataset
No duplicates in the dataset


Lets check the data distribution for all the columns

In [8]:
train_df.describe().show()

+-------+------------------+-------------+-------+-----+------------------+------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+-----------------+-------------+--------------------+------------------+-----------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|summary|          duration|protocol_type|service| flag|         src_bytes|         dst_bytes|                land|      wrong_fragment|              urgent|           

You can notice that some of the columns min, max, average and median is the same value, hence they don't have any value variation. Dropping these columns will help make our training more efficient

In [9]:
train_df = train_df.drop("num_outbound_cmds", "is_host_login")

# FEATURE SELECTION

We will not be checking for outliers in the data as they are valuable for the potential anomally detection i.e. financial fraud. We will look into class imbalance and resolve it further

In [11]:
train_df.groupBy().pivot("class").count().show()

+-------+------+
|anomaly|normal|
+-------+------+
|  11743| 13449|
+-------+------+



The classes are in the ratio of 45:55 which is decently balanced, hence we don't need to do further balancing in such scenario. Lets check if the datatypes are assigned accurately or if they need to be changed

In [12]:
train_df.show()

+--------+-------------+----------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type|   service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_host_count|dst_host_srv_count|dst_host_same_srv_rate|dst_host

In [13]:
train_df.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- srv_count: integer (nullable = true)
 |-- serror_rate: double (nullable = true)
 |-- srv_

#### The columns look fine and the correct values seem to be assigned. Lets encode all the string columns to indexed columns and then perform one hot encoding, so its readable to our ML model

In [14]:
# Getting all the column names which are string
columnList = [item[0] for item in train_df.dtypes if item[1].startswith('string')]

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

index_cols = [c + "_index" for c in columnList]
ohe_cols = [c + "_ohe" for c in columnList]

# Step 1: Index string columns to numeric
indexers = [StringIndexer(inputCol=c, outputCol=c + "_index") for c in columnList]

# Step 2: OneHot encode the indexed columns
ohe = OneHotEncoder(inputCols=index_cols, outputCols=ohe_cols)

# Step 3: Build pipeline
pipeline = Pipeline(stages=indexers + [ohe])

# Fit and transform
encoder_model = pipeline.fit(train_df)
train_encoded_df = encoder_model.transform(train_df)
test_encoded_df = encoder_model.transform(test_df)

# To show the change from original value, indexed value and one hot encoded value
train_encoded_df.select(columnList + index_cols + ohe_cols).show()

train_encoded_df = train_encoded_df.drop(*(columnList + index_cols))
test_encoded_df = train_encoded_df.drop(*(columnList + index_cols))

+-------------+----------+----+-------+-------------------+-------------+----------+-----------+-----------------+---------------+--------------+-------------+
|protocol_type|   service|flag|  class|protocol_type_index|service_index|flag_index|class_index|protocol_type_ohe|    service_ohe|      flag_ohe|    class_ohe|
+-------------+----------+----+-------+-------------------+-------------+----------+-----------+-----------------+---------------+--------------+-------------+
|          tcp|  ftp_data|  SF| normal|                0.0|          4.0|       0.0|        0.0|    (2,[0],[1.0])| (65,[4],[1.0])|(10,[0],[1.0])|(1,[0],[1.0])|
|          udp|     other|  SF| normal|                1.0|          6.0|       0.0|        0.0|    (2,[1],[1.0])| (65,[6],[1.0])|(10,[0],[1.0])|(1,[0],[1.0])|
|          tcp|   private|  S0|anomaly|                0.0|          1.0|       1.0|        1.0|    (2,[0],[1.0])| (65,[1],[1.0])|(10,[1],[1.0])|    (1,[],[])|
|          tcp|      http|  SF| normal| 

You may notice that the one hot encoded values when ".show()" is used don't look like arrays. However, they are arrays. To understand what it means, I will use an example given below:

(65,[44],[1.0]) => the array has 65 values and the index 44 has a non-zero value i.e. 1.0

Alright, we have done plenty of data preprocessing, we can move on to normalizing the data but before that, we have to use Vector Assembler to combine input columns into single vector column

In [26]:
# Getting the list of input columns
input_cols = train_encoded_df.columns
input_cols.remove("class_ohe")

In [28]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
assembler = VectorAssembler(inputCols=input_cols, outputCol="features_vector")
train_assembled_df = assembler.transform(train_encoded_df)
test_assembled_df = assembler.transform(test_encoded_df)

In [29]:
scaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")
scaler_model = scaler.fit(train_assembled_df)
train_scaled_df = scaler_model.transform(train_assembled_df)
test_scaled_df = scaler_model.transform(test_assembled_df)

In [32]:
train_scaled_df.show()

+--------+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------------+---------------+--------------+-------------+--------------------+--------------------+
|duration|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_host_count|dst_host_srv_