In [None]:
import os
import swat
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from collections import defaultdict


class Dataset(torch.utils.data.Dataset):
	def __init__(self, root, labels_path, transform, all_labels):
		self.all_labels = all_labels
		self.data = self.make_data(labels_path)
		self.root = root
		self.data_dict = self.create_dict()
		self.transform = transform
	
	def __len__(self):
		return len(self.data)
	
	def get_labels(self):
		return self.labels
	
	def create_dict(self):
		data_dict = defaultdict(list)
		for index, datum in self.data.iterrows():
			data_dict[datum[' LABEL']].append(os.path.join(self.root, datum['IMAGE_FILENAME']))
		return data_dict
	
	def make_data(self, labels_path):
		data = pd.read_csv(labels_path)
		
		new_rows = []
		for index, datum in data.iterrows():
			new_rows.append({'IMAGE_FILENAME': datum['IMAGE_FILENAME'], ' LABEL': self.all_labels.index(datum[' LABEL'])})
		return pd.DataFrame(new_rows)
	
	def __getitem__(self, index):
		image_path = self.data['IMAGE_FILENAME'][index]
		label = self.data[' LABEL'][index]
		image = Image.open(os.path.join(self.root, image_path))  # only load the grayscale image
		image = self.transform(image)
		return image, label

#Read data and visualise

In [None]:
import umap
import cv2
import torch 
import os
import swat
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
from collections import defaultdict
from PIL import Image


In [None]:
root = 'data/images'
labels_path = 'data/gicsd_labels.csv'
data = pd.read_csv(labels_path)

In [None]:
all_image_files = [ datum['IMAGE_FILENAME'] for index, datum in data.iterrows()]
all_image_labels = [datum[' LABEL'] for index, datum in data.iterrows()]


all_images = [cv2.imread(os.path.join(root,file)) for file in all_image_files]


Check if all the images are the same size

In [None]:

image_shapes = set([image.shape for image in all_images])
print(image_shapes)

In [None]:

data_dict = defaultdict(list)
for index, datum in data.iterrows():
    data_dict[datum[' LABEL']].append(os.path.join(root, datum['IMAGE_FILENAME'])) 


In [None]:
for key, images in data_dict.items():
    print(key, len(images))

Visualise a few examples

