# 30分钟从零开始搞定手写汉字识别

In [None]:
import numpy as np
from keras.layers.core import Flatten, Dense, Dropout, Lambda
from keras.models import Sequential
from keras.optimizers import Adam, Nadam, SGD
from keras.preprocessing import image
from keras.datasets import mnist
from keras.utils.np_utils import to_categorical
from keras.layers import Conv2D, MaxPooling2D, ZeroPadding2D, Activation
from keras.layers.normalization import BatchNormalization
from keras.callbacks import TensorBoard

## 读取数据

这里读取的是MNIST数据，如果是手写汉字识别应该使用 ImageDataGenerator 类的flow_from_directory函数，为了不违背主办方的初衷，这里只给出[官网教程链接](https://keras-cn.readthedocs.io/en/latest/preprocessing/image/#_1)

In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
(x_train.shape, y_train.shape, x_test.shape, y_test.shape)

可以看到，读取出来的数据时 6W 张 28*28 像素的二维图片数据，但是我们的模型一般接收的都是三维数据，即，长、宽、颜色，所以我们需要补齐第三维。如果是使用 ImageDataGenerator 则不存在这个问题

In [None]:
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype('float32')
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1).astype('float32')
(x_train.shape, y_train.shape, x_test.shape, y_test.shape)

这里就是 keras 相较于 tensorflow 的优势，我们可以直接查看我们到底读取了什么数据

In [None]:
y_train[:5]

y_train 是我们的标签，在分类任务中，我们不能直接使用这样的标签，我们需要转换为 onehot 向量，什么是 onehot？其实就是将整数用向量表示，该向量的维度等于分类任务的类别数，如果是MNIST就是10，如果是HCCR就是3755，该向量只有第N个分量为1，其他都是0，N就是整数的数值

In [None]:
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_train[:5]

将数据分割成 min_batch,为什么要使用 batch？因为在训练类似于手写汉字识别这样的任务时，不可能一次性将所有的数据装入显存进行计算

In [None]:
mean_px = x_train.mean().astype(np.float32)
std_px = x_train.std().astype(np.float32)
def norm_input(x): return (x - mean_px) / std_px

In [None]:
batch_size = 128
gen = image.ImageDataGenerator(featurewise_std_normalization=True, samplewise_std_normalization=True)
train_batchs = gen.flow(x_train, y_train, batch_size=batch_size)
test_batchs = gen.flow(x_test, y_test, batch_size=batch_size)

In [None]:
tmp = train_batchs.next()[0]
np.max(tmp[0].reshape(28,28))
np.min(tmp[0].reshape(28,28))

## Linear Model

In [None]:
def get_lin_model():
    model = Sequential([
        Flatten(input_shape=(28, 28, 1)),
        Dense(10, activation='softmax')
    ])
    model.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
lm = get_lin_model()
lm.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
                 validation_data=test_batches, validation_steps=test_batches.n//batch_size)

## Single Dense Layer

In [None]:
def get_single_dense_model():
    model = Sequential([
        Lambda(norm_input, input_shape=(1, 28, 28)),
        Flatten(),
        Dense(512, activation='softmax'),
        Dense(10, activation='softmax')
    ])
    model.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
single_dense_model = get_single_dense_model()

