In [1]:
import keras
import tensorflow as tf

Using TensorFlow backend.


In [2]:
print("TensorFlow version: ", tf.__version__)
print("Keras version", keras.__version__)

TensorFlow version:  1.3.0
Keras version 2.1.2


In [1]:
import os
from os.path import join
import json
import random
import itertools
import re
import datetime
import numpy as np
from scipy import ndimage
import pylab
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from keras import backend as K
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.layers import Input, Dense, Activation
from keras.layers import Reshape, Lambda
from keras.layers.merge import add, concatenate
from keras.models import Model, load_model
from keras.layers.recurrent import GRU
from keras.optimizers import SGD
from keras.utils.data_utils import get_file
from keras.preprocessing import image
import keras.callbacks
import cv2

Using TensorFlow backend.


In [4]:
sess = tf.Session()
K.set_session(sess)

# Get Alphabet

In [5]:
import string
from collections import Counter
letters = sorted(list(set(Counter(string.printable).keys())))
print(letters)

['\t', '\n', '\x0b', '\x0c', '\r', ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '<', '=', '>', '?', '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', '\\', ']', '^', '_', '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '|', '}', '~']


# Input Data Generator

In [7]:
# lambda funtions are used as a functional programming technique
# returns an undelimited string of labels that were inputted
def records_to_text(records):
    artists = [record.artist for record in records]
    albums = [record.album for record in records]   
    artist = ''.join(list(map(lambda x: letters[int(x)], artists)))
    album = ''.join(list(map(lambda x: letters[int(x)], albums)))
    return artist, album

def text_to_record(artist, album):
    artist_list = list(map(lambda x: letters.index(x), artist))
    album_list = list(map(lambda x: letters.index(x), album))
    return Record(artist, album)

def text_to_label(text, length):
    label = list(map(lambda x: letters.index(x), text))
    for i in range(len(label), 100):
        label.append(5)
    print(label)
    return label

def labels_to_text(labels):
    return ''.join(list(map(lambda x: letters[int(x)], labels)))

def is_valid_string(s):
    for ch in s:
        if not ch in letters:
            return False
        return True

