In [120]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F


In [121]:
# Create a Spark session
spark = SparkSession.builder.appName("OST2").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

# Load the CSV file into a PySpark DataFrame
file_path = "../kafka/dataset/SWat_merged.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the schema and the first few rows of the DataFrame
df.printSchema()
df.show(5, truncate=False)

[Stage 4033:>                                                       (0 + 8) / 8]

root
 |-- Timestamp: string (nullable = true)
 |-- FIT101: double (nullable = true)
 |-- LIT101: double (nullable = true)
 |-- MV101: integer (nullable = true)
 |-- P101: integer (nullable = true)
 |-- P102: integer (nullable = true)
 |-- AIT201: double (nullable = true)
 |-- AIT202: double (nullable = true)
 |-- AIT203: double (nullable = true)
 |-- FIT201: double (nullable = true)
 |-- MV201: integer (nullable = true)
 |-- P201: integer (nullable = true)
 |-- P202: integer (nullable = true)
 |-- P203: integer (nullable = true)
 |-- P204: integer (nullable = true)
 |-- P205: integer (nullable = true)
 |-- P206: integer (nullable = true)
 |-- DPIT301: double (nullable = true)
 |-- FIT301: double (nullable = true)
 |-- LIT301: double (nullable = true)
 |-- MV301: integer (nullable = true)
 |-- MV302: integer (nullable = true)
 |-- MV303: integer (nullable = true)
 |-- MV304: integer (nullable = true)
 |-- P301: integer (nullable = true)
 |-- P302: integer (nullable = true)
 |-- AIT401: 

                                                                                

In [122]:
# handling missing values
df = df.na.drop()


In [123]:
# display and drop columns with one distinct value
def display_and_drop_single_valued_columns(df):
    single_valued_columns = []
    
    for col_name in df.columns:
        num_distinct_values = df.select(col_name).distinct().count()
        if num_distinct_values == 1:
            print(f"Column '{col_name}' has only one distinct value and will be dropped.")
            single_valued_columns.append(col_name)

    df = df.drop(*single_valued_columns)
    
    return df

# Display and drop columns with one distinct value
df = display_and_drop_single_valued_columns(df)


                                                                                

Column 'P202' has only one distinct value and will be dropped.


                                                                                

Column 'P401' has only one distinct value and will be dropped.


                                                                                

Column 'P404' has only one distinct value and will be dropped.


                                                                                

Column 'P502' has only one distinct value and will be dropped.


                                                                                

Column 'P601' has only one distinct value and will be dropped.


                                                                                

Column 'P603' has only one distinct value and will be dropped.


                                                                                

In [124]:
# Print the number of columns left
num_columns_left = len(df.columns)
print(f"\nNumber of columns left in the DataFrame: {num_columns_left}")


Number of columns left in the DataFrame: 47


In [125]:
# Convert Timestamp to TimestampType
df = df.withColumn("Timestamp", unix_timestamp(col("Timestamp"), "dd/MM/yyyy h:mm:ss a").cast(TimestampType()))

In [126]:
df.printSchema()
df.show(2, truncate=False)

root
 |-- Timestamp: timestamp (nullable = true)
 |-- FIT101: double (nullable = true)
 |-- LIT101: double (nullable = true)
 |-- MV101: integer (nullable = true)
 |-- P101: integer (nullable = true)
 |-- P102: integer (nullable = true)
 |-- AIT201: double (nullable = true)
 |-- AIT202: double (nullable = true)
 |-- AIT203: double (nullable = true)
 |-- FIT201: double (nullable = true)
 |-- MV201: integer (nullable = true)
 |-- P201: integer (nullable = true)
 |-- P203: integer (nullable = true)
 |-- P204: integer (nullable = true)
 |-- P205: integer (nullable = true)
 |-- P206: integer (nullable = true)
 |-- DPIT301: double (nullable = true)
 |-- FIT301: double (nullable = true)
 |-- LIT301: double (nullable = true)
 |-- MV301: integer (nullable = true)
 |-- MV302: integer (nullable = true)
 |-- MV303: integer (nullable = true)
 |-- MV304: integer (nullable = true)
 |-- P301: integer (nullable = true)
 |-- P302: integer (nullable = true)
 |-- AIT401: double (nullable = true)
 |-- AIT4

In [127]:
# Set the "Timestamp" column as the index
df = df.withColumnRenamed("Timestamp", "index").withColumn("Timestamp", col("index")).drop("index")

# Show the updated DataFrame with "Timestamp" as the index
df.show(2, truncate=False)

+--------+--------+-----+----+----+--------+-------+-------+--------+-----+----+----+----+----+----+--------+--------+--------+-----+-----+-----+-----+----+----+------+--------+------+--------+----+----+-----+--------+--------+--------+--------+-----------+-----------+-----------+------+----+--------+------+--------+----------+----+-------------+-------------------+
|FIT101  |LIT101  |MV101|P101|P102|AIT201  |AIT202 |AIT203 |FIT201  |MV201|P201|P203|P204|P205|P206|DPIT301 |FIT301  |LIT301  |MV301|MV302|MV303|MV304|P301|P302|AIT401|AIT402  |FIT401|LIT401  |P402|P403|UV401|AIT501  |AIT502  |AIT503  |AIT504  |FIT501     |FIT502     |FIT503     |FIT504|P501|PIT501  |PIT502|PIT503  |FIT601    |P602|Normal/Attack|Timestamp          |
+--------+--------+-----+----+----+--------+-------+-------+--------+-----+----+----+----+----+----+--------+--------+--------+-----+-----+-----+-----+----+----+------+--------+------+--------+----+----+-----+--------+--------+--------+--------+-----------+-----

