# **Loading Dataset**

In [1]:
import pandas as pd
df=pd.read_csv("/content/anomaly_dataset_preprocessed.csv")

In [2]:
df.head()

Unnamed: 0,Time,Source,Destination,Protocol,Length
0,2024-03-21 00:30:57.252719,WEB Access PLC Attack,68,HTTP,806
1,2024-03-16 01:57:57.254620,WEB Access PLC Attack,72,STP,904
2,2024-03-20 15:38:57.254635,WEB Access PLC Attack,81,HTTP,638
3,2024-03-19 13:03:57.254642,Stable Normal Operation-Kit Data,99,TELNET,317
4,2024-03-16 12:17:57.254648,Stable Normal Operation-Kit Data,5,DHCPv6,558


# **Data Preprocessing**

In [3]:
# Assuming df is your DataFrame containing the dataset
columns_to_check = ['Source', 'Destination', 'Protocol']
for column in columns_to_check:
    unique_values = df[column].unique()
    print(f"Unique values in column '{column}': {unique_values}")


Unique values in column 'Source': ['WEB Access PLC Attack' 'Stable Normal Operation-Kit Data'
 'TELNET PLC Attack']
Unique values in column 'Destination': [68 72 81 99  5 62 91  7 33 82 38 28 52 79 60 87 96  9 57 30 29 35 59 93
 17 73  8 13 85 76 94 41  1 78 48 51 69 77 58 37 83 95 36 49 50 10 98 71
 31 20 39 45 15 61  4 34 22 40 75 25 65 14  2  3 67 86 27 11 66 84 54 16
 19 43 18 24 21 23 89 53 47 97 56 46 88  6 90 70 26 44 64 63 74 32 92 80
 12 55 42]
Unique values in column 'Protocol': ['HTTP' 'STP' 'TELNET' 'DHCPv6' 'LLMNR' 'PN-PTCP' 'SSDP' 'PN-MRP'
 'BROWSER' 'ARP' 'LLDP' 'HIP' 'NBNS' 'COTP' 'UDP' 'TCP']


In [4]:
# Assuming df is your DataFrame containing the dataset
null_counts = df.isnull().sum()
print("Null counts in each column:")
print(null_counts)


Null counts in each column:
Time           0
Source         0
Destination    0
Protocol       0
Length         0
dtype: int64


In [5]:
from sklearn.preprocessing import LabelEncoder

# Create a label encoder object
label_encoder = LabelEncoder()

# Apply label encoding to 'Protocol' and 'Source' columns
df['Protocol'] = label_encoder.fit_transform(df['Protocol'])
df['Source'] = label_encoder.fit_transform(df['Source'])

# **Exploratory Data Analysis**

In [6]:
df.describe()

Unnamed: 0,Source,Destination,Protocol,Length
count,1000.0,1000.0,1000.0,1000.0
mean,0.99,50.291,7.628,536.384
std,0.820105,28.726286,4.724822,302.336269
min,0.0,1.0,0.0,1.0
25%,0.0,26.0,3.0,270.0
50%,1.0,50.0,8.0,558.0
75%,2.0,75.0,12.0,815.25
max,2.0,99.0,15.0,999.0


In [7]:
import pandas as pd

# Create an empty list to store individual DataFrame for each column
dfs = []

# Populate the list with information for each column
for col in df.columns:
    col_info = pd.DataFrame({
        'Column Name': [col],
        'Data Type': [df[col].dtype],
        'Missing Values': [df[col].isnull().sum()]
    })
    dfs.append(col_info)

# Concatenate all individual DataFrames into a single DataFrame
info_df = pd.concat(dfs, ignore_index=True)

# Display the DataFrame
print(info_df)


   Column Name Data Type  Missing Values
0         Time    object               0
1       Source     int64               0
2  Destination     int64               0
3     Protocol     int64               0
4       Length     int64               0


In [8]:
import plotly.graph_objects as go

# Get the list of numerical column names
num_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()

