In [None]:
import torch 
import torchvision
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
from torchvision.transforms import ToTensor
from torch.utils.data.dataloader import DataLoader
from torch.utils.data import random_split,TensorDataset
import pandas as pd
import numpy as np

In [None]:

import os, types
import pandas as pd
from botocore.client import Config
import ibm_boto3

def __iter__(self): return 0

# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share the notebook.

bucket = 'project-donotdelete-pr-xgn2pqshhnke7i'
object_key = 'Train_data.csv'

body = cos_client.get_object(Bucket=bucket,Key=object_key)['Body']
# add missing __iter__ method, so pandas accepts body as file-like object
if not hasattr(body, "__iter__"): body.__iter__ = types.MethodType( __iter__, body )

df= pd.read_csv(body)



In [None]:
df.info()

In [None]:
df.describe()

In [None]:
df.nunique()

In [None]:
df['class'].value_counts()

In [None]:
X = df.drop('class', axis=1)
y = df['class']

In [None]:
mapping={'normal':1,'anomaly':0}
y=y.map(mapping)

In [None]:
y.info()

In [None]:
categorical_features = X.select_dtypes(include=['object']).columns
numerical_features = X.select_dtypes(include=np.number).columns

In [None]:
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

In [None]:
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split

In [None]:
numeric_transformer = StandardScaler()

In [None]:
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

In [None]:

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ])


In [None]:
x_train, x_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42,stratify=y)

In [None]:
x_train = preprocessor.fit_transform(x_train)
x_val = preprocessor.transform(x_val)

In [None]:
type(x_val)

In [None]:
type(y_val)

In [None]:
x_train_tensor = torch.tensor(x_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.long)
x_val_tensor = torch.tensor(x_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.long)

In [None]:
train_ds = TensorDataset(x_train_tensor, y_train_tensor)
val_ds = TensorDataset(x_val_tensor, y_val_tensor)

In [None]:
batch_size=128

In [None]:
train_loader=DataLoader(train_ds,batch_size,shuffle=True,num_workers=10)
val_loader=DataLoader(val_ds,batch_size*2,num_workers=10)

In [None]:
input_size = x_train.shape[1]
output_classes = len(y.unique())
print(input_size)
print(output_classes)

In [None]:
def accuracy(outputs,targets):
    _,preds=torch.max(outputs,dim=1)
    return torch.tensor(torch.sum(preds==targets).item()/len(preds))

In [None]:
class Base(nn.Module):
    def training_step(self, batch):
        features, targets = batch 
        out = self(features)
        loss = F.cross_entropy(out, targets)
        return loss

    def validation_step(self, batch):
        features, targets = batch
        out = self(features)
        loss = F.cross_entropy(out, targets)
        acc = accuracy(out, targets)
        
        return {'val_loss': loss.detach(), 'val_acc': acc}

    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs] 
        epoch_loss = torch.stack(batch_losses).mean()
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()
    
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}

    def epoch_end(self, epoch, result):
        print(f"Epoch [{epoch+1}], train_loss: {result['train_loss']:.4f}, val_loss: {result['val_loss']:.4f}, val_acc: {result['val_acc']:.4f}")

In [None]:
class IntrusionClassifier(Base):
    def __init__(self,dropout):
        super().__init__()
        self.linear1=nn.Linear(input_size,64)
        self.linear2=nn.Linear(64,output_classes)
        self.relu=nn.ReLU()
        self.dropout = nn.Dropout(p=dropout)

    
    def forward(self,xb):
        xb=self.linear1(xb)
        xb=self.relu(xb)
        xb=self.dropout(xb)
        xb=self.linear2(xb)
        return xb



In [None]:
model=IntrusionClassifier(0.5)

In [None]:
@torch.no_grad()
def evaluate(model,val_loader):
    model.eval()
    outputs=[model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)


def fit(epochs,lr,model,train_loader,val_loader,opt_func=torch.optim.Adam):
    history=[]
    optimizer=opt_func(model.parameters(),lr)
    for epoch in range(epochs):
        model.train()
        train_losses=[]
        for batch in train_loader:
            loss=model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        result=evaluate(model,val_loader)
        result['train_loss']=torch.stack(train_losses).mean().item()
        model.epoch_end(epoch,result)
        history.append(result)
    return history

In [None]:
history=[evaluate(model,val_loader)]
history

In [None]:
history=[]

In [None]:
history+=fit(10,0.0001,model,train_loader,val_loader)

In [None]:
history+=fit(10,0.0001,model,train_loader,val_loader)

In [None]:
history

In [None]:
train_losses = [result['train_loss'] for result in history]
val_losses = [result['val_loss'] for result in history]

In [None]:
plt.plot(train_losses, '-o', label='Training Loss')
plt.plot(val_losses, '-x', label='Validation Loss')

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss vs. Epochs')
plt.legend()
plt.show()

In [None]:
accuracies = [result['val_acc'] for result in history]

plt.plot(accuracies, '-x', color='green')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Accuracy vs. No. of Epochs')

plt.show()

In [None]:
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

model.eval()

all_val_preds = []
all_val_targets = []


with torch.no_grad():
    for features, targets in val_loader:
      
        outputs = model(features)
        
       
        _, predicted = torch.max(outputs.data, 1)
        
        all_val_preds.extend(predicted.numpy())
        all_val_targets.extend(targets.numpy())




target_names = ['anomaly (0)', 'normal (1)']


report = classification_report(all_val_targets, all_val_preds, target_names=target_names)

print("\n--- Classification Report ---")
print(report)



cm = confusion_matrix(all_val_targets, all_val_preds)

print("\n--- Confusion Matrix ---")
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
import joblib
torch.save(model.state_dict(), 'intrusion_model.pth')


joblib.dump(preprocessor, 'preprocessor.joblib')
print("Model and preprocessor saved successfully!")

In [None]:

import os, types
import pandas as pd
from botocore.client import Config
import ibm_boto3

def __iter__(self): return 0

# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share the notebook.


bucket = 'project-donotdelete-pr-xgn2pqshhnke7i'
object_key = 'Test_data.csv'

body = cos_client.get_object(Bucket=bucket,Key=object_key)['Body']
# add missing __iter__ method, so pandas accepts body as file-like object
if not hasattr(body, "__iter__"): body.__iter__ = types.MethodType( __iter__, body )

test_data= pd.read_csv(body)
test_data.head(10)


In [None]:
test_data.info()

In [None]:
X_test_processed = preprocessor.transform(test_data)

In [None]:
type(X_test_processed)

In [None]:
X_test_tensor = torch.tensor(X_test_processed, dtype=torch.float32)

In [None]:
model.eval()


with torch.no_grad():
    outputs = model(X_test_tensor)
    _, predicted_indices = torch.max(outputs, dim=1)


predictions_list = predicted_indices.numpy().tolist()



results_df = test_data.copy()


results_df['predicted_class'] = predictions_list

# Map the number back to the original label for better readability
inverse_class_map = {0: 'anomaly', 1: 'normal'}
results_df['predicted_label'] = results_df['predicted_class'].map(inverse_class_map)

results_df.drop('predicted_class',axis=1,inplace=True)

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)



In [None]:
results_df.head(10)

In [None]:
df.head()