In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
from PIL import Image
from tempfile import TemporaryDirectory
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, Sampler, Subset, random_split
import cv2
import torch.nn.functional as F
import segmentation_models_pytorch as smp
import random
from sklearn.decomposition import PCA
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier


cudnn.benchmark = True
plt.ion()


ModuleNotFoundError: No module named 'segmentation_models_pytorch'

In [3]:
!ls

data.zip  sample_data


In [4]:
!unzip data.zip
%ls

Archive:  data.zip
  End-of-central-directory signature not found.  Either this file is not
  a zipfile, or it constitutes one disk of a multi-part archive.  In the
  latter case the central directory and zipfile comment will be found on
  the last disk(s) of this archive.
unzip:  cannot find zipfile directory in one of data.zip or
        data.zip.zip, and cannot find data.zip.ZIP, period.
data.zip  [0m[01;34msample_data[0m/


In [5]:
class ImageDataset(Dataset):
    def __init__(self, dataset_dir, train=False):
        self.data_transforms = {
                            'train': transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Resize((256, 256)),
                                # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                            ]),
                            'val': transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Resize((256, 256)),
                                # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                            ]),
                        }
        self.dataset_dir = dataset_dir

        self.labels = []
        self.raw_images = []
        self.train = train

        for dirpath, dirnames, filenames in os.walk(dataset_dir):
            for fn in filenames:
                label = 0 if 'normal' in dirpath else 1
                raw_img = cv2.imread(f'{dirpath}/{fn}')
                self.labels.append(label)
                self.raw_images.append(raw_img)

        self.train_images = [self.data_transforms['train'](img) for img in self.raw_images]
        self.test_images = [self.data_transforms['val'](img) for img in self.raw_images]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        if self.train:
            return self.train_images[idx], self.labels[idx]
        else:
            return self.test_images[idx], self.labels[idx]

In [6]:
data_dir = 'data/'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [7]:
n_images = 0
for dirpath, dirnames, filenames in os.walk(data_dir):
    n_images += len(filenames)

In [8]:
n_images

0

In [9]:
image_idxs = np.arange(n_images)

In [10]:
dataset = ImageDataset(data_dir)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

ValueError: num_samples should be a positive integer value, but got num_samples=0

In [None]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

In [None]:
class_names = {0: 'normal', 1: 'phishing'}

In [None]:
device

In [None]:
def imshow(inp, title=None):

    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)

    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)

inputs, classes = next(iter(dataloader))

out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x.item()] for x in classes])

