In [17]:
import torch
import torch.nn as nn

import numpy as np
import matplotlib.pyplot as plt

from matplotlib.patches import Ellipse
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Subset
import torchvision

import zipfile
import os
import shutil

The idea of using a Gaussian Mixture Model for testing is to split the data into subclasses within existing classes using a Gaussian Mixture Model. This is done to deal with the problem of high intra-class variance
and small inter-cluster dissimilarity while evaluating the quality of the generated images.

In [2]:
with zipfile.ZipFile("/content/minefree-class-split-wo-borders.zip", 'r') as zip_ref:
    zip_ref.extractall("data")

In [3]:
source_dirs = ['/content/data/minefree-class-split-wo-borders/test/bombed', '/content/data/minefree-class-split-wo-borders/train/bombed']
target_dir = '/content/data/bombed'
os.makedirs(target_dir, exist_ok=True)

for src in source_dirs:
    for file_name in os.listdir(src):
        full_file_name = os.path.join(src, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, target_dir)

In [4]:
source_dirs = ['/content/data/minefree-class-split-wo-borders/test/not bombed', '/content/data/minefree-class-split-wo-borders/train/not bombed']
target_dir = '/content/data/not bombed'
os.makedirs(target_dir, exist_ok=True)

for src in source_dirs:
    for file_name in os.listdir(src):
        full_file_name = os.path.join(src, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, target_dir)

# Gaussian Mixture Model

In [5]:
def plot_ellipse(mu: np.ndarray, cov: np.ndarray, ax):
    eigenvalues, eigenvectors = np.linalg.eigh(cov)
    angle = np.degrees(np.arctan2(*eigenvectors[:, 0][::-1]))
    width, height = 2 * np.sqrt(5.991 * eigenvalues)  # 95% confidence interval
    ellipse = Ellipse(xy=mu, width=width, height=height, angle=angle, fill=False)
    ax.add_patch(ellipse)
    ax.scatter(mu[0], mu[1], color="r", marker="o", label="Mean")


def plot_gmm_data(data: np.ndarray, mus, sigmas):
    _, ax = plt.subplots()
    for mu, sigma in zip(mus, sigmas):
        plot_ellipse(mu, sigma, ax)

    plt.scatter(data[:, 0], data[:, 1], s=5)
    plt.show()

In [59]:
class MultivariateGaussian(nn.Module):
    def __init__(self, dim: int, mu: np.ndarray = None, R: np.ndarray = None):
        super().__init__()
        self.dim = dim

        if mu is None:
            mu = np.zeros(self.dim)
        self.mu = nn.Parameter(torch.tensor(mu).to(torch.float32))

        if R is None:
            R = np.random.normal(size=(self.dim, self.dim))
        self.R = nn.Parameter(torch.tensor(R).to(torch.float32))

    def sigma(self) -> torch.Tensor:
        """
        Computes the covariance matrix of the multivariate Gaussian distribution.
        """
        Sigma = self.R @ self.R.T
        return Sigma

    def sample(self) -> torch.Tensor:
        """
        Samples from this Gaussian.
        """
        sampled_vector = self.R @ torch.randn(self.dim) + self.mu
        return sampled_vector

    def log_likelihood(self, x: torch.Tensor) -> torch.Tensor:
        """
        Computes the log likelihood of the multivariate Gaussian distribution at the provided point.
        """
        Sigma_det = torch.det(self.sigma())
        Sigma_inv = torch.inverse(self.sigma())
        mahalanobis = torch.sum((x - self.mu) @ Sigma_inv * (x - self.mu), dim=-1)
        log_likelihood = -0.5 * (self.dim * torch.log(torch.tensor(2 * torch.pi)) + torch.log(Sigma_det) + mahalanobis)

        return log_likelihood

In [60]:
from torch import logsumexp

