In [1]:
import random
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchsummary import summary
from torch import optim
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torchvision
from torchvision import transforms, utils
from PIL import Image

In [2]:
FSIZES = [2,3,4,5,6,7,8]
NFILTERS = [2,4,8,16,32]

#Pooling layers
PSIZES = [2,3,4,5]
PTYPE = ['max', 'avg']

#Fully connected layers
NEURONS = [4,8,16,32,64,128]

In [3]:
'''Calcula el tamaño de salida de una capa convolucional'''
def conv_out_size(W, K):
    return W - K + 3

'''Calcula el tamaño de salida de una capa de pooling'''
def pool_out_size(W, K):
    return math.floor((W - K)/2) + 1

In [4]:
class Encoding:
    def __init__(self, minC, maxC, minF, maxF):
        self.n_conv = random.randint(minC, maxC)
        self.n_full = random.randint(minF, maxF)
        
        
        '''First level encoding'''
        self.first_level = []
        
        #Feature extraction part
        for i in range(self.n_conv):
            layer = {'type' : 'conv',
                     'nfilters' : random.choice(NFILTERS),
                     'fsize' : random.choice(FSIZES),
                     'pool' : random.choice(['max', 'avg', 'off']),
                     'psize' : random.choice(PSIZES)
                    }
            self.first_level.append(layer)
        
        #Fully connected part
        for i in range(self.n_full):
            layer = {'type' : 'fc',
                     'neurons' : random.choice(NEURONS)}
            
            self.first_level.append(layer)
        
        
        '''Second level encoding'''
        self.second_level = []
        prev = -1
        for i in range(self.n_conv):
            if prev < 1:
                prev += 1
            if prev >= 1:
                for _ in range(prev-1):
                    self.second_level.append(random.choice([0,1]))
                prev += 1

In [5]:
'''Generando el extractor de caracteristicas (features), el clasificador (classifier), y los tamaños de cada salida (o_sizes)'''
def decoding(encoding):
    n_conv = encoding.n_conv
    n_full = encoding.n_full
    first_level = encoding.first_level
    second_level = encoding.second_level

    features = []
    classifier = []
    in_channels = 1
    out_size = 256
    prev = -1
    pos = 0
    o_sizes = []
    for i in range(n_conv):
        layer = first_level[i]
        n_filters = layer['nfilters']
        f_size = layer['fsize']
        pad = 1
        if f_size > out_size:
            f_size = out_size - 1
        if i == 0 or i == 1:
            if layer['pool'] == 'off':
                operation = [
                    nn.Conv2d(in_channels=in_channels, out_channels=n_filters, kernel_size=f_size, padding=pad),
                    nn.BatchNorm2d(n_filters),
                    nn.ReLU(inplace=True)]
                in_channels = n_filters
                out_size = conv_out_size(out_size, f_size)
                o_sizes.append([out_size, in_channels])

            if layer['pool'] == 'avg':
                p_size = layer['psize']
                if p_size > out_size:
                    p_size = out_size - 1
                operation = [
                    nn.Conv2d(in_channels=in_channels, out_channels=n_filters, kernel_size=f_size, padding=pad),
                    nn.BatchNorm2d(n_filters),
                    nn.ReLU(inplace=True),
                    nn.AvgPool2d(kernel_size=p_size, stride=2)]
                in_channels = n_filters
                out_size = conv_out_size(out_size, f_size)
                out_size = pool_out_size(out_size, p_size)
                o_sizes.append([out_size, in_channels])

            if layer['pool'] == 'max':
                p_size = layer['psize']
                if p_size > out_size:
                    p_size = out_size - 1
                operation = [
                    nn.Conv2d(in_channels=in_channels, out_channels=n_filters, kernel_size=f_size, padding=pad),
                    nn.BatchNorm2d(n_filters),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(kernel_size=p_size, stride=2)]
                in_channels = n_filters
                out_size = conv_out_size(out_size, f_size)
                out_size = pool_out_size(out_size, p_size)
                o_sizes.append([out_size, in_channels])
        else:
            connections = second_level[pos:pos + prev]
            for c in range(len(connections)):
                if connections[c] == 1:
                    in_channels += o_sizes[c][1]

            if layer['pool'] == 'off':
                operation = [
                    nn.Conv2d(in_channels=in_channels, out_channels=n_filters, kernel_size=f_size, padding=pad),
                    nn.BatchNorm2d(n_filters),
                    nn.ReLU(inplace=True)]
                in_channels = n_filters
                out_size = conv_out_size(out_size, f_size)
                o_sizes.append([out_size, in_channels])

            if layer['pool'] == 'avg':
                p_size = layer['psize']
                if p_size > out_size:
                    p_size = out_size - 1
                operation = [
                    nn.Conv2d(in_channels=in_channels, out_channels=n_filters, kernel_size=f_size, padding=pad),
                    nn.BatchNorm2d(n_filters),
                    nn.ReLU(inplace=True),
                    nn.AvgPool2d(kernel_size=p_size, stride=2)]
                in_channels = n_filters
                out_size = conv_out_size(out_size, f_size)
                out_size = pool_out_size(out_size, p_size)
                o_sizes.append([out_size, in_channels])

            if layer['pool'] == 'max':
                p_size = layer['psize']
                if p_size > out_size:
                    p_size = out_size - 1
                operation = [
                    nn.Conv2d(in_channels=in_channels, out_channels=n_filters, kernel_size=f_size, padding=pad),
                    nn.BatchNorm2d(n_filters),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(kernel_size=p_size, stride=2)]
                in_channels = n_filters
                out_size = conv_out_size(out_size, f_size)
                out_size = pool_out_size(out_size, p_size)
                o_sizes.append([out_size, in_channels])

            pos += prev
        prev += 1

        features.append(operation)
    in_size = out_size * out_size * in_channels
    for i in range(n_conv, (n_conv + n_full)):
        layer = first_level[i]
        n_neurons = layer['neurons']
        classifier += [nn.Linear(in_size, n_neurons)]
        classifier += [nn.ReLU(inplace=True)]
        in_size = n_neurons

    ##Last layer generates the last neurons for softmax (change this for binary classification)
    classifier += [nn.Linear(n_neurons, 3)]

    return features, classifier, o_sizes

