In [31]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DAGMM(nn.Module):
    def __init__(self, n_gmm=2, z_dim=128):
        super(DAGMM, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv3d(1, 32, kernel_size=(1,1,7), stride=(1,1,2), padding=(0,0,3)),
            nn.ReLU(),
            nn.Conv3d(32, 64, kernel_size=(3,3,5), stride=(1,1,2), padding=(1,1,2)),
            nn.ReLU(),
            nn.Conv3d(64, 128, kernel_size=(3,3,3), stride=(2,2,2), padding=(1,1,1)),
            nn.ReLU(),
            nn.Conv3d(128, 256, kernel_size=(3,3,3), stride=(2,2,2), padding=(1,1,1)),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(256 * 6 * 6 * 30, 512),
            nn.ReLU(),
            nn.Linear(512, z_dim)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(z_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 256 * 6 * 6 * 30),
            nn.ReLU(),
            nn.Unflatten(1, (256, 6, 6, 30)),
            nn.ConvTranspose3d(256, 128, kernel_size=(3,3,3), stride=(2,2,2), padding=(1,1,1), output_padding=(1,1,1)),
            nn.ReLU(),
            nn.ConvTranspose3d(128, 64, kernel_size=(3,3,3), stride=(2,2,2), padding=(1,1,1), output_padding=(1,1,1)),
            nn.ReLU(),
            nn.ConvTranspose3d(64, 32, kernel_size=(3,3,5), stride=(1,1,2), padding=(1,1,2), output_padding=(0,0,1)),
            nn.ReLU(),
            nn.ConvTranspose3d(32, 1, kernel_size=(1,1,7), stride=(1,1,2), padding=(0,0,3), output_padding=(0,0,1))
        )
        
        # Estimation network
        self.estimation = nn.Sequential(
            nn.Linear(z_dim + 2, 10),
            nn.Tanh(),
            nn.Dropout(0.5),
            nn.Linear(10, n_gmm),
            nn.Softmax(dim=1)
        )

    def encode(self, x):
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z)

    def estimate(self, z):
        return self.estimation(z)

    def compute_reconstruction(self, x, x_hat):
        relative_euclidean_distance = (x - x_hat).norm(2, dim=(1,2,3,4)) / x.norm(2, dim=(1,2,3,4))
        cosine_similarity = F.cosine_similarity(x.view(x.size(0), -1), x_hat.view(x_hat.size(0), -1), dim=1)
        return relative_euclidean_distance, cosine_similarity

    def forward(self, x):
        z_c = self.encode(x)
        x_hat = self.decode(z_c)
        rec_1, rec_2 = self.compute_reconstruction(x, x_hat)
        z = torch.cat([z_c, rec_1.unsqueeze(-1), rec_2.unsqueeze(-1)], dim=1)
        gamma = self.estimate(z)
        return z_c, x_hat, z, gamma

import torch
import torch.nn.functional as F
from torch.autograd import Variable

import numpy as np