In [128]:
# Function to dynamically divide features into numerical and categorical columns
def divide_features(df):
    num_col = []
    cat_col = []

    # Exclude "Timestamp" column (which is now the index) and "Normal/Attack" column
    for column in df.columns:
        if column not in ["Timestamp", "Normal/Attack"]:
            unique_values = df.select(column).distinct().count()
            if unique_values > 10:
                num_col.append(column)
            else:
                cat_col.append(column)

    return num_col, cat_col

# Divide features into numerical and categorical columns
numerical_columns, categorical_columns = divide_features(df)

                                                                                

In [129]:

# Display numerical columns

print(len(numerical_columns),numerical_columns)
# Display categorical columns

print(len(categorical_columns),categorical_columns)


25 ['FIT101', 'LIT101', 'AIT201', 'AIT202', 'AIT203', 'FIT201', 'DPIT301', 'FIT301', 'LIT301', 'AIT401', 'AIT402', 'FIT401', 'LIT401', 'AIT501', 'AIT502', 'AIT503', 'AIT504', 'FIT501', 'FIT502', 'FIT503', 'FIT504', 'PIT501', 'PIT502', 'PIT503', 'FIT601']
20 ['MV101', 'P101', 'P102', 'MV201', 'P201', 'P203', 'P204', 'P205', 'P206', 'MV301', 'MV302', 'MV303', 'MV304', 'P301', 'P302', 'P402', 'P403', 'UV401', 'P501', 'P602']


In [130]:
# Check distinct values in the "Normal/Attack" column
distinct_values = df.select("Normal/Attack").distinct().collect()

# Print the distinct values
print("Distinct values in the 'Normal/Attack' column:")
for row in distinct_values:
    print(row["Normal/Attack"])

# Check if there are exactly two distinct values
num_distinct_values = len(distinct_values)
if num_distinct_values == 2:
    print("\nThere are exactly two distinct values in the 'Normal/Attack' column.")
else:
    print(f"\nThe 'Normal/Attack' column does not have exactly two distinct values. Found: {num_distinct_values} distinct values.")


Distinct values in the 'Normal/Attack' column:
Normal
Attack

There are exactly two distinct values in the 'Normal/Attack' column.


                                                                                

In [131]:
df.show(2, truncate=False)

+--------+--------+-----+----+----+--------+-------+-------+--------+-----+----+----+----+----+----+--------+--------+--------+-----+-----+-----+-----+----+----+------+--------+------+--------+----+----+-----+--------+--------+--------+--------+-----------+-----------+-----------+------+----+--------+------+--------+----------+----+-------------+-------------------+
|FIT101  |LIT101  |MV101|P101|P102|AIT201  |AIT202 |AIT203 |FIT201  |MV201|P201|P203|P204|P205|P206|DPIT301 |FIT301  |LIT301  |MV301|MV302|MV303|MV304|P301|P302|AIT401|AIT402  |FIT401|LIT401  |P402|P403|UV401|AIT501  |AIT502  |AIT503  |AIT504  |FIT501     |FIT502     |FIT503     |FIT504|P501|PIT501  |PIT502|PIT503  |FIT601    |P602|Normal/Attack|Timestamp          |
+--------+--------+-----+----+----+--------+-------+-------+--------+-----+----+----+----+----+----+--------+--------+--------+-----+-----+-----+-----+----+----+------+--------+------+--------+----+----+-----+--------+--------+--------+--------+-----------+-----

In [134]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, OneHotEncoder, StringIndexer

# Assemble numerical features into a vector
assembler_num = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features")
df = assembler_num.transform(df)

# Initialize MinMaxScaler
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")

# Fit and transform the data
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

# Drop the original numerical features column
df = df.drop(*numerical_columns)

# OneHotEncode categorical features
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_columns]
encoder = [OneHotEncoder(inputCols=[indexer.getOutputCol()], outputCols=[indexer.getOutputCol() + "_encoded"]) for indexer in indexers]

# Create a pipeline for the encoding process
pipeline = Pipeline(stages=indexers + encoder)
df = pipeline.fit(df).transform(df)

# Drop the original categorical columns
df = df.drop(*categorical_columns)

# Show the updated DataFrame
df.show(5, truncate=False)


                                                                                

+-------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+----------+----------+-----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+-----------+----------+----------+-------------------+------------------+------------------+-------------------+-----

In [136]:
df.printSchema()

root
 |-- Normal/Attack: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- numerical_features: vector (nullable = true)
 |-- scaled_numerical_features: vector (nullable = true)
 |-- MV101_index: double (nullable = false)
 |-- P101_index: double (nullable = false)
 |-- P102_index: double (nullable = false)
 |-- MV201_index: double (nullable = false)
 |-- P201_index: double (nullable = false)
 |-- P203_index: double (nullable = false)
 |-- P204_index: double (nullable = false)
 |-- P205_index: double (nullable = false)
 |-- P206_index: double (nullable = false)
 |-- MV301_index: double (nullable = false)
 |-- MV302_index: double (nullable = false)
 |-- MV303_index: double (nullable = false)
 |-- MV304_index: double (nullable = false)
 |-- P301_index: double (nullable = false)
 |-- P302_index: double (nullable = false)
 |-- P402_index: double (nullable = false)
 |-- P403_index: double (nullable = false)
 |-- UV401_index: double (nullable = false)
 |-- P501_index: d

In [135]:
df.show(1, truncate=False)

+-------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+----------+----------+-----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+-----------+----------+----------+-------------------+------------------+------------------+-------------------+------

In [133]:
# # Reorder columns to have "Timestamp" as the first column
# column_order = ["Timestamp"] + [col_name for col_name in df.columns if col_name != "Timestamp"]
# df2 = df.select(column_order)

# # Show the updated DataFrame with "Timestamp" as the index
# df2.show(2, truncate=False)
