In [1]:
# piptorchvisionll albumentations==0.4.6
from torch.nn import Module, AvgPool2d, MaxPool2d, BatchNorm2d, Conv2d, Linear as L, ReLU as R, Sigmoid as S, Sequential, NLLLoss as nll_loss
from torch.utils.data import DataLoader, Dataset
from torchvision.utils import make_grid
from torch.autograd import Variable
import matplotlib.pyplot as plt
from torch.functional import F
import albumentations as A
import pandas as pd
import numpy as np
import torch
import cv2
import os

### Datatset

In [2]:
class PneumoniaDataset(Dataset):
    
    def __init__(self, path="chest_xray_20210414/",sub_path='train', img_size=256,device='CPU', is_train=False):
        self.path = path
        self.img_size = img_size
        self.sub_path = sub_path
        self.device = device
        self.is_train = is_train
        #-------------------------------------------------------------#
        self.samples = self.sampler()
        self.transforms = self.transformer()
        
    def sampler(self):
        """CHARGEMENT DYNAMIQUE DE TOUTES LES IMAGES: NORMALE | PNEUMONIA"""
        new_path = os.path.join(self.path, self.sub_path)
        return [ (os.path.join(new_path,etat,i), etat) for etat in os.listdir(new_path) for i in os.listdir(new_path+"/"+etat)]
        return [(self.transforms(i),j) for i,j in images]
    
    def transformer(self):
        """TRANSFORMATION DE NOTRE IMAGE AFIN DE REDUIRE LE BIAIS"""
        if self.is_train:
            transform = A.Compose([
                A.CLAHE(),
                A.RandomRotate90(),
                A.DualTransform(), # adding
                A.Transpose(),
                A.Resize(height=self.img_size, width=self.img_size, interpolation=cv2.INTER_AREA), # RESIZE
                A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.50, rotate_limit=45, p=.75),
                A.Blur(blur_limit=3),
                A.OpticalDistortion(),
                A.GridDistortion(),
                A.HueSaturationValue(),
            ])
        else:
            transform = A.Compose([
                A.Resize(height=self.img_size, width=self.img_size, interpolation=cv2.INTER_AREA), # RESIZE
            ])
            
        return transform
    
    def __len__(self):
        return len(self.sampler())
        
    def __getitem__(self, index):
        # GET ONE IMAGE
        #try:
        path_to_img, class_type =  self.samples[index]
        #except Exception as ie:
        #    print("Index n'existe pas; Taille maximale = {}".format(self.__len__()))
        #    path_to_img, class_type =  self.samples[-1]
            
        
        # TANSFORM LABEL
        class_type = 0 if class_type == "NORMAL" else 1
        
        # LOAD IMAGE
        img = cv2.imread(path_to_img)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # RESIZE IMAGE
        img_resize = cv2.resize(img, (self.img_size, self.img_size), interpolation=cv2.INTER_AREA)
        #print(img_resize.shape)
        
        
        # AUGMENT IMAGE
        import random
        random.seed(42) 
        augmented_image = self.transforms(image=img_resize)['image']
        ## CONVERT ARRAY NUMPY TO TENSOR
        img_tensor = torch.from_numpy(augmented_image).float()
        img_tensor = img_tensor.permute(2,0,1)        
        
        #print(img_tensor.size(),label_tensor.size())
        # print(img_tensor.size())
        #print(type(img_tensor))
        
        # SHOW IMAGE
        #print(path_to_img,' ---- ',class_type)
        # plt.axis('off')
        # plt.title("PERSONNE {}".format(str(self.samples[index][1])))
        # plt.imshow(img_tensor)
        
        
        return img_tensor, class_type
    
    
        

In [3]:
train_dataset = PneumoniaDataset(sub_path='train')
test_dataset = PneumoniaDataset(sub_path='test', is_train=True)
val_dataset = PneumoniaDataset(sub_path='val')

In [4]:
len(test_dataset)

624

### Dataloader

In [5]:
def dataload(data, bSize=3, sFle=True, nWkr=1):
    dataloader = DataLoader(
                    data,
                    batch_size=bSize,
                    shuffle=sFle,
                    num_workers=nWkr
                )
    return dataloader

In [6]:
train_dataload = dataload(train_dataset)
test_dataload = dataload(test_dataset, sFle=True)
val_dataload = dataload(val_dataset, bSize=2, sFle=False)

