In [0]:
import sys
from numpy import load
from matplotlib import pyplot
from sklearn.model_selection import train_test_split
from keras import backend
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Dense
from keras.layers import Flatten
from keras.optimizers import SGD
from keras.layers import Dropout


# Amazon Dataset - preparation

In [0]:
from os import listdir
from numpy import zeros
from numpy import asarray
from numpy import savez_compressed
from pandas import read_csv
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array


In [3]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount = True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive/


In [0]:
filename='train_v2.csv'

In [0]:
csvmapping = read_csv(filename)


In [0]:
# create a mapping of tags to integers given the loaded mapping file
def tag_mapping(csvmapping):
	# create a set of all known tag
	labeltags = set()
	for i in range(len(csvmapping)):
		# convert spaced separated tag into an array of tag
		tag = csvmapping['tags'][i].split(' ')
		# add tags to the set of known labels
		labeltags.update(tag)
	# convert set of labels to a list to list
	labeltags = list(labeltags)
	# order set alphabetically
	labeltags.sort()
	# dict that maps labels___ to integers, and the reverse
	labeltagsmap = {labeltags[i]:i for i in range(len(labeltags))}
	inv_labeltagsmap = {i:labeltags[i] for i in range(len(labeltags))}
	return labeltagsmap, inv_labeltagsmap


In [0]:
# create a mapping of filename to a list of tag
def file_mapping(csvmapping):
	mapping = dict()
	for i in range(len(csvmapping)):
		name, tag = csvmapping['image_name'][i], csvmapping['tags'][i]
		mapping[name] = tag.split(' ')
	return mapping

In [0]:
# create a mapping of tags to integers
tagmapping, _ = tag_mapping(csvmapping)

In [0]:
# create a mapping of filenames to tag lists
filemapping = file_mapping(csvmapping)

In [0]:
!unzip '/content/gdrive/My Drive/Deep Learning/Assignment/Assignment 3/train-jpg.zip'

In [0]:
# load the jpeg images
folder = '/content/train-jpg/'

In [0]:
# create a one hot encoding for one list of tags
def onehotencode(tag, mapping):
	# create empty vector
	enc = zeros(len(mapping), dtype='uint8')
	# mark 1 for each tag in the vector
	for tag in tag:
		enc[mapping[tag]] = 1
	return enc

In [0]:
# load all images into memory
def loaddataset(path, filemapping, tagmapping):
	photos, targets = list(), list()
	# enumerate files in the directory
	for filename in listdir(folder):
		# load image
		photo = load_img(path + filename, target_size=(128,128))
		# convert to numpy array
		photo = img_to_array(photo, dtype='uint8')
		# get tags
		tag = filemapping[filename[:-4]]
		# one hot encode tags
		target = onehotencode(tag, tagmapping)
		# store
		photos.append(photo)
		targets.append(target)
	X = asarray(photos, dtype='uint8')
	y = asarray(targets, dtype='uint8')
	return X, y

In [0]:
X, y = loaddataset(folder, filemapping, tagmapping)


In [32]:
print(X.shape, y.shape)

(40479, 128, 128, 3) (40479, 17)


In [0]:
savez_compressed('planet_data.npz', X, y)

In [0]:
# load train and test dataset
def loaddataset():
	# load dataset
	data = load('planet_data.npz')
	X, y = data['arr_0'], data['arr_1']
	# separate into train and test datasets
	trainX, testX, trainY, testY = train_test_split(X, y, test_size=0.3, random_state=1)
	print(trainX.shape, trainY.shape, testX.shape, testY.shape)
	return trainX, trainY, testX, testY

