# Setup

In [None]:
!pip install -q opendatasets

In [None]:
# Kaggle username: hienliu
# Kaggle Key: fafc0c1746c7bbcf0ff2058a5e40bab4
import opendatasets as od
od.download('https://www.kaggle.com/competitions/aptos2019-blindness-detection')

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: hienliu
Your Kaggle Key: ··········
Downloading aptos2019-blindness-detection.zip to ./aptos2019-blindness-detection


100%|██████████| 9.51G/9.51G [01:32<00:00, 111MB/s]



Extracting archive ./aptos2019-blindness-detection/aptos2019-blindness-detection.zip to ./aptos2019-blindness-detection


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import cv2
import torch
import numpy as np
import pandas as pd
from torch import nn
from PIL import Image
from pathlib import Path
from tqdm.auto import tqdm
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchvision.models import EfficientNet_V2_S_Weights, efficientnet_v2_s

In [None]:
# @title Dataset
class Blindness_dataset(Dataset):
  def __init__(self,
               image_folder = None,
               data_csv_path = None,
               hist_feature=[0, 1, 2],
               transform = None,
               image_size=384):

    self.image_folder = Path(image_folder)
    self.data_csv = pd.read_csv(data_csv_path) if isinstance(data_csv_path, str) else data_csv_path
    self.hist_feature = hist_feature
    self.transform = transform
    self.size = image_size

  def __repr__(self) -> str:
    body = f"""Dataset {self.__class__.__name__}
    Number of datapoints: {len(self.data_csv)}
    Image location: {self.image_folder}
    Transform: {self.transform}
    Using Hist Feature: {self.hist_feature}"""
    return body

  @staticmethod
  def crop_image_from_gray(img, tol=7):
      if img.ndim ==2:
        mask = img > tol
        return img[np.ix_(mask.any(1),mask.any(0))]
      elif img.ndim==3:
        gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        mask = gray_img > tol

        check_shape = img[:,:,0][np.ix_(mask.any(1),mask.any(0))].shape[0]
        if (check_shape == 0): # image is too dark so that we crop out everything,
          return img # return original image
        else:
          img1=img[:,:,0][np.ix_(mask.any(1),mask.any(0))]
          img2=img[:,:,1][np.ix_(mask.any(1),mask.any(0))]
          img3=img[:,:,2][np.ix_(mask.any(1),mask.any(0))]
          img = np.stack([img1,img2,img3],axis=-1)
        return img

  @staticmethod
  def TinhHist(img, channel):
      hist = cv2.calcHist(img,[channel], None, [256], [0,256])
      size = img.shape[0]*img.shape[1]
      hist = hist / size
      return hist

  def __len__(self):
      return len(self.data_csv)

  def extract_hist_feature(self, image_path, channels=[0, 1, 2]):

      image = cv2.imread(image_path, 0) if len(channels) == 1 else cv2.imread(image_path)
      image = Blindness_dataset.crop_image_from_gray(image) # Circle crop
      image = cv2.resize(image, (self.size, self.size))

      features = 0
      for channel in channels:
        features += Blindness_dataset.TinhHist(image, channel).flatten()

      return features / len(channels) # Mean

  def __getitem__(self, idx):

      # Load and Map Image
      image_id = self.data_csv.id_code.loc[idx]
      image_path = self.image_folder / (image_id + '.png')
      label = self.data_csv.diagnosis.loc[idx]

      if self.hist_feature:
        image = self.extract_hist_feature(str(image_path), self.hist_feature)
        return image_id, label, image

      image = Blindness_dataset.crop_image_from_gray(cv2.imread(str(image_path)))
      image = Image.fromarray(image, 'RGB')
      sample = {
          'image_id' : image_id,
          'label' : label,
          'image' : self.transform(image),
      }

      return sample

In [None]:
# @title Config
image_folder = '/content/aptos2019-blindness-detection/train_images'
data_csv_path = '/content/aptos2019-blindness-detection/train.csv'
test_size = 0.3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
random_state = 42
batch_size = 256
image_size = 384
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

transform = transforms.Compose([
    transforms.Resize((image_size, image_size), interpolation=transforms.InterpolationMode.BILINEAR),
    transforms.CenterCrop(image_size),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=mean,
        std=std
    )
])

In [None]:
# @title utils
def save_data(path, data, overwrite=False):

  if os.path.exists(path) and overwrite == False:
    print(f'{path} already exist!')
    return

  np.save(path, np.asarray(data, dtype="object"))
  print('Save successfully!')

def save_train_test(train_path, train_data, test_path, test_data):
  save_data(path=train_path, data=train_data)
  save_data(path=test_path, data=test_data)

