In [1]:
from torch import nn, sqrt
import torch
from math import sqrt
from MBConv import MBConvBlock
from SelfAttention import ScaledDotProductAttention

import copy
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from skimage import io, transform
from sklearn.metrics import confusion_matrix
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision
from torchvision import transforms, utils, models

data_dir = r'/media/nghia/DATA/DATA/Bee/bee_imgs/'
PATH = r'/media/nghia/DATA/DATA/Bee/bee_imgs/bee_data.csv'
img_path = r'/media/nghia/DATA/DATA/Bee/bee_imgs/bee_imgs'

df = pd.read_csv(PATH)

In [2]:
df['health'] = df['health'].map({'healthy': 0,
                                 'few varrao, hive beetles': 1,
                                 'Varroa, Small Hive Beetles': 2,
                                 'ant problems': 3,
                                 'hive being robbed': 4,
                                 'missing queen': 5})

transform = {'train': transforms.Compose([transforms.Resize(256),
                                          transforms.CenterCrop(224),
                                          #torchvision.transforms.ColorJitter(hue=.05, saturation=.05),
                                          #transforms.RandomHorizontalFlip(),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406],
                                                               [0.229, 0.224, 0.225])]),

             'val': transforms.Compose([transforms.Resize(256),
                                        transforms.CenterCrop(224),
                                        transforms.ToTensor(),
                                        transforms.Normalize([0.485, 0.456, 0.406],
                                                             [0.229, 0.224, 0.225])]),

             'test': transforms.Compose([transforms.Resize(256),
                                         transforms.CenterCrop(224),
                                         transforms.ToTensor(),
                                         transforms.Normalize([0.485, 0.456, 0.406],
                                                              [0.229, 0.224, 0.225])])}

# Check for cuda
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [3]:
class HoneyBeeDataset(Dataset):
    # instance attributes
    def __init__(self, df, csv_file, root_dir, transform=None):
        self.data = df
        self.root_dir = root_dir
        self.labels = np.asarray(self.data.iloc[:, 6])
        self.transform = transform

    # length of the dataset passed to this class method
    def __len__(self):
        return len(self.data)

    # get the specific image and labels given the index
    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data.iloc[idx, 0])
        image = Image.open(img_name)
        image = image.convert('RGB')
        image_label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, image_label

In [4]:
dataset = HoneyBeeDataset(df=df,
                          csv_file=PATH,
                          root_dir=img_path)

validation_split = 0.2
te_split = 0.5
dataset_size = len(dataset)
indices = list(range(dataset_size))
np.random.shuffle(indices)
val_split = int(np.floor(validation_split * dataset_size))
test_split = int(np.floor(te_split * val_split))
train_indices = indices[val_split:]
rest_indices = indices[:val_split]
val_indices, test_indices = rest_indices[test_split:], rest_indices[:test_split]

In [5]:
dataset_sizes = {'train': len(train_indices), 'val': len(val_indices), 'test': len(test_indices)}

train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_dataset = HoneyBeeDataset(df=df,
                                csv_file=PATH,
                                root_dir=img_path,
                                transform=transform['train'])

val_dataset = HoneyBeeDataset(df=df,
                              csv_file=PATH,
                              root_dir=img_path,
                              transform=transform['val'])

test_dataset = HoneyBeeDataset(df=df,
                               csv_file=PATH,
                               root_dir=img_path,
                               transform=transform['test'])

In [6]:
dataloaders = {'train': torch.utils.data.DataLoader(train_dataset, batch_size=4, sampler=train_sampler),
               'val': torch.utils.data.DataLoader(val_dataset, batch_size=4, sampler=valid_sampler),
               'test': torch.utils.data.DataLoader(test_dataset, batch_size=1, sampler=test_sampler)}

In [7]:
img, lab = train_dataset.__getitem__(2)
img.shape

torch.Size([3, 224, 224])

In [8]:
def train_model(model, criterion, optimizer, scheduler, num_epochs):
    #copy the best model weights
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print("Epoch: {}/{}".format(epoch, num_epochs-1))
        print("="*10)

        for phase in ['train', 'val']:
            if phase == 'train':
                scheduler.step()
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for data in dataloaders[phase]:
                inputs, labels = data
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase=='train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase=='train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item()*inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

# CoAtNet

