[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/salmanaff/gambar-outlier/blob/main/deteksi-gambar-outlier.ipynb)

# Codes

## General

In [None]:
!pip install -q glasbey
import glasbey

import re
import os
from tqdm.notebook import trange, tqdm

import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual
from IPython.display import display, clear_output
from traitlets import traitlets

class LoadedButton(widgets.Button):
    """A button that can holds a value as a attribute."""
    def __init__(self, value=None, *args, **kwargs):
        super(LoadedButton, self).__init__(*args, **kwargs)
        # Create the value attribute.
        self.add_traits(value=traitlets.Any(value))

import torch
from torch import nn, optim
import torch.nn.functional as F
import torchvision.transforms as t
from torchvision import models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

model_dir = 'models'
data_dir = 'dataset'
try:
  import google.colab
  os.mkdir(model_dir)
  os.mkdir(data_dir)
except:
  IN_COLAB = False

def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
device = get_default_device()

## Dataset

In [None]:
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl:
            yield to_device(b, self.device)
    def __len__(self):
        """Number of batches"""
        return len(self.dl)

## Model

In [None]:
def gen_models():
    classification_models_name = []
    for name in dir(models):
        if name.islower() and (name.startswith('resnet')
                              #  name.startswith('densenet') or
                              #  name.startswith('inception') or
                              #  name.startswith('mobilenet') or
                              #  name.startswith('googlenet') or
                              #  name.startswith('vgg')
                              ):
            classification_models_name.append(name)
    return classification_models_name
model_list = gen_models()

