In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.autograd import Variable
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from project_code.read_labels import *
import numpy as np
from torch.utils import data



%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

scaler = transforms.Resize((224, 224))
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
to_tensor = transforms.ToTensor()


In [None]:
def get_well_specific_string_prefix(well_num):
    well_str = ""
    if well_num <= 9:
        well_str = "000"
    elif well_num <= 99:
        well_str = "00"
    elif well_num <= 999:
        well_str = "0"
    
    return well_str

def get_day_specific_string_prefix(day):
    day_str = "_day"
    if day <= 9:
        day_str = "_day0"
    
    return day_str

def make_list_of_image_names(min_well, max_well, min_day, max_day, prefix, suffix):
    well_values = range(min_well, max_well + 1)
    day_values = range(min_day, max_day + 1)
    prefix = "/home/fahimtajwar/final_project/project_data/labelled_data/well"
    suffix = "_well.png"
    
    image_names = []
    for well_num in well_values:
        for day in day_values:
            well_str = get_well_specific_string_prefix(well_num)
            day_str = get_day_specific_string_prefix(day)
            name = prefix + well_str + str(well_num) + day_str + str(day) + suffix
            image_names.append(name)
    
    return image_names

In [None]:
def read_images(min_well, max_well, min_day, max_day, 
                prefix = "/home/fahimtajwar/final_project/project_data/labelled_data/well", 
                suffix = "_well.png"):
    
    image_names = make_list_of_image_names(min_well, max_well, min_day, max_day, prefix, suffix)
    images = []
    for name in image_names:
        img = Image.open(name)
        images.append(img)
        
    return images

In [None]:
def flatten(x):
    N = x.shape[0]
    return x.view(N,-1)

In [None]:
def get_tensors_one(min_well, max_well, min_day, max_day, 
                prefix = "/home/fahimtajwar/final_project/project_data/labelled_data/well", 
                suffix = "_well.png"):
    
    image_names = make_list_of_image_names(min_well, max_well, min_day, max_day, prefix, suffix)
    listTens = []
    numAp = 0
    for name in image_names:
        img = Image.open(name)
        t_img = Variable(to_tensor(scaler(img)))
        listTens.append(t_img)
        
    return torch.stack(listTens)

In [None]:
def get_tensors(min_well, max_well, min_day, max_day, 
                prefix = "/home/fahimtajwar/final_project/project_data/labelled_data/well", 
                suffix = "_well.png"):
    
    image_names = make_list_of_image_names(min_well, max_well, min_day, max_day, prefix, suffix)
    listTens = []
    count = 0
    currT = []
    numAp = 0
    for name in image_names:
        img = Image.open(name)
        t_img = Variable(to_tensor(scaler(img)))
        count += 1
        currT.append(t_img)
        if count % 5==0:
            numAp += 1
            tenS = torch.stack(currT, dim=1)
            tenS = tenS.squeeze(0)
            listTens.append(tenS)
            currT = []
            
    return torch.stack(listTens)
        

In [None]:
# source: https://gist.github.com/soply/f3eec2e79c165e39c9d540e916142ae1
def show_images(images, cols = 1, titles = None):
    """Display a list of images in a single figure with matplotlib.
    
    Parameters
    ---------
    images: List of np.arrays compatible with plt.imshow.
    
    cols (Default = 1): Number of columns in figure (number of rows is 
                        set to np.ceil(n_images/float(cols))).
    
    titles: List of titles corresponding to each image. Must have
            the same length as titles.
    """
    assert((titles is None)or (len(images) == len(titles)))
    n_images = len(images)
    if titles is None: titles = ['Image (%d)' % i for i in range(1,n_images + 1)]
    fig = plt.figure()
    for n, (image, title) in enumerate(zip(images, titles)):
        a = fig.add_subplot(cols, np.ceil(n_images/float(cols)), n + 1)
        plt.imshow(image)
        a.set_title(title)
    fig.set_size_inches(np.array(fig.get_size_inches()) * n_images)
    plt.show()

In [None]:
label_file_name = "/home/fahimtajwar/final_project/project_data/classes_800.xlsx"

In [None]:
def get_label_map(label_file_name):
    label_reader = Label_Reader(label_file_name)
    return label_reader.get_label_map()

In [None]:
X_data = get_tensors(0, 799, 5, 5)
print(X_data.shape)

In [None]:
label_reader = Label_Reader(label_file_name)
label_to_label_id = label_reader.label_to_label_id
Y_label = get_label_map(label_file_name)

for key in Y_label:
    Y_label[key] = label_to_label_id[Y_label[key]]

label_id_train = range(0, 119)
label_id_val= range(120, 159)

