In [1]:
import findspark
findspark.init()
findspark.find()

import requests
import feedparser
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime
import pytz
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

# Initialize Spark
appName = "Project - Machine Learning Techniques on MQTT"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default
sc = SparkContext.getOrCreate(conf=conf)
# You need to create SQL Context to conduct some database operations like what we will
sqlContext = SQLContext(sc)
# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [2]:
train_path = r"C:\Users\nickc\OneDrive\Documents\GitHub\course-project-option-2-nick1117\DataFolder\archive\Data\FINAL_CSV\train70_augmented.csv"
train_data = spark.read.csv(train_path, header=True, inferSchema=True)

test_path = r"C:\Users\nickc\OneDrive\Documents\GitHub\course-project-option-2-nick1117\DataFolder\archive\Data\FINAL_CSV\test30_augmented.csv"
test_data = spark.read.csv(test_path, header=True, inferSchema=True)

In [3]:
from pyspark.sql.functions import lit

train_df = train_data.withColumn("dataset_type", lit("train"))
test_df = test_data.withColumn("dataset_type", lit("test"))
df = train_df.union(test_df)

In [4]:
new_column_names = [
    "tcp_flags",
    "tcp_time_delta",
    "tcp_length",
    "mqtt_connack_flags",
    "mqtt_connack_reserved_flags",
    "mqtt_connack_session_present",
    "mqtt_connack_return_code",
    "mqtt_connect_clean_session_flag",
    "mqtt_connect_password_flag",
    "mqtt_connect_qos_level",
    "mqtt_connect_reserved_flag",
    "mqtt_connect_retain_flag",
    "mqtt_connect_username_flag",
    "mqtt_connect_will_flag",
    "mqtt_connect_flags",
    "mqtt_duplicate_flag",
    "mqtt_header_flags",
    "mqtt_keep_alive_interval",
    "mqtt_length",
    "mqtt_message",
    "mqtt_message_id",
    "mqtt_message_type",
    "mqtt_protocol_length",
    "mqtt_protocol_name",
    "mqtt_qos_level",
    "mqtt_retain_flag",
    "mqtt_subscription_qos_level",
    "mqtt_subscription_ack_qos_level",
    "mqtt_version",
    "mqtt_will_message",
    "mqtt_will_message_length",
    "mqtt_will_topic",
    "mqtt_will_topic_length",
    "target_class",
    "dataset_type"
]

df = df.toDF(*new_column_names)

train_df = train_df.toDF(*new_column_names)
test_df = test_df.toDF(*new_column_names)
train_df = train_df.drop("dataset_type")
test_df = test_df.drop("dataset_type")

In [9]:
from pyspark.ml import Pipeline, Transformer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.sql import DataFrame

col_names = new_column_names
    # Define the columns
binary_cols = [
    "mqtt_connack_reserved_flags",
    "mqtt_connack_session_present",
    "mqtt_connect_clean_session_flag",
    "mqtt_connect_password_flag",
    "mqtt_connect_reserved_flag",
    "mqtt_connect_retain_flag",
    "mqtt_connect_username_flag",
    "mqtt_connect_will_flag",
    "mqtt_duplicate_flag",
    "mqtt_retain_flag",
    "mqtt_connect_qos_level"
]

nominal_cols = [
    "tcp_flags",
    "mqtt_connack_flags",
    "mqtt_connack_return_code",
    "mqtt_connect_flags",
    "mqtt_header_flags",
    "mqtt_message_type",
    "mqtt_qos_level",
    "mqtt_version",
    "mqtt_message",
    'mqtt_protocol_name'
]

continuous_cols = [
    "tcp_time_delta",
    "tcp_length",
    "mqtt_keep_alive_interval",
    "mqtt_length",
    "mqtt_protocol_length",
    "mqtt_will_message_length",
    "mqtt_will_topic_length",
    "mqtt_will_topic",
    "mqtt_will_message",
    "mqtt_subscription_ack_qos_level",
    "mqtt_subscription_qos_level",
    "mqtt_message_id"
]