class GaussianMixtureModel(nn.Module):
    def __init__(self, dim: int, K: int, prior: np.ndarray = None, mus = None, sigmas = None, use_softmax: bool = False):
        super().__init__()
        self.dim = dim
        self.K = K
        self.use_softmax = use_softmax
        self.gaussians = nn.ModuleList([MultivariateGaussian(dim=dim) for _ in range(K)])
        self.set_params(prior, mus, sigmas)
        self.gaussians = nn.ModuleList(self.gaussians)

    def sigma(self) -> torch.Tensor:
        """
        Computes the covariance matrix of the multivariate Gaussian distribution.
        """
        Sigma = self.R @ self.R.T
        return Sigma

    def set_params(self, prior: np.ndarray = None, mus = None, sigmas = None) -> None:
        """
        Sets the distribution parameter values. If unspecified, default values are used.
        """
        if prior is None:
            prior = np.ones(self.K) / self.K

        if mus is None:
            mus = [np.random.randn(self.dim) for _ in range(self.K)]

        if sigmas is None:
            sigmas = [np.eye(self.dim) for _ in range(self.K)]

        self.prior_logits = nn.Parameter(torch.tensor(prior, dtype=torch.float32))

        for i, gaussian in enumerate(self.gaussians):
            sigma_i = sigmas[i]
            if isinstance(sigma_i, torch.Tensor):
                sigma_i = sigma_i.detach().numpy()

            gaussian.mu.data = torch.tensor(mus[i], dtype=torch.float32)
            gaussian.R.data = torch.tensor(np.linalg.cholesky(sigma_i), dtype=torch.float32)

        return

    def get_prior(self) -> torch.Tensor:
        """
        If `use_softmax` attribute is True, apply softmax for prior normalization.
        """
        if self.use_softmax:
            prior = torch.softmax(self.prior_logits, dim=0)
        else:
            prior = self.prior_logits
        return prior

    def log_likelihood(self, x: torch.Tensor) -> torch.Tensor:
        no_of_points = x.shape[0]
        log_probability = torch.zeros((no_of_points, self.K), dtype=torch.float32, device=x.device)

        log_prior = torch.log(self.get_prior())

        for k, gaussian in enumerate(self.gaussians):
            log_probability[:, k] = gaussian.log_likelihood(x)

        log_likelihood = torch.logsumexp(log_probability + log_prior, dim=1).sum()
        return log_likelihood

    def sample(self) -> torch.Tensor:
        prior = self.get_prior()
        random_comp = torch.multinomial(prior, num_samples=1).item()
        sampled_vector = self.gaussians[random_comp].sample()
        return sampled_vector

    def get_gauss_params(self):
        mus, sigmas = [], []
        for gauss in self.gaussians:
            mus.append(gauss.mu.detach().numpy())
            sigmas.append(gauss.sigma().detach().numpy())
        return mus, sigmas

Recall the objective, which we want to maximize (1):

$$
\operatorname{argmax}_{\boldsymbol{\theta}} p(\mathbf{X} \mid \theta) = \operatorname{argmax}_{\boldsymbol{\theta}}\prod_{i=1}^N p(\mathbf{x}_k \mid \theta) = \operatorname{argmax}_{\boldsymbol{\theta}}\prod_{i=1}^N \Bigl(\sum_{k=1}^K \pi_k p_k(\mathbf{x}_i \mid \mathbf{\theta})\Bigr) \tag{1}
$$

- we start with an initialization $\boldsymbol{\theta}_0$ of the model parameters;
- on the E-step, the cluster membership probabilities (often called responsibilities) of all datapoints are evaluated;
- based on them, the MLE of the model parameters are updated.

## Expectation step

The responsibilities are calculated using the Bayes rule:

$$r_{ik} = \mathbb{P}(Z_i = k \mid \widehat{\boldsymbol{\theta}}, \mathbf{x}_i) = \frac{ \hat{\pi}_k p(\mathbf{x}_i \mid \widehat{\boldsymbol{\theta}},Z_i = k)}{p(\mathbf{x}_i \mid \widehat{\boldsymbol{\theta}})}
  = \frac{ \hat{\pi}_k p_k(\mathbf{x}_i \mid \hat{\boldsymbol{\mu}}_k, \widehat{\Sigma}_k)}{p(\mathbf{x}_i \mid \widehat{\boldsymbol{\theta}})}$$

In [8]:
def log_responsibilities(model: nn.Module, data: torch.Tensor) -> torch.Tensor:
    nominator = torch.stack([torch.log(model.get_prior()[i]) + model.gaussians[i].log_likelihood(data) for i in range(model.K)])
    log_resp = nominator - torch.logsumexp(nominator, dim=0)
    return log_resp

## Maximization step

