**Задание 1.**

Реализуйте нейронную сеть с двумя сверточными слоями, и одним полносвязным с нейронами с кусочно-линейной функцией активации. Какова точность построенное модели?

**Задание 2.**

Замените один из сверточных слоев на слой, реализующий операцию пулинга (Pooling) с функцией максимума или среднего. Как это повлияло на точность классификатора?

**Задание 3.**

Реализуйте классическую архитектуру сверточных сетей LeNet-5 (http://yann.lecun.com/exdb/lenet/).

**Задание 4.**

Сравните максимальные точности моделей, построенных в лабораторных работах 1-3. Как можно объяснить полученные различия?


In [113]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

In [58]:
import numpy as np
import tensorflow as tf
import scipy.io
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.utils import resample
from sklearn.model_selection import train_test_split
import random
import string

In [3]:
tf.__version__

'1.14.0'

In [4]:
large_dataset_path = '../../lab_1/src/notMNIST_large_clean.mat'
small_dataset_path = '../../lab_1/src/notMNIST_small_uniq.mat'

In [5]:
chars = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
def prepare_dataset(dataset, records=None):
    data = list()
    labels = list()

    lb = preprocessing.LabelBinarizer()
    lb.fit(chars)
    
    if records:
        one = int(records / len(chars))
        #check
        for ch in chars:
            ch_len = len(dataset[ch])
            assert ch_len >= one, f'"{ch}" has {ch_len} items but required {one}'
        #print(one)
        for ch in chars:
            indexes = np.random.choice(len(dataset[ch]), one)
            picked_elements = dataset[ch][indexes] / 255
            data.extend(picked_elements)
            labels.extend(np.tile(lb.transform([ch])[0], (one, 1)))
    else:
        for ch in chars:
            data.extend(dataset[ch]/255)
            labels.extend(np.tile(lb.transform([ch])[0], (len(dataset[ch]), 1)))
    
    data = np.array(data)
    labels = np.array(labels)
    
    return resample(data, labels)

In [6]:
small_data = scipy.io.loadmat(small_dataset_path)
test_X, test_y = prepare_dataset(small_data, 2000)
test_X.shape

(2000, 784)

In [144]:
large_data = scipy.io.loadmat(large_dataset_path)
train_X, train_y = prepare_dataset(large_data, 10000)
train_X.shape

(10000, 784)

In [57]:
def rand_str(str_len=20):
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=str_len))

