In [1]:
import io
import re
import zipfile
from collections import OrderedDict

from PIL import Image
import tensorflow as tf
from tqdm import tqdm
import numpy as np
from IPython.display import display

In [2]:
print(tf.__version__)

0.12.0


In [3]:
tf.set_random_seed(0)

In [4]:
def extract_images_bytes(path_64='train_64.zip', path='../CatDog/train.zip'):
    labels_images = OrderedDict()
    # load 64
    z = zipfile.ZipFile(path_64, 'r')
    for file in z.filelist:
        m = re.match('.*(cat|dog).*', file.filename)
        if m:
            category = m.groups()[0]
            if category != 'cat': continue
            img = Image.open(io.BytesIO((z.open(file.filename).read())))
            label = re.sub('[^/]+/', '', file.filename)
            labels_images[label] = [img]
    # load origin
    z = zipfile.ZipFile(path, 'r')
    for file in z.filelist:
        m = re.match('.*(cat|dog).*', file.filename)
        if m:
            category = m.groups()[0]
            if category != 'cat': continue
            label = re.sub('[^/]+/', '', file.filename)
            if label not in labels_images: continue
            img = Image.open(io.BytesIO((z.open(file.filename).read())))
            labels_images[label].append(img)
    return labels_images

In [5]:
train = extract_images_bytes()

In [6]:
X_train = np.array([np.array(x[0]) for x in train.values()])
y_train = np.array([np.array(x[1]) for x in train.values()])

In [7]:
test = extract_images_bytes('test_64.zip', '../CatDog/test.zip')

In [8]:
X_test = np.array([np.array(x[0]) for x in test.values()])
y_test = np.array([np.array(x[1]) for x in test.values()])

In [9]:
mean_of_train = np.mean(X_train)
std_of_train = np.std(X_train)
print(mean_of_train, std_of_train)

114.567369759 66.4041757717


In [10]:
X_train = (X_train - mean_of_train) / std_of_train
X_test = (X_test - mean_of_train) / std_of_train
y_train = (y_train - mean_of_train) / std_of_train
y_test = (y_test - mean_of_train) / std_of_train

In [11]:
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)

(10000, 64, 64, 3) (10000, 128, 128, 3) (2500, 64, 64, 3) (2500, 128, 128, 3)


In [12]:
batch_size = 8
learning_rate = 0.001
stddev = 1.0

In [13]:
X = tf.placeholder(tf.float32, [batch_size, 64, 64, 3], name='X')
y = tf.placeholder(tf.float32, [batch_size, 128, 128, 3], name='y')

In [14]:
with tf.variable_scope('dconv'):
    pitch_1 = tf.Variable(tf.random_normal([1, 1, 3, 3], stddev=stddev))
    pitch_1_bias = tf.Variable(tf.random_normal([3], stddev=stddev))
    dconv = tf.nn.bias_add(
        tf.nn.conv2d_transpose(
            X, pitch_1,
            (batch_size, 128, 128, 3),
            (1, 2, 2, 1),
            padding='SAME',
            data_format='NHWC'
        ),
        pitch_1_bias
    )
    print(dconv.get_shape())

(8, 128, 128, 3)


In [15]:
with tf.variable_scope('conv_1'):
    pitch_2 = tf.Variable(tf.random_normal([9, 9, 3, 64], stddev=stddev))
    pitch_2_bias = tf.Variable(tf.random_normal([64], stddev=stddev))
    conv_1 = tf.nn.relu(
        tf.nn.bias_add(
            tf.nn.conv2d(
                dconv, pitch_2, strides=[1, 1, 1, 1], padding='SAME'
            ),
            pitch_2_bias
        )
    )
    print(conv_1.get_shape())

(8, 128, 128, 64)


In [16]:
with tf.variable_scope('conv_2'):
    pitch_3 = tf.Variable(tf.random_normal([3, 3, 64, 32], stddev=stddev))
    pitch_3_bias = tf.Variable(tf.random_normal([32], stddev=stddev))
    conv_2 = tf.nn.relu(
        tf.nn.bias_add(
            tf.nn.conv2d(
                conv_1, pitch_3, strides=[1, 1, 1, 1], padding='SAME'
            ),
            pitch_3_bias
        )
    )
    print(conv_2.get_shape())

(8, 128, 128, 32)