# Create a histogram for each numerical column
for col in num_cols:
    fig = go.Figure()
    fig.add_trace(go.Histogram(x=df[col], histnorm='probability density', name=col,
                               marker=dict(color='skyblue')))
    fig.update_layout(title=f'Distribution of {col}',
                      xaxis=dict(title=col, showgrid=False),
                      yaxis=dict(title='Probability Density', showgrid=False),
                      plot_bgcolor='white',  # Set background color
                      barmode='overlay',  # Overlay bars for better comparison
                      bargap=0.1,  # Set gap between bars
                      bargroupgap=0.1)  # Set gap between groups of bars
    fig.show()


In [9]:
import plotly.graph_objects as go

# Create a box plot for each numerical column
for col in num_cols:
    fig = go.Figure()
    fig.add_trace(go.Box(y=df[col], name=col))
    fig.update_layout(title=f'Boxplot of {col}',
                      xaxis=dict(title=''),
                      yaxis=dict(title=col))
    fig.show()


In [10]:
import plotly.figure_factory as ff

# Calculate correlation matrix
corr_matrix = df.corr()

# Create an annotated heatmap using Plotly
fig = ff.create_annotated_heatmap(z=corr_matrix.values,
                                  x=list(corr_matrix.columns),
                                  y=list(corr_matrix.index),
                                  colorscale='Viridis',
                                  annotation_text=corr_matrix.round(2).values,
                                  showscale=True)

# Update layout
fig.update_layout(title='Correlation Heatmap',
                  xaxis=dict(title='Features'),
                  yaxis=dict(title='Features'))

# Show the interactive heatmap
fig.show()






In [11]:
import plotly.graph_objects as go

# Sort DataFrame by 'Time' column
df_sorted = df.sort_values('Time')

# Create an interactive line plot using Plotly
fig = go.Figure()

# Add trace for the line plot
fig.add_trace(go.Scatter(x=df_sorted['Time'], y=df_sorted['Length'],
                         mode='lines+markers', marker=dict(color='blue'), name='Length'))

# Update layout
fig.update_layout(title='Line Plot of Time vs. Length', xaxis_title='Time', yaxis_title='Length',
                  xaxis=dict(type='date', tickformat='%Y-%m-%d %H:%M:%S'), showlegend=True)

# Show the interactive plot
fig.show()


# **Data Preparation**

In [12]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Exclude the 'Time' column from the DataFrame
df_features = df.drop(columns=['Time'])

# Normalize the data
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df_features)

# Split the data into training and testing sets
X_train, X_test = train_test_split(scaled_data, test_size=0.2, random_state=42)

# Print the shapes of the training and testing sets
print("Training data shape:", X_train.shape)
print("Testing data shape:", X_test.shape)


Training data shape: (800, 4)
Testing data shape: (200, 4)


# **Applying Variational Autoencoders (VAEs)**

In [13]:
pip install keras-tuner




In [14]:
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Lambda
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from kerastuner.tuners import RandomSearch

# Define the architecture of the VAE
def build_vae(hp):
    input_dim = X_train.shape[1]  # Number of features
    latent_dim = hp.Int('latent_dim', min_value=2, max_value=10, step=2)  # Hyperparameter for latent dimension
    learning_rate = hp.Choice('learning_rate', values=[1e-3, 1e-4, 1e-5])  # Hyperparameter for learning rate

    # Encoder
    inputs = Input(shape=(input_dim,))
    x = Dense(128, activation='relu')(inputs)
    x = Dense(64, activation='relu')(x)

    z_mean = Dense(latent_dim, name='z_mean')(x)
    z_log_var = Dense(latent_dim, name='z_log_var')(x)

    # Sampling layer
    def sampling(args):
        z_mean, z_log_var = args
        epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim), mean=0., stddev=1.0)
        return z_mean + K.exp(0.5 * z_log_var) * epsilon

    z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

    # Decoder
    decoder_input = Input(shape=(latent_dim,))
    x = Dense(64, activation='relu')(decoder_input)
    x = Dense(128, activation='relu')(x)
    outputs = Dense(input_dim, activation='sigmoid')(x)

    # Instantiate encoder and decoder models
    encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
    decoder = Model(decoder_input, outputs, name='decoder')

    # VAE model
    outputs = decoder(encoder(inputs)[2])
    vae = Model(inputs, outputs, name='vae')

    # Define VAE loss
    reconstruction_loss = binary_crossentropy(inputs, outputs) * input_dim
    kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
    kl_loss = K.sum(kl_loss, axis=-1) * -0.5
    vae_loss = K.mean(reconstruction_loss + kl_loss)

    # Compile the VAE model
    vae.add_loss(vae_loss)
    optimizer = Adam(learning_rate=learning_rate)
    vae.compile(optimizer=optimizer)

    return vae