class ComputeLoss:
    def __init__(self, model, lambda_energy, lambda_cov, device, n_gmm):
        self.model = model
        self.lambda_energy = lambda_energy
        self.lambda_cov = lambda_cov
        self.device = device
        self.n_gmm = n_gmm

    def forward(self, x, x_hat, z, gamma):
        """Computing the loss function for DAGMM."""
        reconst_loss = torch.mean((x-x_hat).pow(2))
        sample_energy, cov_diag = self.compute_energy(z, gamma, sample_mean=True)
        loss = reconst_loss + self.lambda_energy * sample_energy + self.lambda_cov * cov_diag
        return loss

    def compute_energy(self, z, gamma, phi=None, mu=None, cov=None, sample_mean=False):
        """Computing the sample energy function"""
        if (phi is None) or (mu is None) or (cov is None):
            phi, mu, cov = self.compute_params(z, gamma)
        
        z_mu = (z.unsqueeze(1)- mu.unsqueeze(0))
        
        eps = 1e-12
        cov_inverse = []
        det_cov = []
        cov_diag = 0
        
        for k in range(self.n_gmm):
            cov_k = cov[k] + torch.eye(cov[k].size(-1)).to(self.device) * eps
            cov_inverse.append(torch.inverse(cov_k).unsqueeze(0))
            det_cov.append(torch.logdet(cov_k * (2*np.pi)).unsqueeze(0))
            cov_diag += torch.sum(1 / cov_k.diag())
        
        cov_inverse = torch.cat(cov_inverse, dim=0)
        det_cov = torch.cat(det_cov).to(self.device)
        
        E_z = -0.5 * torch.sum(torch.sum(z_mu.unsqueeze(-1) * cov_inverse.unsqueeze(0), dim=-2) * z_mu, dim=-1)
        E_z = E_z - 0.5 * det_cov.unsqueeze(0)
        E_z = -torch.logsumexp(torch.log(phi.unsqueeze(0)) + E_z, dim=1)
        
        if sample_mean:
            E_z = torch.mean(E_z)
        
        return E_z, cov_diag

    def compute_params(self, z, gamma):
        """Computing the parameters phi, mu and gamma for sample energy function """ 
        phi = torch.sum(gamma, dim=0) / gamma.size(0) 
        mu = torch.sum(z.unsqueeze(1) * gamma.unsqueeze(-1), dim=0)
        mu /= torch.sum(gamma, dim=0).unsqueeze(-1)
        z_mu = (z.unsqueeze(1) - mu.unsqueeze(0))
        z_mu_z_mu_t = z_mu.unsqueeze(-1) * z_mu.unsqueeze(-2)
        cov = torch.sum(gamma.unsqueeze(-1).unsqueeze(-1) * z_mu_z_mu_t, dim=0)
        cov /= torch.sum(gamma, dim=0).unsqueeze(-1).unsqueeze(-1)
        return phi, mu, cov
    
import torch
from torch import optim
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc
import numpy as np
from barbar import Bar

# def weights_init_normal(m):
#     classname = m.__class__.__name__
#     if classname.find("Conv") != -1 and classname != 'Conv':
#         torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
#         torch.nn.init.normal_(m.bias.data, 0.0, 0.02)
#     elif classname.find("Linear") != -1:
#         torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
#         torch.nn.init.normal_(m.bias.data, 0.0, 0.02)
#     elif classname.find('BatchNorm') != -1:
#         m.weight.data.normal_(1.0, 0.01)
#         m.bias.data.fill_(0)
        
def weights_init_normal(m):
    if isinstance(m, nn.Conv3d) or isinstance(m, nn.ConvTranspose3d) or isinstance(m, nn.Linear):
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
        if m.bias is not None:
            torch.nn.init.constant_(m.bias.data, 0.0)