In [None]:
for key, images in data_dict.items():
    frames = [cv2.imread(im) for im in images[:5]]
    print(key)
    plt.imshow(np.asarray(np.hstack(frames)))
    plt.show(

In [None]:
colours = ['b','g','r'] 

for key, images in data_dict.items():
    print(key)
    frames = [cv2.imread(im) for im in images[:5]]
    for frame in frames: 
        chans = cv2.split(frame)
        for (chan, color) in zip(chans, colours):   
            hist = cv2.calcHist([chan],[0], None, [255], [0, 256])
            plt.plot(hist, color = color)
            plt.xlim([0, 256])
        plt.show()

In [None]:
for key, images in data_dict.items():
    frames = [cv2.imread(im) for im in images[:5]]
    blues = [frame[:,:,0] for frame in frames]
    greens = [frame[:,:,1] for frame in frames]
    reds = [frame[:,:,2] for frame in frames]


    print(key)
    plt.imshow(np.asarray(np.hstack(blues)))
    plt.show()
    plt.imshow(np.asarray(np.hstack(greens)))
    plt.show()
    plt.imshow(np.asarray(np.hstack(reds)))
    plt.show()

Save the blue channel of the data as greyscale images in the folder data/images_grayscale

In [None]:
new_root = 'data/images_grayscale'
os.makedirs(new_root, exist_ok=True)
all_images = [im[:,:,0] for im in all_images]
for im, filename in zip(all_images, all_image_files):
    cv2.imwrite(os.path.join(new_root, filename), im)


In [None]:
def grayscale_channel(img_path):
    img = cv2.imread(img_path) 
    return img[:,:,0]

Split the data into training, testing and validation dataset:

In [None]:
# Split dataframe by label and shuffle it
data_fv= data[data[' LABEL']==' FULL_VISIBILITY '].reset_index(drop=True)
data_pv= data[data[" LABEL"]==' PARTIAL_VISIBILITY '].reset_index(drop=True)
data_nc= data[data[" LABEL"]==' NO_VISIBILITY '].reset_index(drop=True)

test = data_fv.loc[:int(len(data_fv)*.1),:]
test = test.append(data_pv.loc[:int(len(data_pv)*.1),:], ignore_index=True)
test = test.append(data_nc.loc[:9,:], ignore_index=True)
test.to_csv('data/gicsd_labels_test.csv')

data_fv = data_fv.loc[int(len(data_fv)*.1)+1:,:]
data_pv = data_pv.loc[int(len(data_pv)*.1)+1:,:]
data_nc = data_nc.loc[10:,:]

print(test.shape)
print(data_fv.shape)
print(data_pv.shape)
print(data_nc.shape)

In [None]:
data_pv

Augment data in preparation for training

In [None]:
root = 'data/images_grayscale'
from utils import rotate_data, flip_data, mirror_data

In [None]:
data_nc = rotate_data(data_nc, root)
data_nc = flip_data(data_nc, root)
data_nc = mirror_data(data_nc, root)
data_nc.to_csv('data/data_nc2.csv')

print(data_nc.shape)

In [None]:

data_pv = rotate_data(data_pv, root)
data_pv.to_csv('data/data_pv2.csv')


print(data_pv.shape)

In [None]:
train = data_fv.append(data_pv, ignore_index=True)
train = train.append(data_nc, ignore_index=True).sample(frac=1).reset_index(drop=True)

validation = train.loc[:int(len(train)*.1),:]
train = train.loc[int(len(train)*.1)+1:,:]


train.to_csv('data/gicsd_labels_train.csv')
validation.to_csv('data/gicsd_labels_val.csv')

In [None]:
from train import Model
from torchvision import transforms
from dataset import Dataset
model = Model()
print(model)

In [None]:

checkpoint = torch.load('resnet34.pth')
model.load_state_dict(checkpoint['state_dict'])
all_labels = [' FULL_VISIBILITY ', ' PARTIAL_VISIBILITY ', ' NO_VISIBILITY ']



In [None]:
def confusion_matrix(model, dataloader, all_labels):
    nb_classes = len(all_labels)

    confusion_matrix = torch.zeros(nb_classes, nb_classes)
    with torch.no_grad():
        for i, (inputs, classes) in enumerate(dataloader):
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1
    print("Confusion matrix:")
    print(confusion_matrix)
    print("Per class accuracy:")
    print(confusion_matrix.diag()/confusion_matrix.sum(1))
    return confusion_matrix

In [None]:
transform = transforms.Compose([transforms.Resize(256), transforms.ToTensor(), transforms.Normalize(mean=[0.485], std=[0.229])])

test_data = Dataset('/Users/vira/Documents/ml/revolut/data/images_grayscale', '/Users/vira/Documents/ml/revolut/data/gicsd_labels_test.csv', transform, all_labels = [' FULL_VISIBILITY ', ' PARTIAL_VISIBILITY ', ' NO_VISIBILITY '])
test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=32, shuffle=True)

confusion = confusion_matrix(model, test_dataloader, all_labels)


Additional data exploration

In [None]:
data_np = np.asarray(all_images)
data_np = data_np / 255.0
unique_labels, _, numeric_image_labels = np.unique(all_image_labels, return_index=True, return_inverse=True)

print(data.shape)

In [None]:
import umap.umap_ as umap


reducer = umap.UMAP()
embedding = reducer.fit_transform(data_np.reshape((data_np.shape[0], data_np.shape[1]*data_np.shape[2])))
plt.scatter(embedding[:,0], embedding[:,1], c =numeric_image_labels )


In [None]:
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, verbose=1, perplexity=40, n_iter=300)
tsne_results = tsne.fit_transform(data_np.reshape(
    (data_np.shape[0], data_np.shape[1]*data_np.shape[2])))
plt.scatter(tsne_results[:,0], tsne_results[:,1], c =numeric_image_labels )


Morphological edge detection

In [None]:
import cv2
import numpy as np
from matplotlib import pyplot as plt

for img in all_images[:5]:
    edges = cv2.Canny(img,100,200)
    plt.imshow(edges,cmap = 'gray')
    plt.show()

In [None]:
from PIL import Image
import argparse
from torchvision import transforms
import torch
from train import Model

parser = argparse.ArgumentParser(description='Process some images.')
parser.add_argument('-image_path', type=str, required=True, help='path to the image for inference')
parser.add_argument('-model_weights', type=str, default='best_checkpointt.pth', help='path to the image for inference')