In [269]:
class BaseModel():
    def __init__(self):
        self.init_basic_params()
        
        self.compile()
        
        self.tf_writer.add_graph(self.session.graph)
    
    def init_basic_params(self):
        tf.reset_default_graph()
        self.logs_path = './tf_board/' + self.__class__.__name__
        self.var_scope = rand_str()
        self.print_separator = '-' * 65
        self.session = None
        self.dropout_rate_tf = tf.placeholder(tf.float32)
        
        self.input_size = 784
        self.output_size = 10
        
        self.dropout_rate = 0.1
        
        self.start_learning_rate = 0.1
        
        self.tf_writer = tf.summary.FileWriter(self.logs_path)
    
    def __del__(self): 
        print('object del')
        if self.session:
            tf.reset_default_graph()
            self.session.close()
    
    def reset_internal_params(self):
        self.hidden_layers = {}
        self.hidden_layers_W = {}
        self.hidden_layers_b = {}
        self.history = {
            'acc_train': [],
            'acc_valid': [],
            'loss_train': [],
            'loss_valid': []
        }
        
        self.global_step = tf.Variable(0, trainable=False)
        self.decay_steps = tf.Variable(100000, trainable=False)
    
    def get_W(self, layer_id, shape):
        #with tf.variable_scope(self.var_scope, reuse=tf.AUTO_REUSE):
        W = tf.Variable(tf.truncated_normal(shape, stddev=0.01), name=f'W_{layer_id}')
        self.hidden_layers_W[layer_id] = W

        return self.hidden_layers_W[layer_id]

    def get_b(self, layer_id, shape):
        #with tf.variable_scope(self.var_scope, reuse=tf.AUTO_REUSE):
        self.hidden_layers_b[layer_id] = tf.Variable(tf.zeros(shape), name=f'b_{layer_id}')
        
        return self.hidden_layers_b[layer_id]
        
    def get_dense_layer(self, layer_id, prev_layer, units_count, activ=tf.nn.relu):
        input_size = prev_layer.get_shape().as_list()[1]
        
        W = self.get_W(layer_id, [input_size, units_count])
        b = self.get_b(layer_id, [units_count])
        
        layer = tf.matmul(prev_layer, W) + b
        
        if activ:
            layer = activ(layer, name=f'Lay_Dense_{layer_id}')
        
        return layer
    
    def get_conv2_layer(self, layer_id, prev_layer, kernel_size, output_channels, strides=1, padding='SAME', activ=tf.nn.relu):
        input_channels = prev_layer.get_shape().as_list()[3]
        
        # 5x5 conv, 1 input, 32 outputs
        # [filter_height, filter_width, in_channels, out_channels]
        filter_shape = [kernel_size[0], kernel_size[1], input_channels, output_channels]

        W = self.get_W(layer_id, filter_shape)
        b = self.get_b(layer_id, [output_channels])
        
        layer = tf.nn.conv2d(prev_layer, W, [1, strides, strides, 1], padding) + b
        
        if activ:
            layer = activ(layer, name=f'Lay_Conv2d_{layer_id}')  
        
        return layer
    
    def next_batch(self, x, y, batch_size, iteration):
        start = iteration * batch_size
        end = (iteration + 1) * batch_size
        
        return x[start:end], y[start:end]
    
    def flatten(self, input):
        shape = input.get_shape().as_list()
        shape = np.array(shape)
        size = shape[shape != None].prod()
        
        return tf.reshape(input, [-1, size], name='Flatten')
    
    def get_max_pooling(self, input, ksize, stride=1, padding='SAME'):
        return tf.nn.max_pool(input, [1, ksize, ksize, 1], [1, stride, stride, 1], padding, name='max_pool')

    def get_avg_pooling(self, input, ksize, stride=1, padding='SAME'):
        return tf.nn.avg_pool(input, [1, ksize, ksize, 1], [1, stride, stride, 1], padding, name='avg_pool')
    
    def pre_compile():
        print('precompile')
    
    def compile(self):
        
        self.lr = tf.placeholder(tf.float32, shape=[])
        
        self.pre_compile()
        
        self.prediction = tf.nn.softmax(self.layer_output, name='Output')
        self.accuracy = tf.reduce_mean(
            tf.cast(
                tf.equal(tf.argmax(self.prediction, 1), tf.argmax(self.expected_output, 1)),
                tf.float32,
            ),
            name='Accuracy'
        )
                
        self.session = tf.Session()
        self.vars = tf.global_variables_initializer()
        self.session.run(tf.global_variables_initializer())
        
    def fit(self, x=None, y=None, batch_size=64, epochs=1):
        
        valid_size = 0.3
        
        if len(y) * valid_size > 20_000:
            valid_size = 20_000
        
        x_train, x_valid, y_train, y_valid = train_test_split(x, y, test_size=valid_size, random_state=50)
        
        print(f'Train size: {len(y_train)},\t Valid size: {len(y_valid)}')
        
        iterations = int(len(y_train) / batch_size)
        
        display_info = int(iterations / 2)
        
        print(self.print_separator)
        print(f'Epochs: {epochs}\t| Iterations: {iterations}\t| Batch: {batch_size}')
        print(self.print_separator)
        
        self.session.run(self.decay_steps.assign(iterations))
        
        for epoch in range(epochs):
            x_train_epoch, y_train_epoch = resample(x_train, y_train)
            
            for iteration in range(iterations):
                x_batch, y_batch = self.next_batch(x_train_epoch, y_train_epoch, batch_size, iteration)

                feed_data = { 
                    self.input: x_batch, 
                    self.expected_output: y_batch,
                    self.dropout_rate_tf: self.dropout_rate
                }
                
                self.session.run(self.optimizer, feed_dict=feed_data)

            
            feed_data_train = { self.input: x_train, self.expected_output: y_train, self.dropout_rate_tf: 0}
            loss_train, acc_train = self.session.run([self.loss, self.accuracy], feed_dict=feed_data_train)
            
            # print(self.session.run([self.global_step, self.decay_steps, self.lr]))
            
            self.history['acc_train'].append(acc_train)
            self.history['loss_train'].append(loss_train)

            feed_data_valid = { self.input: x_valid, self.expected_output: y_valid, self.dropout_rate_tf: 0}
            loss_valid, acc_valid = self.session.run([self.loss, self.accuracy], feed_dict=feed_data_valid)

            self.history['acc_valid'].append(acc_valid)
            self.history['loss_valid'].append(loss_valid)

            print(f'Epoch {epoch + 1}: loss - Tr[{loss_train:.2f}] Va[{loss_valid:.2f}] \t acc - Tr[{acc_train:.01%}] Va[{acc_valid:.01%}]')
            print(self.print_separator)