def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        "calculate loss for a batch of training data"
        images, labels = batch
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    def validation_step(self, batch):
        "calculate loss & accuracy for a batch of validation data"
        images, labels = batch
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    def epoch_end(self, epoch, result):
        print("Epoch [{}], train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['train_loss'], result['val_loss'], result['val_acc']))

class base_net(ImageClassificationBase):
    def __init__(self, model, num_classes):
        super().__init__()
        # Use a pretrained model
        self.network = model
        # Replace last layer
        self.network.fc = nn.Linear(self.network.fc.in_features, num_classes)
    def forward(self, xb):
        return self.network(xb)

## Training

In [None]:
optim_list = {
    'SGD': optim.SGD,
    'RMSprop': optim.RMSprop,
    'Adagrad': optim.Adagrad,
    'Adam': optim.Adam,
    'AdamW': optim.AdamW,
}

class EarlyStopping:
    def __init__(self, patience=5, verbose=False, restore_best_weights=True):
        self.patience = patience
        self.verbose = verbose
        self.best_loss = float('inf')
        self.counter = 0
        self.early_stop = False
        self.restore_best_weights = restore_best_weights
        self.best_model_weights = None
    def __call__(self, val_loss, model):
        if val_loss < self.best_loss:
            if self.verbose:
                print(f'Validation loss improved by: {self.best_loss-val_loss:.4f}')
            self.best_loss = val_loss
            self.counter = 0
            if self.restore_best_weights:
                self.best_model_weights = model.state_dict().copy()
        else:
            self.counter += 1
            if self.verbose:
                print(f'Validation loss did not improve: {val_loss:.4f}')
            if self.counter >= self.patience:
                self.early_stop = True
                if self.restore_best_weights and self.best_model_weights is not None:
                    model.load_state_dict(self.best_model_weights)
                    if self.verbose:
                        print("Restored best model weights.")

@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_fe(epochs, lr, model, train_loader, opt_func=torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr)
    early_stopping = EarlyStopping(patience=3, verbose=True)
    for epoch in range(epochs):
        model.train()
        train_losses = []
        for batch in tqdm(train_loader, desc=f"Epoch {epoch}", leave=False):
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        train_loss = torch.stack(train_losses).mean().item()
        print(f"Epoch [{epoch}], train loss: {train_loss}")
        history.append(train_loss)
        early_stopping(train_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break
    return history

def train(b):
  DATA_TRANSFORM = t.Compose([t.Resize(image_dim.value),
                                t.ToTensor()])
  DATA_DS = ImageFolder(folder_path.value, DATA_TRANSFORM)
  TRAIN_DL = DataLoader(DATA_DS,
                        batch_size.value,
                        shuffle=True,
                        num_workers=num_workers.value)
  TRAIN_DL = DeviceDataLoader(TRAIN_DL, device)
  trained = base_net(models.get_model(model_dd.value), len(DATA_DS.classes))
  trained = to_device(trained, device)

  history = []
  with output3:
    clear_output(wait=True)
    history += fit_fe(epoch.value,
                      lr.value,
                      trained,
                      TRAIN_DL,
                      optim_list[optimizer.value])
  b.value = {
      'data': DATA_DS,
      'history': history,
      'model': trained
  }

## UMAP

In [None]:
from io import BytesIO
from PIL import Image
import pandas as pd
import base64
import umap

from bokeh.io import show, output_notebook
from bokeh.plotting import figure
from bokeh.models import HoverTool, ColumnDataSource, CategoricalColorMapper
from bokeh.palettes import Spectral10

output_notebook()

def get_embedding(model, dl, device):
    """extract feature embeddings from image"""

    # Remove classification layer
    model_fe = torch.nn.Sequential(*(list(model.children())[:-1]))
    to_device(model_fe, device)

    embeds = []
    classes = []
    # Iterate through batches
    progress_bar = tqdm(total=len(dl))
    with torch.no_grad():
       for batch in dl:
           images, labels = batch
           outputs = model_fe(images)
           classes.append(labels)
           embeds.append(outputs)
           progress_bar.update(1)
    progress_bar.close()

    embed = [embed.squeeze(dim=2).squeeze(dim=2).cpu() for embed in embeds]
    embed = torch.cat(embed, dim=0).tolist()
    classes = torch.cat(classes, dim=0).tolist()
    return embed, classes

def embeddable_image(data):
  img = Image.open(data[0])
  img.thumbnail((64, 64), Image.Resampling.BICUBIC)
  buffer = BytesIO()
  img.save(buffer, format='png')
  for_encoding = buffer.getvalue()
  return 'data:image/png;base64,' + base64.b64encode(for_encoding).decode()

def create_df(b):
  DATA_DS = train_btn.value['data']
  TEST_DL = DataLoader(DATA_DS, batch_size.value, shuffle=False, num_workers=num_workers.value)
  TEST_DL = DeviceDataLoader(TEST_DL, device)

  model = train_btn.value['model']
  model.eval()

  with output4:
    print("Getting image embedding:")
    embed, y = get_embedding(model.network, TEST_DL, device)

    print("\nCreating UMAP embedding:")
    reducer = umap.UMAP(verbose=True)
    reducer.fit(embed)
    embedding = reducer.transform(embed)
    clear_output(wait=True)

  images_df = pd.DataFrame(embedding, columns=('x', 'y'))
  images_df['label'] = [DATA_DS.classes[x] for x in DATA_DS.targets]
  images_df['name'] = [path.split("/")[-1].split('.')[0] for path, _ in DATA_DS.imgs]
  images_df['image'] = list(map(embeddable_image, DATA_DS.imgs))

  cmap = glasbey.create_palette(palette_size=len(DATA_DS.classes))
  images_df['color'] = [cmap[x] for x in DATA_DS.targets]
  b.value = images_df
  iqr_dd.options = DATA_DS.classes
  iqr_dd.value = DATA_DS.classes[0]

def plot_umap(b):
  df = embed_btn.value
  color_mapping = CategoricalColorMapper(
      factors=df.label.unique(),
      palette=df.color.unique()
  )
  p = figure(
      width=800,
      height=600,
      title="UMAP Scatter Plot",
      toolbar_location="above",
      x_axis_label = 'FEATURE 1',
      y_axis_label = 'FEATURE 2',
      tools=('box_zoom, pan, wheel_zoom, reset, save')
  )
  p.add_tools(HoverTool(tooltips="""
  <div>
      <div >
          <img src='@image' style='display: block; margin: 3px auto 0px'/>
      </div>
      <div style='text-align: center; margin-bottom: 2px'>
          <span style='font-size: 14px'>@name</span>
      </div>
  </div>
  """))

  for labs in df.label.unique():
      new_df = df[df.label==labs]
      datasource = ColumnDataSource(new_df)
      p.scatter('x', 'y',
                source=datasource,
                legend_label=labs,
                color=dict(field='label', transform=color_mapping),
                line_alpha=0,
                fill_alpha=0.5,
                size=10)
  p.legend.title = "Classess"
  p.legend.location = "top_right"
  p.legend.click_policy="hide"

  show(p)

## IQR

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
%matplotlib inline

def plot_iqr(df, label, color):
  fig, axs=plt.subplots(2,2,figsize=(8,6), gridspec_kw={'hspace': .01,
                                                        'wspace': .01,
                                                        'width_ratios': [5, 1],
                                                        'height_ratios': [1, 5]})
  axs[0,0].axis("off")
  axs[0,1].axis("off")
  axs[1,1].axis("off")

  sns.boxplot(data=df, x='x', y='label', ax=axs[0,0], orient='h', legend=False, color=color)
  sns.boxplot(data=df, y='y', x='label', ax=axs[1,1], orient='v', legend=False, color=color)
  sns.scatterplot(data=df, x="x", y="y", ax=axs[1,0], color=color)

  fig.suptitle(f"Scatter and Box Plot for {label}")
  axs[1,0].set_xlabel("Feature 1")
  axs[1,0].set_ylabel("Feature 2")
  plt.show()

In [None]:
def dynamic_plot(total, Cols=5):
    '''get column and row length to fit all'''
    Rows = total // Cols
    if total % Cols != 0:
        Rows += 1
    return Rows, Cols

def decode_image(img):
  img = base64.b64decode(img)
  img = BytesIO(img)
  img = Image.open(img)
  return img

def plot_images(outlier, name, n):
  max=10
  scale=1

  if n==1:
    fig, axes = plt.subplots(1, n, figsize=(n*scale,1*scale+.5))
    img = decode_image(outlier['image'][0].split(',')[-1])
    axes.imshow(img)
    axes.set_xlabel(outlier['name'][0])
    plt.setp(axes, xticks=[], yticks=[])
    plt.suptitle(f"{name} outlier")
    plt.show()
    return

  if n<max:
    fig, axes = plt.subplots(1, n, figsize=(n*scale,1*scale+.5))
  else:
    col, row = dynamic_plot(n, max)
    fig, axes = plt.subplots(col, row, figsize=(row*scale,col*scale))

  plt.setp(axes, xticks=[], yticks=[])
  for i, ax in enumerate(axes.flatten()):
    if i>=n:
      ax.axis("off")
    else:
      img = decode_image(outlier['image'][i].split(',')[-1])
      ax.set_xlabel(outlier['name'][i])
      ax.imshow(img)
  plt.suptitle(f"{name} outlier")
  plt.tight_layout()
  plt.show()

In [None]:
def f(x):
  df = embed_btn.value
  df = df[df.label==x.new].reset_index()
  embed = df[['x', 'y']]

  Q1, Q3 = np.percentile(embed, [25, 75], axis=0)
  IQR = Q3-Q1
  LB = Q1-1.5*IQR
  UB = Q3+1.5*IQR
  outlier = np.unique(np.where((embed<=LB) | (embed>=UB))[0])
  n = len(outlier)
  outlier = df.iloc[outlier].reset_index()

  with output6:
    embed = df[['x', 'y']]
    plot_iqr(df, x.new, df['color'][0])
    clear_output(wait=True)

  with output7:
    if n==0:
      print("No outlier found!")
    else:
      plot_images(outlier, x.new, n)
    clear_output(wait=True)

## UI bits

In [None]:
link = widgets.Text(placeholder='https://example.com/dataset.zip')
dl_btn = LoadedButton(description="Download", value=[])

def downloader(b):
  file = re.split('/', link.value)[-1]
  with output1:
    if file not in os.listdir():
      !wget {link.value}
      clear_output(wait=False)
      print(f"Downloaded {file} from {link.value}")
      !unzip -qq -o {file} -d "/content/dataset"
      print(f"Unzipped {file} to '/content/dataset'")
      folder_path.options = [f"{data_dir}/{f}" for f in os.listdir(data_dir)]
      folder_path.value = f"{data_dir}/{os.listdir(data_dir)[0]}"
    else:
      print("file already uploaded")

dl_btn.on_click(downloader)

title1 = widgets.HTML(value="<h1>Download dataset</h1>")
ui1 = widgets.HBox([link, dl_btn])

In [None]:
dataset = [f"{data_dir}/{f}" for f in os.listdir(data_dir)]
folder_path = widgets.Dropdown(description='Image folder:', options=dataset)
image_dim = widgets.IntText(value=224, description='Image dim:')
batch_size = widgets.IntText(value=32, description='Batch size:')
num_workers = widgets.IntText(value=2, description='Num worker:')

title2 = widgets.HTML(value="<h1>Dataset options</h1>")
ui2 = widgets.VBox([title2, folder_path, image_dim, batch_size, num_workers])

In [None]:
model_dd = widgets.Dropdown(options=model_list,
                            value='resnet18',
                            description='Model:')
epoch = widgets.IntText(value=2, description='Epoch:')
lr = widgets.FloatText(value=0.00001, description='Learn Rate:')
optimizer = widgets.Dropdown(options=list(optim_list.keys()),
                             value='Adam',
                             description='Optimizer:',)

train_btn = LoadedButton(description="Train")
train_btn.on_click(train)

title3 = widgets.HTML(value="<h1>Training options</h1>")
ui3 = widgets.VBox([title3, model_dd, epoch, lr, optimizer, train_btn])
output3 = widgets.Output()

# User Interface

In [None]:
# @title
output1 = widgets.Output()
display(title1, ui1, output1)
ui23 = widgets.VBox([ui2, ui3])
display(ui23)

In [None]:
# @title
output3 = widgets.Output()
widgets.Accordion([output3])

In [None]:
# @title
embed_btn = LoadedButton(description="Get embed")
embed_btn.on_click(create_df)

title4 = widgets.HTML(value="<h1>Outlier Detection</h1>")
output4 = widgets.Output()
display(title4, embed_btn, output4)

In [None]:
# @title
umap_btn = LoadedButton(description="Plot UMAP")
umap_btn.on_click(plot_umap)

display(umap_btn)

In [None]:
# @title
iqr_dd = widgets.Dropdown(options=[])
iqr_dd.observe(f, names='value')

output6 = widgets.Output()
display(iqr_dd, output6)

In [None]:
# @title
output7 = widgets.Output()
display(output7)