def inject_anomalies(image, num_clusters=1, cluster_size=5, shift_amount=40, noise_factor=0.1, peak_reduction_factor=0.2, peak_reduction_probability=0.3):
    """
    Inject anomalies into the EELS image with added noise and peak reduction options.
    
    Args:
    image (numpy.ndarray): Input image of shape (height, width, energy_channels)
    num_clusters (int): Number of anomalous clusters to inject
    cluster_size (int): Size of each anomalous cluster
    shift_amount (int): Amount to shift the peak (in energy channels)
    noise_factor (float): Factor to control the amount of noise added (0 to 1)
    peak_reduction_factor (float): Factor to reduce peak intensity (0 to 1)
    peak_reduction_probability (float): Probability of applying peak reduction to a cluster (0 to 1)
    
    Returns:
    tuple: (anomalous_image, anomaly_mask)
        anomalous_image (numpy.ndarray): Image with injected anomalies, noise, and peak reductions
        anomaly_mask (numpy.ndarray): Boolean mask indicating anomalous pixels
    """
    anomalous_image = np.copy(image)
    height, width, channels = image.shape
    anomaly_mask = np.zeros((height, width), dtype=bool)
    
    # Add global noise
    noise = np.random.normal(0, noise_factor * np.mean(image), image.shape)
    anomalous_image += noise
    
    for _ in range(num_clusters):
        # Choose a random center for the cluster
        center_y = np.random.randint(cluster_size, height - cluster_size)
        center_x = np.random.randint(cluster_size, width - cluster_size)
        
        # Define the cluster region
        y_start, y_end = center_y - cluster_size // 2, center_y + cluster_size // 2 + 1
        x_start, x_end = center_x - cluster_size // 2, center_x + cluster_size // 2 + 1
        
        # Mark the cluster region as anomalous
        anomaly_mask[y_start:y_end, x_start:x_end] = True
        
        # Shift the peak
        if np.random.random() < 0.5:  # 50% chance to shift left or right
            anomalous_image[y_start:y_end, x_start:x_end, :-shift_amount] = image[y_start:y_end, x_start:x_end, shift_amount:]
        else:
            anomalous_image[y_start:y_end, x_start:x_end, shift_amount:] = image[y_start:y_end, x_start:x_end, :-shift_amount]
        
        # Randomly reduce peak intensity
        if np.random.random() < peak_reduction_probability:
            peak_intensity = np.max(anomalous_image[y_start:y_end, x_start:x_end])
            reduction_amount = peak_intensity * peak_reduction_factor
            anomalous_image[y_start:y_end, x_start:x_end] -= reduction_amount
    
    # Ensure all values are non-negative
    anomalous_image = np.clip(anomalous_image, 0, None)
    
    return anomalous_image, anomaly_mask

from sklearn.metrics import precision_recall_fscore_support as prf, accuracy_score
from anomaly_detection.config.config_handler import get_config