class OutcomeCreater(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        label_to_multiclass = udf(lambda name:
                                  0.0 if name == 'legitimate' else
                                  1.0 if name == 'slowite' else
                                  2.0 if name == 'bruteforce' else
                                  3.0 if name == 'flood' else
                                  4.0 if name == 'malformed' else
                                  5.0 if name == 'dos' else
                                  -1.0, DoubleType())
        output_df = dataset.withColumn('outcome', label_to_multiclass(col('target_class')))
        output_df = output_df.drop("target_class")
        output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        return output_df
    
class FeatureTypeCaster(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset: DataFrame):
        output_df = dataset
        for col_name in binary_cols + continuous_cols:
            output_df = output_df.withColumn(col_name, col(col_name).cast(DoubleType()))
        return output_df
    
class ColumnDropper(Transformer):
    def __init__(self, columns_to_drop=None):
        super().__init__()
        self.columns_to_drop = columns_to_drop

    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

def get_preprocess_pipeline():
    # Columns to remove
    corelated_cols_to_remove = ["mqtt_connack_reserved_flags", "mqtt_connack_session_present", "mqtt_connect_qos_level",
                                "mqtt_connect_reserved_flag", "mqtt_connect_retain_flag", "mqtt_connect_will_flag", "mqtt_message", 
                                "mqtt_subscription_qos_level", "mqtt_subscription_ack_qos_level", "mqtt_will_message",
                                "mqtt_will_message_length", "mqtt_will_topic", "mqtt_will_topic_length",'mqtt_protocol_name']

    # Stage where columns are cast as appropriate types
    stage_typecaster = FeatureTypeCaster()

    # Convert nominal columns to string type
    class NominalTypeCaster(Transformer):
        def __init__(self):
            super().__init__()

        def _transform(self, dataset):
            output_df = dataset
            for col_name in nominal_cols:
                output_df = output_df.withColumn(col_name, col(col_name).cast("string"))
            return output_df

    stage_nominal_typecaster = NominalTypeCaster()

    # Create a list of StringIndexers with handleInvalid='keep'
    stage_nominal_indexers = [
        StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid='keep') 
        for col in nominal_cols
    ]

    # Create a list of OneHotEncoders with handleInvalid='keep'
    stage_nominal_onehot_encoders = [
        OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded", handleInvalid='keep') 
        for col in nominal_cols
    ]

    # Feature columns assembly
    feature_cols = continuous_cols + binary_cols + [col + "_encoded" for col in nominal_cols]

    # Remove correlated columns from features
    for col_name in corelated_cols_to_remove:
        if col_name in feature_cols:
            feature_cols.remove(col_name)

    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

    # Stage where we scale the features
    stage_scaler = StandardScaler(inputCol='vectorized_features', outputCol='features')

    # Stage for creating the outcome column
    stage_outcome = OutcomeCreater()

    # Removing unnecessary columns
    columns_to_drop = (
        nominal_cols + 
        [col + "_index" for col in nominal_cols] + 
        [col + "_encoded" for col in nominal_cols] + 
        binary_cols + continuous_cols + ['vectorized_features']
    )
    stage_column_dropper = ColumnDropper(columns_to_drop=columns_to_drop)

    # Connect the stages into a pipeline
    stages = (
        [stage_typecaster, stage_nominal_typecaster] + 
        stage_nominal_indexers + 
        stage_nominal_onehot_encoders + 
        [stage_vector_assembler, stage_scaler, stage_outcome, stage_column_dropper]
    )
    pipeline = Pipeline(stages=stages)

    return pipeline


In [None]:
# Get the preprocessing pipeline
pipeline = get_preprocess_pipeline()

# Fit the pipeline to data
pipeline_model = pipeline.fit(train_df)

# Transform the data
train_df_preprocessed = pipeline_model.transform(train_df)
test_df_preprocessed = pipeline_model.transform(test_df)

# Cache the dataframes
train_df_final = train_df_preprocessed.cache()
test_df_final = test_df_preprocessed.cache()

In [5]:
from pyspark.sql.functions import col, avg, count
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
import matplotlib.pyplot as plt

In [None]:
# from pyspark.ml import Pipeline, Transformer
# from pyspark.sql.functions import col, udf, when
# from pyspark.sql.types import DoubleType
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
# from pyspark.sql import DataFrame
# from functools import reduce

# # Assuming 'binary_cols', 'continuous_cols', and 'nominal_cols' are already defined

# # Outlier detection function
# def column_add(a, b):
#     return a.__add__(b)

# def find_outliers(df):
#     # Identifying the numerical columns in a spark dataframe
#     numeric_columns = [column[0] for column in df.dtypes if column[1] in ('int', 'double')]

#     # Using the for loop to create new columns by identifying the outliers for each feature
#     for column_name in numeric_columns:
#         Q1 = df.approxQuantile(column_name, [0.25], relativeError=0)
#         Q3 = df.approxQuantile(column_name, [0.75], relativeError=0)
        
#         # IQR : Inter Quartile Range
#         IQR = Q3[0] - Q1[0]
        
#         # Define thresholds for outliers
#         lower_bound = Q1[0] - 1.5 * IQR
#         upper_bound = Q3[0] + 1.5 * IQR
        
#         isOutlierCol = 'is_outlier_{}'.format(column_name)
        
