In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from scipy.ndimage import imread
import json
import gzip
import tarfile
import random
from hangul_utils import check_syllable, split_syllable_char, split_syllables, join_jamos
%matplotlib inline
plt.rcParams['image.cmap'] = 'Greys'

print("packs loaded")
%connect_info

In [None]:
en_chset = []
en_chset.extend(["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"])
en_chset.extend(["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",\
              "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"])
en_chset.extend(["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N",\
              "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"])
en_chset.extend(["(", ")", "'", "\"", ".", ",", ":", ";", "!", "?", "/", "@", "#", "$",\
              "%", "^", "&", "*", "[", "]", "{", "}", "<", ">", "~", "-"])

ko_chset_cho = ["ㄱ", "ㄲ", "ㄴ", "ㄷ", "ㄸ", "ㄹ", "ㅁ", "ㅂ", "ㅃ", "ㅅ", "ㅆ", "ㅇ", "ㅈ", "ㅉ", "ㅊ", "ㅋ", "ㅌ", "ㅍ", "ㅎ"]
ko_chset_jung = ["ㅏ", "ㅐ", "ㅑ", "ㅒ", "ㅓ", "ㅔ", "ㅕ", "ㅖ", "ㅗ", "ㅘ", "ㅙ", "ㅚ", "ㅛ", "ㅜ", "ㅝ", "ㅞ", "ㅟ", "ㅠ", "ㅡ", "ㅢ", "ㅣ"]
ko_chset_jong = ["X", "ㄱ", "ㄲ", "ㄳ", "ㄴ", "ㄵ", "ㄶ", "ㄷ", "ㄹ", "ㄺ", "ㄻ", "ㄼ", "ㄽ", "ㄾ", "ㄿ", "ㅀ", "ㅁ", "ㅂ", "ㅄ", "ㅅ", "ㅆ", "ㅇ", "ㅈ", "ㅊ", "ㅋ", "ㅌ", "ㅍ", "ㅎ"]

# Read training and test images from file
def get_image_index_from_file(indexf, dataf):
    index_data = []
    with gzip.open(indexf, 'rt') as arc:
        index_data.extend(json.load(arc))
        print("index loaded")

    # Read-stream mode r|*
    with tarfile.open(dataf, "r|*") as tar:
        print("tar opened")
        img_data = []
        for i, member in enumerate(index_data):
            if i%10000 == 1:
                print("%2.0f%% complete (%d / %d)" % (i / len(index_data) * 100, i, len(index_data)))
            ti = tar.next()
            if ti.name != member['path']:
                print("ERROR: order doesn't match")
                break;
            f = tar.extractfile(ti)
            img_data.append(1 - (imread(f)/255))
        img = np.array(img_data)
        del img_data
        print("image loaded")
    return (index_data, img)

def get_label(index_data):
    # len + 1 for one 'invalid' label
    label_ko_cho = np.zeros([len(index_data), len(ko_chset_cho)+1])
    label_ko_jung = np.zeros([len(index_data), len(ko_chset_jung)+1])
    label_ko_jong = np.zeros([len(index_data), len(ko_chset_jong)+1])
    label_en = np.zeros([len(index_data), len(en_chset)+1])
    for i, member in enumerate(index_data):
        target = member['target'] # Target Character
        # Is Hangeul?
        if (check_syllable(target)):
            splited = split_syllable_char()
            label_en[i][len(en_chset)] = 1
            label_ko_cho[i][ko_chset_cho.index(splited[0])] = 1
            label_ko_jung[i][ko_chset_jung.index(splited[1])] = 1
            if len(splited) < 3:
                label_ko_jong[i][0] = 1
            else:
                label_ko_jong[i][ko_chset_jong.index(splited[2])] = 1
        else :
            label_ko_cho[i][len(ko_chset_cho)] = 1
            label_ko_jung[i][len(ko_chset_jung)] = 1
            label_ko_jong[i][len(ko_chset_jong)] = 1
            label_en[i][en_chset.index(target)] = 1
            
    # Concatenate all labels
    label = np.concatenate((label_ko_cho, label_ko_jung, label_ko_jong, label_en), axis=1)
    print("label loaded")
    return label

def get_all():
    index_data_en, img_en = get_image_index_from_file('data/en/index.json.gz', 'data/en/data.tar.gz')
    index_data_ko, img_ko = get_image_index_from_file('data/ko/index.json.gz', 'data/ko/data.tar.gz')
    index_data = index_data_en + index_data_ko
    img = np.concatenate((img_en, img_ko))
    label = get_label(index_data)
    return (index_data, img, label)
        
index_data, img, label = get_all()

In [None]:
def getIndex(l, indexes):
    return [l[i] for i in indexes]

