In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms, utils
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.metrics import roc_curve, auc
import numpy as np
from tqdm import tqdm

In [9]:
#https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb
# CNN based Auto-Encoder model
class CNNAutoencoder(nn.Module):
    def __init__(self):
        super(CNNAutoencoder, self).__init__()
        # Encoder (Convolutional Layers) : Compressed into low-dimensional vectors.
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=7)  # [batch, 128, 1, 1]
        )
        # Decoder (Transpose Convolutional Layers) : Reconstruct low-dimensional vectors to original image dimensions.
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=7),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 1, kernel_size=3, stride=2, padding=1, output_padding=1),  # [batch, 1, 28, 28]
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x     

In [30]:
#https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb
# CNN based Auto-Encoder model
class CNNAutoencoder_Shallow(nn.Module):
    def __init__(self):
        super(CNNAutoencoder_Shallow, self).__init__()
        # Encoder (Convolutional Layers) : Compressed into low-dimensional vectors.
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=3, stride=2, padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.Conv1d(32, 64, kernel_size=3, stride=2, padding=1),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.Conv1d(64, 128, kernel_size=7)  # [batch, 128, 1, 1]
        )
        # Decoder (Transpose Convolutional Layers) : Reconstruct low-dimensional vectors to original image dimensions.
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(128, 64, kernel_size=7),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose1d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.ConvTranspose1d(32, 1, kernel_size=3, stride=2, padding=1, output_padding=1),  # [batch, 1, 28, 28]
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [208]:
def CNNOutput(W,K,S,P):
    #W is the input volume - in your case 128
    #K is the Kernel size - in your case 5
    #S is the stride - which you have not provided.    
    #P is the padding - in your case 0 i believe    
    return (W-K+2*P)/S+1

print(CNNOutput(500,3,2,1))

250.5