#         df = df.withColumn(isOutlierCol, when((df[column_name] < lower_bound) | (df[column_name] > upper_bound), 1).otherwise(0))

#     # Selecting the specific columns which we have added above, to check if there are any outliers
#     selected_columns = [column for column in df.columns if column.startswith("is_outlier")]
#     # Adding all the outlier columns into a new column "total_outliers", to see the total number of outliers
#     df = df.withColumn('total_outliers', reduce(column_add, (df[col] for col in selected_columns)))

#     # Dropping the extra columns created above, just to create nice dataframe without extra columns
#     df = df.drop(*selected_columns)

#     return df


# col_names = new_column_names
#     # Define the columns
# binary_cols = [
#     "mqtt_connack_reserved_flags",
#     "mqtt_connack_session_present",
#     "mqtt_connect_clean_session_flag",
#     "mqtt_connect_password_flag",
#     "mqtt_connect_reserved_flag",
#     "mqtt_connect_retain_flag",
#     "mqtt_connect_username_flag",
#     "mqtt_connect_will_flag",
#     "mqtt_duplicate_flag",
#     "mqtt_retain_flag",
#     "mqtt_connect_qos_level"
# ]

# nominal_cols = [
#     "tcp_flags",
#     "mqtt_connack_flags",
#     "mqtt_connack_return_code",
#     "mqtt_connect_flags",
#     "mqtt_header_flags",
#     "mqtt_message_type",
#     "mqtt_qos_level",
#     "mqtt_version",
#     "mqtt_message",
#     'mqtt_protocol_name'
# ]

# continuous_cols = [
#     "tcp_time_delta",
#     "tcp_length",
#     "mqtt_keep_alive_interval",
#     "mqtt_length",
#     "mqtt_protocol_length",
#     "mqtt_will_message_length",
#     "mqtt_will_topic_length",
#     "mqtt_will_topic",
#     "mqtt_will_message",
#     "mqtt_subscription_ack_qos_level",
#     "mqtt_subscription_qos_level",
#     "mqtt_message_id"
# ]

# # Custom Transformers
# class OutcomeCreater(Transformer):
#     def __init__(self):
#         super().__init__()

#     def _transform(self, dataset):
#         label_to_multiclass = udf(lambda name:
#                                   0.0 if name == 'legitimate' else
#                                   1.0 if name == 'slowite' else
#                                   2.0 if name == 'bruteforce' else
#                                   3.0 if name == 'flood' else
#                                   4.0 if name == 'malformed' else
#                                   5.0 if name == 'dos' else
#                                   -1.0, DoubleType())
#         output_df = dataset.withColumn('outcome', label_to_multiclass(col('target_class')))
#         output_df = output_df.drop("target_class")
#         output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
#         return output_df

# class FeatureTypeCaster(Transformer):
#     def __init__(self):
#         super().__init__()

#     def _transform(self, dataset: DataFrame):
#         output_df = dataset
#         for col_name in binary_cols + continuous_cols:
#             output_df = output_df.withColumn(col_name, col(col_name).cast(DoubleType()))
#         return output_df

# class NominalTypeCaster(Transformer):
#     def __init__(self):
#         super().__init__()

#     def _transform(self, dataset):
#         output_df = dataset
#         for col_name in nominal_cols:
#             output_df = output_df.withColumn(col_name, col(col_name).cast("string"))
#         return output_df

# class OutlierHandler(Transformer):
#     def __init__(self):
#         super().__init__()

#     def _transform(self, dataset):
#         return find_outliers(dataset)

# class ColumnDropper(Transformer):
#     def __init__(self, columns_to_drop=None):
#         super().__init__()
#         self.columns_to_drop = columns_to_drop

#     def _transform(self, dataset):
#         return dataset.drop(*self.columns_to_drop)

# def get_preprocess_pipeline():
#     # Columns to remove
#     corelated_cols_to_remove = [
#         "mqtt_connack_reserved_flags", "mqtt_connack_session_present", "mqtt_connect_qos_level",
#         "mqtt_connect_reserved_flag", "mqtt_connect_retain_flag", "mqtt_connect_will_flag", "mqtt_message",
#         "mqtt_subscription_qos_level", "mqtt_subscription_ack_qos_level", "mqtt_will_message",
#         "mqtt_will_message_length", "mqtt_will_topic", "mqtt_will_topic_length", 'mqtt_protocol_name'
#     ]

#     # Stage where columns are cast as appropriate types
#     stage_typecaster = FeatureTypeCaster()

#     # Convert nominal columns to string type
#     stage_nominal_typecaster = NominalTypeCaster()

#     # Stage for handling outliers
#     stage_outlier_handler = OutlierHandler()