def shuffle(n, *lists):
    perm = np.random.permutation(n)
    lists = list(lists)
    for i in range(len(lists)):
        if hasattr(lists[i], "shape"):
            lists[i] = lists[i][perm]
        else:
            lists[i] = getIndex(lists[i], perm)
    return tuple(lists)

img, label, index_data = shuffle(img.shape[0], img, label, index_data)

print("shuffled")

In [None]:
print(img.shape)
plt.figure()
plt.imshow(img[0], interpolation='none')
print(index_data[0])
print(label[0])

plt.figure()
plt.imshow(img[1], interpolation='none')
print(index_data[1])
print(label[1])

In [None]:
trainimg = img_ko[:-5000]
trainlabel = label_ko[:-5000]
testimg = img_ko[-5000:]
testlabel = label_ko[-5000:]
randidx = np.random.randint(trainimg.shape[0], size=2)

def get_batch(i, batch_size, input_var):
    if batch_size > input_var.shape[0]:
        return input_var
    start = (i*batch_size)%input_var.shape[0]
    overflow = start + batch_size - input_var.shape[0]
    if overflow <= 0:
        return input_var[start:start+batch_size]
    else:
        return np.r_[input_var[start:], input_var[:overflow]]
    
def flatten_cnn(layer):
    layer_shape = layer.get_shape().as_list()
    n_out = layer_shape[1] * layer_shape[2] * layer_shape[3]
    return tf.reshape(layer, [-1, n_out])

def build_nn(shape, X):
    n_before = int(X.get_shape()[1])
    W = tf.Variable(tf.truncated_normal([n_before, shape], stddev=0.1))
    b = tf.Variable(tf.constant(0.1, shape=[shape]))
    return tf.matmul(X, W)+b

