# Internet of Vehicles (IoV) Network Packet Analysis (NPA) for Intrusion Detection Systems (IDS) - ETL and Feature Engineering

This notebook contains the ETL and Feature Engineering portion of the this project. Datasets were combined then split into training and test data to be used in logistic regression and anomaly detection.

This section requires the installation of PySpark, FindSpark and pandas

### Importing Libraries and Initiating PySpark

In [1]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.functions import array
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer

In [2]:
import findspark
findspark.init()

In [3]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [None]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

### Data Loading and Concatenation

In [None]:
atk_files = ['./decimal_IOT_Dataset/decimal_DoS.csv', \
             './decimal_IOT_Dataset/decimal_spoofing-GAS.csv', \
             './decimal_IOT_Dataset/decimal_spoofing-RPM.csv', \
             './decimal_IOT_Dataset/decimal_spoofing-SPEED.csv', \
             './decimal_IOT_Dataset/decimal_spoofing-STEERING_WHEEL.csv']

atk_data = pd.concat([pd.read_csv(file) for file in atk_files])
atk_data.to_csv('decimal_attack.csv', index=False)


In [11]:
attack_df = spark.read.csv('./decimal_IOT_Dataset/decimal_attack.csv', header = True)
attack_df.createOrReplaceTempView('attack')
attack_df.show()

+---+------+------+------+------+------+------+------+------+------+--------+--------------+
| ID|DATA_0|DATA_1|DATA_2|DATA_3|DATA_4|DATA_5|DATA_6|DATA_7| label|category|specific_class|
+---+------+------+------+------+------+------+------+------+------+--------+--------------+
|291|     0|     0|     0|     0|     0|     0|     0|     0|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|     DoS|           DoS|
|291|    14|    11|     4|     4|     3|     3|     8|    12|ATTACK|  

In [12]:
benign_df = spark.read.csv('./decimal_IOT_Dataset/decimal_benign.csv', header = True)
benign_df.createOrReplaceTempView('benign')
benign_df.show()

+----+------+------+------+------+------+------+------+------+------+--------+--------------+
|  ID|DATA_0|DATA_1|DATA_2|DATA_3|DATA_4|DATA_5|DATA_6|DATA_7| label|category|specific_class|
+----+------+------+------+------+------+------+------+------+------+--------+--------------+
|  65|    96|     0|     0|     0|     0|     0|     0|     0|BENIGN|  BENIGN|        BENIGN|
|1068|   132|    13|   160|     0|     0|     0|     0|     0|BENIGN|  BENIGN|        BENIGN|
| 535|   127|   255|   127|   255|   127|   255|   127|   255|BENIGN|  BENIGN|        BENIGN|
| 131|    15|   224|     0|     0|     0|     0|     0|     0|BENIGN|  BENIGN|        BENIGN|
| 936|     1|     0|    39|    16|     0|     0|     0|     0|BENIGN|  BENIGN|        BENIGN|
| 359|     0|   128|     0|     0|     0|     1|   227|     0|BENIGN|  BENIGN|        BENIGN|
| 369|    16|   108|     0|     0|     0|     0|     0|     0|BENIGN|  BENIGN|        BENIGN|
| 516|   192|     0|   125|     0|     0|     0|     0|     

### Data Cleansing and Processing

In [13]:
# renaming an existing label column for downstream feature engineering
attack_df = attack_df.withColumnRenamed("label", "string")
benign_df = benign_df.withColumnRenamed("label", "string")

# dropping irrelevant columns
attack_df = attack_df.drop('ID', 'category', 'specific_class')
benign_df = benign_df.drop('ID', 'category', 'specific_class')

# changing data types of feature columns
cols_to_cast = ['DATA_0','DATA_1','DATA_2','DATA_3','DATA_4','DATA_5','DATA_6','DATA_7']
for col_name in cols_to_cast:
    attack_df = attack_df.withColumn(col_name, col(col_name).cast("int"))
    benign_df = benign_df.withColumn(col_name, col(col_name).cast("int"))
                                              

In [8]:
# determining presence of null values
for col in attack_df.columns:
    null_count = attack_df.filter(attack_df[col].isNull()).count()
    print(f"Column '{col}' has {null_count} null values.")

for col in benign_df.columns:
    null_count = benign_df.filter(benign_df[col].isNull()).count()
    print(f"Column '{col}' has {null_count} null values.")

                                                                                

