In [12]:
import torch
from torch import nn
import torch.nn.functional as F

import pytorch_lightning as pl
from pytorch_lightning import LightningDataModule
import torchmetrics


from sklearn.metrics import accuracy_score

import os
import cv2
import numpy as np

In [13]:
from sklearn.model_selection import train_test_split


In [14]:
import pandas as pd
import numpy as np
from pathlib import Path
import torch
from torch.utils.data import Subset, DataLoader, Dataset
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import random_split
import os
import cv2

import pytorch_lightning as pl

from pytorch_lightning import LightningDataModule

In [15]:
from src.baseline import BaselineClf

In [16]:
class SIFTClf(pl.LightningModule):
    def __init__(self, in_channels=3):
        super().__init__()
        self.accuracy = torchmetrics.Accuracy()
        self.in_channels = in_channels
        self.model = nn.Sequential(
            nn.Conv2d(self.in_channels, 16, kernel_size=3, stride=2, padding=1, bias=False), # 64x64
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1, bias=False), # 32x32
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1, bias=False), # 16x16
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1, bias=False), # 8x8
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1, bias=False), # 4x4
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1, bias=False), # 2x2
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(), 
            nn.Linear(512, 64, bias=False),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, 2),
            nn.Softmax(dim=-1),
        )
        
    def forward(self, x):
        x = self.model(x)
        return x

    def training_step(self, batch, batch_idx):
        images, labels = batch 
        labels = torch.squeeze(labels)
        out = self(images)         
        loss = F.cross_entropy(out, labels) 
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)
    
    def validation_step(self, batch, batch_idx):
        images, labels = batch 
        labels = torch.squeeze(labels)
        out = self(images)                    
        loss = F.cross_entropy(out, labels)   
        _, preds = torch.max(out, dim=1)
        self.accuracy(preds, labels)
        self.log('Validation Accuracy: ', self.accuracy)

    def test_step(self, batch, batch_idx):
        images, labels = batch 
        labels = torch.squeeze(labels)
        out = self(images)
        loss = F.cross_entropy(out, labels)   
        _, preds = torch.max(out, dim=1)
        self.accuracy(preds, labels)
        self.log('Test Accuracy: ', self.accuracy)      


In [17]:
class SIFTDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return self.data.__len__()

    def __getitem__(self, idx):
        label, feature = self.data[idx]
        feature = np.expand_dims(feature, axis=0)
        return torch.tensor(feature, dtype=torch.float), torch.tensor(label, dtype=torch.long)

In [18]:
class SIFTDataModule(LightningDataModule):
    def __init__(self, batch_size=32, num_workers=0):
        super().__init__()
        self.batch_size = batch_size
        self.num_workers = num_workers

    def setup(self, stage=None):
        print("SIFT DataModule: setup...")
        data_path = "../data/images"

        data = []
        for file in os.listdir(data_path + "/yes"):
            img = cv2.imread(data_path + "/yes/" + file)
            sift = cv2.SIFT_create()
            kp, desc = sift.detectAndCompute(img, None)
            features = np.resize(desc, (256, 256))
            label = np.array([1])
            data.append([label, features])

        for file in os.listdir(data_path + "/no"):
            img = cv2.imread(data_path + "/no/" + file)
            kp, desc = sift.detectAndCompute(img, None)
            features = np.resize(desc, (256, 256))
            label = np.array([0])
            data.append([label, features])

        train, test = train_test_split(data, test_size= 0.1)
        train, val = train_test_split(train, test_size=0.11)

        self.train_set = SIFTDataset(train)
        self.test_set = SIFTDataset(test)
        self.val_set = SIFTDataset(val)

        print("SIFT Datamodule: ...complete!")

    def train_dataloader(self):
        return DataLoader(self.train_set, batch_size=self.batch_size, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.val_set, batch_size=self.batch_size, num_workers=self.num_workers)

    def test_dataloader(self):
        return DataLoader(self.test_set, batch_size=self.batch_size, num_workers=self.num_workers)

In [19]:
train_loader = SIFTDataModule()
trainer = pl.Trainer()
model = SIFTClf(in_channels=1)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [20]:
trainer.fit(model, train_dataloaders=train_loader)

SIFT DataModule: setup...



  | Name     | Type       | Params
----------------------------------------
0 | accuracy | Accuracy   | 0     
1 | model    | Sequential | 1.6 M 
----------------------------------------
1.6 M     Trainable params
0         Non-trainable params
1.6 M     Total params
6.426     Total estimated model params size (MB)


SIFT Datamodule: ...complete!
Epoch 22:   0%|          | 0/9 [00:00<?, ?it/s, loss=0.482, v_num=64]        