# Cyber Security Anomaly Detection

In [None]:
import pandas as pd

In [None]:
df=pd.read_csv("advanced_cybersecurity_data.csv")
df

In [None]:
df.describe()

# Exploratory Data Analysis

In [None]:
df["Anomaly_Flag"].value_counts()

In [None]:
# The main columns to work on is Request_type, Status_code, User_agent

In [None]:
df1=df[["Request_Type","Anomaly_Flag"]]
df1

In [None]:
# PUT has a the maximum count of Anamaly of a total count of 146 PUT replaces data
df1[df1["Anomaly_Flag"]==1].value_counts()

In [None]:
df2=df[["Status_Code","Anomaly_Flag"]]
df2

In [None]:
# 500 the atatcker had atatcked the server leading for the server to crash
# 400 the attacker is attempting to atatck the server
df2[df2["Anomaly_Flag"]==1].value_counts()

In [None]:
df3=df[["User_Agent","Anomaly_Flag"]]
df3

In [None]:
# request from edge browser while Bot is not a browser is a program or an automated script
df3[df3["Anomaly_Flag"]==1].value_counts()

In [None]:
df4=df[["Request_Type","Status_Code","User_Agent","Anomaly_Flag"]]
df4

In [None]:
df4[df4["Anomaly_Flag"]==1].value_counts()

# Encoding

In [None]:
df["Request_Type"].value_counts()

In [None]:
categories=["GET","PUT","POST","DELETE"]
for category in categories:
    df[category]=0
    for x in range(len(df)):
        if df.loc[x, "Request_Type"]== category:
            df.loc[x,category]=1
    
        

In [None]:
df[category]

In [None]:
df

In [None]:
df["Status_Code"].unique()

In [None]:
Status_Bucket=[]
for i in df["Status_Code"]:
        first_digit= int(str(i)[0])
        if first_digit == 2:
            Status_Bucket.append("Success")
        elif first_digit == 3:
            Status_Bucket.append("Intermediate")
        elif first_digit == 4:
            Status_Bucket.append("WarningError")
        elif first_digit == 5:
            Status_Bucket.append("DangerError")
        else:
            Status_Bucket.append("Other")
        
        
        

In [None]:
df["Status_Bucket"]=Status_Bucket

In [None]:
df

In [None]:
Status_Bucket

In [None]:
buckets1=["Success","Intermediate","WarningError","DangerError"]
for bucket in buckets1:
        df[bucket]=0
        for x in range(len(df)):
            if df.loc[x,"Status_Bucket"]== bucket:
                df.loc[x,bucket]=1
            

In [None]:
df

In [None]:
from sklearn.model_selection import train_test_split
features=["GET","PUT","POST","DELETE","Success","Intermediate","WarningError","DangerError"]
X=df[features]
y=df["Anomaly_Flag"]
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2,random_state=1,stratify=y)
from sklearn.preprocessing import StandardScaler
scaler=StandardScaler()
X_train_scaled=scaler.fit_transform(X_train)
X_test_scaled=scaler.transform(X_test)



In [None]:
# Preparing data for PyTorch
import torch
X_train_tensor=torch.tensor(X_train_scaled,dtype=torch.float32)
X_test_tensor=torch.tensor(X_test_scaled,dtype=torch.float32)

y_train_tensor=torch.tensor(y_train.values,dtype=torch.float32).view(-1,1)
y_test_tensor=torch.tensor(y_test.values,dtype=torch.float32).view(-1,1)



In [None]:
# Defining the neural network input
import torch.nn as nn
class AnomalyNN(nn.Module):
    def __init__(self,input_size):
        super(AnomalyNN,self).__init__() #calls parent class
        self.model=nn.Sequential( #define sequence of layers
            nn.Linear(input_size,16), # maps input_size to 16 neurons
            nn.ReLU(), #non-linearity so the network can learn complex patterns 
            nn.Linear(16,8), #second hidden layer
            nn.ReLU(), #adds non linearity 
            nn.Linear(8,1)) #ouput layer Maps 8 neurons down to 1 output neuron for 0 to 1 probability
             #The outputs a probabilty between 0 and 1
    def forward(self,x): # Forward Passing 
        return self.model(x)
    

In [None]:
input_size=X_train.shape[1]
# instance of the model
model=AnomalyNN(input_size)

# Training the Anomaly Detection Model 

In [None]:
import torch.optim as optim
criterion = nn.BCEWithLogitsLoss()
optimizer=torch.optim.Adam(model.parameters(),lr=0.001)

In [None]:
number_epochs=70
for epoch in range(number_epochs):
    model.train()
    optimizer.zero_grad()
    outputs=model(X_train_tensor)
    loss=criterion(outputs,y_train_tensor)
    loss.backward()
    optimizer.step()
    print(f"Epoch [{epoch+1}/{number_epochs}],Loss:{loss.item():.4f}")

In [None]:
model.eval()
with torch.no_grad():
    test_outputs=model(X_test_tensor)
# convert logits to probaility 0 and 1
    probs=torch.sigmoid(test_outputs)
    predicted_labels=(probs>0.5).float()
accuracy=(predicted_labels==y_test_tensor).float().mean()
print(f"Test Accuracy:{accuracy:.4f}")

In [None]:
from sklearn.metrics import classification_report
predicted_label_1=(torch.sigmoid(test_outputs)>0.5).float()
report1=classification_report(y_test_tensor.numpy(),
                              predicted_label_1.numpy(),
                              target_names=["Normal","Anomaly"],
                              zero_division=0
                             )
print("Classification Report with Threshold 0.5\n", report1)

In [None]:
predicted_label_2=(torch.sigmoid(test_outputs)>0.3).float()
report2=classification_report(y_test_tensor.numpy(),
                              predicted_label_2.numpy(),
                              target_names=["Normal","Anomaly"],
                              zero_division=0
                             )
print("Classification Report with Threshold 0.3\n", report2)

# Retraining  the Anomaly Detection Model 

In [None]:
pos_weight=torch.tensor([1902/98])  
criterion=nn.BCEWithLogitsLoss(pos_weight=pos_weight)

In [None]:
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer=torch.optim.Adam(model.parameters(),lr=0.001)
number_epochs=70
for epoch in range(number_epochs):
    model.train()
    optimizer.zero_grad()
    outputs=model(X_train_tensor)
    loss=criterion(outputs,y_train_tensor)
    loss.backward()
    optimizer.step()
    print(f"Epoch [{epoch+1}/{number_epochs}],Loss:{loss.item():.4f}")

In [None]:
model.eval()
with torch.no_grad():
    test_outputs=model(X_test_tensor)
    probs=torch.sigmoid(test_outputs)
    


In [None]:
import matplotlib.pyplot as plt
probs=probs=torch.sigmoid(test_outputs)
plt.hist(probs[y_test_tensor.numpy()==0],bins=50,alpha=0.5,label="Normal")
plt.hist(probs[y_test_tensor.numpy()==1],bins=50,alpha=0.5,label="Anomaly")
plt.legend()
plt.show()


In [None]:
predicted_label_4=(probs>0.35).float()
report4=classification_report(y_test_tensor.numpy(),
                              predicted_label_4.numpy(),
                              target_names=["Normal","Anomaly"],
                              zero_division=0
                             )
print(report4)