In [13]:
class CoAtNet(nn.Module):
    def __init__(self,in_ch,image_size,out_chs=[64,96,192,384,768], output_dim=6):
        super().__init__()
        self.out_chs=out_chs
        self.maxpool2d=nn.MaxPool2d(kernel_size=2,stride=2)
        self.maxpool1d = nn.MaxPool1d(kernel_size=2, stride=2)

        self.s0=nn.Sequential(
            nn.Conv2d(in_ch,in_ch,kernel_size=3,padding=1),
            nn.ReLU(),
            nn.Conv2d(in_ch,in_ch,kernel_size=3,padding=1)
        )
        self.mlp0=nn.Sequential(
            nn.Conv2d(in_ch,out_chs[0],kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(out_chs[0],out_chs[0],kernel_size=1)
        )

        self.s1=MBConvBlock(ksize=3,input_filters=out_chs[0],output_filters=out_chs[0],image_size=image_size//2)
        self.mlp1=nn.Sequential(
            nn.Conv2d(out_chs[0],out_chs[1],kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(out_chs[1],out_chs[1],kernel_size=1)
        )

        self.s2=MBConvBlock(ksize=3,input_filters=out_chs[1],output_filters=out_chs[1],image_size=image_size//4)
        self.mlp2=nn.Sequential(
            nn.Conv2d(out_chs[1],out_chs[2],kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(out_chs[2],out_chs[2],kernel_size=1)
        )

        self.s3=ScaledDotProductAttention(out_chs[2],out_chs[2]//8,out_chs[2]//8,8)
        self.mlp3=nn.Sequential(
            nn.Linear(out_chs[2],out_chs[3]),
            nn.ReLU(),
            nn.Linear(out_chs[3],out_chs[3])
        )

        self.s4=ScaledDotProductAttention(out_chs[3],out_chs[3]//8,out_chs[3]//8,8)
        self.mlp4=nn.Sequential(
            nn.Linear(out_chs[3],out_chs[4]),
            nn.ReLU(),
            nn.Linear(out_chs[4],out_chs[4])
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(14,43008),
            nn.ReLU(inplace=True),
            nn.Linear(43008, output_dim)
        )


    def forward(self, x) :
        B,C,H,W=x.shape
        #stage0
        y=self.mlp0(self.s0(x))
        y=self.maxpool2d(y)
        #stage1
        y=self.mlp1(self.s1(y))
        y=self.maxpool2d(y)
        #stage2
        y=self.mlp2(self.s2(y))
        y=self.maxpool2d(y)
        #stage3
        y=y.reshape(B,self.out_chs[2],-1).permute(0,2,1) #B,N,C
        y=self.mlp3(self.s3(y,y,y))
        y=self.maxpool1d(y.permute(0,2,1)).permute(0,2,1)
        #stage4
        y=self.mlp4(self.s4(y,y,y))
        y=self.maxpool1d(y.permute(0,2,1))
        N=y.shape[-1]
        y=y.reshape(B,self.out_chs[4],int(sqrt(N)),int(sqrt(N)))
        #h = y.view(y.shape[0], -1)
        out = self.classifier(y)

        return out

In [14]:
model = CoAtNet(3,224)
model

CoAtNet(
  (maxpool2d): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (maxpool1d): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (s0): Sequential(
    (0): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
  (mlp0): Sequential(
    (0): Conv2d(3, 64, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
  )
  (s1): MBConvBlock(
    (_depthwise_conv): Conv2dStaticSamePadding(
      64, 64, kernel_size=(3, 3), stride=(1, 1), groups=64, bias=False
      (static_padding): ZeroPad2d((1, 1, 1, 1))
    )
    (_bn1): BatchNorm2d(64, eps=0.01, momentum=0.1, affine=True, track_running_stats=True)
    (_se_reduce): Conv2dStaticSamePadding(
      64, 16, kernel_size=(1, 1), stride=(1, 1)
      (static_padding): Identity()
    )
    (_se_expand): Conv2dStaticSamePadding(
      

In [15]:
model = model.to(device)
# loss function
criterion = nn.CrossEntropyLoss()
# Observe that all parameters are being optimized
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 10 epochs
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [16]:
EPOCHS = 35
# train
model_pre = train_model(model, criterion, optimizer, exp_lr_scheduler, num_epochs=EPOCHS)

Epoch: 0/34




RuntimeError: only batches of spatial targets supported (3D tensors) but got targets of size: : [4]