In [6]:
class CNN(nn.Module):
    def __init__(self, encoding, features, classifier, sizes, init_weights=True):
        super(CNN, self).__init__()
        extraction = []
        for layer in features:
            extraction += layer
        self.extraction = nn.Sequential(*extraction)
        self.classifier = nn.Sequential(*classifier)
        self.features = features
        self.second_level = encoding.second_level
        self.sizes = sizes

    def forward(self, x):
        '''Feature extraction'''
        prev = -1
        pos = 0
        outputs = {}
        features = self.features
        for i in range(len(features)):
            # print('Layer: ', i)
            if i == 0 or i == 1:
                x = nn.Sequential(*features[i])(x)
                outputs[i] = x

            else:
                connections = self.second_level[pos:pos + prev]
                for c in range(len(connections)):
                    if connections[c] == 1:
                        skip_size = self.sizes[c][0]  # Size comming from previous layer
                        req_size = x.shape[2]  # Current feature map size
                        if skip_size > req_size:
                            psize = skip_size - req_size + 1
                            pool = nn.MaxPool2d(kernel_size=psize, stride=1)  # Applying pooling to adjust sizes
                            x2 = pool(outputs[c])
                        if skip_size == req_size:
                            x2 = outputs[c]
                        if req_size == skip_size + 1:
                            pool = nn.MaxPool2d(kernel_size=2, stride=1, padding=(1, 1))
                            x2 = pool(outputs[c])
                        if req_size == skip_size + 2:
                            pad = int((req_size - skip_size) / 2)
                            padding = nn.ZeroPad2d(pad)
                            x2 = padding(outputs[c])
                        x = torch.cat((x, x2), axis=1)

                x = nn.Sequential(*features[i])(x)
                outputs[i] = x
                pos += prev

            prev += 1

        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return nn.functional.log_softmax(x, dim=1)

In [7]:
'Los bloques convolucionales tienen la forma: (no. de filtros, tamaño del filtro, tipo de pooling, tamaño de pooling)'
'Los bloques fully-connected tienen la forma: (no. de neuronas)'
e = Encoding(4,4,2,2) #Crear un encoding aleatorio con el número de capas convolucionasl (5 en este ejemplo, repetido dos veces), y de capas fully-connected (2 en el ejemplo, repetido dos veces)
#A continuacion cargamos manualmente los datos de la red que hay en excel
e.first_level = [{'fsize': 2, 'nfilters': 5, 'pool': 'avg', 'psize': 3, 'type': 'conv'},
                {'fsize': 8, 'nfilters': 2, 'pool': 'max', 'psize': 4, 'type': 'conv'},
                {'fsize': 4, 'nfilters': 6, 'pool': 'off', 'psize': 4, 'type': 'conv'},
                {'fsize': 4, 'nfilters': 4, 'pool': 'off', 'psize': 5, 'type': 'conv'},
                {'neurons': 8, 'type': 'fc'},
                {'neurons': 8, 'type': 'fc'}]