# Define the tuner
tuner = RandomSearch(
    build_vae,
    objective='val_loss',
    max_trials=5,  # Number of hyperparameter combinations to try
    directory='vae_hyperparameter_tuning',
    project_name='vae_hyperparameter_tuning'
)

# Iterate twice
for _ in range(2):
    # Perform hyperparameter tuning
    tuner.search(X_train, epochs=100, batch_size=32, shuffle=True, validation_data=(X_test, None), verbose=0)

    # Get the best hyperparameters
    best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]

    # Build and compile the VAE model with the best hyperparameters
    best_vae = tuner.hypermodel.build(best_hyperparameters)
    best_vae.compile(optimizer=Adam(learning_rate=best_hyperparameters.get('learning_rate')))

    # Train the VAE model with the best hyperparameters for more epochs
    history = best_vae.fit(X_train, epochs=100, batch_size=32, shuffle=True, validation_data=(X_test, None), verbose=0)

    # Evaluate the VAE's performance on the testing data
    loss = best_vae.evaluate(X_test, X_test, verbose=0)

    # Use the trained VAE for anomaly detection
    reconstructed_data = best_vae.predict(X_test)
    reconstruction_errors = np.mean(np.square(X_test - reconstructed_data), axis=1)



`import kerastuner` is deprecated, please use `import keras_tuner`.



Reloading Tuner from vae_hyperparameter_tuning/vae_hyperparameter_tuning/tuner0.json


# **Model Metrices**

In [15]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, roc_curve, auc, precision_recall_fscore_support

# Compute MSE, RMSE, and MAE
mse = mean_squared_error(X_test, reconstructed_data)
rmse = np.sqrt(mse)
mae = mean_absolute_error(X_test, reconstructed_data)

print("MSE:", mse)
print("RMSE:", rmse)
print("MAE:", mae)

MSE: 0.9210010545344768
RMSE: 0.9596879985362309
MAE: 0.7921687256256347


# **Detecting Anomalies**

In [16]:
# Set the lower and upper thresholds for anomaly detection
lower_threshold = 0.2
upper_threshold = 1.2

# Detect anomalies based on the lower and upper thresholds
anomalies = X_test[(reconstruction_errors < lower_threshold) | (reconstruction_errors > upper_threshold)]

# Print the number of anomalies detected
print("Number of anomalies detected:", len(anomalies))


Number of anomalies detected: 66


# **Scatter Plot**

In [17]:
import plotly.graph_objects as go
import numpy as np

# Example data: unnormalized length and thresholds
unnormalized_length = np.random.rand(100) * 1000  # Example unnormalized length data
lower_threshold = 180  # Example lower threshold for unnormalized length
upper_threshold = 800  # Example upper threshold for unnormalized length

# Convert time values to minutes and seconds
time_values_minutes = np.arange(len(unnormalized_length)) // 60
time_values_seconds = np.arange(len(unnormalized_length)) % 60
time_values_formatted = [f"{min}:{sec:02d}" for min, sec in zip(time_values_minutes, time_values_seconds)]

# Plot the scatter plot of unnormalized length data
fig = go.Figure()

# Add the unnormalized length as a scatter plot
fig.add_trace(go.Scatter(x=time_values_formatted, y=unnormalized_length,
                         mode='markers', name='Length', marker=dict(color='orange', size=5)))  # Adjust marker size here

# Add the lower and upper thresholds for unnormalized length
fig.add_shape(type="line", x0=0, y0=lower_threshold, x1=len(unnormalized_length), y1=lower_threshold,
              line=dict(color="red", dash="dash"), name='Lower Threshold')
fig.add_shape(type="line", x0=0, y0=upper_threshold, x1=len(unnormalized_length), y1=upper_threshold,
              line=dict(color="green", dash="dash"), name='Upper Threshold')