class TrainerDAGMM:
    def __init__(self, config, train_loader, test_loader, device):
        self.config = config
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.device = device
        
        # Initialize model
        self.model = DAGMM(n_gmm=config['n_gmm'], z_dim=config['latent_dim']).to(device)
        self.model.apply(weights_init_normal)
        
        # Initialize optimizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=float(config['learning_rate']))
        
        # Initialize loss computer
        self.compute_loss = ComputeLoss(self.model, config['lambda_energy'], config['lambda_cov'], 
                                        self.device, config['n_gmm'])

    def train(self):
        self.model.train()
        for epoch in range(self.config['epochs']):
            total_loss = 0
            for batch in self.train_loader:
                x = batch.float().to(self.device)
                
                self.optimizer.zero_grad()
                
                _, x_hat, z, gamma = self.model(x)
                loss = self.compute_loss.forward(x, x_hat, z, gamma)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 5)
                self.optimizer.step()
                
                total_loss += loss.item()
            
            avg_loss = total_loss / len(self.train_loader)
            print(f'Epoch [{epoch+1}/{self.config['epochs']}], Loss: {avg_loss:.4f}')

            # Test on training data every few epochs
            if (epoch + 1) % 5 == 0:
                self.test_on_train()

        print("Training finished!")

    def train(self):
        self.model.train()
        for epoch in range(self.config['epochs']):
            total_loss = 0
            for batch, _ in self.train_loader:  # Ignore the labels during training
                x = batch.to(self.device)  # batch is already a float tensor
                
                self.optimizer.zero_grad()
                
                _, x_hat, z, gamma = self.model(x)
                loss = self.compute_loss.forward(x, x_hat, z, gamma)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 5)
                self.optimizer.step()
                
                total_loss += loss.item()
            
            avg_loss = total_loss / len(self.train_loader)
            print(f'Epoch [{epoch+1}/{self.config['epochs']}], Loss: {avg_loss:.4f}')

    def eval(self):
        """Evaluating the DAGMM model"""
        self.model.eval()
        print('Evaluating...')
        
        with torch.no_grad():
            N_samples = 0
            gamma_sum = 0
            mu_sum = 0
            cov_sum = 0
            
            # Obtaining the parameters gamma, mu and cov using the training (clean) data.
            for x, _ in self.train_loader:
                x = x.float().to(self.device)
                _, _, z, gamma = self.model(x)
                phi_batch, mu_batch, cov_batch = self.compute_loss.compute_params(z, gamma)
                batch_gamma_sum = torch.sum(gamma, dim=0)
                gamma_sum += batch_gamma_sum
                mu_sum += mu_batch * batch_gamma_sum.unsqueeze(-1)
                cov_sum += cov_batch * batch_gamma_sum.unsqueeze(-1).unsqueeze(-1)
                
                N_samples += x.size(0)
            
            train_phi = gamma_sum / N_samples
            train_mu = mu_sum / gamma_sum.unsqueeze(-1)
            train_cov = cov_sum / gamma_sum.unsqueeze(-1).unsqueeze(-1)
            
            # Obtaining Labels and energy scores for train data
            energy_train = []
            labels_train = []
            for x, y in self.train_loader:
                x = x.float().to(self.device)
                _, _, z, gamma = self.model(x)
                sample_energy, _  = self.compute_loss.compute_energy(z, gamma, phi=train_phi,
                                                                     mu=train_mu, cov=train_cov, 
                                                                     sample_mean=False)
                
                energy_train.append(sample_energy.detach().cpu())
                labels_train.append(y)
            energy_train = torch.cat(energy_train).numpy()
            labels_train = torch.cat(labels_train).numpy()
            
            # Obtaining Labels and energy scores for test data
            energy_test = []
            labels_test = []
            for x, y in self.test_loader:
                x = x.float().to(self.device)
                _, _, z, gamma = self.model(x)
                sample_energy, _  = self.compute_loss.compute_energy(z, gamma, train_phi,
                                                                     train_mu, train_cov,
                                                                     sample_mean=False)
                
                energy_test.append(sample_energy.detach().cpu())
                labels_test.append(y)
            energy_test = torch.cat(energy_test).numpy()
            labels_test = torch.cat(labels_test).numpy()
        
        scores_total = np.concatenate((energy_train, energy_test), axis=0)
        labels_total = np.concatenate((labels_train, labels_test), axis=0)
        
        threshold = np.percentile(scores_total, 100 - 20)
        pred = (energy_test > threshold).astype(int)
        gt = labels_test.astype(int)
        precision, recall, f_score, _ = prf(gt, pred, average='binary')
        print("Precision : {:0.4f}, Recall : {:0.4f}, F-score : {:0.4f}".format(precision, recall, f_score))
        print('ROC AUC score: {:.2f}'.format(roc_auc_score(labels_total, scores_total)*100))
        
        return labels_total, scores_total
    
import numpy as np
import hyperspy.api as hs
from skimage.filters import gaussian
from scipy.ndimage import uniform_filter
from torch.utils.data import Dataset, DataLoader
import torch

# Load configuration
config = get_config("/home/ssulta24/Desktop/VCAE_new/anomaly_detection/config/config.yaml")

filepath = config['data_path']
energy_range = tuple(config['energy_range'])
size = 24
sigma = config['sigma']
xy_window = 3

def load_and_preprocess():
    """
    Load and preprocess the EELS data.

    Returns:
        numpy.ndarray: Preprocessed EELS data
    """
    try:
        raw_data = load_dm_data()
        preprocessed_data = preprocess_3d_images(raw_data)
        print("done")
        return preprocessed_data
    except Exception as e:
        raise

def load_dm_data():
    """
    Load data from a Digital Micrograph file.

    Returns:
        numpy.ndarray: Raw EELS data
    """
    try:
        s = hs.load(filepath)
        data = s.data
        return data
    except Exception as e:
        raise