def predict(args):
	# get only the grayscale image that is stored in the blue channel

	img = Image.open(args.image_path)
	if img.mode == 'RGB':
		print("Only taking blue channel from RGB image")
		img = img.split()[2]
	elif img.mode == 'L':
		print("Provided image is already grayscale")

	
	transform = transforms.Compose([transforms.Resize(256), transforms.ToTensor(),
	                                transforms.Normalize(mean=[0.485], std=[0.229])])
	
	# img = img.split()[2]
	img_t = transform(img)
	batch_t = torch.unsqueeze(img_t, 0)
	
	model = Model()
	
	checkpoint = torch.load(args.model_weights)
	model.load_state_dict(checkpoint['state_dict'])
	
	model.eval()
	out = model(batch_t)
	_, preds = torch.max(out, 1)
	
	print('Result: ', args.all_labels[preds.item()])


if __name__ == "__main__":
	parser = argparse.ArgumentParser(description='Process some integers.')
	parser.add_argument('-image_path', type=str, required=True, help='path to the image for inference')
	parser.add_argument('-model_weights', type=str, default='resnet34.pth', help='path to the image for inference')
	parser.add_argument('-all_labels', type=list, default=[' FULL_VISIBILITY ', ' PARTIAL_VISIBILITY ', ' NO_VISIBILITY '], help='The list of labels')
	args = parser.parse_args()
	
	predict(args)

In [None]:
import torch
from torchvision import models
from dataset import Dataset
from torchvision import transforms
from torch.autograd import Variable
import argparse


class Model(torch.nn.Module):
	def __init__(self):
		super(Model, self).__init__()
		
		self.resnet = models.resnet34(pretrained=False)
		self.resnet.conv1 = torch.nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3,
		                                    bias=False)  # change to single channel input
		
		self.resnet = load_weights_single_channel(self.resnet,
		                                          "https://download.pytorch.org/models/resnet34-333f7ec4.pth")
		
		for param in self.resnet.parameters():
			param.requires_grad = True
		
		self.resnet.fc = torch.nn.Linear(self.resnet.fc.in_features, 3)  # change to 3 class classification
		
		self.softmax = torch.nn.Softmax()
	
	def forward(self, x):
		x = self.resnet(x)
		x = self.softmax(x)
		return x


class AverageMeter(object):
	"""Computes and stores the average and current value"""
	
	def __init__(self):
		self.reset()
	
	def reset(self):
		self.val = 0
		self.avg = 0
		self.sum = 0
		self.count = 0
	
	def update(self, val, n=1):
		self.val = val
		self.sum += val * n
		self.count += n
		self.avg = self.sum / self.count


def set_parameter_requires_grad(model, feature_extracting):
	if feature_extracting:
		for param in model.parameters():
			param.requires_grad = False


def load_weights_single_channel(model, url):
	state_dict = torch.utils.model_zoo.load_url(url)
	conv1_weight = state_dict['conv1.weight']
	state_dict['conv1.weight'] = conv1_weight.sum(dim=1, keepdim=True)
	model.load_state_dict(state_dict)
	return model


def main(args):
	transform = transforms.Compose([transforms.Resize(256), transforms.ToTensor(), transforms.Normalize(mean=[0.485], std=[0.229])])
	
	train_data = Dataset(args.root, args.annotations_train, transform, args.all_labels)
	val_data = Dataset(args.root, args.annotations_val, transform, args.all_labels)
	dataloader = torch.utils.data.DataLoader(train_data, batch_size=args.batch_size, shuffle=True)
	val_dataloader = torch.utils.data.DataLoader(val_data, batch_size=args.batch_size, shuffle=True)
	
	model = Model()
	weight = torch.tensor([1.0, 2.0, 2.0])
	criterion = torch.nn.CrossEntropyLoss()
	optimiser = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
	lr_scheduler = torch.optim.lr_scheduler.StepLR(optimiser, step_size=7, gamma=0.1)
	
	best_acc = 0
	for epoch in range(args.epochs):
		train_loss, train_accuracy = train(model, criterion, optimiser, lr_scheduler, dataloader, epoch, args.epochs)
		
		validation_loss, acc = validation(model, criterion, optimiser, lr_scheduler, val_dataloader, epoch, args.epochs)
		if acc > best_acc:
			best_acc = acc
			state = {
				'epoch': epoch,
				'state_dict': model.state_dict(),
				'optimizer': optimiser.state_dict(),
				'best_acc': best_acc
			}
			torch.save(state, args.checkpoint)


