In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
Load_from_checkpoint = False
chkpt_path = 'drive/MyDrive/' +\
             'Checkpoints_sorted/65_perc_dm_100x50_main/epoch=2-step=164.ckpt'

batch_size = 64

Train = True
epochs = 4
chkpts_upload_dir = 'drive/MyDrive/checkpoints/100x50'

In [3]:
!cp -u /content/drive/MyDrive/777x20_colored.zip ./
!cp /content/drive/MyDrive/color_color_by_img.csv ./
!unzip -q -n -j 777x20_colored.zip -d setupdir/

In [4]:
!pip -q install torch
!pip -q install torchvision
!pip -q install pytorch-lightning

from torchvision import datasets, transforms, models

from pytorch_lightning import LightningDataModule, LightningModule, Trainer
from pytorch_lightning.metrics.functional import precision
from pytorch_lightning.callbacks import ModelCheckpoint

import torch
from torch.nn import functional
from torch.utils.data import DataLoader, Dataset, random_split

import csv
import os
from PIL import Image

In [5]:
class ColorDataset(Dataset):
    def __init__(self, setupdir, labels_csv):

        self.color_list = []

        raw_color_by_filename = dict()
        with open(labels_csv, 'r') as csvfile:
            reader = csv.reader(csvfile)
            for row in reader:
                raw_color_by_filename[row[0]] = row[1:]
                for color in row[1:]:
                    if color not in self.color_list:
                        self.color_list.append(color)

        print(raw_color_by_filename.keys())

        self.color_by_filename = dict()
        for img, raw_colors in raw_color_by_filename.items():
            colors = [int(color in raw_colors) for color in self.color_list]
            self.color_by_filename[img] = colors

        if setupdir[-1] != '/':
            setupdir += '/'

        self.images = []
        for filename in os.listdir(setupdir):
            self.images.append((datasets.folder.default_loader(setupdir + filename),
                               self.color_by_filename[filename]))

    def __len__(self):

        return len(self.images)
    
    def __getitem__(self, ind):

        return self.images[ind]

In [6]:
class RecognizeColorDM(LightningDataModule):
    def __init__(self, setupdir, labels_csv, train_frac=0.9, seed=0, batch_size=64):
        
        super().__init__()

        self.batch_size = batch_size
        self.setupdir = setupdir
        self.labels_csv = labels_csv
        self.train_frac = train_frac
        self.seed = seed
        self.batch_size = batch_size
        
        # for testing purposes only
        self.paths = []
        self.test_mode = False

        self.transform = transforms.Compose([
              transforms.Resize(size=256),
              transforms.CenterCrop(size=224),
              transforms.ToTensor(),
              transforms.Normalize([0.485, 0.456, 0.406],
                                   [0.229, 0.224, 0.225])
        ])


    def setup(self):
        
        torch.manual_seed(self.seed)
        
        dataset = ColorDataset(self.setupdir, self.labels_csv)

        self.num_colors = len(dataset.color_list)
        
        set_len = len(dataset)
        train_len = int(set_len * self.train_frac)
        val_len = int(set_len * (1 - self.train_frac) / 2)
        test_len = set_len - train_len - val_len
        
        self.train, self.val, self.test = random_split(dataset, 
                                                      [train_len,
                                                       val_len,
                                                       test_len])
        self.train.dataset.transform = self.transform
        
        self.val.dataset.transform = self.transform
        
        self.test.dataset.transform = self.transform
        
    def train_dataloader(self):
        return DataLoader(self.train, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.test, batch_size=self.batch_size)

In [7]:
class RecognizeColorModel(LightningModule):
    def __init__(self, input_shape, num_classes,
                 learning_rate = 1e-4, batch_size=64):
        
        super().__init__()

        self.batch_size = batch_size
        
        self.save_hyperparameters()
        self.learning_rate = learning_rate
        self.dim = input_shape
        self.num_classes = num_classes
        
        self.feature_extractor = models.resnet34(pretrained=True)
        self.feature_extractor.eval()
        
        n_sizes = self._get_conv_output(input_shape)
        self.classifier = torch.nn.Linear(n_sizes, num_classes)
        
        self.predictions = []

    def _get_conv_output(self, shape):
        
        batch_size = 1
        inp = torch.autograd.Variable(torch.rand(batch_size, *shape))
        
        features = self._forward_features(inp)
        n_size = features.data.view(batch_size, -1).size(1)
        return n_size
    
    def _forward_features(self, x):
        
        x = self.feature_extractor(x)
        return x
    
    def forward(self, x):

        x = self._forward_features(x)
        x = x.view(x.size(0), -1)
        x = functional.sigmoid(self.classifier(x))
        
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = functional.nll_loss(logits, y)

        prec = precision(logits, y)
        self.log('train_loss', loss, on_step=True, on_epoch=True, logger=True)
        self.log('train_acc', prec, on_step=True, on_epoch=True, logger=True)        

        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = functional.nll_loss(logits, y)

        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y)
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = functional.nll_loss(logits, y)

        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y)
        
        for i in range(len(y)):
            self.predictions.append((preds[i], y[i]))

        self.log('test_loss', loss, prog_bar=True)
        self.log('test_acc', acc, prog_bar=True)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer

