This is implementation of [FractalNet](http://arxiv.org/abs/1605.07648) paper

In [1]:
import os
import numpy as np
from six.moves import cPickle as pickle
from sklearn.preprocessing import OneHotEncoder, StandardScaler
import time

height, width, channels = 32, 32, 3
noClasses = 10
batch_size = 50

logdir = "/tmp/FractalNet/test1"
test_log_steps = 10

cifar10root = os.path.expanduser('~/Downloads/cifar-10-batches-py')
cifar10root


train_data = []
train_labels = []


for i in range(1, 6):
    with open(os.path.join(cifar10root, "data_batch_%d" %i), 'rb') as f:
        data_batch = pickle.load(f, encoding='latin1')
        train_data.append(data_batch['data'])
        train_labels.append(np.array(data_batch['labels']).reshape(-1,1))

train_data = np.vstack(train_data)
train_labels = np.vstack(train_labels)
train_labels = OneHotEncoder(noClasses, sparse=False).fit_transform(train_labels)

with open(os.path.join(cifar10root, "test_batch"), 'rb') as f:
    test = pickle.load(f, encoding='latin1')
    test_data = test['data']
    test_label = np.array(test['labels']).reshape(-1,1)
    test_label = OneHotEncoder(noClasses, sparse=False).fit_transform(test_label)
    
scale = StandardScaler().fit(train_data)
train_data = scale.transform(train_data).reshape(-1, height, width, channels)
test_data  = scale.transform(test_data).reshape(-1, height, width, channels)

print(train_data.shape, train_labels.shape)
print(test_data.shape, test_label.shape)



(50000, 32, 32, 3) (50000, 10)
(10000, 32, 32, 3) (10000, 10)




In [2]:
#!/usr/bin/env python

import numpy as np
import tensorflow as tf
import tflearn    
from tflearn.layers.normalization import batch_normalization

class FractalNet():
    def __init__(self, input, n, f_height = 2, f_width = 2, out_chanell=None):
        _, _, _, c = input.get_shape()
        in_channel = int(c)
        if out_chanell is None:
            out_chanell = in_channel
            
        self.n = n 
        self.children = []
        with tf.name_scope("F%d" % n):
            with tf.name_scope("atom"):
                # single comutational "atom" in FractalNet
                
                self.filter = tf.Variable(tf.truncated_normal(
                        [f_height, f_width, in_channel, out_chanell], 
                        stddev=0.35
                    ),
                    name="filter"
                )
                
                self.bias = tf.Variable([0]*out_chanell, dtype=tf.float32, name="bias")
                atom = tf.nn.conv2d(input, self.filter, [1,1,1,1], 'SAME')
                atom = tf.nn.relu(tf.nn.bias_add(atom, self.bias))
                atom = batch_normalization(atom)
                
            self.__tensors = [atom]
            if n > 1:
                Fp = FractalNet(input, n - 1, f_height, f_width, int((out_chanell + in_channel)/2))
                self.children.append(Fp)
                Fp = FractalNet(Fp.get_tensor(), n - 1, f_height, f_width, out_chanell)
                self.children.append(Fp)
                self.__tensors.extend(Fp.__tensors)
            
            with tf.name_scope("join"):
                # activations in join layer 
                # for mean join layer they should be equal and sum to 1
                self.is_active = [
                    tf.Variable(1.0/n, trainable=False, name="a%d"%i)
                    for i in range(n)
                ]
            
                self.__tensor = tf.add_n(
                    [tf.mul(m, x) for m, x in zip(self.is_active, self.__tensors)], 
                    name="Average_pool_join"
                )

    def get_tensor(self):
        return self.__tensor

    def genAssignJoinValues(self, values):
        return tf.group(*[
            var.assign(val)
            for val, var in zip(values, self.is_active)
        ])

    def genColumn(self, column):
        assert 0 <= column < self.n

        values = np.zeros(len(self.is_active))
        values[column] = 1
        
        return tf.group(
            self.genAssignJoinValues(values),
            *[fp.genColumn(column - 1) for fp in self.children if column > 0]
        )

    def genRandomColumn(self):
        return self.genColumn(np.random.randint(self.n))
        
    
    def genLocalDropPath(self, dropout_prob):
        values = np.zeros(self.n)
        while np.sum(values) < 0.5: # ==0; floating point correction
            values = (np.random.random(self.n) > dropout_prob).astype(np.float32)
        values /= np.sum(values) #normalize sum to 1
        return tf.group(
            self.genAssignJoinValues(values),
            *[fp.genLocalDropPath(dropout_prob) for fp in self.children]
        )
    
    def genTestMode(self):
        """
            Kills any droppaths set
        """
        return self.genAssignJoinValues(np.ones(self.n, dtype=np.float32)/self.n)
    
g = tf.Graph()

with g.as_default():
    X = tf.placeholder(tf.float32, [None, height, width, channels], name="input")
    Y = tf.placeholder(tf.float32, [None, noClasses], name="labels")
    FF = []
    net = X
    
    for i, channel_no in enumerate([16, 32, 64, 128, 128]):
        with tf.name_scope("block_%d" % (i + 1)):
            net = FractalNet(net, 1, out_chanell=channel_no)
        FF.append(net)
        net = tf.nn.max_pool(net.get_tensor(), [1,2,2,1], [1,2,2,1], padding='SAME')
        print(i, net.get_shape())
    
    net = tflearn.fully_connected(net, noClasses)
    yp = tf.nn.softmax(net)
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(net, Y))
    learning_rate = tf.Variable(0.01, trainable=False)
    halve_lr = learning_rate.assign(tf.mul(learning_rate, 0.5))
    train = tf.train.MomentumOptimizer(learning_rate, 0.9).minimize(loss)
    acc = tf.reduce_mean(
        tf.cast(tf.nn.in_top_k(yp, tf.argmax(Y, 1), k = 1), tf.float32)
    )
    
    learning_rate_summ = tf.scalar_summary("Learning rate", learning_rate)
    train_acc_summ = tf.scalar_summary("Train accuracy", acc)
    train_loss_summ = tf.scalar_summary("Train loss", loss)
    
    
    train_summ = tf.merge_summary(
        [learning_rate_summ, train_acc_summ, train_loss_summ], 
        name="Training_summary"
    )
    
    test_acc_summ = tf.scalar_summary("Test accuracy", acc)
    test_loss_summ = tf.scalar_summary("Test loss", loss)
    
    test_summ = tf.merge_summary(
        [test_acc_summ, test_loss_summ],
        name="Test_summary"
    )

