In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace

from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder\
	.master("local[*]")\
    .config("spark.driver.memory", "9g")\
    .appName("Binary_Classifier")\
    .getOrCreate()

24/09/08 13:01:47 WARN Utils: Your hostname, abraham resolves to a loopback address: 127.0.1.1; using 192.168.0.145 instead (on interface eno2)
24/09/08 13:01:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/08 13:01:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/08 13:01:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/08 13:01:48 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
file_path = "/user/ids_analysis/UNSW-NB15.csv"
data = spark.read.csv(file_path, inferSchema=True)
data = data.toDF(
    'srcip',
    'sport',
    'dstip',
    'dsport',
    'proto',
    'state',
    'dur',
    'sbytes',
    'dbytes',
    'sttl',
    'dttl',
    'sloss',
    'dloss',
    'service',
    'Sload',
    'Dload',
    'Spkts',
    'Dpkts',
    'swin',
    'dwin',
    'stcpb',
    'dtcpb',
    'smeansz',
    'dmeansz',
    'trans_depth',
    'res_bdy_len',
    'Sjit',
    'Djit',
    'Stime',
    'Ltime',
    'Sintpkt',
    'Dintpkt',
    'tcprtt',
    'synack',
    'ackdat',
    'is_sm_ips_ports',
    'ct_state_ttl',
    'ct_flw_http_mthd',
    'is_ftp_login',
    'ct_ftp_cmd',
    'ct_srv_src',
    'ct_srv_dst',
    'ct_dst_ltm',
    'ct_src_ ltm',
    'ct_src_dport_ltm',
    'ct_dst_sport_ltm',
    'ct_dst_src_ltm',
    'attack_cat',
    'Label'
)

                                                                                

In [4]:
clean_data = data.withColumn("attack_cat", regexp_replace("attack_cat", "Shellcode ", "Shellcode"))
clean_data = clean_data.withColumn("attack_cat", regexp_replace("attack_cat", "Reconnaissance ", "Reconnaissance"))
clean_data = clean_data.withColumn("attack_cat", regexp_replace("attack_cat", "Fuzzers ", "Fuzzers"))

In [5]:
clean_data.show(10)

24/09/08 13:01:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----------+-----+-------------+------+-----+-----+------------+------+------+----+----+-----+-----+--------+---------+---------+-----+-----+----+----+----------+----------+-------+-------+-----------+-----------+---------+---------+----------+----------+------------+------------+------------+------------+------------+---------------+------------+----------------+------------+----------+----------+----------+----------+-----------+----------------+----------------+--------------+----------+-----+
|     srcip|sport|        dstip|dsport|proto|state|         dur|sbytes|dbytes|sttl|dttl|sloss|dloss| service|    Sload|    Dload|Spkts|Dpkts|swin|dwin|     stcpb|     dtcpb|smeansz|dmeansz|trans_depth|res_bdy_len|     Sjit|     Djit|     Stime|     Ltime|     Sintpkt|     Dintpkt|      tcprtt|      synack|      ackda

In [6]:
bin_data = clean_data.drop("attack_cat")

In [7]:
bin_data.show(10)

+----------+-----+-------------+------+-----+-----+------------+------+------+----+----+-----+-----+--------+---------+---------+-----+-----+----+----+----------+----------+-------+-------+-----------+-----------+---------+---------+----------+----------+------------+------------+------------+------------+------------+---------------+------------+----------------+------------+----------+----------+----------+----------+-----------+----------------+----------------+--------------+-----+
|     srcip|sport|        dstip|dsport|proto|state|         dur|sbytes|dbytes|sttl|dttl|sloss|dloss| service|    Sload|    Dload|Spkts|Dpkts|swin|dwin|     stcpb|     dtcpb|smeansz|dmeansz|trans_depth|res_bdy_len|     Sjit|     Djit|     Stime|     Ltime|     Sintpkt|     Dintpkt|      tcprtt|      synack|      ackdat|is_sm_ips_ports|ct_state_ttl|ct_flw_http_mthd|is_ftp_login|ct_ftp_cmd|ct_srv_src|ct_srv_dst|ct_dst_ltm|ct_src_ ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|Label|
+----------+-----+

In [8]:
# Get nominal features
nominal_features = []
for pair in bin_data.dtypes:
    if pair[1] == "string":
        nominal_features.append(pair[0])


# use indexers to convert catgorical cols
pipeline = Pipeline(stages=[
    StringIndexer(inputCol=column, outputCol=column+"_index") for column in nominal_features
    ])

indexed_data = pipeline.fit(bin_data).transform(bin_data)


                                                                                

In [9]:
# get the numerical features columns
feature_cols = [feature[0] for feature in bin_data.dtypes if feature[0] not in nominal_features and feature[0] != "Label"]

# Add indexed nominal columns
indexed_cols = [column+"_index" for column in nominal_features]
all_features = feature_cols + indexed_cols

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=all_features, outputCol="features")
assembled_data = assembler.transform(indexed_data)

In [10]:
# define the label column
assembled_data = assembled_data.withColumn("label", col("Label").cast("double"))

In [11]:
# Split data into train and test sets
train, test = assembled_data.randomSplit([0.8, 0.2], seed=42)

In [12]:
# train model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train)

                                                                                

In [13]:
# make predictions
predictions = model.transform(test)

In [14]:
# evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
roc_auc = evaluator.evaluate(predictions)
print(f"AUC: {roc_auc}")

                                                                                

AUC: 0.9988134189998723


In [15]:
# evaluate the model
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test F1 Score: {accuracy}")



Test F1 Score: 0.9884624894774023


                                                                                