#     # Create a list of StringIndexers with handleInvalid='keep'
#     stage_nominal_indexers = [
#         StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid='keep')
#         for col in nominal_cols
#     ]

#     # Create a list of OneHotEncoders with handleInvalid='keep'
#     stage_nominal_onehot_encoders = [
#         OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded", handleInvalid='keep')
#         for col in nominal_cols
#     ]

#     # Feature columns assembly
#     feature_cols = continuous_cols + binary_cols + [col + "_encoded" for col in nominal_cols]

#     # Remove correlated columns from features
#     for col_name in corelated_cols_to_remove:
#         if col_name in feature_cols:
#             feature_cols.remove(col_name)

#     stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

#     # Stage where we scale the features
#     stage_scaler = StandardScaler(inputCol='vectorized_features', outputCol='features')

#     # Stage for creating the outcome column
#     stage_outcome = OutcomeCreater()

#     # Removing unnecessary columns
#     # Exclude 'total_outliers' from columns_to_drop to keep it after the pipeline runs
#     columns_to_drop = (
#         nominal_cols +
#         [col + "_index" for col in nominal_cols] +
#         [col + "_encoded" for col in nominal_cols] +
#         binary_cols + continuous_cols + ['vectorized_features']
#         # 'total_outliers' is not included here
#     )
#     stage_column_dropper = ColumnDropper(columns_to_drop=columns_to_drop)

#     # Connect the stages into a pipeline
#     stages = (
#         [stage_typecaster, stage_nominal_typecaster, stage_outlier_handler] +
#         stage_nominal_indexers +
#         stage_nominal_onehot_encoders +
#         [stage_vector_assembler, stage_scaler, stage_outcome, stage_column_dropper]
#     )
#     pipeline = Pipeline(stages=stages)

#     return pipeline


In [None]:
# pipeline = get_preprocess_pipeline()

# # Fit the pipeline to training data
# pipeline_model = pipeline.fit(train_df)

# # Transform the data
# train_df_preprocessed = pipeline_model.transform(train_df)
# test_df_preprocessed = pipeline_model.transform(test_df)

# # Cache the dataframes if needed
# train_df_preprocessed = train_df_preprocessed.cache()
# test_df_preprocessed = test_df_preprocessed.cache()

# # Display the first few rows of the transformed training DataFrame
# print("Training Data Before Filtering:")
# train_df_preprocessed.show(n=10, truncate=False)

# # Now, filter out rows with 4 or more outliers
# threshold = 4
# train_df_filtered = train_df_preprocessed.filter(col('total_outliers') < threshold)
# test_df_filtered = test_df_preprocessed.filter(col('total_outliers') < threshold)

# # Display the first few rows of the filtered training DataFrame
# print("Training Data After Filtering:")
# train_df_filtered.show(n=10, truncate=False)

# # Optionally, drop the 'total_outliers' column if it's no longer needed
# train_df_final = train_df_filtered.drop('total_outliers')
# test_df_final = test_df_filtered.drop('total_outliers')

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
train_df_with_outlier_handling = find_outliers(train_df_preprocessed)
train_df_with_outlier_handling.show(1, vertical=True)

In [None]:
train_df_with_substituted_na_and_outliers = train_df_with_outlier_handling.\
        filter(train_df_with_outlier_handling['total_Outliers']<=4)
print(train_df_with_substituted_na_and_outliers.count())

In [None]:
# Import necessary libraries
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

# Convert the 'features' vector column to an array and expand each element as a separate column
train_df_expanded = train_df_with_substituted_na_and_outliers.withColumn(
    "features_array", vector_to_array(col("features"))
)

# Determine the number of features (dimensions) in the vector
num_features = len(train_df_expanded.select("features_array").head()[0])

# Expand each element in the array to its own column
for i in range(num_features):
    train_df_expanded = train_df_expanded.withColumn(f"feature_{i}", col("features_array")[i])

# Drop the original 'features' and 'features_array' columns
numeric_df = train_df_expanded.drop("features", "features_array").select(
    *[f"feature_{i}" for i in range(num_features)], "outcome", "total_outliers"
)

# Convert to Pandas and calculate the correlation matrix
correlation_matrix = numeric_df.toPandas().corr()
print(correlation_matrix)


In [None]:
correlation_matrix = train_df_with_substituted_na_and_outliers.toPandas().corr()
print(correlation_matrix)

In [None]:
train_df_with_outlier_handling = find_outliers(train_df_preprocessed)

# Count rows with 4 or fewer outliers
count_4_or_less_outliers = train_df_with_outlier_handling.filter(train_df_with_outlier_handling['total_outliers'] <= 4).count()

# Print the result
print(f"Number of rows with 4 or fewer outliers: {count_4_or_less_outliers}")