In [None]:
single_dense_model.optimizer.lr = 0.1
single_dense_model.fit_generator(batches, steps_per_epoch=batches.n//batch_size,
                                 epochs=4, validation_data=test_batches, validation_steps=test_batches.n//batch_size)

In [None]:
single_dense_model.optimizer.lr=0.01
single_dense_model.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=10, validation_data=test_batches, validation_steps=test_batches.n//batch_size)

## Vgg + Batchnorm + dropout + data augmentation

In [None]:
def get_vgg_improve():
    model = Sequential([
        Lambda(norm_input, input_shape=(1, 28, 28)),
        Conv2D(32, 3, 3, activation='relu', input_shape=(1, 28, 28)),
        BatchNormalization(axis=1),
        Conv2D(32, 3, 3, activation='relu'),
        MaxPooling2D(),
        BatchNormalization(axis=1),
        Conv2D(64, 3, 3, activation='relu'),
        BatchNormalization(axis=1),
        Conv2D(64, 3, 3, activation='relu'),
        MaxPooling2D(),
        Flatten(),
        BatchNormalization(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(10, activation='softmax')
    ])
    model.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model
def get_model():
    model = Sequential([
        Lambda(norm_input, input_shape=(28,28,1)),
        ZeroPadding2D(),
        Convolution2D(28,3,3, activation='relu'),
        MaxPooling2D(),
        Flatten(),
        Dense(512, activation='relu'),
        Dense(10, activation='softmax')
        ])
    model.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
model = get_model()
model.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=4, validation_data=test_batches, validation_steps=test_batches.n//batch_size)

In [None]:
# Larger CNN for the MNIST Dataset
import numpy
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import Flatten
from keras.layers.convolutional import Conv2D
from keras.layers.convolutional import MaxPooling2D
from keras.utils import np_utils
from keras import backend as K
K.set_image_dim_ordering('th')
# fix random seed for reproducibility
seed = 7
numpy.random.seed(seed)
# load data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# reshape to be [samples][pixels][width][height]
X_train = X_train.reshape(X_train.shape[0], 1, 28, 28).astype('float32')
X_test = X_test.reshape(X_test.shape[0], 1, 28, 28).astype('float32')
# normalize inputs from 0-255 to 0-1
X_train = X_train / 255
X_test = X_test / 255
# one hot encode outputs
y_train = np_utils.to_categorical(y_train)
y_test = np_utils.to_categorical(y_test)
num_classes = y_test.shape[1]
def larger_model():
	# create model
	model = Sequential()
	model.add(Conv2D(30, (5, 5), input_shape=(1, 28, 28), activation='relu'))
	model.add(MaxPooling2D(pool_size=(2, 2)))
	model.add(Conv2D(15, (3, 3), activation='relu'))
	model.add(MaxPooling2D(pool_size=(2, 2)))
	model.add(Dropout(0.2))
	model.add(Flatten())
	model.add(Dense(128, activation='relu'))
	model.add(Dense(50, activation='relu'))
	model.add(Dense(num_classes, activation='softmax'))
	# Compile model
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	return model

In [None]:
model = VGG16()
model.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=4, validation_data=test_batches, validation_steps=test_batches.n//batch_size)

In [None]:
# build the model
model = larger_model()
# Fit the model
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=200)
# Final evaluation of the model
scores = model.evaluate(X_test, y_test, verbose=0)
print("Large CNN Error: %.2f%%" % (100-scores[1]*100))

## RestNet

In [None]:
#-*- coding: UTF-8 -*-
"""
Environment: Keras2.0.5，Python2.7
Model: ResNet
"""

from __future__ import division
from keras.models import Model
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from keras.layers import Input, Activation, Dense, Flatten
from keras.layers.merge import add
from keras.layers.normalization import BatchNormalization
from keras.regularizers import l2
from keras import backend as K
from keras.utils import plot_model
import six



def _handle_dim_ordering():
    global ROW_AXIS
    global COL_AXIS
    global CHANNEL_AXIS
    if K.image_dim_ordering() == 'tf':
        ROW_AXIS = 1
        COL_AXIS = 2
        CHANNEL_AXIS = 3
    else:
        CHANNEL_AXIS = 1
        ROW_AXIS = 2
        COL_AXIS = 3



def _get_block(identifier):
    if isinstance(identifier, six.string_types):
        res = globals().get(identifier)
        if not res:
            raise ValueError('Invalid {}'.format(identifier))
        return res
    return identifier



def _bn_relu(input):
    """
    Helper to build a BN -> relu block
    """

    norm = BatchNormalization(axis=CHANNEL_AXIS)(input)
    return Activation("relu")(norm)



def _conv_bn_relu(**conv_params):

    """
    Helper to build a conv -> BN -> relu block
    """

    filters = conv_params["filters"]
    kernel_size = conv_params["kernel_size"]
    strides = conv_params.setdefault("strides", (1, 1))
    kernel_initializer = conv_params.setdefault("kernel_initializer", "he_normal")
    padding = conv_params.setdefault("padding", "same")
    kernel_regularizer = conv_params.setdefault("kernel_regularizer", l2(1.e-4))

    def f(input):
        conv = Conv2D(filters=filters, kernel_size=kernel_size,strides=strides, padding=padding,kernel_initializer=kernel_initializer,kernel_regularizer=kernel_regularizer)(input)
        return _bn_relu(conv)
    return f



def _bn_relu_conv(**conv_params):

    """
    Helper to build a BN -> relu -> conv block.
    This is an improved scheme proposed in http://arxiv.org/pdf/1603.05027v2.pdf
    """

    filters = conv_params["filters"]
    kernel_size = conv_params["kernel_size"]
    strides = conv_params.setdefault("strides", (1, 1))
    kernel_initializer = conv_params.setdefault("kernel_initializer", "he_normal")
    padding = conv_params.setdefault("padding", "same")
    kernel_regularizer = conv_params.setdefault("kernel_regularizer", l2(1.e-4))

    def f(input):
        activation = _bn_relu(input)
        return Conv2D(filters=filters, kernel_size=kernel_size,strides=strides, padding=padding, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer)(activation)
    return f



def _shortcut(input, residual):

    """
    Adds a shortcut between input and residual block and merges them with "sum"
    """
    # Expand channels of shortcut to match residual.
    # Stride appropriately to match residual (width, height)
    # Should be int if network architecture is correctly configured.

    input_shape = K.int_shape(input)
    residual_shape = K.int_shape(residual)
    stride_width = int(round(input_shape[ROW_AXIS] / residual_shape[ROW_AXIS]))
    stride_height = int(round(input_shape[COL_AXIS] / residual_shape[COL_AXIS]))
    equal_channels = input_shape[CHANNEL_AXIS] == residual_shape[CHANNEL_AXIS]

    shortcut = input

    # 1 X 1 conv if shape is different. Else identity.
    if stride_width > 1 or stride_height > 1 or not equal_channels:
        shortcut = Conv2D(filters=residual_shape[CHANNEL_AXIS], kernel_size=(1, 1), strides=(stride_width, stride_height), padding="valid", kernel_initializer="he_normal",kernel_regularizer=l2(0.0001))(input)
    return add([shortcut, residual])



def _residual_block(block_function, filters, repetitions, is_first_layer=False):

    """
    Builds a residual block with repeating bottleneck blocks.
    """

    def f(input):
        for i in range(repetitions):
            init_strides = (1, 1)
            if i == 0 and not is_first_layer:
                init_strides = (2, 2)
            input = block_function(filters=filters, init_strides=init_strides, is_first_block_of_first_layer=(is_first_layer and i == 0))(input)
        return input
    return f




def basic_block(filters, init_strides=(1, 1), is_first_block_of_first_layer=False):

    """
    Basic 3 X 3 convolution blocks for use on resnets with layers <= 34.
    Follows improved proposed scheme in http://arxiv.org/pdf/1603.05027v2.pdf
    """

    def f(input):
        if is_first_block_of_first_layer:
            # don't repeat bn->relu since we just did bn->relu->maxpool
            conv1 = Conv2D(filters=filters, kernel_size=(3, 3),strides=init_strides, padding="same", kernel_initializer="he_normal", kernel_regularizer=l2(1e-4))(input)
        else:
            conv1 = _bn_relu_conv(filters=filters, kernel_size=(3, 3), strides=init_strides)(input)
        residual = _bn_relu_conv(filters=filters, kernel_size=(3, 3))(conv1)
        return _shortcut(input, residual)
    return f



def bottleneck(filters, init_strides=(1, 1), is_first_block_of_first_layer=False):

    """
    Bottleneck architecture for > 34 layer resnet.
    Follows improved proposed scheme in http://arxiv.org/pdf/1603.05027v2.pdf
    Returns:
        A final conv layer of filters * 4
    """

    def f(input):
        if is_first_block_of_first_layer:
            # don't repeat bn->relu since we just did bn->relu->maxpool
            conv_1_1 = Conv2D(filters=filters, kernel_size=(1, 1), strides=init_strides, padding="same", kernel_initializer="he_normal", kernel_regularizer=l2(1e-4))(input)
        else:
            conv_1_1 = _bn_relu_conv(filters=filters, kernel_size=(1, 1), strides=init_strides)(input)

        conv_3_3 = _bn_relu_conv(filters=filters, kernel_size=(3, 3))(conv_1_1)
        residual = _bn_relu_conv(filters=filters * 4, kernel_size=(1, 1))(conv_3_3)
        return _shortcut(input, residual)
    return f



class ResnetBuilder(object):
    @staticmethod
    def build(input_shape, num_outputs, block_fn, repetitions):
        """
        Builds a custom ResNet like architecture.
        Args:
            input_shape: The input shape in the form (nb_channels, nb_rows, nb_cols)
            num_outputs: The number of outputs at final softmax layer
            block_fn: The block function to use. This is either `basic_block` or `bottleneck`.The original paper used basic_block for layers < 50
            repetitions: Number of repetitions of various block units.At each block unit, the number of filters are doubled and the input size is halved
        Returns:
            The keras `Model`.
        """

        _handle_dim_ordering()

        if len(input_shape) != 3:
            raise Exception("Input shape should be a tuple (nb_channels, nb_rows, nb_cols)")

        # Permute dimension order if necessary
        if K.image_dim_ordering() == 'tf':
            input_shape = (input_shape[0], input_shape[1], input_shape[2])

        # Load function from str if needed.
        block_fn = _get_block(block_fn)

        input = Input(shape=input_shape)
        conv1 = _conv_bn_relu(filters=64, kernel_size=(7, 7), strides=(2, 2))(input)
        pool1 = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding="same")(conv1)

        block = pool1
        filters = 64
        for i, r in enumerate(repetitions):
            block = _residual_block(block_fn, filters=filters, repetitions=r, is_first_layer=(i == 0))(block)
            filters *= 2

        # Last activation
        block = _bn_relu(block)

        # Classifier block
        block_shape = K.int_shape(block)
        pool2 = AveragePooling2D(pool_size=(block_shape[ROW_AXIS], block_shape[COL_AXIS]), strides=(1, 1))(block)
        flatten1 = Flatten()(pool2)
        dense = Dense(units=num_outputs, kernel_initializer="he_normal", activation="softmax")(flatten1)

        model = Model(inputs=input, outputs=dense)
        return model


    @staticmethod
    def build_resnet_18(input_shape, num_outputs):
        return ResnetBuilder.build(input_shape, num_outputs, basic_block, [2, 2, 2, 2])

    @staticmethod
    def build_resnet_34(input_shape, num_outputs):
        return ResnetBuilder.build(input_shape, num_outputs, basic_block, [3, 4, 6, 3])

    @staticmethod
    def build_resnet_50(input_shape, num_outputs):
        return ResnetBuilder.build(input_shape, num_outputs, bottleneck, [3, 4, 6, 3])

    @staticmethod
    def build_resnet_101(input_shape, num_outputs):
        return ResnetBuilder.build(input_shape, num_outputs, bottleneck, [3, 4, 23, 3])

    @staticmethod
    def build_resnet_152(input_shape, num_outputs):
        return ResnetBuilder.build(input_shape, num_outputs, bottleneck, [3, 8, 36, 3])




