In [1]:
from pathlib import Path
import numpy as np
import glob

from pyspark.sql import SparkSession
# from pyspark.conf import SparkConf
from pyspark.sql.functions import col, sum, count, when
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

from skrebate import ReliefF

In [2]:
# Initialize Spark Session
spark: SparkSession = SparkSession.builder\
    .appName("pyspark-notebook")\
    .master("spark://127.0.0.1:7077")\
    .config("spark.driver.host", "host.docker.internal") \
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.memory", "2g")\
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "2")\
    .config("spark.hadoop.fs.s3a.block.size", "33554432") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.attempts.maximum", "0") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

Spark Version: 3.5.1


In [3]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
result = rdd.sum()
print(f"Tổng: {result}")

Tổng: 15


In [3]:
volume_path = "s3a://mybucket/cicids2017" 

local_path = "./CICIDS2017"
all_files = glob.glob(local_path + "/*.csv")

volume_files = [volume_path + file.removeprefix(local_path) for file in all_files]
print(volume_files)

['s3a://mybucket/cicids2017\\Friday-WorkingHours-Afternoon-DDos.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Friday-WorkingHours-Afternoon-PortScan.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Friday-WorkingHours-Morning.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Monday-WorkingHours.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Thursday-WorkingHours-Afternoon-Infilteration.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Thursday-WorkingHours-Morning-WebAttacks.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Tuesday-WorkingHours.pcap_ISCX.csv', 's3a://mybucket/cicids2017\\Wednesday-workingHours.pcap_ISCX.csv']


In [4]:
df = spark.read \
    .option("nullValue", "NA") \
    .option("emptyValue", "unknown").csv(volume_files, header=True, inferSchema=True)

In [12]:
df.show(10)

+-----------------+--------------+------------------+-----------------------+---------------------------+----------------------------+----------------------+----------------------+-----------------------+----------------------+---------------------+----------------------+-----------------------+----------------------+----------------+----------------+--------------+--------------+-------------+-------------+-------------+-------------+--------------+------------+------------+-------------+--------------+--------------+------------+------------+-------------+--------------+--------------+--------------+--------------------+------------------+----------------+---------------+------------------+------------------+-------------------+------------------+-----------------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------------+--------------------+---------------------+---------------------+----------

In [5]:
df.columns

[' Destination Port',
 ' Flow Duration',
 ' Total Fwd Packets',
 ' Total Backward Packets',
 'Total Length of Fwd Packets',
 ' Total Length of Bwd Packets',
 ' Fwd Packet Length Max',
 ' Fwd Packet Length Min',
 ' Fwd Packet Length Mean',
 ' Fwd Packet Length Std',
 'Bwd Packet Length Max',
 ' Bwd Packet Length Min',
 ' Bwd Packet Length Mean',
 ' Bwd Packet Length Std',
 'Flow Bytes/s',
 ' Flow Packets/s',
 ' Flow IAT Mean',
 ' Flow IAT Std',
 ' Flow IAT Max',
 ' Flow IAT Min',
 'Fwd IAT Total',
 ' Fwd IAT Mean',
 ' Fwd IAT Std',
 ' Fwd IAT Max',
 ' Fwd IAT Min',
 'Bwd IAT Total',
 ' Bwd IAT Mean',
 ' Bwd IAT Std',
 ' Bwd IAT Max',
 ' Bwd IAT Min',
 'Fwd PSH Flags',
 ' Bwd PSH Flags',
 ' Fwd URG Flags',
 ' Bwd URG Flags',
 ' Fwd Header Length34',
 ' Bwd Header Length',
 'Fwd Packets/s',
 ' Bwd Packets/s',
 ' Min Packet Length',
 ' Max Packet Length',
 ' Packet Length Mean',
 ' Packet Length Std',
 ' Packet Length Variance',
 'FIN Flag Count',
 ' SYN Flag Count',
 ' RST Flag Count',
 '

In [6]:
null_counts = df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
])

null_counts.show()

+-----------------+--------------+------------------+-----------------------+---------------------------+----------------------------+----------------------+----------------------+-----------------------+----------------------+---------------------+----------------------+-----------------------+----------------------+------------+---------------+--------------+-------------+-------------+-------------+-------------+-------------+------------+------------+------------+-------------+-------------+------------+------------+------------+-------------+--------------+--------------+--------------+--------------------+------------------+-------------+--------------+------------------+------------------+-------------------+------------------+-----------------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------------+--------------------+---------------------+---------------------+--------------------+----