In [7]:
class PneumoniaFakeModel(Module):
    def __init__(self):
        super(PneumoniaFakeModel, self).__init__()
        
        # Image size [3, 3, 256, 256]

        
        self.conv1 = Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)
        """  # Shape = (3,12,256,256)

        self.bn1 = BatchNorm2d(num_features=12)
        # Shape= (3,12,256,256)

        self.relu1 = R()
        # Shape= (3,12,256,256)

        self.pool1 = MaxPool2d(kernel_size=2) # Reduce the image size be factor 2
        # Shape= (3,12,128,128)

        
        self.conv2 = Conv2d(in_channels=12, out_channels=6, kernel_size=2, stride=1, padding=1)
        # Shape= (3,6,128,128)

        self.relu2= R()
        # Shape= (3,6,128,128)
        
        self.pool2 = MaxPool2d(kernel_size=2) # Reduce the image size be factor 2
        # Shape= (3,12,64,64)


        self.conv3 = Conv2d(in_channels=6, out_channels=2, kernel_size=2, stride=1, padding=1)
        # Shape= (3,2,64,64)

        self.bn3 = BatchNorm2d(num_features=2)
        # Shape= (256,2,64,64)

        self.relu3= R()
        # Shape= (256,2,64,64)
        """


        #self.fc = L(in_features=64 * 64 * 2,out_features=2)

        # self.fc1 = L(in_features=200,out_features=100)
        self.fc2 = L(256, 50)
        self.fc3 = L(50, 2)
        self.pool1 = MaxPool2d(kernel_size=2) # Reduce the image size be factor 2
        self.relu2 = R()
        self.sig = S()
        self.bn1 = BatchNorm2d(num_features=12)
        self.pool2 = MaxPool2d(kernel_size=2) # Reduce the image size be factor 2
    
    
    def forward(self, x):
        x = self.conv1(x)
        """x = self.bn1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        
        output = x.view(-1,64*64*2)
        output = self.fc(output)
        print(x.size())"""
        
        
        # x = self.fc1(x)
        x = self.fc2(x)
        x = self.bn1(x)
        x = self.relu2(x)
        
        x = self.sig(x)
        
        x = self.fc3(x)
        x = self.pool1(x)
        
        
        bSz, _, _, _ = x.shape
        x = x.view(bSz, -1)
        
        # print(x.size())
        
        return F.log_softmax(x, dim=1)
        
        # bSz, _, _, _ = x.shape
        # y_pred = x.view(bSz, -1)

        # x = self.fc(y_pred)
        
        return x # F.log_softmax(y_pred, dim=1)


### Modele

In [8]:
myDevice = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(myDevice))

Using cpu device


In [9]:
mm = PneumoniaFakeModel()
mm.to(myDevice)

# [N, C, W, H]
X = torch.randn([2, 3, 256,256])


m = mm(X)
m

tensor([[-7.3328, -7.3406, -7.3769,  ..., -7.2592, -7.3410, -7.3651],
        [-7.2998, -7.3777, -7.3192,  ..., -7.2863, -7.3309, -7.3137]],
       grad_fn=<LogSoftmaxBackward>)

### Train

In [10]:
from torch.optim import Adam, RMSprop
from torch.nn import KLDivLoss, CrossEntropyLoss

In [11]:
modellls = PneumoniaFakeModel()
modellls.to(myDevice)

PneumoniaFakeModel(
  (conv1): Conv2d(3, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc2): Linear(in_features=256, out_features=50, bias=True)
  (fc3): Linear(in_features=50, out_features=2, bias=True)
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (relu2): ReLU()
  (sig): Sigmoid()
  (bn1): BatchNorm2d(12, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
)

In [12]:
optimizer = Adam(modellls.parameters(),lr=0.001, weight_decay=0.0001)
loss_function = CrossEntropyLoss(size_average=0.5)



In [13]:
epochs = 7

import torch
for i in range(4):
    for epoch in range(epochs):
        modellls.train()
        train_accuracy=0.0
        train_loss=0.0
        
        for j, (inputs, labels) in enumerate(test_dataload):
            images = inputs.to(myDevice)
            labels = labels.to(myDevice)

            optimizer.zero_grad()
            
            outputs=modellls(images)
            loss=loss_function(outputs,labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss += loss
            _, prediction = torch.max(outputs.data, dim=1)
            
            train_accuracy += int(torch.sum(prediction==labels.data))
            
        train_accuracy=train_accuracy/624
        train_loss=train_loss/624
        
        print("ITER: {} | EPOCH: {}  | LOSS = {}  | ACC = {}".format(i, epoch, train_loss, train_accuracy))


ITER: 0 | EPOCH: 0  | LOSS = 1.6311827898025513  | ACC = 0.18269230769230768
ITER: 0 | EPOCH: 1  | LOSS = 0.9519787430763245  | ACC = 0.2644230769230769
ITER: 0 | EPOCH: 2  | LOSS = 0.7122829556465149  | ACC = 0.35096153846153844
ITER: 0 | EPOCH: 3  | LOSS = 0.5724525451660156  | ACC = 0.40064102564102566
ITER: 0 | EPOCH: 4  | LOSS = 0.5195545554161072  | ACC = 0.40224358974358976
ITER: 0 | EPOCH: 5  | LOSS = 0.47556474804878235  | ACC = 0.40544871794871795
ITER: 0 | EPOCH: 6  | LOSS = 0.4468417465686798  | ACC = 0.41185897435897434
ITER: 1 | EPOCH: 0  | LOSS = 0.4253743886947632  | ACC = 0.4375
ITER: 1 | EPOCH: 1  | LOSS = 0.4118540287017822  | ACC = 0.4310897435897436
ITER: 1 | EPOCH: 2  | LOSS = 0.4015834331512451  | ACC = 0.45032051282051283
ITER: 1 | EPOCH: 3  | LOSS = 0.38234731554985046  | ACC = 0.4599358974358974
ITER: 1 | EPOCH: 4  | LOSS = 0.3778650164604187  | ACC = 0.48717948717948717
ITER: 1 | EPOCH: 5  | LOSS = 0.3510117530822754  | ACC = 0.5
ITER: 1 | EPOCH: 6  | LOSS = 

### Test