model_18=ResnetBuilder.build_resnet_18((1, 28, 28), 10)
#model.summary()

# Save a PNG of the Model Build
#plot_model(model,to_file='ResNet.png')

model_18.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
# model.compile(optimizer='sgd',loss='categorical_crossentropy')
model_18.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)
model_18.optimizer.lr=0.01
model_18.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)    
model_18.optimizer.lr=0.001
model_18.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)      

In [None]:
batch_size = 64
gen = image.ImageDataGenerator(samplewise_std_normalization=True)
batches = gen.flow(x_train, y_train, batch_size=batch_size)
test_batches = gen.flow(x_test, y_test, batch_size=batch_size)
model_18.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)

In [None]:
# Create a Keras Model
model_34=ResnetBuilder.build_resnet_34((1, 28, 28), 10)
#model.summary()

# Save a PNG of the Model Build
#plot_model(model,to_file='ResNet.png')

model_34.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
# model.compile(optimizer='sgd',loss='categorical_crossentropy')
model_34.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)
model_34.optimizer.lr=0.01
model_34.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)    
model_34.optimizer.lr=0.001
model_34.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)      

In [None]:
# Create a Keras Model
model_50=ResnetBuilder.build_resnet_50((1, 28, 28), 10)
#model.summary()

# Save a PNG of the Model Build
#plot_model(model,to_file='ResNet.png')

