In [2]:
#Khai báo các thư viện để xử lý dữ liệu
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, max, min, sum, log, log10, log2
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, ChiSqSelector
from pyspark.ml.classification import LinearSVC, RandomForestClassifier, LogisticRegression, NaiveBayes
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import sqlite3
import pandas as pd
import ldata

In [4]:
#Load dữ liệu arff sang pandas Dataframe
file_path= '/root/NSLKDDProject/nslkdd/KDDTrain+.arff'
df = ldata.load_arff_to_pandasDF(file_path)
#print(df)
# Khởi tạo Spark session
spark = SparkSession.builder.appName("NSLKDDApp").getOrCreate()
#Load DataFrame
spark_df = spark.createDataFrame(df)
spark_df = spark_df.repartition(100)
#Show dữ liệu 
spark_df.show()
total_records = spark_df.count()
print(f"Tổng số lượng record: {total_records}")
#Đếm số lượng record của mỗi class
class_counts = spark_df.groupBy("class").count()
class_counts.show()

24/08/16 19:03:06 WARN TaskSetManager: Stage 15 contains a task of very large size (10740 KiB). The maximum recommended task size is 1000 KiB.
24/08/16 19:03:07 WARN TaskSetManager: Stage 18 contains a task of very large size (10740 KiB). The maximum recommended task size is 1000 KiB.


+--------+-------------+--------+----+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type| service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent| hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst

24/08/16 19:03:08 WARN TaskSetManager: Stage 24 contains a task of very large size (10740 KiB). The maximum recommended task size is 1000 KiB.

+-------+-----+
|  class|count|
+-------+-----+
| normal|67343|
|anomaly|58630|
+-------+-----+



                                                                                

In [None]:
# Mô tả dữ liệu (statstics cho mỗi cột)
# count: Kiểm tra giá trị không null
# mean: Giá trị trung bình
# stddev: Độ lệch chuẩn
# min: giá trị lớn nhất
# max: Giá trị nhỏ nhất
description = spark_df.describe()
description.show()
spark_df.printSchema()
column_types = spark_df.dtypes
# Print the data type    
type_counts = {}
for _, dtype in column_types:
    if dtype in type_counts:
        type_counts[dtype] += 1
    else:
        type_counts[dtype] = 1

# Đếm giá trị các trường dữ liệu
for dtype, count in type_counts.items():
    print(f"Data Type: {dtype}, Count: {count}")
print("")  
for column, dtype in column_types:
    if (dtype == 'string'):
        print(f"Column: {column}, Type: {dtype}")

In [None]:
#Xem xét một số trường dữ liệu 
#protocol_type: giao thức trong lưu lượng mạng
protocol_counts = spark_df.groupBy("protocol_type").count()
protocol_counts.show()
#service: dịch vụ truyền tải trong lưu lượng mạng
service_counts = spark_df.groupBy("service").count()
service_counts.show()
#flag: trạng thái cờ trong lưu lượng mạng theo mô hình TCP/IP
flag_counts = spark_df.groupBy("flag").count()
flag_counts.show()
#service_percentage = service_counts.withColumn("Percentage", (col("service") / total_records) * 100)
#service_percentage.show()

#Kiểm tra giá trị null trên dữ liệu
#null_counts = {}
#for checkcol in spark_df.columns:
#    null_count = spark_df.filter(spark_df[checkcol].isNull()).count()
#    null_counts[checkcol] = null_count
#print(null_counts)

In [None]:
# THỰC HIỆN PREPROCESSING DATA BAO GỒM
#0. Drop cột
spark_df = spark_df.drop("num_outbound_cmds")
#1. One hot encoding data string dựa trên tần suất xuất hiện của value trong data
translist = ["protocol_type", "service", "flag"]
translist_temp = ["protocol_type_trans", "service_trans", "flag_trans"]
for i in range(len(translist)):
    indexer = StringIndexer(inputCol=translist[i], outputCol=translist_temp[i])
    spark_df = indexer.fit(spark_df).transform(spark_df)
    spark_df = spark_df.drop(translist[i]).withColumnRenamed(translist_temp[i], translist[i])