hdf5 not supported (please install/reinstall h5py)
0 (?, 16, 16, 16)
1 (?, 8, 8, 32)
2 (?, 4, 4, 64)
3 (?, 2, 2, 128)
4 (?, 1, 1, 128)


In [3]:
with tf.Session(graph=g) as sess:
    sess.run(tf.initialize_all_variables())
    merged = tf.merge_all_summaries()
    writer = tf.train.SummaryWriter(logdir, sess.graph)
    step = 0
    reduce_learning = [200, 300, 350, 375]
    for epoh in range(400):
        t = time.clock()
        if epoh in reduce_learning:
            sess.run(halve_lr)
        tflearn.is_training(True)
        batch_idxs = np.random.permutation(train_data.shape[0])
        for batch in np.split(batch_idxs, train_data.shape[0]/batch_size):
            step += 1
            for F in FF[::2]:
                sess.run(F.genLocalDropPath(0.15))
            for F in FF[1::2]:
                sess.run(F.genRandomColumn())
            summ, _ = sess.run([train_summ, train], feed_dict={
                    X: train_data[batch],
                    Y: train_labels[batch]
                })
            writer.add_summary(summ, step)
            if step % test_log_steps == 0 or step == 1:
                tn = time.clock()

                tflearn.is_training(False)
                for F in FF:
                    sess.run(F.genTestMode())
                summ, top1, avg_loss = sess.run([test_summ, acc, loss], feed_dict={
                        X: test_data,
                        Y: test_label
                    })
                writer.add_summary(summ, step)
                print("[{:10d}] step, acc: {:2.5f}%, loss: {:2.7f} steps/sec{:5.2f}"
                      .format(step, 100 * top1, avg_loss, test_log_steps/(tn-t)))
                t = tn

KeyboardInterrupt: 