class TextImageGenerator:
    
    def __init__(self, dirpath, img_width, img_height, batch_size, downsample_factor):
        self.img_width = img_width
        self.img_height = img_height
        
        # batch size determines the # of samples that go through the network per epoch
        self.batch_size = batch_size 
        self.downsample_factor = downsample_factor
        
        img_dirpath = dirpath + 'img/'
        json_dirpath = dirpath + 'json/'
        
        self.samples = []
        for filename in os.listdir(img_dirpath):
            # separate the file extension from the filename
            name, extension = os.path.splitext(filename)
            img_filepath = img_dirpath + filename
            json_filepath = json_dirpath + (name + '.json')
            
            # obtain correct answers for each sample
            artist = json.load(open(json_filepath, 'r'))['artist']
            album = json.load(open(json_filepath, 'r'))['album']
            self.samples.append([img_filepath, artist + '\n' + album])
        
        # establish the number of samples
        # make the current sample 0 (the first sample)
        self.n = len(self.samples)
        self.indexes = list(range(self.n))
        self.current_index = 0
        
    def build_data(self):
        # create an array for every image in the dataset
        self.imgs = np.zeros((self.n, self.img_height, self.img_width))
        self.records = []
        
        # add counter to list of samples
        # for each sample store the image and record that corresponds to it
        for i, (img_filepath, record) in enumerate(self.samples):
            # print('Getting data for sample at ', img_filepath)
            img = cv2.imread(img_filepath)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            img = cv2.resize(img, (self.img_width, self.img_height))
            img = img.astype(np.float32)
            # normalize sample
            img /= 255
            
            # width and height are backwards from typical Keras convention
            # because width is the time dimension when it gets fed into the RNN
            self.imgs[i, :, :] = img
            self.records.append(record)
            
    def get_output_size(self):
        return len(letters) + 1
    
    def next_sample(self):
        self.current_index += 1
        if self.current_index >= self.n:
            self.current_index = 0
            random.shuffle(self.indexes)
        return self.imgs[self.indexes[self.current_index]], self.records[self.indexes[self.current_index]]
    
    def next_batch(self):
        # width and height are backwards from typical Keras convention
        # because width is the time dimension when it gets fed into the RNN
        while True:
            # there is only one channel because sample is grayscale
            # batch_size is the number of samples to a batch
            if K.image_data_format() == 'channels_first':
                X_data = np.ones([self.batch_size, 1, self.img_width, self.img_height, 1])
            else:
                X_data = np.ones([self.batch_size, self.img_width, self.img_height, 1])
            
            # the 200 here is hardcoded and might need to be played with
            Y_data = np.ones([self.batch_size, 100])
            input_length = np.ones((self.batch_size, 1)) * (self.img_width // self.downsample_factor - 2)
            artist_length = np.zeros((self.batch_size, 1))
            album_length = np.zeros((self.batch_size, 1))
            label_length = np.zeros((self.batch_size, 1))
            source_artist_string = []
            source_album_string = []
            
            for i in range(self.batch_size):
                img, record = self.next_sample()
                img = img.T
                if K.image_data_format() == 'channels_first':
                    img = np.expand_dims(img, 0)
                else:
                    img = np.expand_dims(img, -1)
                    
                X_data[i] = img
                Y_data[i] = text_to_label(record.artist + ',' + record.album, 100)
                source_artist_string.append(record.artist)
                source_album_string.append(record.album)
                artist_length[i] = len(record.artist)
                album_length[i] = len(record.album)
                label_length[i] = len(record.artist + ',' + record.album)
                
                inputs = {
                         'input': X_data,
                         'labels': Y_data,
                         'input_length': input_length,
                         'artist_length': artist_length,
                         'album_length' : album_length,
                         'label_length' : label_length,
                         'source_artist_string': source_artist_string,
                         'source_album_string' : source_album_string
                         }
                outputs = {'ctc': np.zeros([self.batch_size])}
                yield(inputs, outputs)
                
            
    

    
    

In [8]:
model = TextImageGenerator('dataset/train/', 256, 192, 8, 4)
model.build_data()

In [9]:
for inp, out in model.next_batch():
    print('Text Generator output (data which will be fed into convolutional neural network)')
    print('1. the_input (image)')
    if K.image_data_format() == 'channels_first':
        img = inp['input'][0,0,:,:]
    else:
        img = inp['input'][0,:,:,0]
        
    plt.imshow(img.T, cmap='gray')
    plt.show()
    
    print('2. the_labels (artist, album): %s is encoded as %s' % 
          (labels_to_text(inp['records'][0]), list(map(int, inp['records'][0]))))
    print('3. input_length (width of image that is fed to the loss function): %d == %d / 4 - 2' % 
          (inp['input_length'][0], model.img_width))
    print('4. artist_length (length of artist name): %d' % inp['artist_length'][0])
    print('5. album_length (length of album name): %d' % inp['album_length'][0])
    break

AttributeError: 'str' object has no attribute 'artist'

# Convolutional Neural Network Architecture With Loss & Train Functions

In [None]:
# fix this to work with records
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    
    # the 2 is critical here since the first couple outputs of the RNN are garabage
    y_pred = y_pred[:, 2:, :]
    print(y_pred)
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)