In [218]:
#https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb
# CNN based Auto-Encoder model
#optimized for input 500 sampling points
class CNNAutoencoder_Shallow_498sp_1(nn.Module):
    def __init__(self):
        super(CNNAutoencoder_Shallow_500sp_1, self).__init__()
        # Encoder (Convolutional Layers) : Compressed into low-dimensional vectors.
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=6, stride=2, padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.Conv1d(64, 128, kernel_size=6, stride=2, padding=1),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.Conv1d(128, 256, kernel_size=3,stride=2, padding=1),  # [batch, 128, 1, 1]                        
        )
        # Decoder (Transpose Convolutional Layers) : Reconstruct low-dimensional vectors to original image dimensions.
        self.decoder = nn.Sequential(                       
            nn.ConvTranspose1d(256,128, kernel_size=3,stride=2, padding=1,output_padding=0),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose1d(128,64, kernel_size=6, stride=2, padding=1,output_padding=0),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose1d(64, 1, kernel_size=6, stride=2, padding=1, output_padding=0),  # [batch, 32, 14, 14]
            nn.ReLU(True),            
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [26]:
def GetUniqueElements_List(inputList):
    list_out=[]
    for k in range(0,len(inputList)):
        if(inputList[k] not in list_out):
            list_out.append(inputList[k])
    return list_out
        
def SeparateFeat_Norm_Anom(CLASSIF_FEAT,CLASSIF_LABS):
    unique_labels=GetUniqueElements_List(CLASSIF_LABS)
    feat_norm=[]
    labs_norm=[]
    feat_anomaly=[]
    labs_anomaly=[]
    label_0=CLASSIF_LABS[0]
    for k in range(0,len(CLASSIF_LABS)):
        if(CLASSIF_LABS[k]==label_0):
            feat_norm.append(np.asarray(CLASSIF_FEAT[k][:]))
            labs_norm.append(0)
        else:
            feat_anomaly.append(np.asarray(CLASSIF_FEAT[k][:]))
            labs_anomaly.append(1)
    return feat_norm,labs_norm,feat_anomaly,labs_anomaly

In [31]:
# Residual Blcok
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channels)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(out_channels)
            )

    def forward(self, x):
        residual = self.shortcut(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += residual
        out = F.relu(out)
        return out


# ResNet based Auto-Encoder model
class ResNetAutoencoder(nn.Module):
    def __init__(self):
        super(ResNetAutoencoder, self).__init__()
        # Encoder (Convolutional Layers with Residual Blocks)
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=False),  # Avoid inplace operation
            ResidualBlock(64, 128, stride=2),
            ResidualBlock(128, 256, stride=2),
            nn.ReLU(inplace=False)  # Avoid inplace operation
        )

        # Decoder (Transpose Convolutional Layers)
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=False),  # Avoid inplace operation
            nn.ConvTranspose1d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=False),  # Avoid inplace operation
            nn.Conv1d(64, 1, kernel_size=3, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [35]:
def TrainAE_1(feat,labels,
                  AE_type="ResNetCNN",#"CNN_shallow",
                  lr=0.001,
                  num_epochs=10,
                  visualize=True,
            ):
    
    norm,norm_l,non_norm,non_norm_l = SeparateFeat_Norm_Anom(feat,labels)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    #train
    norm = torch.Tensor(np.asarray(norm)).to(device) 
    norm_l = torch.Tensor(np.asarray(norm_l)).to(device) 
    norm_dataset = torch.utils.data.TensorDataset(norm,norm_l)
    train_loader = torch.utils.data.DataLoader(norm_dataset)
    
    if(len(norm.shape)<=2): 
        norm=norm[:,None,:]
        print("Features shape is corrected. Cur. shape - "+str(norm.shape))
    
    #test
    non_norm = torch.Tensor(np.asarray(non_norm)).to(device) 
    non_norm_l = torch.Tensor(np.asarray(non_norm_l)).to(device) 
    non_norm_dataset = torch.utils.data.TensorDataset(non_norm,non_norm_l)
    test_loader = torch.utils.data.DataLoader(non_norm_dataset)

    if(len(norm.shape)<=2): 
        non_norm=non_norm[:,None,:]
        print("Features shape is corrected. Cur. shape - "+str(non_norm.shape))
            
    #https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb    
    # model 1 : CNN based model
    model=None
    if(AE_type=="CNN_shallow"):
        model = CNNAutoencoder_Shallow_500sp_1().to(device)#CNNAutoencoder_Shallow_500sp().to(device) #CNNAutoencoder_Shallow().to(device)
    if(AE_type=="ResNetCNN"):
        model = WideResNet(depth=6, width_multiplier=1, num_classes=1).to(device)#ResNetAutoencoder().to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    num_epochs = num_epochs

    # Training
    train_losses = []
    for epoch in range(num_epochs):        
        epoch_loss = 0
        with tqdm(total=len(train_loader), desc=f'Epoch {epoch + 1}/{num_epochs}', unit='batch') as pbar:
            for data, _ in train_loader:
                data = data.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, data)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()
                pbar.set_postfix(loss=loss.item())
                pbar.update(1)
    
        train_losses.append(epoch_loss / len(train_loader))

    if(visualize==True):
        # Plotting the training loss curve
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Loss Curve')
        plt.legend()
        plt.grid(True)
        plt.show()
        
    return model

In [36]:
norm=np.random.random((400,8))
norm_l=np.zeros((np.shape(norm)[0]))
non_norm=np.random.random((400,8))
non_norm_l=np.ones((np.shape(non_norm)[0]))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
norm_dataset = torch.utils.data.TensorDataset(torch.tensor(norm,dtype=torch.float32).to(device),torch.tensor(norm_l,dtype=torch.float32).to(device))
train_loader = torch.utils.data.DataLoader(norm_dataset)
non_norm_dataset = torch.utils.data.TensorDataset(torch.tensor(non_norm,dtype=torch.float32).to(device),torch.tensor(non_norm_l,dtype=torch.float32).to(device))
test_loader = torch.utils.data.DataLoader(non_norm_dataset)
model=TrainAE_1(norm,norm_l,num_epochs=10)