In [None]:
class Dataset(data.Dataset):
  'Characterizes a dataset for PyTorch'
  def __init__(self, list_IDs, labels):
        'Initialization'
        self.labels = labels
        self.list_IDs = list_IDs

  def __len__(self):
        'Denotes the total number of samples'
        return len(self.list_IDs)

  def __getitem__(self, index):
        'Generates one sample of data'
        # Select sample
        ID = self.list_IDs[index]
        # Load data and get label
        y = self.labels[ID]
        X = X_data[ID]
        return X, y


In [None]:
params = {'batch_size': 64,
          'shuffle': True,
          'num_workers': 6}
training_set = Dataset(label_id_train, Y_label)
training_generator = data.DataLoader(training_set, **params)

validation_set = Dataset(label_id_val, Y_label)
validation_generator = data.DataLoader(validation_set, **params)


In [None]:
class LogisticRegression(nn.Module):
    def __init__(self, input_size, num_classes):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(input_size, num_classes)
    
    def forward(self, x):
        flat = flatten(x)
        out = self.linear(flat)
        return out

In [None]:
input_size = 4 * 224 * 224 * 5
num_classes = 6
num_epochs = 10
batch_size = 10
learning_rate = 0.001

train_loader = torch.utils.data.DataLoader(dataset=training_set, 
                                           batch_size=batch_size, 
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=validation_set, 
                                          batch_size= 1, 
                                          shuffle=False)

In [None]:
def train(model, train_loader, verbose = True):
    criterion = nn.CrossEntropyLoss()  
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)  
    loss_table = []
    
    # Training the Model
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(train_loader):

            images = Variable(images)
            labels = Variable(labels)

            # Forward + Backward + Optimize
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
        print ('Epoch: [%d/%d], Step: [%d/%d], Loss: %.4f' 
                % (epoch+1, num_epochs, i+1, len(training_set)//batch_size, loss.data))
        loss_table.append(loss)
        
    plt.plot(loss_table)
    plt.xlabel('Number of epochs')
    plt.ylabel('Cross entropy training loss')
    plt.title('Training loss history')
    plt.show()

In [None]:
# train the model
log_reg_on_day_5_images = LogisticRegression(input_size, num_classes)
train(log_reg_on_day_5_images, train_loader)

In [None]:
# Test the Model
correct = 0
total = 0
for images, labels in test_loader:
    images = Variable(images)
    outputs = log_reg_on_day_5_images(images)
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum()
    
print('Accuracy of the model on the 100 validation images: %d %%' % (100 * correct / total))

In [None]:
# error_analysis
# first find the number of members in each class

labels_map = get_label_map(label_file_name)
label_to_label_id = label_reader.label_to_label_id
label_id_to_label = label_reader.label_id_to_label

map_count = {}
for key in labels_map:
    label = labels_map[key]
    if label not in map_count:
        map_count[label] = 1
    else:
        map_count[label] += 1
        
print(map_count)

D = map_count
plt.bar(range(len(D)), list(D.values()), align='center')
li = ["transient", "debris", "cell dies", "sparse", "artifact", "dense"]
plt.xticks(range(len(D)), li)

In [None]:
def test_model(model, test_loader):
    correct = 0
    total = 0
    correct_map = {}
    for images, labels in test_loader:
        images = Variable(images)
        output = model(images)
        _, predicted = torch.max(output.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum()
        
        if labels[0].item() not in correct_map:
            if predicted[0] == labels[0]:
                correct_map[labels[0].item()] = 1
            else:
                correct_map[labels[0].item()] = 0
        
        else:
            if predicted[0] == labels[0]:
                correct_map[labels[0].item()] += 1
                
    accuracy = (100.0 * correct) / total
    return accuracy, correct_map

accuracy, correct_map = test_model(log_reg_on_day_5_images, test_loader)
print("accuracy %f" % accuracy)

def get_correct_numbers_per_label(correct_map, label_id_to_label):
    new_map = {}
    for key in correct_map:
        new_map[label_id_to_label[key]] = correct_map[key]
        
    return new_map
    
    
print(get_correct_numbers_per_label(correct_map, label_id_to_label))
        

In [None]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=6):
        super(ConvNet, self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv3d(4, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm3d(16),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2, stride=2))
        
        self.fc = nn.Linear(56 * 56 * 16 * 5, num_classes)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
cnn_model = ConvNet(num_classes = 6)
train(cnn_model, train_loader, verbose = True)

In [None]:
accuracy, correct_map = test_model(cnn_model, test_loader)
print("accuracy %f" % accuracy)

def get_correct_numbers_per_label(correct_map, label_id_to_label):
    new_map = {}
    for key in correct_map:
        new_map[label_id_to_label[key]] = correct_map[key]
        
    return new_map
    
    
print(get_correct_numbers_per_label(correct_map, label_id_to_label))
        