In [None]:
class Encoder(nn.Module):
    def __init__(self, image_size, channels, embedding_dim):
        super(Encoder, self).__init__()

        self.channels = channels
        self.image_size = image_size
        self.embedding_dim = embedding_dim

        self.conv1 = nn.Conv2d(channels, 32, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)

        self.shape_before_flattening = self.calculate_shape()
        flattened_size = (image_size // 8) * (image_size // 8) * 128
        self.fc = nn.Linear(flattened_size, embedding_dim)

    @torch.no_grad()
    def calculate_shape(self):
        x = torch.zeros((1, self.channels, self.image_size, self.image_size))
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return x.shape[1:]

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        return x

In [None]:
class Decoder(nn.Module):
    def __init__(self, embedding_dim, shape_before_flattening, channels):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(embedding_dim, np.prod(shape_before_flattening))
        self.reshape_dim = shape_before_flattening

        self.deconv1 = nn.ConvTranspose2d(
            128, 128, kernel_size=3, stride=2, padding=1, output_padding=1
        )
        self.deconv2 = nn.ConvTranspose2d(
            128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
        )
        self.deconv3 = nn.ConvTranspose2d(
            64, 32, kernel_size=3, stride=2, padding=1, output_padding=1
        )

        self.conv1 = nn.Conv2d(32, channels, kernel_size=3, stride=1, padding=1)


    def forward(self, x):
        x = x.view(x.size(0), *self.reshape_dim)
        x = F.relu(self.deconv1(x))
        x = F.relu(self.deconv2(x))
        x = F.relu(self.deconv3(x))
        x = torch.sigmoid(self.conv1(x))
        return x

In [None]:
class ConvAutoencoder(nn.Module):
    def __init__(self, image_size, channels, embedding_dim, n_classes=1):
        super(ConvAutoencoder, self).__init__()
        self.encoder = Encoder(image_size, channels, embedding_dim)
        self.decoder = Decoder(embedding_dim, self.encoder.shape_before_flattening, channels)
        self.classifier = nn.Linear(embedding_dim, n_classes)

    def forward(self, x):
        embeddings = self.encoder(x)
        faked = self.decoder(embeddings)
        return faked

    def get_embeddings(self, x):
        embeddings = self.encoder(x)
        return embeddings


In [None]:
def train_autoencoder(num_epochs=25, model=None):
    since = time.time()
    sigm = nn.Sigmoid()

    train_losses = []
    val_losses = []

    best_model_params_path = os.path.join('.', 'autoencoder_best_model_params.pt')

    best_loss = 1000

    img_shape = dataset[0][0].shape
    if model is None:
        model = ConvAutoencoder(img_shape[1], img_shape[0], 1, 1)
    model = model.to(device)

    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)

    warmup_epochs = 5
    warmup_scheduler = optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda epoch: epoch / warmup_epochs)

    main_scheduler = lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

    for epoch in range(num_epochs):
        print(f'Epoch: {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                loader = train_loader
                model.train()
            else:
                loader = test_loader
                model.eval()

            running_loss = 0.0

            for inputs, labels in loader:
                inputs = inputs.to(device)
                labels = torch.unsqueeze(labels, 1).type('torch.FloatTensor').to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, inputs)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)

            if phase == 'train':
                fold_train_loss = running_loss / train_size
                train_losses.append(fold_train_loss)
                print(f'\t\ttrain loss: {fold_train_loss}')
            else:
                fold_val_loss = running_loss / test_size
                val_losses.append(fold_val_loss)
                print(f'\t\tval loss: {fold_val_loss}')

        if epoch < warmup_epochs:
            warmup_scheduler.step()
        else:
            main_scheduler.step()

        if val_losses[epoch] < best_loss:
            best_loss = val_losses[epoch]
            torch.save(model.state_dict(), best_model_params_path)

    torch.save(model.state_dict(), best_model_params_path)

    time_elapsed = time.time() - since
    print()
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best mean val loss: {best_loss:.4f}')

    model.load_state_dict(torch.load(best_model_params_path))

    train_losses = np.array(train_losses)
    val_losses = np.array(val_losses)

    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.show()

    return model

In [None]:
huge_autoencoder = smp.Unet(
    encoder_name="efficientnet-b2",
    encoder_weights="imagenet",
    in_channels=3,
    classes=3,
)

In [None]:
last_autoencoder = train_autoencoder(100)

In [None]:
img_shape = dataset[0][0].shape
autoencoder  = ConvAutoencoder(img_shape[1], img_shape[0], 1, 1)
autoencoder.load_state_dict(torch.load('autoencoder_best_model_params.pt'))
autoencoder  = autoencoder .to(device)

In [None]:
def plot_random_autoencoder(ds : Dataset, autoenc):
    idx = random.randint(0, len(ds)-1)
    image, label = ds[idx]
    x = image.unsqueeze(0).to(device)

    autoenc.eval()
    with torch.no_grad():
        restored_image = autoenc(x).squeeze(0).cpu().detach().numpy()

    fig, axs = plt.subplots(ncols=2, figsize=(8, 8), layout='tight')

    image = np.swapaxes(image, 0, -1)
    restored_image = np.swapaxes(restored_image, -1, 0)

    axs[0].imshow(image)
    axs[0].set_title('Original image')
    axs[1].imshow(restored_image)
    axs[1].set_title('Faked image')

In [None]:
plot_random_autoencoder(test_dataset, last_autoencoder)

getting embeddings

In [None]:
def get_embeddings(ds : Dataset, autoenc, idxs=None):
    embeddings = []
    labels = []

    if idxs is None:
        for image, label in ds:
            labels.append(label)

            x = image.unsqueeze(0).to(device)
            embed = autoenc.encoder(x).squeeze(0).cpu().detach().numpy()
            embeddings.append(embed)
    else:
        for i in idxs:
            image, label = ds[i]
            labels.append(label)

            x = image.unsqueeze(0).to(device)
            embed = autoenc.encoder(x).squeeze(0).cpu().detach().numpy()
            embeddings.append(embed)
    return embeddings, labels