e.second_level = [0,1,0]

In [8]:
'''Generamos la decodificacion de la red (genera las partes de la CNN por nosotros de forma automatica)'''
cnn = decoding(e)
'''Construimos la CNN'''
model = CNN(e, cnn[0], cnn[1], cnn[2]).cuda()

In [9]:
#Mounting Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [10]:
c_labels = np.ones(250, dtype = np.int8) #benigno
n_labels = np.zeros(250, dtype = np.int8) #cancer_mama
p_labels = np.zeros(250, dtype = np.int8) #saludable
#Images path
c_root = '/content/gdrive/My Drive/ImagesADP2/benigno/benignop'
n_root = '/content/gdrive/My Drive/ImagesADP2/cama/camap'
p_root = '/content/gdrive/My Drive/ImagesADP2/salud/saludp'

In [11]:
class CovidDataset(Dataset):
  def __init__(self, root, labels, transform = None):
    self.root = root #The folder path
    self.labels = labels #Labels array
    self.transform = transform #Transform composition
  
  def __len__(self):
    return len(self.labels)
  
  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()
    p_root = self.root[:] 
    img_name_p = p_root + str(idx+1) + '.jpg'
    #image_p = cv2.imread(img_name_p, 0)
    image_p = np.array(Image.open(img_name_p))
    [H, W] = image_p.shape
    image_p = image_p.reshape((H,W,1))
    p_label = self.labels[idx]
    sample = {'image': image_p, 'label': p_label}

    if self.transform:
      sample = self.transform(sample)

    return sample

#Class to transform image to tensor
class ToTensor(object):
  def __call__(self, sample):
    image, label = sample['image'], sample['label']
    image = image.transpose((2,0,1))
    return {'image':torch.from_numpy(image),
            'label':label}

def loading_data():
    #Loading Datasets
    covid_ds = CovidDataset(root = c_root, labels = c_labels, transform = transforms.Compose([ToTensor()]))
    normal_ds = CovidDataset(root = n_root, labels = n_labels, transform = transforms.Compose([ToTensor()]))
    pneumonia_ds = CovidDataset(root = p_root, labels = p_labels, transform = transforms.Compose([ToTensor()]))
    
    #Merging Covid, normal, and pneumonia Datasets
    dataset = torch.utils.data.ConcatDataset([covid_ds, normal_ds, pneumonia_ds])
    #lengths = [int(len(dataset)*0.7), int(len(dataset)*0.3)+1]
    lengths = [int(len(dataset)*0.7), len(dataset)-int(len(dataset)*0.7)]
    train_ds, test_ds = torch.utils.data.random_split(dataset = dataset, lengths = lengths)
    
    #Creating Dataloaders
    train_dl = DataLoader(train_ds, batch_size = 24, shuffle = True)
    test_dl = DataLoader(test_ds, batch_size = 24, shuffle = True)
    
    return train_dl, test_dl

**Ejemplo de loops de entrenamiento y prueba**

In [12]:
#Funcion para calcular accuracy
def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

In [13]:
if torch.cuda.is_available():
  device1 = torch.device("cuda:0")
print(device1)

cuda:0


In [None]:
'''Ejemplo de loop de entrenamiento'''
'''Asumimos que los dataloaders de entrenamiento y prueba ya estan disponibles'''
train_loader, test_loader=loading_data()

epochs = 100
lr = 1e-3
opt = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
model.train()
acc_sum = 0.0

for epoch in range(epochs):
  acc_sum=0.0
  for i, data in enumerate(train_loader):
    x = data['image'].float().cuda()
    y = data['label'].long().cuda()
  

    #Forward pass
    pred = model(x)
    loss = criterion(pred, y)

    #Compute accuracy for a single batch
    
    acc_sum += accuracy(pred, y)[0]

    #Backward pass
    loss.backward()

    #Optimization step
    opt.step()
    opt.zero_grad(set_to_none=True)
  
  #Compute accuracy for this epoch
  acc_train = acc_sum/len(train_loader)
  print('Accuracy en epoch ', epoch, ' : ', acc_train)

In [None]:
'''Ejemplo de loop de prueba'''
'''Asumimos que los dataloaders de entrenamiento y prueba ya estan disponibles'''
model.eval()
acc_sum = 0.0
for i, data in enumerate(test_loader):
    x = data['image'].cuda()
    y = data['label'].cuda()

    #Forward pass
    pred = model(x)

    #Compute accuracy for a single batch
    acc_sum += accuracy(pred, y)

#Computing mean accuracy acrros batches
acc_total = acc_sum/len(test_loader)
print('Accuracy final de prueba: ', acc_total)