In [None]:
pip install pyspark

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, mean, stddev
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [None]:
class DataCleaning:
    def __init__(self, dataset_path):
        self.spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
        self.df = self.spark.read.parquet(dataset_path, header = True, inferSchema= True)

    def handle_missing_values(self):
        # Lets Check for missing values in all columns 
        missing_values = self.df.select(
        [count(when(col(c).isNull() | (col(c).isNotNull() if c not in self.df.columns else isnan(c)), c)).alias(c) 
         for c in self.df.columns])
    
        missing_values.show()

        # Fill missing values with mean for numeric columns
        numeric_cols = [c for c, dtype in self.df.dtypes if dtype in ['int', 'double']]
        for col_name in numeric_cols:
            mean_value = self.df.select(mean(col_name)).collect()[0][0]
            self.df = self.df.na.fill({col_name: mean_value})

            
    def handle_duplicates(self):
        self.df = self.df.dropDuplicates()

    def inconsistent_formatting(self):
        self.df = self.df.withColumn("column_name", col("column_name").lower().trim())

    def handle_outliers(self):
        numeric_cols = [c for c, dtype in self.df.dtypes if dtype in ['int', 'double']]
        for col_name in numeric_cols:
            stats = self.df.select(mean(col_name), stddev(col_name)).first()
            mean_value = stats[0]
            stddev_value = stats[1]
            self.df = self.df.filter((col(col_name)> (mean_value - 3 * stddev_value)) & (col(col_name)< (mean_value + 3 * stddev_value)))

    def encode_categorical_values(self):
       string_cols = [c for c, dtype in self.df.dtypes if dtype == 'string']
       for col_name in string_cols:
           indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")
           self.df = indexer.fit(self.df).transform(self.df)

    def return_cleaned_data(self):
        return self.df

In [None]:
class DataPreprocessing:
    def __init__(self, cleaned_df):
        self.df = cleaned_df.toPandas()

    def visualize_data(self):
        few_rows = self.df.head()
        print(few_rows)
        
        data_description = self.df.describe()
        print(data_description)
        
        #plot a histogram for the data
        self.df.hist(figsize=(10, 10), bins=20)
        plt.show()

In [None]:
data_cleaning = DataCleaning('/kaggle/input/unsw-nb15-testing-set-parquet-4-54-mb/UNSW_NB15_testing-set.parquet')

In [None]:
print(data_cleaning.handle_missing_values())

In [None]:
print(data_cleaning.handle_duplicates())

In [None]:
print(data_cleaning.handle_outliers())

In [None]:
print(data_cleaning.encode_categorical_values())

In [None]:
cleaned_df = data_cleaning.return_cleaned_data()
print(cleaned_df)

In [None]:
data_preprocessing = DataPreprocessing(cleaned_df)
print(data_preprocessing)

In [None]:
print(data_preprocessing.visualize_data())

In [None]:
rows = cleaned_df.count()
print(rows)

In [None]:
columns = len(cleaned_df.columns)
print(columns)

In [None]:
print(cleaned_df.columns)

In [None]:
cleaned_df.coalesce(1).write.csv("cleaned_daraset.csv", header =True)

In [None]:
## not actually needed, download  the uncleaned dataset for other reasons
data_cleaning.df.coalesce(1).write.csv("uncleaned_dataset.csv", header =True)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

In [None]:
class TrainModel:
    def __init__(self, training_df):
        self.df = training_df

    def one_hot_encoding(self):
        string_cols = self.df.select_dtypes(include=['object']).columns.drop('attack_cat')
        encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
        transformed_data = encoder.fit_transform(self.df[string_cols])
        encoded_cols = encoder.get_feature_names_out(string_cols)
        encoded_df = pd.DataFrame(transformed_data, columns=encoded_cols)
        self.df = pd.concat([self.df.drop(string_cols, axis=1), encoded_df], axis=1)

    def train_model(self):
        # Drop attack_cat only from features (X), not from the target variable (y)
        X = self.df.drop('attack_cat', axis=1)  # Ensure this column is part of the encoded columns
        y = self.df['attack_cat']  # original label column (e.g., `normal`, `DOS`, etc.)
        
        # Split the dataset into training and testing
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # Train the model using RandomForest
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)
        
        # predictions
        y_pred = model.predict(X_test)
        
        # Output classification results 
        print("Classification result: \n", classification_report(y_test, y_pred))
        
        # print feature importance
        feature_importances = pd.DataFrame(model.feature_importances_, index=X.columns, columns=['importance']).sort_values('importance', ascending=False)
        print("\nFeature Importance: \n", feature_importances)


In [None]:
print(cleaned_df.columns)

In [None]:
train_model = TrainModel(cleaned_df.toPandas())
train_model.one_hot_encoding()
train_model.train_model()