- the updated means $\boldsymbol{\mu}_k$ are simply the weighted averages of all data points $\mathbf{x}_i$ with weights $r_{ik}$:
  $$
    \hat{\boldsymbol{\mu}}_k = \frac{\sum_{i=1}^N r_{ik} \mathbf{x}_i}{\sum_{i=1}^N r_{ik}}
  $$
- the updated covariance matrices $\widehat\Sigma_k$ is the weighted analogue of the MLE estimate of the covariance:
  $$
    \widehat\Sigma_k = \frac{\sum_{i=1}^N r_{ik} (\mathbf{x}_i - \hat{\boldsymbol{\mu}}_k)(\mathbf{x}_i - \hat{\boldsymbol{\mu}}_k)^\top}{\sum_{i=1}^N r_{ik}}
  $$
- the updated mixing weights are
  $$
    \hat \pi_k = \frac1N\sum_{i=1}^N r_{ik}
  $$

In [66]:
def update_params(model: nn.Module, log_resp: torch.Tensor, data: torch.Tensor):
    resp = torch.exp(log_resp)
    N, Dim = data.shape

    prior = torch.sum(resp, dim=1) / N
    mus = []
    sigmas = []
    for k in range(model.K):
        mu = torch.sum(resp[k,:]*data.T, dim=1) / torch.sum(resp[k,:])
        mus.append(mu)

        diff = data - mu
        sigma = torch.matmul((resp[k,:] * diff.T), diff) / torch.sum(resp[k, :])
        sigmas.append(sigma)
    model.set_params(prior=prior, mus=mus, sigmas=sigmas)

In [41]:
def initialize_gmm(data: np.ndarray, K: int) -> nn.Module:
    N, D = data.shape
    model = GaussianMixtureModel(dim=D, K=K, use_softmax=True)
    return model

def fit_gmm(model: nn.Module, data: np.ndarray, limit: int = 10000, delta: float = 1e-10):
    model.train()
    data = torch.tensor(data).to(torch.float32)
    loss_history = []
    for i in range(limit):
        log_resp = log_responsibilities(model, data)
        update_params(model, log_resp, data)
        loss = -torch.mean(model.log_likelihood(data))
        loss_history.append(loss.item())

        if i % 100 == 0:
            print(f"Iteration {i}, Loss: {loss.item()}")
        if i > 0 and abs(loss_history[-1] - loss_history[-2]) < delta:
            break

    return loss_history

# Extracting features from the images

In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [12]:
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
])

In [13]:
data_paths = {
    "bombed": "data/bombed",
    "not_bombed": "data/not bombed"
}

In [14]:
def load_dataloader(path):
    dataset = datasets.ImageFolder(root=os.path.dirname(path), transform=transform)
    class_name = os.path.basename(path)
    class_idx = dataset.class_to_idx[class_name]
    indices = [i for i, (_, y) in enumerate(dataset.samples) if y == class_idx]
    subset = torch.utils.data.Subset(dataset, indices)
    return DataLoader(subset, batch_size=16, shuffle=True)

In [15]:
bombed_loader = load_dataloader(data_paths["bombed"])
not_bombed_loader = load_dataloader(data_paths["not_bombed"])

In [18]:
resnet = torchvision.models.resnet18(pretrained=True)
resnet.fc = nn.Identity()
resnet.eval()
resnet.to("cuda" if torch.cuda.is_available() else "cpu")

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 169MB/s]


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [19]:
def extract_features(model, dataloader, device='cpu', max_batches=None):
    features_list = []
    labels_list = []
    with torch.no_grad():
        for i, (images, labels) in enumerate(dataloader):
            images = images.to(device)
            feats = model(images)
            features_list.append(feats.cpu())
            labels_list.append(labels)
            if max_batches and i >= max_batches:
                break
    return torch.cat(features_list), torch.cat(labels_list)

features, labels = extract_features(resnet, bombed_loader, device=device, max_batches=100)

In [67]:
gmm = initialize_gmm(features.numpy(), K=4)
loss_history = fit_gmm(gmm, features.numpy())
cur_mus, cur_sigmas = gmm.get_gauss_params()
plot_gmm_data(features.numpy(), cur_mus, cur_sigmas)

  self.prior_logits = nn.Parameter(torch.tensor(prior, dtype=torch.float32))
  gaussian.mu.data = torch.tensor(mus[i], dtype=torch.float32)


LinAlgError: Matrix is not positive definite