Features shape is corrected. Cur. shape - torch.Size([400, 1, 8])


TypeError: WideResNet.__init__() missing 1 required positional argument: 'output_indices'

In [328]:
threshold_factor=1.5
proximity=20

non_norm_torch=torch.tensor(non_norm,dtype=torch.float32).to(device)
non_norm_torch=non_norm_torch[:,None,:]
preds=model.forward(non_norm_torch)
losses = torch.mean((preds - non_norm_torch)**2, dim=1)
labels=[]
"""
lk=int(losses.shape[1]/2)
for l in range(0,losses.shape[0]):
    cur_loss=losses[l]
    mean_loss=cur_loss.mean()    
    print(cur_loss)
    cur_lab=0
    anom_len=losses.shape[1]
    while(True):
        threshold_down = mean_loss + factor*cur_lab
        threshold_up   = threshold_down + factor*(cur_lab+1)
        anomalies = (cur_loss > threshold_down) & (cur_loss < threshold_up)        
        if(anom_len>lk): 
            labels.append(cur_lab)
            break
        else: cur_lab+=1    
    print(anom_len)
"""
threshold =losses.mean() + factor * losses.std()
anomalies = losses > threshold
labels=[]
lk=int(anomalies.shape[1]/20)
for l in range(0,anomalies.shape[0]):
    count=torch.sum(anomalies[l]==True)
    #count=0
    #for k in range(0,anomalies.shape[1]):
    #    if(anomalies.data[l,k]==True):count+=1            
    #tr=[1 for x in anomalies[l] if x]#anomalies[l].count(True)
    print(count)
    if(count>lk):labels.append(1)
    else: labels.append(0)
    