# Extract Feature

## Extract Histogram Grayscale

In [None]:
data = Blindness_dataset(image_folder=image_folder,
                         data_csv_path=data_csv_path,
                         hist_feature=[0])
data

Dataset Blindness_dataset
    Number of datapoints: 3662
    Image location: /content/aptos2019-blindness-detection/train_images
    Transform: None
    Using Hist Feature: [0]

In [None]:
# Split data
train_data, test_data = train_test_split(data,
                                         test_size=test_size,
                                         random_state=random_state,
                                         shuffle=True,
                                         stratify=data.data_csv.diagnosis.tolist())

# Save data
save_train_test(train_path='/content/drive/MyDrive/UIT/CS/datasets/histogram_grayscale/train_data.npy',
                train_data=train_data,
                test_path='/content/drive/MyDrive/UIT/CS/datasets/histogram_grayscale/test_data.npy',
                test_data=test_data)

## Extract Histogram RGB

In [None]:
data = Blindness_dataset(image_folder=image_folder,
                         data_csv_path=data_csv_path,
                         hist_feature=[0, 1, 2])
data

Dataset Blindness_dataset
    Number of datapoints: 3662
    Image location: /content/aptos2019-blindness-detection/train_images
    Transform: None
    Using Hist Feature: [0, 1, 2]

In [None]:
# Split data
train_data, test_data = train_test_split(data,
                                         test_size=test_size,
                                         random_state=random_state,
                                         shuffle=True,
                                         stratify=data.data_csv.diagnosis.tolist())

# Save data
save_train_test(train_path='/content/drive/MyDrive/UIT/CS/datasets/histogram_rgb/train_data.npy',
                train_data=train_data,
                test_path='/content/drive/MyDrive/UIT/CS/datasets/histogram_rgb/test_data.npy',
                test_data=test_data)

## Extract EfficientNet-V2

In [None]:
data_csv = pd.read_csv('/content/aptos2019-blindness-detection/train.csv')
train_data, test_data = train_test_split(data_csv, test_size=test_size, random_state=random_state)

train_data.reset_index(drop=True, inplace=True)
test_data.reset_index(drop=True, inplace=True)

In [None]:
train_dataset = Blindness_dataset(image_folder=image_folder,
                                  data_csv_path=train_data,
                                  transform=transform,
                                  hist_feature=None)

test_dataset = Blindness_dataset(image_folder=image_folder,
                                data_csv_path=test_data,
                                transform=transform,
                                hist_feature=None)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class Custom_Model(nn.Module):
  def __init__(self, efficient_model):
    super(Custom_Model, self).__init__()
    self.feature_extractor = efficient_model.features
    self.pooler = efficient_model.avgpool
    self.flatten = nn.Flatten()

    self.freeze_weight()

  def freeze_weight(self):
    for param in self.feature_extractor.parameters():
      param.requires_grad = False

  def forward(self, sample, device):
    image = sample['image'].to(device)

    feature = self.feature_extractor(image)
    pooled_feature = self.pooler(feature) # batch_size, 1280, 1, 1
    output = self.flatten(pooled_feature)

    return output

In [None]:
weights = EfficientNet_V2_S_Weights.DEFAULT
efficient_model = efficientnet_v2_s(weights=weights)
model = Custom_Model(efficient_model = efficient_model)

Downloading: "https://download.pytorch.org/models/efficientnet_v2_s-dd5fe13b.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_v2_s-dd5fe13b.pth
100%|██████████| 82.7M/82.7M [00:02<00:00, 39.8MB/s]


In [None]:
def extract_feature(model, dataloader, device):

  res_features = []
  res_labels = []
  res_id = []

  model.to(device)
  model.eval()
  with torch.no_grad():
    for samples in tqdm(dataloader):
      feature = model(samples, device)
      res_features.extend(feature.detach().cpu().numpy())
      res_labels.extend(samples['label'].numpy())
      res_id.extend(samples['image_id'])

  return np.array(res_id), np.array(res_features), np.array(res_labels)

In [None]:
train_id, train_features, train_labels = extract_feature(model, train_dataloader, device)
test_id, test_features, test_labels = extract_feature(model, test_dataloader, device)

  0%|          | 0/11 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s]

In [None]:
np.savez("drive/MyDrive/UIT/CS/datasets/efficientnet_v2/train_data.npz",
         train_id=train_id,
         train_labels=train_labels,
         train_features=train_features)

np.savez("drive/MyDrive/UIT/CS/datasets/efficientnet_v2/test_data.npz",
         test_id=test_id,
         test_labels=test_labels,
         test_features=test_features)