#2. Chuyển đổi các cột string cast về double
castlistvalue = ["land", "logged_in", "is_host_login", "is_guest_login"]
for i in range (len(castlistvalue)):
    spark_df = spark_df.withColumn(castlistvalue[i], col(castlistvalue[i]).cast(DoubleType()))

#3 Logarithmic scaling method dành cho các trường dữ liệu có độ chênh lệch lớn giữa max và min value
#loglist = ["duration", "src_bytes", "dst_bytes", "num_compromised","num_root"]
#loglist_temp = ["duration_trans", "src_bytes_trans", "dst_bytes_trans", "num_compromised_trans","num_root_trans"]
loglist = ["duration", "src_bytes", "dst_bytes"]
loglist_temp = ["duration_trans", "src_bytes_trans", "dst_bytes_trans"]
for i in range(len(loglist)):
    #spark_df = spark_df.withColumn(loglist_temp[i], log2(loglist[i]))
    spark_df = spark_df.withColumn(loglist_temp[i],when(col(loglist[i]) > 0, log(loglist[i])).otherwise(0))
    spark_df = spark_df.drop(loglist[i]).withColumnRenamed(loglist_temp[i], loglist[i])

#4 Chuyển đổi label data thành 0,1 
class_name = {"normal": "0", "anomaly": "1"}
spark_df = spark_df.replace(class_name, subset=["class"])
spark_df = spark_df.withColumn("class", col("class").cast(DoubleType()))
#class_df = spark_df.select("class")
#spark_df = spark_df.drop("class")
#5 Normalize Data
print("Sau khi scalling")
spark_df.show()
columntypes = spark_df.dtypes
for column, dtype in columntypes:
    sum_value = spark_df.agg(sum(col(column))).first()[0]
    if (sum_value == 0):
        print(column)
    elif (column == "class"):
        continue
    elif (column in translist):
        continue
    else:
        min_value = spark_df.select(min(col(column))).first()[0]
        max_value = spark_df.select(max(col(column))).first()[0]
        spark_df = spark_df.withColumn(column,(col(column) - min_value) / (max_value - min_value))
spark_df.show()

In [None]:
#Kiểm tra dữ liệu sau khi thực hiện tiền xử lý trước khi đưa vào học máy
#spark_df.show()
null_counts = {}
for checkcol in spark_df.columns:
    null_count = spark_df.filter(spark_df[checkcol].isNull()).count()
    null_counts[checkcol] = null_count
print(null_counts)
description_after = spark_df.describe()
description_after.show()

In [None]:
column_types1 = spark_df.dtypes
inputColumns = []
#outputCols = []
for column, dtype in columntypes:
    if (column == "class"):
        continue
    else:
        inputColumns.append(column)
print(inputColumns)
#print(outputCols)

assembler = VectorAssembler(inputCols=inputColumns, outputCol= "features")

#df_assembled = assembler.transform(spark_df).select("class", "features")
df_assembled = assembler.transform(spark_df)
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selected_features", labelCol="class")
selectorModel = selector.fit(df_assembled)
df_selected = selectorModel.transform(df_assembled)
# Prepare DataFrame for model training
#df_final = df_selected.select("selected_features", "class")
#df_final.show()

#df_assembled = assembler.transform(spark_df).select("class", "features")
##df_assembled = assembler.transform(spark_df)
##df_final = df_assembled.select("features", "class")
#df_final.show()
#train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)
train_data, test_data = spark_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
nb = NaiveBayes(featuresCol=inputColumns, labelCol="class")
paramGrid = (ParamGridBuilder()
             .addGrid(nb.smoothing, [0.0, 0.5, 1.0])
             .addGrid(nb.modelType, ["bernoulli"])
             .build())
evaluator = MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)
cvModel = crossval.fit(train_data)

# Make predictions on test set
predictions = cvModel.transform(test_data)
predictions.select(inputColumns, "class", "prediction", "probability").show()

# Evaluate best model
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu mau = {accuracy}")

In [None]:
lr = LogisticRegression(featuresCol="selected_features", labelCol="class")