model_50.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
# model.compile(optimizer='sgd',loss='categorical_crossentropy')
# print('Model Compiled')
model_50.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)
model_50.optimizer.lr=0.01
model_50.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)    
model_50.optimizer.lr=0.001
model_50.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)  

主要试试steps_per_epoch能不能很快的打印日志，验证后发现可以提前终止epoch，但是旧版本的keras有bug，无法提前终止

In [None]:
# Create a Keras Model
model_50=ResnetBuilder.build_resnet_50((1, 28, 28), 10)
#model.summary()

# Save a PNG of the Model Build
#plot_model(model,to_file='ResNet.png')

model_50.compile(Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
# model.compile(optimizer='sgd',loss='categorical_crossentropy')
# print('Model Compiled')
model_50.fit_generator(batches, steps_per_epoch=100, epochs=1,
             validation_data=test_batches, validation_steps=test_batches.n//batch_size)
model_50.optimizer.lr=0.01
model_50.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
             validation_data=test_batches, validation_steps=100)    
model_50.optimizer.lr=0.001
model_50.fit_generator(batches, steps_per_epoch=100, epochs=1,
             validation_data=test_batches, validation_steps=100)  

## ResNet20

In [None]:
from python_code.resnet_20 import build_resnet
model = build_resnet((28, 28, 1), 10)
model.compile(SGD(lr=0.1, momentum=0.9, decay=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
batch_size = 100
gen = image.ImageDataGenerator()
batches = gen.flow(x_train, y_train, batch_size=batch_size)
test_batches = gen.flow(x_test, y_test, batch_size=batch_size)

In [None]:
model.fit_generator(batches, steps_per_epoch=batches.n//batch_size, epochs=1,
                 validation_data=test_batches, validation_steps=test_batches.n//batch_size)

In [None]:
# %load /tmp/tmp.py
import os
import numpy as np
import struct
import PIL.Image

train_data_dir = '/home/data/HWDB/train/'
test_data_dir = '/home/data/HWDB/test/'
log_dir = '/aiml/dfs/checkpoint/train/'

# 读取图像和对应的汉字
def read_from_gnt_dir(gnt_dir=train_data_dir):
    def one_file(f):
        header_size = 10
        while True:
            header = np.fromfile(f, dtype='uint8', count=header_size)
            if not header.size: break
            sample_size = header[0] + (header[1]<<8) + (header[2]<<16) + (header[3]<<24)
            tagcode = header[5] + (header[4]<<8)
            width = header[6] + (header[7]<<8)
            height = header[8] + (header[9]<<8)
            if header_size + width*height != sample_size:
                break
            image = np.fromfile(f, dtype='uint8', count=width*height).reshape((height, width))
            yield image, tagcode

    for file_name in os.listdir(gnt_dir):
        if file_name.endswith('.gnt'):
            file_path = os.path.join(gnt_dir, file_name)
            with open(file_path, 'rb') as f:
                for image, tagcode in one_file(f):
                    yield image, tagcode

def resize_and_normalize_image(img):
    # 补方
    pad_size = abs(img.shape[0]-img.shape[1]) // 2
    if img.shape[0] < img.shape[1]:
        pad_dims = ((pad_size, pad_size), (0, 0))
    else:
        pad_dims = ((0, 0), (pad_size, pad_size))
        img = np.lib.pad(img, pad_dims, mode='constant', constant_values=255)
        # 缩放
        img = scipy.misc.imresize(img, (64 - 4*2, 64 - 4*2))
        img = np.lib.pad(img, ((4, 4), (4, 4)), mode='constant', constant_values=255)
        assert img.shape == (64, 64)

    img = img.flatten()
    # 像素值范围-1到1
    img = (img - 128) / 128
    return img

# one hot
def convert_to_one_hot(char):
    vector = np.zeros(len(char_set))
    vector[char_set.index(char)] = 1
    return vector

train_data_x = []
train_data_y = []
for image, tagcode in read_from_gnt_dir(gnt_dir=train_data_dir):
    tagcode_unicode = struct.pack('>H', tagcode).decode('gb2312')
    train_data_x.append(resize_and_normalize_image(image))
    train_data_y.append(convert_to_one_hot(tagcode_unicode))

test_data_x = []
test_data_y = []
for image, tagcode in read_from_gnt_dir(gnt_dir=test_data_dir):
    tagcode_unicode = struct.pack('>H', tagcode).decode('gb2312')
    text_data_x.append(resize_and_normalize_image(image))
    text_data_y.append(convert_to_one_hot(tagcode_unicode))