In [None]:
embeddings, labels = get_embeddings(test_dataset, autoencoder)

In [None]:
pca = PCA(n_components=2)

In [None]:
embed_matrix = np.array(embeddings)
labels = np.array(labels)

pca_matrix = pca.fit_transform(embed_matrix)

In [None]:
fake_examples = np.where(labels == 1)[0]
normal_examples = np.where(labels == 0)[0]

plt.scatter(pca_matrix[fake_examples, 0], pca_matrix[fake_examples, 1], color='red', label='fake')
plt.scatter(pca_matrix[normal_examples, 0], pca_matrix[normal_examples, 1], color='blue', label='normal')
plt.legend()
plt.title('Train embeddings Normal/Fake')

In [None]:
fishing_idxs = []
true_idxs = []

In [None]:
for i, (_, label) in enumerate(train_dataset):
    if label == 0:
        true_idxs.append(i)
    else:
        fishing_idxs.append(i)

In [None]:
fishing_idxs = np.array(fishing_idxs)
true_idxs = np.array(true_idxs)

In [None]:
def get_single_embedding(ds : Dataset, autoenc, idx=None):
    if idx is None:
        idx = random.randint(0, len(ds)-1)
    image, label = ds[idx]
    x = image.unsqueeze(0).to(device)
    embed = autoenc.encoder(x).squeeze(0).cpu().detach().numpy()
    faked_image = autoenc(x).squeeze(0).cpu().detach().numpy()
    return image, faked_image, embed, label

def cos_distance(v1, v2):
    return 1 - (np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))

def analyse_embeds(ds : dataset, autoenc, relative_idx, k=30):
    global true_idxs

    image, restored_image, fake_embed, label = get_single_embedding(ds, autoenc, relative_idx)
    all_embeddings, labels = get_embeddings(ds, autoenc)

    distances = np.zeros(len(all_embeddings))
    for i, v in enumerate(all_embeddings):
        distances[i] = cos_distance(v, fake_embed)

    sorted_idxs = np.argsort(distances)
    distances = distances[sorted_idxs]

    labels = np.array(labels)[sorted_idxs]
    print('Distances', distances)
    print()
    print('True Labels', labels)

    pca = PCA(n_components=2)
    embed_matrix = np.array(all_embeddings)
    labels = np.array(labels)
    pca_matrix = pca.fit_transform(embed_matrix)

    fig, axs = plt.subplots(ncols=2, figsize=(16, 10), layout='tight')

    image = np.swapaxes(image, 0, -1)
    restored_image = np.swapaxes(restored_image, -1, 0)

    axs[0].imshow(image)
    axs[0].set_title('Original image')

    fake_examples = np.where(labels == 1)[0]
    normal_examples = np.where(labels == 0)[0]

    axs[1].scatter(pca_matrix[fake_examples, 0], pca_matrix[fake_examples, 1], color='red', label='fake')
    axs[1].scatter(pca_matrix[normal_examples, 0], pca_matrix[normal_examples, 1], color='blue', label='normal')

    top_k = sorted_idxs[-1:-k-1:-1]
    coords = pca_matrix[top_k, :]
    for i, p1 in enumerate(coords):
        p2 = pca_matrix[relative_idx, 0], pca_matrix[relative_idx, 1]
        axs[1].plot([p1[0], p2[0]], [p1[1], p2[1]], '--', color='gray')
        axs[1].text((p1[0]+p2[0])/2, (p1[1]+p2[1])/2, f'{distances[top_k[i]]:.2f}', color='black')


    axs[1].scatter(pca_matrix[relative_idx, 0], pca_matrix[relative_idx, 1], color='black', label='picked fake')
    axs[1].legend()
    axs[1].set_title('PCA embeddings')

In [None]:
random_fishing_idx = np.random.choice(fishing_idxs)
analyse_embeds(train_dataset, autoencoder, random_fishing_idx)

In [None]:
for _ in range(10):
    random_fishing_idx = np.random.choice(fishing_idxs)
    analyse_embeds(train_dataset, autoencoder, random_fishing_idx)