def train(img_width, load=False):
    # training parameters
    img_height = 192
    
    # network parameters
    conv_filters = 16
    kernel_size = (3, 3)
    pool_size = 2
    time_dense_size = 32
    rnn_size = 512
    batch_size = 32
    downsample_factor = pool_size ** 2
    
    if K.image_data_format() == 'channels_first':
        input_shape = (1, img_width, img_height)
    else:
        input_shape = (img_width, img_height, 1)
    
    model_train = TextImageGenerator('dataset/train/', img_width, img_height, batch_size, downsample_factor)
    model_train.build_data()
    model_val = TextImageGenerator('dataset/train/', img_width, img_height, batch_size, downsample_factor)
    model_val.build_data()
    
    # CNN feature extraction (preprocessing)
    act = 'relu'
    
    # keras.layers.Input is used to create a Keras tensor
    print(input_shape)
    input_data = Input(input_shape, name='input', dtype='float32')
    print('Input: %s' % (input_data))
    # conv_filters: # of outputs from the convolutional layer
    # kernel_size: size of convolution window
    # activation: activation function used. 
    inner = Conv2D(conv_filters, kernel_size, padding='same', activation=act, 
                   kernel_initializer='he_normal', name='conv1')(input_data)
    print('Conv1 Output: %s' % (inner))
    # pool_size: factors by which to downscale
    inner = MaxPooling2D(pool_size=(pool_size, pool_size), name='max1')(inner)
    print('Max1 Output: %s' % (inner))
    inner = Conv2D(conv_filters, kernel_size, padding='same', activation=act, 
                   kernel_initializer='he_normal', name='conv2')(inner)
    print('Conv2 Output: %s' % (inner))
    inner = MaxPooling2D(pool_size=(pool_size, pool_size), name='max2')(inner)
    print('Max2 Output: %s' % (inner))
    # reshape & cut down tensor for input into RNN
    conv_to_rnn_dims = (img_width // (pool_size ** 2), (img_height // (pool_size ** 2)) * conv_filters)
    inner = Reshape(target_shape=conv_to_rnn_dims, name='reshape')(inner)
    print('Reshape Output: %s' % (inner))
    print("Time dense size: %s" % (time_dense_size))
    inner = Dense(time_dense_size, activation=act, name='dense1')(inner)
    print('Dense1 Output: %s' % (inner))
    
    # two layers of bidirectional GRUs
    # gated recurrent unit is a variant of long short-term memory (LSTM)
    # LSTM units/blocks are responsible for 'remembering' values in an RNN
    gru_1 = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru1')(inner)
    gru_1b = GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='gru1_b')(inner)
    gru1_merged = add([gru_1, gru_1b])
    gru_2 = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru2')(gru1_merged)
    gru_2b = GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='gru2_b')(gru1_merged)
    gru2_concat = concatenate([gru_2, gru_2b])
    
    # transform RNN output to character activations
    inner = Dense(model_train.get_output_size(), kernel_initializer='he_normal', name='dense2')(gru2_concat)
    y_pred = Activation('softmax', name='softmax')(inner)
    
    # prints a summary representation of model
    Model(inputs=input_data, outputs=y_pred).summary()
    
    labels = Input(name='labels', shape=[100], dtype='float64')
    input_length = Input(name='input_length', shape=[1], dtype='int64')
    artist_length = Input(name='artist_length', shape=[1], dtype='int64')
    album_length = Input(name='album_length', shape=[1], dtype='int64')
    label_length = Input(name='label_length', shape=[1], dtype='int64')
    
    # keras does not currently support loss functions with extra parameters
    # so CTC loss is implemented in a lambda layer
    loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])
   
    print('Loading Model')
    
    # clipnorm seems to speed up convergence
    sgd = SGD(lr=0.02, decay=1e-6, momentum=0.9, nesterov=True, clipnorm=5)
    
    if load:
        model = load_model('./tmp_model.h5', compile=False)
    else:
        
        model = Model(inputs=[input_data, labels, input_length, artist_length, album_length,label_length], outputs=loss_out)
        
    # loss calculation occurs elsewhere, so use a dummy lambda function for loss
    model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=sgd)
    
    if not load:
        # captures output of softmax so we can decode the output during the visualization
        test_func = K.function([input_data], [y_pred])
        
        model.fit_generator(generator=model_train.next_batch(),
                        steps_per_epoch=model_train.n,
                        epochs=1)
        
#         model.fit_generator(generator=model_train.next_batch(),
#                            steps_per_epoch=model_train.n,
#                            epochs=1,
#                            validation_data = model_val.next_batch(),
#                            validation_steps = model_val.n)

       # model.fit_generator(generator=model_train.next_batch(), steps_per_epoch=model_train.n, epochs=1)
    return model

In [None]:
model = train(256, load=False)

In [None]:
def decode_batch(out):
    ret = []
    for j in range(out.shape[0]):
        out_best = list(np.argmax(out[j, 2:], 1))
        out_best = [k for k, g in itertools.groupby(out_best)]
        outstr = ''
        for c in out_best:
            if c < len(letters):
                outstr += letters[c]
        ret.append(outstr)
    return ret

In [None]:
model_test = TextImageGenerator('dataset/test/', 256, 192, 8, 4)
model_test.build_data()

net_inp = model.get_layer(name='input').input
net_out = model.get_layer(name='softmax').output

print(net_inp)
print(net_out)

for inp_value, _ in model_test.next_batch():
    bs = inp_value['input'].shape[0]
    X_data = inp_value['input']
    print(X_data)
    tf.global_variables_initializer().run(session=sess)
    net_out_value = sess.run(net_out, feed_dict={net_inp:X_data})
    pred_texts = decode_batch(net_out_value)
    labels = inp_value['labels']
    for label in labels:
        text = ''.join(list(map(lambda x: letters[int(x)], label)))
        texts.append(text)
    
    for i in range(bs):
        print('Predicted: %s\nTrue: %s' % (pred_texts[i], texts[i]))
    break
    