In [230]:
class Model1CCD(BaseModel):
    def __init__(self):
        BaseModel.__init__(self)

    def pre_compile(self):
        self.reset_internal_params()
        
        self.input = tf.placeholder(tf.float32, shape=[None, self.input_size], name="Input")
        self.expected_output = tf.placeholder(tf.float32, shape=[None, self.output_size], name="Y_actual")
        
        # [batch, in_height, in_width, in_channels]
        layer_1_pre = tf.reshape(self.input, [-1, 28, 28, 1])
        
        layer_1 = self.get_conv2_layer(1, layer_1_pre, [3, 3], 16, strides=2)
        
        layer_2 = self.get_conv2_layer(2, layer_1, [3, 3], 16, strides=2)
                    
        layer_3_pre = self.flatten(layer_2)
        
        layer_3 = self.get_dense_layer(3, layer_3_pre, 50)
        layer_3_do = tf.nn.dropout(layer_3, rate=self.dropout_rate_tf)
        
        self.layer_output = self.get_dense_layer(4, layer_3_do, 10, activ=None)
        
        self.loss = tf.reduce_mean(
            tf.nn.softmax_cross_entropy_with_logits_v2(labels=self.expected_output, logits=self.layer_output),
            name='Loss'
        )
        
        self.lr = tf.compat.v1.train.exponential_decay(self.start_learning_rate, self.global_step, self.decay_steps, 0.96)
        
        self.optimizer = tf.train.GradientDescentOptimizer(self.lr).minimize(self.loss, global_step=self.global_step)

In [232]:
class Model2CPD(BaseModel):
    def __init__(self):
        BaseModel.__init__(self)

    def pre_compile(self):
        self.reset_internal_params()
        
        self.input = tf.placeholder(tf.float32, shape=[None, self.input_size], name="Input")
        self.expected_output = tf.placeholder(tf.float32, shape=[None, self.output_size], name="Y_actual")
        
        # [batch, in_height, in_width, in_channels]
        layer_1_pre = tf.reshape(self.input, [-1, 28, 28, 1])
        
        layer_1 = self.get_conv2_layer(1, layer_1_pre, [5, 5], 16, strides=2)
        
        layer_1_pool = self.get_max_pooling(layer_1, 3)
                    
        layer_2_pre = self.flatten(layer_1_pool)
        
        layer_2 = self.get_dense_layer(3, layer_2_pre, 50)
        layer_2_do = tf.nn.dropout(layer_2, rate=self.dropout_rate_tf)
        
        self.layer_output = self.get_dense_layer(4, layer_2_do, 10, activ=None)
        
        self.loss = tf.reduce_mean(
            tf.nn.softmax_cross_entropy_with_logits_v2(labels=self.expected_output, logits=self.layer_output),
            name='Loss'
        )
        
        self.lr = tf.compat.v1.train.exponential_decay(self.start_learning_rate, self.global_step, self.decay_steps, 0.96)
        
        self.optimizer = tf.train.GradientDescentOptimizer(self.lr).minimize(self.loss, global_step=self.global_step)