In [17]:
with tf.variable_scope('conv_3'):
    pitch_4 = tf.Variable(tf.random_normal([5, 5, 32, 3], stddev=stddev))
    pitch_4_bias = tf.Variable(tf.random_normal([3], stddev=stddev))
    conv_3 = tf.nn.bias_add(
        tf.nn.conv2d(
            conv_2, pitch_4, strides=[1, 1, 1, 1], padding='SAME'
        ),
        pitch_4_bias
    )
    print(conv_3.get_shape())

(8, 128, 128, 3)


In [18]:
pred = conv_3

In [19]:
slice_begin = (int(y.get_shape()[1]) - int(pred.get_shape()[1]))
slice_end = int(pred.get_shape()[1]) + slice_begin

In [20]:
delta = pred - y[:, slice_begin: slice_end, slice_begin: slice_end, :]
delta *= [[[[0.11448, 0.58661, 0.29891]]]]

In [21]:
loss = tf.pow(delta, 2.0)
cost = tf.reduce_mean(tf.reduce_sum(loss, axis=[1, 2, 3]))

In [22]:
# opt = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)
opt = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(cost)

In [23]:
init = tf.global_variables_initializer()

In [24]:
def batch_flow(inputs, targets, batch_size):
    """流动数据流"""
    flowed = 0
    total = len(inputs)
    while True:
        X_ret = []
        y_ret = []
        for i in range(total):
            X_ret.append(inputs[i])
            y_ret.append(targets[i])
            if len(X_ret) == batch_size:
                flowed += batch_size
                X, y = np.array(X_ret), np.array(y_ret)
                yield X, y
                X_ret = []
                y_ret = []
            if flowed >= total:
                break
        if flowed >= total:
            break

In [25]:
for batch_x, batch_y in batch_flow(X_train, y_train, batch_size):
    print(batch_x.shape, batch_y.shape)
    break

(8, 64, 64, 3) (8, 128, 128, 3)


In [26]:
n_epoch = 30

In [27]:
with tf.Session() as sess:
    sess.run(init)
    total = None
    for epoch in range(n_epoch):
        costs = []
        for batch_x, batch_y in tqdm(batch_flow(X_train, y_train, batch_size), total=total):
            _, c = sess.run([opt, cost], feed_dict={X: batch_x, y: batch_y})
            costs.append(c)
        print('epoch: {}, loss: {:.4f}'.format(epoch, np.mean(costs)))
        if total is None:
            total = len(costs)
    print('calculate train accuracy')
    costs = []
    train_result = []
    for batch_x, batch_y in tqdm(batch_flow(X_train, y_train, batch_size)):
        c, p = sess.run([cost, pred], feed_dict={X: batch_x, y: batch_y})
        costs.append(c)
        train_result += list(p)
    print('test loss: {:.4f}'.format(np.mean(costs)))
    print('calculate test accuracy')
    costs = []
    test_result = []
    for batch_x, batch_y in tqdm(batch_flow(X_test, y_test, batch_size)):
        c, p = sess.run([cost, pred], feed_dict={X: batch_x, y: batch_y})
        costs.append(c)
        test_result += list(p)
    print('test loss: {:.4f}'.format(np.mean(costs)))
    print('Done')

1250it [00:27, 45.40it/s]:00,  1.49it/s]
  0%|          | 5/1250 [00:00<00:28, 44.41it/s]

epoch: 0, loss: nan


100%|██████████| 1250/1250 [00:26<00:00, 45.82it/s]1%|          | 15/1250 [00:00<00:27, 44.71it/s]
  0%|          | 5/1250 [00:00<00:28, 43.90it/s]

epoch: 1, loss: nan


100%|██████████| 1250/1250 [00:25<00:00, 48.73it/s]1%|          | 15/1250 [00:00<00:27, 44.55it/s]
  0%|          | 5/1250 [00:00<00:26, 46.88it/s]

epoch: 2, loss: nan


 97%|█████████▋| 1216/1250 [00:24<00:00, 48.99it/s]1%|          | 15/1250 [00:00<00:26, 46.88it/s]

KeyboardInterrupt: 

In [None]:
def disp(n, mean, std):
    display(
        Image.fromarray(np.uint8(X_train[n] * std + mean)).resize((128, 128), Image.ANTIALIAS),
        Image.fromarray(np.uint8(train_result[n] * std + mean)),
        Image.fromarray(np.uint8(y_train[n] * std + mean))
    )

In [None]:
disp(0, mean_of_train, std_of_train)