# Optional: Hyperparameter tuning
#paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
#crossval = CrossValidator(estimator=lr,
#                          estimatorParamMaps=paramGrid,
#                          evaluator=BinaryClassificationEvaluator(),
#                          numFolds=5)

# Train model
cVmodel = lr.fit(train_data)
predictions = cVmodel.transform(test_data)
predictions.select("selected_features", "class", "prediction", "probability").show()

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="class")
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu mau = {accuracy}")

In [None]:
rf = RandomForestClassifier(featuresCol="features", labelCol="class")

# Optional: Hyperparameter tuning
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]).build()
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

# Train model
cVmodel = crossval.fit(train_data)

# Make predictions
predictions = cVmodel.transform(test_data)
predictions.select("class", "features", "prediction").show()
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="class")
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu mau = {accuracy}")

In [None]:
#SVM
svm = LinearSVC(maxIter=10, regParam=0.1, labelCol="class", featuresCol="features")
cvModel = svm.fit(train_data)
predictions = cvModel.transform(test_data)
predictions.select("class", "features", "prediction").show()
evaluator = MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu mau = {accuracy}")

In [None]:
#Training SVM model
#Gộp các cột thành 1 vector cột
column_types1 = spark_df.dtypes
inputColumns = []
#outputCols = []
for column, dtype in columntypes:
    if (column == "class"):
        continue
    else:
        inputColumns.append(column)
print(inputColumns)
#print(outputCols)

assembler = VectorAssembler(inputCols=inputColumns, outputCol= "features")
#df_assembled = assembler.transform(spark_df).select("class", "features")
df_assembled = assembler.transform(spark_df)
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selected_features", labelCol="class")
selectorModel = selector.fit(df_assembled)
df_selected = selectorModel.transform(df_assembled)
# Prepare DataFrame for model training
df_final = df_selected.select("selected_features", "class")
df_final.show()

train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=1234)

#svm = LinearSVC(maxIter=10, regParam=0.1, labelCol="class", featuresCol="selected_features")
#cvModel = svm.fit(train_data)
#predictions = cvModel.transform(test_data)
svm = LinearSVC(labelCol="class", featuresCol="selected_features")
paramGrid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(svm.maxIter, [10, 50, 100]) \
    .build()