def train(model, criterion, optimiser, scheduler, dataloader, epoch, epochs):
	losses = AverageMeter()
	accuracies = AverageMeter()
	
	model.train()  # Set model to training mode
	for batch, (inputs, labels) in enumerate(dataloader):
		
		inputs = Variable(inputs)
		labels = Variable(labels)
		
		# print(labels)
		outputs = model(inputs)
		# _, preds = torch.max(outputs, 1)
		loss = criterion(outputs, labels)
		
		# calculate accuracies
		acc = calculate_accuracy(outputs, labels)
		
		# statistics
		losses.update(loss.item(), inputs.size(0))
		accuracies.update(acc, inputs.size(0))
		
		optimiser.zero_grad()
		loss.backward()
		optimiser.step()
		scheduler.step()
		
		if batch % 10 == 0:
			print('Epoch {}/{}:[{}]/[{}] Loss: {:.4f} Acc: {:.4f}'.format(epoch, epochs, batch, len(dataloader),
			                                                              losses.avg, accuracies.avg))
	
	return losses.avg, accuracies.avg


def validation(model, criterion, optimiser, scheduler, dataloader, epoch, epochs):
	losses = AverageMeter()
	accuracies = AverageMeter()
	model.eval()  # Set model to validation mode
	for batch, (inputs, labels) in enumerate(dataloader):
		outputs = model(inputs)
		# _, preds = torch.max(outputs, 1)
		loss = criterion(outputs, labels)
		
		# calculate accuracies
		acc = calculate_accuracy(outputs, labels)
		# precision = calculate_precision(outputs, labels)  #
		# recall = calculate_recall(outputs, labels)
		
		losses.update(loss.item(), inputs.size(0))
		accuracies.update(acc, inputs.size(0))
		if batch % 10 == 0:
			print('Val epoch {}/{}:[{}]/[{}] Loss: {:.4f} Acc: {:.4f}'.format(epoch, epochs, batch, len(dataloader), losses.avg, accuracies.avg))
	
	return losses.avg, accuracies.avg


def calculate_accuracy(outputs, targets):
	batch_size = targets.size(0)
	
	_, pred = outputs.topk(1, 1, True)
	pred = pred.t()
	correct = pred.eq(targets.view(1, -1))
	n_correct_elems = correct.float().sum().item()
	
	return n_correct_elems / batch_size


if __name__ == "__main__":
	all_labels = [' FULL_VISIBILITY ', ' PARTIAL_VISIBILITY ', ' NO_VISIBILITY ']
	
	parser = argparse.ArgumentParser(description='Process some integers.')
	parser.add_argument('-epochs', type=int, default=20, help='number of epoch for training')
	parser.add_argument('-batch_size', type=int, default=16, help='number of epoch for training')
	parser.add_argument('-checkpoint', type=str, default='resnet34.pth', help='path where to save checkpoint during training')
	parser.add_argument('-root', type=str, default='data/images_grayscale', help='path to the folder with grayscale images')
	parser.add_argument('-annotations_train', type=str, default='data/gicsd_labels_train.csv', help='path to the folder with grayscale images')
	parser.add_argument('-annotations_val', type=str, default='data/gicsd_labels_val.csv', help='path to the folder with grayscale images')
	parser.add_argument('-all_labels', type=list, default=[' FULL_VISIBILITY ', ' PARTIAL_VISIBILITY ', ' NO_VISIBILITY '], help='The list of labels')
	args = parser.parse_args()
	main(args)
	print('Done.')

In [None]:
import math
import cv2
import pandas as pd
import random
import os
import numpy as np


def mirror_data(dataframe, root):
	"""
	Creates a mirror reflection for  every image in the dataframe and save new images in the same
	folder and adds the new image into the dataframe
	:param dataframe: dataframe with filenames and labels
	:param root: path to the folder with all the images
	:return:
	"""
	new_entries = []
	for index, datum in dataframe.iterrows():
		img = cv2.imread(os.path.join(root, datum['IMAGE_FILENAME']), cv2.IMREAD_UNCHANGED)
		new_image_name = datum['IMAGE_FILENAME'].split(".")[0] + 'm.png'
		flipped_image = np.fliplr(img)
		cv2.imwrite(os.path.join(root, new_image_name), flipped_image)
		new_entries.append({'IMAGE_FILENAME': new_image_name, ' LABEL': datum[' LABEL']})
	df = pd.DataFrame(new_entries)
	return dataframe.append(df, ignore_index=True)