tensor(17, device='cuda:0')
tensor(20, device='cuda:0')
tensor(14, device='cuda:0')
tensor(23, device='cuda:0')
tensor(19, device='cuda:0')
tensor(18, device='cuda:0')
tensor(18, device='cuda:0')
tensor(18, device='cuda:0')
tensor(21, device='cuda:0')
tensor(19, device='cuda:0')
tensor(27, device='cuda:0')
tensor(18, device='cuda:0')
tensor(18, device='cuda:0')
tensor(29, device='cuda:0')
tensor(27, device='cuda:0')
tensor(22, device='cuda:0')
tensor(19, device='cuda:0')
tensor(14, device='cuda:0')
tensor(26, device='cuda:0')
tensor(16, device='cuda:0')
tensor(21, device='cuda:0')
tensor(20, device='cuda:0')
tensor(25, device='cuda:0')
tensor(10, device='cuda:0')
tensor(13, device='cuda:0')
tensor(21, device='cuda:0')
tensor(12, device='cuda:0')
tensor(17, device='cuda:0')
tensor(22, device='cuda:0')
tensor(17, device='cuda:0')
tensor(12, device='cuda:0')
tensor(17, device='cuda:0')
tensor(12, device='cuda:0')
tensor(17, device='cuda:0')
tensor(18, device='cuda:0')
tensor(24, device='c

In [329]:
print(labels)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [None]:
# Residual Blcok
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = self.shortcut(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += residual
        out = F.relu(out)
        return out


# ResNet based Auto-Encoder model
class ResNetAutoencoder(nn.Module):
    def __init__(self):
        super(ResNetAutoencoder, self).__init__()
        # Encoder (Convolutional Layers with Residual Blocks)
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=False),  # Avoid inplace operation
            ResidualBlock(16, 32, stride=2),
            ResidualBlock(32, 64, stride=2),
            nn.ReLU(inplace=False)  # Avoid inplace operation
        )

        # Decoder (Transpose Convolutional Layers)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=False),  # Avoid inplace operation
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=False),  # Avoid inplace operation
            nn.Conv2d(16, 1, kernel_size=3, stride=1, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
#https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb
# CNN based Auto-Encoder model
class CNNAutoencoder_Shallow1(nn.Module):
    def __init__(self,input_size=32):
        super(CNNAutoencoder_Shallow, self).__init__()
        # Encoder (Convolutional Layers) : Compressed into low-dimensional vectors.
        self.encoder = nn.Sequential(
            nn.Conv1d(1,32, kernel_size=3, stride=2, padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.Conv1d(32, 64, kernel_size=3, stride=2, padding=1),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.Conv1d(64, 128, kernel_size=7)  # [batch, 128, 1, 1]
        )
        # Decoder (Transpose Convolutional Layers) : Reconstruct low-dimensional vectors to original image dimensions.
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(128, 64, kernel_size=7),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose1d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.ConvTranspose1d(32, 1, kernel_size=3, stride=2, padding=1, output_padding=1),  # [batch, 1, 28, 28]
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
#https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb
# CNN based Auto-Encoder model
#optimized for input 500 sampling points
class CNNAutoencoder_Shallow_500sp(nn.Module):
    def __init__(self):
        super(CNNAutoencoder_Shallow_500sp, self).__init__()
        # Encoder (Convolutional Layers) : Compressed into low-dimensional vectors.
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 248, kernel_size=6, stride=2, padding=0),  # [batch, 32, 14, 14]
            nn.ReLU(True),
            nn.Conv1d(248, 122, kernel_size=6, stride=2, padding=0),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.Conv1d(122, 59, kernel_size=6,stride=2, padding=0),  # [batch, 128, 1, 1]            
            nn.ReLU(True),
            nn.Conv1d(59, 29, kernel_size=3,stride=2, padding=0),  # [batch, 128, 1, 1]                
        )
        # Decoder (Transpose Convolutional Layers) : Reconstruct low-dimensional vectors to original image dimensions.
        self.decoder = nn.Sequential(            
            nn.ConvTranspose1d(29, 59, kernel_size=3,stride=2, padding=0,output_padding=0),  # [batch, 128, 1, 1]    
            nn.ReLU(True),
            nn.ConvTranspose1d(59,122, kernel_size=6,stride=2, padding=0,output_padding=0),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose1d(122,248, kernel_size=6, stride=2, padding=0,output_padding=0),  # [batch, 64, 7, 7]
            nn.ReLU(True),
            nn.ConvTranspose1d(248, 1, kernel_size=6, stride=2, padding=1, output_padding=0),  # [batch, 32, 14, 14]
            nn.ReLU(True),            
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [10]:
#https://github.com/Navy10021/MDAutoEncoder/blob/main/notebooks/MDAutoEncoder.ipynb
in_sz=np.shape(norm)[1]
# model 1 : CNN based model
model = CNNAutoencoder_Shallow(input_size=in_sz).to(device)
# model 2 : ResNet based model
#model = ResNetAutoencoder().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 30

# Training
train_losses = []
for epoch in range(num_epochs):
    epoch_loss = 0

    with tqdm(total=len(train_loader), desc=f'Epoch {epoch + 1}/{num_epochs}', unit='batch') as pbar:
        for data, _ in train_loader:
            data = data.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, data)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            pbar.set_postfix(loss=loss.item())
            pbar.update(1)

    train_losses.append(epoch_loss / len(train_loader))

# Plotting the training loss curve
plt.figure(figsize=(10, 6))
plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Curve')
plt.legend()
plt.grid(True)
plt.show()

Epoch 1/30:   0%|          | 0/400 [00:02<?, ?batch/s]


RuntimeError: Given groups=1, weight of size [64, 32, 3], expected input[1, 250, 125] to have 32 channels, but got 250 channels instead

In [62]:
# Reconstruction error : mean + 3 * standard deviation
def compute_reconstruction_error(data, model):
    model.eval() # using trained model
    with torch.no_grad():
        data = data.to(device)
        output = model(data)
        recon_error = torch.mean((data - output) ** 2, dim=[1, 2, 3])
    return recon_error.cpu().detach().numpy()