In [268]:
class LeNet5(BaseModel):
    def __init__(self):
        BaseModel.__init__(self)
    
    def init_basic_params(self):
        super().init_basic_params()
        self.input_size = [32, 32, 1]
    
    def pad_x(self, x):
        temp = x.reshape(-1, 28, 28, 1)
        return np.pad(temp, ((0,0),(2,2),(2,2),(0,0)), 'constant')
    
    def fit(self, x=None, y=None, batch_size=64, epochs=1):
        x_pad = self.pad_x(x)
        super().fit(x=x_pad, y=y, batch_size=batch_size, epochs=epochs) 
    
    def pre_compile(self):
        self.reset_internal_params()

        self.input = tf.placeholder(tf.float32, shape=[None, *self.input_size], name="Input")
        self.expected_output = tf.placeholder(tf.float32, shape=[None, self.output_size], name="Y_actual")
   
        # Conv. out - 6, kernel - 5x5, stride = 1
        layer_1 = self.get_conv2_layer(1, self.input, [5, 5], 6, strides=1, padding='VALID', activ=tf.nn.tanh)
        
        # avg pool. filter = 2x2, stride = 2
        layer_2 = self.get_avg_pooling(layer_1, 2, stride=2, padding='VALID')
        
        # Conv. out - 16, kernel - 5x5, stride = 1
        layer_3 = self.get_conv2_layer(3, layer_2, [5, 5], 16, strides=1, padding='VALID', activ=tf.nn.tanh)
        
        # avg pool. filter = 2x2, stride = 2
        layer_4 = self.get_avg_pooling(layer_3, 2, stride=2, padding='VALID')
        
        # Conv. out - 16, kernel - 5x5, stride = 1
        layer_5 = self.get_conv2_layer(5, layer_4, [5, 5], 120, strides=1, padding='VALID', activ=tf.nn.tanh)
        
        # The fifth layer (C5) is a fully connected convolutional layer 
        # with 120 feature maps each of size 1×1. Each of the 120 units in C5 is connected to all the 400 nodes 
        # (5x5x16) in the fourth layer S4.
                    
        #layer_5_pre = self.flatten(layer_4)
        
        #layer_5 = self.get_dense_layer(5, layer_5_pre, 84)
        #layer_2_do = tf.nn.dropout(layer_2, rate=self.dropout_rate_tf)
        
        self.layer_output = self.get_dense_layer(4, layer_5, 10, activ=None)
        
        self.loss = tf.reduce_mean(
            tf.nn.softmax_cross_entropy_with_logits_v2(labels=self.expected_output, logits=self.layer_output),
            name='Loss'
        )
        
        self.lr = tf.compat.v1.train.exponential_decay(self.start_learning_rate, self.global_step, self.decay_steps, 0.96)
        
        self.optimizer = tf.train.GradientDescentOptimizer(self.lr).minimize(self.loss, global_step=self.global_step)

m3 = LeNet5()
m3.fit(train_X, train_y, batch_size=128, epochs=1)

object del
Train size: 7000,	 Valid size: 3000
-----------------------------------------------------------------
Epochs: 1	| Iterations: 54	| Batch: 128
-----------------------------------------------------------------
Epoch 1: loss - Tr[2.24] Va[2.24] 	 acc - Tr[38.6%] Va[39.6%]
-----------------------------------------------------------------