Column 'DATA_0' has 0 null values.


                                                                                

Column 'DATA_1' has 0 null values.


                                                                                

Column 'DATA_2' has 0 null values.


                                                                                

Column 'DATA_3' has 0 null values.


                                                                                

Column 'DATA_4' has 0 null values.


                                                                                

Column 'DATA_5' has 0 null values.
Column 'DATA_6' has 0 null values.
Column 'DATA_7' has 0 null values.


                                                                                

Column 'string' has 0 null values.


                                                                                

Column 'DATA_0' has 0 null values.


                                                                                

Column 'DATA_1' has 0 null values.


                                                                                

Column 'DATA_2' has 0 null values.


                                                                                

Column 'DATA_3' has 0 null values.


                                                                                

Column 'DATA_4' has 0 null values.


                                                                                

Column 'DATA_5' has 0 null values.


                                                                                

Column 'DATA_6' has 0 null values.


                                                                                

Column 'DATA_7' has 0 null values.




Column 'string' has 0 null values.


                                                                                

### Test and Train Dataset Creation

In [14]:
#create and combine train and test data sets
split = [0.7,0.3]
atk_dfs = attack_df.randomSplit(split)
benign_dfs = benign_df.randomSplit(split)

train_df = benign_dfs[0].union(atk_dfs[0])
test_df = benign_dfs[1].union(atk_dfs[1])


### Feature Engineering - Logistic Regression

In [None]:
#Pipeline creation
indexer = StringIndexer(inputCol = 'string', outputCol = 'label')

vectorAssembler = VectorAssembler(inputCols=['DATA_0','DATA_1','DATA_2','DATA_3','DATA_4','DATA_5','DATA_6','DATA_7'\
                                            ], outputCol='features')

lr = LogisticRegression(maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

pipeline = Pipeline(stages = [indexer, vectorAssembler, lr])


### Feature Engineering - Anomaly Detector

In [15]:
# transform label from string to integer value
indexer = StringIndexer(inputCol = 'string', outputCol = 'label')

train_data = indexer.fit(train_df).transform(train_df)
test_data = indexer.fit(test_df).transform(test_df)

# remove previous string label
train_data = train_data.drop('string')
test_data = test_data.drop('string')


                                                                                

In [16]:
pandas_df = train_data.toPandas()

# Convert Pandas DataFrame to NumPy array
numpy_array = pandas_df.to_numpy()

X_train = numpy_array[:, :-1]  # Input features (all columns except the last one)
y_train = numpy_array[:, -1]   # Target variable (last column)

                                                                                

In [17]:
pandas_df = test_data.toPandas()

# Convert Pandas DataFrame to NumPy array
numpy_array = pandas_df.to_numpy()

X_test = numpy_array[:, :-1]  # Input features (all columns except the last one)
y_test = numpy_array[:, -1]   # Target variable (last column)

                                                                                

In [18]:
X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)
y_train = y_train.astype(np.float32)
y_test = y_test.astype(np.float32)

In [26]:
print("X_train: ")
print("rows:", len(X_train))
print(X_train)
print("\n")
print("y_train: ")
print("rows:", len(y_train))
print(y_train)
print("\n")
print("X_test: ")
print("rows:", len(X_test))
print(X_test)
print("\n")
print("y_test: ")
print("rows:", len(y_test))
print(y_test)

X_train: 
rows: 984407
[[  0.   0.   0. ...   0.   0.   0.]
 [  0.   0.   0. ...   0.   0.   0.]
 [  0.   0.   0. ...   0.   0.   0.]
 ...
 [194.   1.   2. ...   5. 138.  34.]
 [194.   1.   2. ...   5. 138.  34.]
 [194.   1.   2. ...   5. 138.  34.]]


y_train: 
rows: 984407
[0. 0. 0. ... 1. 1. 1.]


X_test: 
rows: 423812
[[  0.   0.   0. ...   0.   0.   0.]
 [  0.   0.   0. ...   0.   0.   0.]
 [  0.   0.   0. ...   0.   0.   0.]
 ...
 [194.   1.   2. ...   5. 138.  34.]
 [194.   1.   2. ...   5. 138.  34.]
 [194.   1.   2. ...   5. 138.  34.]]


y_test: 
rows: 423812
[0. 0. 0. ... 1. 1. 1.]


In [31]:
spark.stop()