def build_cnn(cnn_shape, patch_shape, X):
    n_before = int(X.get_shape()[3])
    W = tf.Variable(tf.truncated_normal([patch_shape[0], patch_shape[1], n_before, cnn_shape], stddev=0.1))
    b = tf.Variable(tf.constant(0.1, shape=[cnn_shape]))
    layer = tf.nn.relu(tf.nn.conv2d(X, W, strides=[1, 1, 1, 1], padding='SAME') + b)
    layer = tf.nn.max_pool(layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
    return layer

print("function loaded")

In [None]:
X = tf.placeholder(tf.float32, [None, 32, 32])
Y = tf.placeholder(tf.float32, [None, label_ko.shape[1]])
keep_prob = tf.placeholder(tf.float32)

Y_cho = tf.slice(Y, [0, 0], [-1, len(ko_chset_cho)])
Y_jung = tf.slice(Y, [0, len(ko_chset_cho)], [-1, len(ko_chset_jung)])
Y_jong = tf.slice(Y, [0, len(ko_chset_cho)+len(ko_chset_jung)], [-1, len(ko_chset_jong)])

cnn_1 = build_cnn(32, [5,5], tf.reshape(X, [-1, 32, 32, 1]))
cnn_2 = build_cnn(64, [5,5], cnn_1)

dense_1 = tf.nn.relu(build_nn(512, flatten_cnn(cnn_2)))
dropout_1 = tf.nn.dropout(dense_1, keep_prob)

dense_2 = tf.nn.relu(build_nn(256, dropout_1))
dropout_2 = tf.nn.dropout(dense_2, keep_prob)

dense_3 = build_nn(label_ko.shape[1], dropout_2)
h_cho = tf.nn.softmax(tf.slice(dense_3, [0, 0], [-1, len(ko_chset_cho)]))
h_jung = tf.nn.softmax(tf.slice(dense_3, [0, len(ko_chset_cho)], [-1, len(ko_chset_jung)]))
h_jong = tf.nn.softmax(tf.slice(dense_3, [0, len(ko_chset_cho)+len(ko_chset_jung)], [-1, len(ko_chset_jong)]))

learning_rate = tf.placeholder(tf.float32)
cost_cho = tf.reduce_mean(-tf.reduce_sum(Y_cho * tf.log(h_cho), reduction_indices=[1]))
cost_jung = tf.reduce_mean(-tf.reduce_sum(Y_jung * tf.log(h_jung), reduction_indices=[1]))
cost_jong = tf.reduce_mean(-tf.reduce_sum(Y_jong * tf.log(h_jong), reduction_indices=[1]))
cost = cost_cho + cost_jung * 2 + cost_jong

train = tf.train.AdamOptimizer(learning_rate).minimize(cost)

correct_cho = tf.equal(tf.argmax(Y_cho,1), tf.argmax(h_cho,1))
correct_jung = tf.equal(tf.argmax(Y_jung,1), tf.argmax(h_jung,1))
correct_jong = tf.equal(tf.argmax(Y_jong,1), tf.argmax(h_jong,1))
correct_two = tf.logical_or(tf.logical_and(correct_cho, tf.logical_or(correct_jung, correct_jong)),
                           tf.logical_and(correct_jung, correct_jong))
correct_all = tf.logical_and(tf.logical_and(correct_cho, correct_jung), correct_jong)
accuracy = tf.reduce_mean(tf.cast(correct_all, tf.float32))
accuracy_two = tf.reduce_mean(tf.cast(correct_two, tf.float32))
accuracy_cho = tf.reduce_mean(tf.cast(correct_cho, tf.float32))
accuracy_jung = tf.reduce_mean(tf.cast(correct_jung, tf.float32))
accuracy_jong = tf.reduce_mean(tf.cast(correct_jong, tf.float32))

sess = tf.Session()
print("session loaded")

In [None]:
def init_session():
    sess.run(tf.initialize_all_variables())
    print("session initialized")
    
def train_accuracy():
    trainsize = trainimg.shape[0]
    batchsize = 100
    batch_per_epoch = int(trainsize/batchsize)
    train_accuracy = 0
    for i in range(batch_per_epoch):
        batch_x = get_batch(i, batchsize, trainimg)
        batch_y = get_batch(i, batchsize, trainlabel)
        train_accuracy += sess.run(accuracy, feed_dict={X:batch_x, Y:batch_y, keep_prob:1})
    return train_accuracy / batch_per_epoch
    
def print_accuracy():
    taccuracy, taccuracy_cho, taccuracy_jung, taccuracy_jong, taccuracy_two = \
            sess.run((accuracy, accuracy_cho, accuracy_jung, accuracy_jong, accuracy_two), feed_dict={X:testimg, Y:testlabel, keep_prob:1})
    print ("test accuracy = %.3f" % taccuracy)
    print ("train accuracy = %.3f" % train_accuracy())
    print ("two of three = %.3f" % taccuracy_two)
    print ("cho = %.3f" % taccuracy_cho)
    print ("jung = %.3f" % taccuracy_jung)
    print ("jong = %.3f" % taccuracy_jong)

def do_training(is_console=False):
    trainsize = trainimg.shape[0]
    batchsize = 100
    batch_per_epoch = int(trainsize/batchsize)
    print ("Training %d, mini-batch %d * %d" % (trainsize, batchsize, batch_per_epoch))

    lr = 0.0003
    for i in range(batch_per_epoch*5):
        if i % 200 == 0 :
            taccuracy, taccuracy_cho, taccuracy_jung, taccuracy_jong, taccuracy_two = \
            sess.run((accuracy, accuracy_cho, accuracy_jung, accuracy_jong, accuracy_two), feed_dict={X:testimg, Y:testlabel, keep_prob:1})
            print ("%6dth epoch : test accuracy = %.3f" % \
                   (i / batch_per_epoch, taccuracy))
            
        if i % batch_per_epoch == 0 :
            print ("                 two of three = %.3f" % taccuracy_two)
            print ("                 cho = %.3f" % taccuracy_cho)
            print ("                 jung = %.3f" % taccuracy_jung)
            print ("                 jong = %.3f" % taccuracy_jong)
                   
        
        if(is_console):
            print ("%dth... lr = %f\r" % (i, lr), end="")

        batch_x = get_batch(i, batchsize, trainimg)
        batch_y = get_batch(i, batchsize, trainlabel)
        sess.run(train, feed_dict={X:batch_x, Y:batch_y, keep_prob:0.5, learning_rate:lr})
        lr = lr * (1 - 0.0003)
    print("train complete")
    print_accuracy()
    
def error_check(pred_label_tuple):
    h, y = pred_label_tuple
    n_error = np.zeros([y.shape[0], y.shape[0]])
    n_all = np.zeros(y.shape[0])

    for i in range(y.shape[0]):
        n_all[np.argmax(y[i])] += 1
        if (np.argmax(h[i]) != np.argmax(y[i])):
            n_error[np.argmax(y[i])][np.argmax(h[i])] += 1


    print ("Error rate")
    for i, ch in enumerate(ko_chset_jung):
        most_error = np.argmax(n_error[i])
        print ("%s : %2.0f%% (%4d / %4d)" %
               (ch, float(np.sum(n_error[i])) / n_all[i] * 100, np.sum(n_error[i]), n_all[i]), end="")
        if n_error[i][most_error] > 0:
            print ("%6d errors with %s" % (n_error[i][most_error], ko_chset_jung[most_error]))
        else:
            print ("")

In [None]:
print_accuracy()

In [None]:
error_check(sess.run((h_jung, Y_jung), feed_dict={X:testimg, Y:testlabel, keep_prob:1}))