def flip_data(dataframe, root):
	"""
	Turns upside down every image in the dataframe and save new images in the same
	folder and adds the new image into the dataframe
	:param dataframe: dataframe with filenames and labels
	:param root: path to the folder with all the images
	:return: new dataframe dataframe
	"""
	new_entries = []
	for index, datum in dataframe.iterrows():
		img = cv2.imread(os.path.join(root, datum['IMAGE_FILENAME']), cv2.IMREAD_UNCHANGED)
		new_image_name = datum['IMAGE_FILENAME'].split(".")[0] + 'f.png'
		
		flipped_image = np.flipud(img)
		
		cv2.imwrite(os.path.join(root, new_image_name), flipped_image)
		new_entries.append({'IMAGE_FILENAME': new_image_name, ' LABEL': datum[' LABEL']})
	df = pd.DataFrame(new_entries)
	return dataframe.append(df, ignore_index=True)


def rotate_data(dataframe, root):
	"""
	Rotate every image in the dataframe and save new images in the same
	folder and adds the new image into the dataframe
	:param dataframe: dataframe with filenames and labels
	:param root: path to the folder with all the images
	:return:
	"""
	new_entries = []
	for index, datum in dataframe.iterrows():
		img = cv2.imread(os.path.join(root, datum['IMAGE_FILENAME']), cv2.IMREAD_UNCHANGED)
		new_image_name = datum['IMAGE_FILENAME'].split(".")[0] + 'r.png'
		angle = random.uniform(-45, 45)
		image_rotated_cropped = rotate_resize(img, angle)
		cv2.imwrite(os.path.join(root, new_image_name), image_rotated_cropped)
		new_entries.append({'IMAGE_FILENAME': new_image_name, ' LABEL': datum[' LABEL']})
	df = pd.DataFrame(new_entries)
	return dataframe.append(df, ignore_index=True)


def rotate_resize(img, angle):
	"""
	Rotates the images, crops the black border out and resizes to the shape of the original image
	"""
	image_height, image_width = img.shape
	image_rotated = rotate_image(img, angle)
	image_rotated_cropped = crop_around_center(image_rotated,
	                                           *largest_rotated_rect(image_width, image_height, math.radians(angle)))
	return cv2.resize(image_rotated_cropped, img.shape, interpolation=cv2.INTER_AREA)


def rotate_image(img, angle):
	"""
	Rotate image
	"""
	
	rows, cols = img.shape
	rotation_matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
	return cv2.warpAffine(img, rotation_matrix, (rows, cols))


def largest_rotated_rect(w, h, angle):
	"""
	Given a rectangle of size wxh that has been rotated by 'angle' (in
	radians), computes the width and height of the largest possible
	axis-aligned rectangle within the rotated rectangle.
	Original JS code by 'Andri' and Magnus Hoff from Stack Overflow
	Converted to Python by Aaron Snoswell
	"""
	
	quadrant = int(math.floor(angle / (math.pi / 2))) & 3
	sign_alpha = angle if ((quadrant & 1) == 0) else math.pi - angle
	alpha = (sign_alpha % math.pi + math.pi) % math.pi
	
	bb_w = w * math.cos(alpha) + h * math.sin(alpha)
	bb_h = w * math.sin(alpha) + h * math.cos(alpha)
	
	gamma = math.atan2(bb_w, bb_w) if (w < h) else math.atan2(bb_w, bb_w)
	
	delta = math.pi - alpha - gamma
	
	length = h if (w < h) else w
	
	d = length * math.cos(alpha)
	a = d * math.sin(alpha) / math.sin(delta)
	
	y = a * math.cos(gamma)
	x = y * math.tan(gamma)
	
	return (
		bb_w - 2 * x,
		bb_h - 2 * y
	)


def crop_around_center(image, width, height):
	"""
	Given a NumPy / OpenCV 2 image, crops it to the given width and height,
	around it's centre point
	"""
	
	image_size = (image.shape[1], image.shape[0])
	image_center = (int(image_size[0] * 0.5), int(image_size[1] * 0.5))
	
	if (width > image_size[0]):
		width = image_size[0]
	
	if (height > image_size[1]):
		height = image_size[1]
	
	x1 = int(image_center[0] - width * 0.5)
	x2 = int(image_center[0] + width * 0.5)
	y1 = int(image_center[1] - height * 0.5)
	y2 = int(image_center[1] + height * 0.5)
	
	return image[y1:y2, x1:x2]