# Highlight anomalies below and above the thresholds for unnormalized length
anomalies_below_x = np.where(unnormalized_length < lower_threshold)[0]
anomalies_below_y = unnormalized_length[anomalies_below_x]
fig.add_trace(go.Scatter(x=[time_values_formatted[i] for i in anomalies_below_x], y=anomalies_below_y,
                         mode='markers', name='Anomaly (Below Threshold)', marker=dict(color='red', symbol='star', size=8)))  # Adjust marker size here

anomalies_above_x = np.where(unnormalized_length > upper_threshold)[0]
anomalies_above_y = unnormalized_length[anomalies_above_x]
fig.add_trace(go.Scatter(x=[time_values_formatted[i] for i in anomalies_above_x], y=anomalies_above_y,
                         mode='markers', name='Anomaly (Above Threshold)', marker=dict(color='green', symbol='star', size=8)))  # Adjust marker size here

# Identify normal data points (between lower and upper threshold)
normal_indices = np.where((unnormalized_length >= lower_threshold) & (unnormalized_length <= upper_threshold))[0]
normal_y = unnormalized_length[normal_indices]
fig.add_trace(go.Scatter(x=[time_values_formatted[i] for i in normal_indices], y=normal_y,
                         mode='markers', name='Normal', marker=dict(color='blue', size=5)))  # Adjust marker size here

# Set axis labels and title
fig.update_layout(title='Anomaly Detection with Length (Scatter Plot)',
                  xaxis_title='Time', yaxis_title='Length')

# Show the interactive plot
fig.show()


# **Line Plot**

In [18]:
import plotly.graph_objects as go
import numpy as np

# Example data: unnormalized length and thresholds
unnormalized_length = np.random.rand(100) * 1000  # Example unnormalized length data
lower_threshold = 180  # Example lower threshold for unnormalized length
upper_threshold = 800  # Example upper threshold for unnormalized length

# Convert time values to minutes and seconds
time_values_minutes = np.arange(len(unnormalized_length)) // 60
time_values_seconds = np.arange(len(unnormalized_length)) % 60
time_values_formatted = [f"{min}:{sec:02d}" for min, sec in zip(time_values_minutes, time_values_seconds)]

# Plot the unnormalized length data with thresholds
fig = go.Figure()

# Add the unnormalized length as a trace
fig.add_trace(go.Scatter(x=time_values_formatted, y=unnormalized_length,
                         mode='lines', name='Length', line=dict(color='orange')))

# Add the lower and upper thresholds for unnormalized length
fig.add_shape(type="line", x0=0, y0=lower_threshold, x1=len(unnormalized_length), y1=lower_threshold,
              line=dict(color="red", dash="dash"), name='Lower Threshold')
fig.add_shape(type="line", x0=0, y0=upper_threshold, x1=len(unnormalized_length), y1=upper_threshold,
              line=dict(color="green", dash="dash"), name='Upper Threshold')

# Highlight anomalies below and above the thresholds for unnormalized length
anomalies_below_x = np.where(unnormalized_length < lower_threshold)[0]
anomalies_below_y = unnormalized_length[anomalies_below_x]
fig.add_trace(go.Scatter(x=[time_values_formatted[i] for i in anomalies_below_x], y=anomalies_below_y,
                         mode='markers', name='Anomaly (Below Threshold)', marker=dict(color='red', symbol='star')))

anomalies_above_x = np.where(unnormalized_length > upper_threshold)[0]
anomalies_above_y = unnormalized_length[anomalies_above_x]
fig.add_trace(go.Scatter(x=[time_values_formatted[i] for i in anomalies_above_x], y=anomalies_above_y,
                         mode='markers', name='Anomaly (Above Threshold)', marker=dict(color='green', symbol='star')))

# Identify normal data points (between lower and upper threshold)
normal_indices = np.where((unnormalized_length >= lower_threshold) & (unnormalized_length <= upper_threshold))[0]
normal_y = unnormalized_length[normal_indices]
fig.add_trace(go.Scatter(x=[time_values_formatted[i] for i in normal_indices], y=normal_y,
                         mode='markers', name='Normal', marker=dict(color='blue')))

# Set axis labels and title
fig.update_layout(title='Anomaly Detection with Length',
                  xaxis_title='Time', yaxis_title='Length')

# Show the interactive plot
fig.show()