normal_recon_error = compute_reconstruction_error(data, model)
# 1. Calculate reconstruction error for normal data
"""
normal_data = next(iter(train_loader))[0]
normal_recon_error = compute_reconstruction_error(normal_data, model)
mean_normal_error = np.mean(normal_recon_error)
"""


"""
# 2. Generate random anomalous data (e.g. noise)
anomalous_data = torch.randn_like(normal_data) * 0.25
anomalous_recon_error = compute_reconstruction_error(anomalous_data, model)
mean_anomalous_error = np.mean(anomalous_recon_error)

# 3. Plotting histograms
plt.figure(figsize=(10, 6))  # Adjust figure size as needed
plt.hist(normal_recon_error, bins=50, alpha=0.95, color='salmon', label='Normal data')
plt.hist(anomalous_recon_error, bins=50, alpha=0.95, color='skyblue', label='Anomaly')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.title('Histogram of Reconstruction Errors')
plt.legend(loc='upper right')
plt.grid(True)
plt.show()

# 4. Calculate and print mean error rates
mean_normal_error = np.mean(normal_recon_error)
mean_anomalous_error = np.mean(anomalous_recon_error)
print("==========================================================")
print(f">> Mean Normal data Reconstruction Error Rate: {mean_normal_error:.4f}")
print(f">> Mean Anomalous data Reconstruction Error Rate: {mean_anomalous_error:.4f}")
print("==========================================================\n")

# Generate labels (0 for normal, 1 for anomalous)
labels = np.concatenate([np.zeros(len(normal_recon_error)), np.ones(len(anomalous_recon_error))])
scores = np.concatenate([normal_recon_error, anomalous_recon_error])

# 5. Calculate ROC curve and AUC
fpr, tpr, _ = roc_curve(labels, scores)
roc_auc = auc(fpr, tpr)

# Plot ROC curve
plt.figure(figsize=(10, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()
print("\n\n")

def plot_reconstructed_images(model, data):
    model.eval()
    with torch.no_grad():
        data = data.to(device)
        output = model(data)

    # Visualization of original and reconstructed images (recovered by the model)
    fig, axes = plt.subplots(2, 10, figsize=(15, 3))
    for i in range(10):
        axes[0, i].imshow(data[i].cpu().numpy().squeeze(), cmap='gray')
        axes[0, i].axis('off')
        axes[0, i].set_title('Original', fontsize=8, pad=2)

        axes[1, i].imshow(output[i].cpu().numpy().squeeze(), cmap='gray')
        axes[1, i].axis('off')
        axes[1, i].set_title('Reconstructed', fontsize=8, pad=2)

    plt.suptitle('Original and Reconstructed Malware Images', fontsize=13, y=1.15)
    plt.show()
    print("\n\n")

# Visualization of Reconstructed IMG
plot_reconstructed_images(model, normal_data[:10])

# 7. Latent space visualization
def plot_latent_space(data, model):
    model.eval()
    with torch.no_grad():
        data = data.to(device)
        latent = model.encoder(data)  # Assuming the encoder part of the autoencoder is accessible
        latent = latent.view(latent.size(0), -1).cpu().numpy()

    tsne = TSNE(n_components=2)
    latent_2d = tsne.fit_transform(latent)

    plt.figure(figsize=(10, 6))
    plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c='blue', alpha=0.5)
    plt.xlabel('TSNE Component 1')
    plt.ylabel('TSNE Component 2')
    plt.title('Latent Space Visualization using TSNE')
    plt.grid(True)
    plt.show()
    print("\n\n")

# Example usage
plot_latent_space(next(iter(train_loader))[0], model)
"""

torch.Size([1, 400, 32])


RuntimeError: Given groups=1, weight of size [32, 1, 3], expected input[1, 400, 32] to have 1 channels, but got 400 channels instead