def preprocess_3d_images(image):
    """
    Preprocess the 3D EELS image.

    Args:
        image (numpy.ndarray): Raw 3D EELS image

    Returns:
        numpy.ndarray: Preprocessed 3D EELS image
    """
    # Apply Gaussian blur
    blurred_image = gaussian(image, sigma=sigma, mode='reflect', preserve_range=True)
    
    # Slice the data array to keep only the desired energy range
    start_pixel = int(energy_range[0] - 0)
    end_pixel = int(energy_range[1] - 0)
    blurred_image = blurred_image[:, :, start_pixel:end_pixel]

    # Apply min-max scaling
    min_val = np.min(blurred_image)
    max_val = np.max(blurred_image)
    normalized_image = (blurred_image - min_val) / (max_val - min_val)
    
    # Apply spatial-spectral smoothing
    smoothed_img = smooth_spatial_spectral(normalized_image)
    
    # Generate sub-images using sliding window
    sub_images = sliding_window(smoothed_img)
    
    # Reshape sub-images for PyTorch (batch_size, channels, height, width, energy)
    reshaped_sub_images = sub_images.reshape(-1, size, size, sub_images.shape[-1])
    
    return reshaped_sub_images.astype('float32')

def smooth_spatial_spectral(arr):
    """
    Apply spatial-spectral smoothing to the image.

    Args:
        arr (numpy.ndarray): Input 3D array

    Returns:
        numpy.ndarray: Smoothed 3D array
    """
    neighborhood_sum = uniform_filter(arr, size=(xy_window, xy_window, 1), mode='reflect')
    neighborhood_count = uniform_filter(np.ones_like(arr), size=(xy_window, xy_window, 1), mode='reflect')
    return neighborhood_sum / neighborhood_count

def sliding_window(data, stride=24):
    """
    Generate sub-images using a sliding window approach.
    
    Args:
        data (numpy.ndarray): Input data of shape (height, width, energy)
        stride (int): Step size for the sliding window
    
    Returns:
        numpy.ndarray: Array of sub-images
    """
    height, width, energy = data.shape
    sub_images = []
    
    for i in range(0, height - size + 1, stride):
        for j in range(0, width - size + 1, stride):
            sub_image = data[i:i+size, j:j+size, :]
            sub_images.append(sub_image)
    
    return np.array(sub_images)

data = load_and_preprocess()

done


In [32]:
from anomaly_detection.data.data_loader import EELSDataset, get_data_loader

# Load configuration
config = get_config("/home/ssulta24/Desktop/VCAE_new/anomaly_detection/config/config.yaml")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

from torch.utils.data import DataLoader, TensorDataset
import numpy as np

def create_train_test_loaders(normal_data, batch_size, test_split=0.2, num_anomalies=30):
    print(f"Normal data shape: {normal_data.shape}")
    
    # Split normal data into train and test
    num_test = int(normal_data.shape[0] * test_split)
    train_data = normal_data[:-num_test]
    test_normal_data = normal_data[-num_test:]
    
    # Create anomalous data
    anomalous_data = []
    for i in range(num_anomalies):
        normal_image = test_normal_data[i % len(test_normal_data)]
        anomalous_image, _ = inject_anomalies(normal_image)
        anomalous_data.append(anomalous_image)
    anomalous_data = np.array(anomalous_data)
    print(f"Shape of the anom data: {anomalous_data.shape}")
    
    # Create train loader (all normal)
    train_dataset = TensorDataset(torch.tensor(train_data).float(), torch.zeros(len(train_data)))
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    # Create test loader (mix of normal and anomalous)
    test_data = np.concatenate([test_normal_data, anomalous_data])
    test_labels = np.concatenate([np.zeros(len(test_normal_data)), np.ones(len(anomalous_data))])
    test_dataset = TensorDataset(torch.tensor(test_data).float(), torch.tensor(test_labels))
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader

# Splitgh
train_loader, test_loader = create_train_test_loaders(data, batch_size=63)

trainer = TrainerDAGMM(config, train_loader, test_loader, device)
trainer.train()
labels, scores = trainer.eval()

Normal data shape: (64, 24, 24, 480)
Shape of the anom data: (30, 24, 24, 480)


RuntimeError: Given groups=1, weight of size [32, 1, 1, 1, 7], expected input[1, 52, 24, 24, 480] to have 1 channels, but got 52 channels instead