evaluator = BinaryClassificationEvaluator(labelCol="class", rawPredictionCol="rawPrediction")
crossval = CrossValidator(estimator=svm,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
cvModel = crossval.fit(train_data)
#cvModel = crossval.fit(df_assembled)
predictions = cvModel.transform(test_data)

#predictions.select("class", "features", "prediction").show()
predictions.select("class", "selected_features", "prediction").show()
evaluator = MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu mau = {accuracy}")


In [None]:
#Sau khi training model xong, thuc hien tren data tu file Test
file_path= '/root/NSLKDDProject/nslkdd/KDDTest+.arff'
df_test = ldata.load_arff_to_pandasDF(file_path)
#Load DataFrame
spark_df_test = spark.createDataFrame(df_test)
#Show dữ liệu 
spark_df_test.show()

In [None]:
# THỰC HIỆN PREPROCESSING DATA BAO GỒM
#0. Drop cột
spark_df_test = spark_df_test.drop("num_outbound_cmds")
#1. One hot encoding data string dựa trên tần suất xuất hiện của value trong data
translist = ["protocol_type", "service", "flag"]
translist_temp = ["protocol_type_trans", "service_trans", "flag_trans"]
for i in range(len(translist)):
    indexer1 = StringIndexer(inputCol=translist[i], outputCol=translist_temp[i])
    spark_df_test = indexer1.fit(spark_df_test).transform(spark_df_test)
    spark_df_test = spark_df_test.drop(translist[i]).withColumnRenamed(translist_temp[i], translist[i])

#2. Chuyển đổi các cột string cast về double
castlistvalue = ["land", "logged_in", "is_host_login", "is_guest_login"]
for i in range (len(castlistvalue)):
    spark_df_test = spark_df_test.withColumn(castlistvalue[i], col(castlistvalue[i]).cast(DoubleType()))
#3 Tách label data ra Dataframe mới
class_name = {"normal": "0", "anomaly": "1"}
spark_df_test = spark_df_test.replace(class_name, subset=["class"])
spark_df_test = spark_df_test.withColumn("class", col("class").cast(DoubleType()))

loglist = ["duration", "src_bytes", "dst_bytes"]
loglist_temp = ["duration_trans", "src_bytes_trans", "dst_bytes_trans"]
for i in range(len(loglist)):
    #spark_df = spark_df.withColumn(loglist_temp[i], log2(loglist[i]))
    spark_df_test = spark_df_test.withColumn(loglist_temp[i],when(col(loglist[i]) > 0, log(loglist[i])).otherwise(0))
    spark_df_test = spark_df_test.drop(loglist[i]).withColumnRenamed(loglist_temp[i], loglist[i])
#class_df = spark_df.select("class")
#spark_df = spark_df.drop("class")
#4 Normalize Data
columntypes = spark_df_test.dtypes
for column, dtype in columntypes:
    sum_value = spark_df_test.agg(sum(col(column))).first()[0]
    if (sum_value == 0):
        print(column)
    elif (column == "class"):
        continue
    elif (column in translist):
        continue
    else:
        min_value = spark_df_test.select(min(col(column))).first()[0]
        max_value = spark_df_test.select(max(col(column))).first()[0]
        spark_df_test = spark_df_test.withColumn(column,(col(column) - min_value) / (max_value - min_value))
spark_df_test.show()


In [None]:
des = spark_df_test.describe()
des.show()
column_types1 = spark_df_test.dtypes
inputColumns = []
#outputCols = []
for column, dtype in columntypes:
    if (column == "class"):
        continue
    else:
        inputColumns.append(column)
print(inputColumns)
#print(outputCols)

#df_test_assembled = assembler.transform(spark_df_test).select("class", "features")
df_test_assembled = assembler.transform(spark_df_test)
df_test_final = df_test_assembled.select("features", "class")

predictions = cvModel.transform(df_test_final)
#predictions = cvModel.transform(df_test_assembled)
#predictions.select("class", "features", "prediction").show()
evaluator = MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu test = {accuracy}")
truep = predictions.filter((col("class") == 1) & (col("prediction") == 1)).count()
truen = predictions.filter((col("class") == 0) & (col("prediction") == 0)).count()
falsep = predictions.filter((col("class") == 0) & (col("prediction") == 1)).count()
falsen = predictions.filter((col("class") == 1) & (col("prediction") == 0)).count()
print(f"True Positives: {truep}")
print(f"True Negatives: {truen}")
print(f"False Positives: {falsep}")
print(f"False Negatives: {falsen}")

In [None]:
des = spark_df_test.describe()
des.show()
column_types1 = spark_df_test.dtypes
inputColumns = []
#outputCols = []
for column, dtype in columntypes:
    if (column == "class"):
        continue
    else:
        inputColumns.append(column)
print(inputColumns)
#print(outputCols)

#df_test_assembled = assembler.transform(spark_df_test).select("class", "features")
df_test_assembled = assembler.transform(spark_df_test)
df_test_selected = selectorModel.transform(df_test_assembled)

# Prepare DataFrame for making predictions
df_test_final = df_test_selected.select("selected_features", "class")

predictions = cVmodel.transform(df_test_final)
#predictions = cvModel.transform(df_test_assembled)
#predictions.select("class", "features", "prediction").show()
evaluator = MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Do chinh xac tren tap du lieu test = {accuracy}")
truep = predictions.filter((col("class") == 1) & (col("prediction") == 1)).count()
truen = predictions.filter((col("class") == 0) & (col("prediction") == 0)).count()
falsep = predictions.filter((col("class") == 0) & (col("prediction") == 1)).count()
falsen = predictions.filter((col("class") == 1) & (col("prediction") == 0)).count()
print(f"True Positives: {truep}")
print(f"True Negatives: {truen}")
print(f"False Positives: {falsep}")
print(f"False Negatives: {falsen}")