In [0]:
# calculate fbeta score for multi-class/label classification
def fbeta(y_true, y_pred, beta=2):
	# clip predictions
	y_pred = backend.clip(y_pred, 0, 1)
	# calculate elements
	tp = backend.sum(backend.round(backend.clip(y_true * y_pred, 0, 1)), axis=1)
	fp = backend.sum(backend.round(backend.clip(y_pred - y_true, 0, 1)), axis=1)
	fn = backend.sum(backend.round(backend.clip(y_true - y_pred, 0, 1)), axis=1)
	# calculate precision
	p = tp / (tp + fp + backend.epsilon())
	# calculate recall
	r = tp / (tp + fn + backend.epsilon())
	# calculate fbeta, averaged across each class
	bb = beta ** 2
	fbeta_score = backend.mean((1 + bb) * (p * r) / (bb * p + r + backend.epsilon()))
	return fbeta_score

In [0]:
# define cnn model
def definemodel(in_shape=(128, 128, 3), out_shape=17):
	model = Sequential()
	model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same', input_shape=in_shape))
	model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(MaxPooling2D((2, 2)))
	model.add(Dropout(0.2))
	model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(MaxPooling2D((2, 2)))
	model.add(Dropout(0.2))
	model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(MaxPooling2D((2, 2)))
	model.add(Dropout(0.2))
	model.add(Flatten())
	model.add(Dense(128, activation='relu', kernel_initializer='he_uniform'))
	model.add(Dropout(0.5))
	model.add(Dense(out_shape, activation='sigmoid'))
	# compile model
	opt = SGD(lr=0.01, momentum=0.9)
	model.compile(optimizer=opt, loss='binary_crossentropy', metrics=[fbeta])
	return model

In [0]:
# plot diagnostic learning curves
def summarizediagnostics(history):
	# plot loss
	pyplot.subplot(211)
	pyplot.title('Cross Entropy Loss')
	pyplot.plot(history.history['loss'], color='blue', label='train')
	pyplot.plot(history.history['val_loss'], color='orange', label='test')
	# plot accuracy
	pyplot.subplot(212)
	pyplot.title('Fbeta')
	pyplot.plot(history.history['fbeta'], color='blue', label='train')
	pyplot.plot(history.history['val_fbeta'], color='orange', label='test')
	# save plot to file
	filename = sys.argv[0].split('/')[-1]
	pyplot.savefig(filename + '_plot.png')
	pyplot.close()

In [0]:
# run the test harness for evaluating a model
def runtestharness():
	# load dataset
	trainX, trainY, testX, testY = load_dataset()
	# create data generator
	datagen = ImageDataGenerator(rescale=1.0/255.0)
	# prepare iterators
	train_it = datagen.flow(trainX, trainY, batch_size=128)
	test_it = datagen.flow(testX, testY, batch_size=128)
	# define model
	model = define_model()
	# fit model
	history = model.fit_generator(train_it, steps_per_epoch=len(train_it),
		validation_data=test_it, validation_steps=len(test_it), epochs=200, verbose=0)
	# evaluate model
	loss, fbeta = model.evaluate_generator(test_it, steps=len(test_it), verbose=0)
	print('> loss=%.3f, fbeta=%.3f' % (loss, fbeta))
	# learning curves
	summarize_diagnostics(history)

In [42]:
# entry point, run the test harness
trainX, trainY, testX, testY = loaddataset()


(28335, 128, 128, 3) (28335, 17) (12144, 128, 128, 3) (12144, 17)


In [0]:
datagen = ImageDataGenerator(rescale=1.0/255.0)


In [0]:
trainit = datagen.flow(trainX, trainY, batch_size=128)


In [0]:
testit = datagen.flow(testX, testY, batch_size=128)

In [0]:
model = definemodel()


In [0]:
history = model.fit_generator(trainit, steps_per_epoch=len(trainit),
		validation_data=testit, validation_steps=len(testit), epochs=10, verbose=0)

In [49]:
loss, fbeta = model.evaluate_generator(testit, steps=len(testit), verbose=0)
print('> loss=%.3f, fbeta=%.3f' % (loss, fbeta))

> loss=0.190, fbeta=0.733


In [0]:
summarizediagnostics(history)