In [8]:
# load model from a checkpoint
model = None
dm = None
if Load_from_checkpoint:
    dm = RecognizeColorDM(setupdir='setupdir', train_frac=0.7,
                  seed=0, batch_size=batch_size)
    dm.setup()
    dm.test_mode = True
    try:
        trainer = Trainer(gpus=[0])
    except:
        trainer = Trainer()
    model = RecognizeColorModel.load_from_checkpoint(chkpt_path)
    trainer.test(model, dm)
    dm.test_mode = False

In [None]:
# train model
if Train:
    if not dm:
        
        dm = RecognizeColorDM(setupdir='setupdir',
                              labels_csv='color_color_by_img.csv',
                              train_frac=0.7, seed=0,
                              batch_size=batch_size)
        dm.setup()

    if not model:
        model = ClassifyModel((3,224,224), dm.num_colors,
                              batch_size=batch_size, learning_rate=2e-4)


    checkpoint = ModelCheckpoint(dirpath=chkpts_upload_dir,
                                monitor='val_loss', save_top_k=1)

    trainer = Trainer(max_epochs=epochs,
                    progress_bar_refresh_rate=1,
                    gpus=[0],
                    callbacks = [checkpoint])

    trainer.fit(model, dm)

    trainer.test()

dict_keys(['2-M.jpg', '5-M.jpg', '11-M.jpg', '12-M.jpg', '14-M.jpg', '15-M.jpg', '17-M.jpg', '18-M.jpg', '20-M.jpg', '23-M.jpg', '24-M.jpg', '25-M.jpg', '26-M.jpg', '30-M.jpg', '31-M.jpg', '39-M.jpg', '41-M.jpg', '42-M.jpg', '43-M.jpg', '44-M.jpg', '46-M.jpg', '48-M.jpg', '49-M.jpg', '52-M.jpg', '53-M.jpg', '58-M.jpg', '59-M.jpg', '62-M.jpg', '63-M.jpg', '66-M.jpg', '67-M.jpg', '68-M.jpg', '69-M.jpg', '71-M.jpg', '74-M.jpg', '75-M.jpg', '76-M.jpg', '77-M.jpg', '84-M.jpg', '88-M.jpg', '89-M.jpg', '90-M.jpg', '91-M.jpg', '93-M.jpg', '100-M.jpg', '104-M.jpg', '106-M.jpg', '107-M.jpg', '114-M.jpg', '116-M.jpg', '122-M.jpg', '123-M.jpg', '125-M.jpg', '128-M.jpg', '131-M.jpg', '135-M.jpg', '141-M.jpg', '143-M.jpg', '144-M.jpg', '148-M.jpg', '153-M.jpg', '154-M.jpg', '157-M.jpg', '160-M.jpg', '163-M.jpg', '167-M.jpg', '168-M.jpg', '169-M.jpg', '170-M.jpg', '171-M.jpg', '174-M.jpg', '175-M.jpg', '176-M.jpg', '177-M.jpg', '181-M.jpg', '182-M.jpg', '183-M.jpg', '184-M.jpg', '185-M.jpg', '187-M.j

In [None]:
x,y = dm.test[:64]
model(x)

In [None]:
import csv
from PIL import Image
cat_names = dict()
with open('drive/MyDrive/100x50_img_amount.csv', 'r') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        cat_names[row[0]] = row[3]

def cat_name_by_path(path):
    beginning = path.find('/') + 1 
    ending = beginning + path[beginning:].find('/')
    return cat_names[path[beginning:ending]]

In [None]:
dm.test.dataset[1][0][0][0][0]

In [None]:
for i in range(10):
    print(dm.test[i][1])
    print(model.predictions[i].item())
    print(cat_name_by_path(dm.paths[i]))

In [None]:
import os
os.listdir('setupdir/1929/39220-M.jpg')