In [2]:
import tensorflow as tf

#Block of layers with bool for if grad = true
#Grad = not true, no flatten and FCL

class GroupedLayer(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, final_output):
        super(GroupedLayer, self).__init__()
        self.hidden_1 = tf.keras.layers.Conv2D(filters, kernel_size, strides=(1,1), padding="same", activation='ReLU')
        self.hidden_2 = tf.keras.layers.Conv2D(filters, kernel_size, strides=(1,1), padding="same", activation='ReLU')
        self.hidden_output = tf.keras.layers.Dense(units=final_output, activation='ReLU')
        
    def call(self, inputs):
        inputs = self.hidden_1(inputs)
        inputs = self.hidden_2(inputs)
        return inputs
    
    def call_output(self, inputs):
          return self.hidden_output(tf.keras.layers.Flatten()(self.call(inputs)))

class GroupedLayerInput(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, final_output):
        super(GroupedLayerInput, self).__init__()
        self.input_block = tf.keras.Sequential(
            [
                tf.keras.Input(shape=(32, 32, 3)),
                tf.keras.layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same", activation="ReLU"),
                tf.keras.layers.MaxPooling2D(pool_size=2),

            ],
            name="input",
        )
        self.hidden = GroupedLayer(64, (3,3), 6)
        
    def call(self, inputs):
        inputs = self.input_block(inputs)
        inputs = self.hidden.call(inputs)
        return inputs
    
    def call_output(self, inputs):
          return self.hidden(tf.keras.layers.Flatten()(self.call(inputs)))

grouped_block = GroupedLayer(64,3,10)

In [None]:
loss_tracker = tf.keras.metrics.Mean(name="loss")
class ForcedLearnerClass(tf.keras.Model):
    
    def __init__(self, output_size):
        super(ForcedLearnerClass, self).__init__()
        self.input_block = GroupedLayerInput(filters=16,kernel_size=(3,3), final_output=output_size)
        
        self.training_block_1 = [GroupedLayer(16, (3,3), final_output=output_size) for _ in range(3)]
        self.training_block_2 = [GroupedLayer(32, (3,3), final_output=output_size) for _ in range(3)]
        self.training_block_3 = [GroupedLayer(64, (3,3), final_output=output_size) for _ in range(3)]
    
    def compile(self, optimizer, loss_fn):
        super(ForcedLearnerClass, self).compile()
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        
    def call(self, x):
        x = self.input_block(x)
        for i in range(3):
            x = self.training_block_1[i].call(x)
        for i in range(3):
            x = self.training_block_2[i].call(x)
        for i in range(3):
            x = self.training_block_3[i].call(x)
        return x
        
    def train_step(self, data):
        pred_output, y = data
        
        with tf.GradientTape(persistent=True) as tape:
            pred_output_back_input = self.input_block.call_output(pred_output)
            pred_output = self.input_block.call(pred_output)
            
            pred_output_back_1 = []
            for i in range(3):
                pred_output_back_1.append(self.training_block_1[i].call_output(pred_output))
                pred_output = self.training_block_1[i].call(pred_output)
                
            pred_output_back_2 = []
            for i in range(3):
                pred_output_back_2.append(self.training_block_2[i].call_output(pred_output))
                pred_output = self.training_block_2[i].call(pred_output)
                
            pred_output_back_3 = []
            for i in range(3):
                pred_output_back_3.append(self.training_block_3[i].call_output(pred_output))
                pred_output = self.training_block_3[i].call(pred_output)
                
            loss_input = self.loss_fn(y, pred_output_back_input)
            loss_1 = []
            loss_2 = []
            loss_3 = []
            
            for i in range(3):
                loss_1[i].append(self.loss_fn(y, pred_output_back_1[i]))
                loss_2[i].append(self.loss_fn(y, pred_output_back_2[i]))
                loss_3[i].append(self.loss_fn(y, pred_output_back_3[i]))
                
        #mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
        gradients_input = tape.gradient(loss_input, self.input_block.trainable_variables)
        gradients_1 = []
        gradients_2 = []
        gradients_3 = []
        
        for i in range(3):
            gradients_1.append(tape.gradient(loss_1[i], self.training_block_1[i].trainable_variables))
            gradients_2.append(tape.gradient(loss_2[i], self.training_block_2[i].trainable_variables))
            gradients_3.append(tape.gradient(loss_3[i], self.training_block_3[i].trainable_variables))
        
        #Update weights
        self.optimizer.apply_gradients(zip(gradients_input, self.input_block.trainable_variables))
        for i in range(3):
            self.optimizer.apply_gradients(zip(gradients_1[i], self.training_block_1[i].trainable_variables))
            self.optimizer.apply_gradients(zip(gradients_2[i], self.training_block_2[i].trainable_variables))
            self.optimizer.apply_gradients(zip(gradients_3[i], self.training_block_3[i].trainable_variables))
         
        #Compute our own metrics
        loss_tracker.update_state(loss_4)
        #mae_metric.update_state(y, y_pred)
        #"mae": mae_metric.result()
        return {"loss": loss_tracker.result()}

In [None]:
import tensorflow as tf
training_block_1 = tf.keras.Sequential(
            [
                GroupedLayer(64, (64,64), 6) for _ in range(2)
            ]
        )

training_block_1.build((16,224,224,3))

training_block_1.summary()
training_block_1.trainable_variables
training_block_1.compile()

In [None]:
network_block = GroupedLayerInput(2, (3,3), 6)

network_block.build((16,224,224,3))

In [None]:
forced_learner = ForcedLearnerClass(output_size=10)
forced_learner.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=tf.keras.losses.Huber(),
)

In [126]:
batch_size = 10
(x_train, Y_train), (x_test, Y_test) = tf.keras.datasets.cifar10.load_data()

In [127]:
import numpy as np
all_digits = np.concatenate([x_train, x_test])
all_output = np.concatenate([Y_train, Y_test])
all_output_one_hot = tf.one_hot(np.squeeze(all_output), 10)
all_digits = all_digits.astype("float32") / 255.0

In [128]:
all_digits = np.reshape(all_digits, (-1, 32, 32, 3))
dataset = tf.data.Dataset.from_tensor_slices((all_digits, all_output_one_hot))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.batch(batch_size).prefetch(1)

In [129]:
DATASET_SIZE=60000
train_size = int(0.7 * DATASET_SIZE)
val_size = int(0.15 * DATASET_SIZE)
test_size = int(0.15 * DATASET_SIZE)

full_dataset = dataset
train_dataset = full_dataset.take(train_size)
test_dataset = full_dataset.skip(train_size)
val_dataset = test_dataset.skip(test_size)
test_dataset = test_dataset.take(test_size)

In [None]:
history = forced_learner.fit(train_dataset, epochs=10)

In [None]:
model_temp = tf.keras.applications.ResNet50V2(weights=None, input_shape=(32,32,3), classes=10)

opt = tf.keras.optimizers.Nadam(learning_rate=3e-4)

callback = [tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', patience=7)]
model_temp.compile(loss="Huber", optimizer=opt, metrics=['mse', 'mae', 'mape', 'accuracy'])
history = model_temp.fit(train_dataset, batch_size=16, epochs=100, callbacks=[callback],
                    validation_data=(val_dataset))

In [149]:
for line in dataset.take(1):
    print(line[1])

tf.Tensor(
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]], shape=(10, 10), dtype=float32)


In [None]:
all_output_one_hot[0]

In [155]:
dense_16 = tf.keras.layers.Dense(10, activation="softmax")
inputs = tf.keras.Input(shape=(32,32,3))

#block_64_1 = [tf.keras.layers.Conv2D(64, 3, padding='same',activation='relu') for _ in range(2)]
#block_64_2 = [tf.keras.layers.Conv2D(64, 3, padding='same',activation='relu') for _ in range(2)]
#block_64_3 = [tf.keras.layers.Conv2D(64, 3, padding='same',activation='relu') for _ in range(2)]

avg_pool_16 = tf.keras.layers.AveragePooling2D(pool_size=(32,32), strides=(1, 1), padding='valid')

def lambda_layer(tensor):
    return tf.squeeze(tensor, [1,2])
model_16 = tf.keras.Sequential(
    [
        tf.keras.Input(shape=(32,32,3)),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
        tf.keras.layers.Conv2D(16, 3, padding='same',activation='relu'),
    ], name = "block_1"
)

model_32 = tf.keras.Sequential(
    [
        tf.keras.Input(shape=(32,32,16)),
        tf.keras.layers.Conv2D(32, 3, strides=(2,2), padding="same", activation='relu'),
        tf.keras.layers.Conv2D(32, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(32, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(32, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(32, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(32, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(32, 3, padding="same", activation='relu'),
    ], name = "block_2"
)

model_64 = tf.keras.Sequential(
    [
        tf.keras.Input(shape=(16,16,32)),
        tf.keras.layers.Conv2D(64, 3, strides=(2,2), padding="same", activation='relu'),
        tf.keras.layers.Conv2D(64, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(64, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(64, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(64, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(64, 3, padding="same", activation='relu'),
        tf.keras.layers.Conv2D(64, 3, padding="same", activation='relu'),
        tf.keras.layers.AveragePooling2D(pool_size=(8,8), strides=(1, 1), padding='valid'),
        tf.keras.layers.Lambda(lambda_layer, name="lambda_layer"),
        tf.keras.layers.Dense(10, activation="softmax")
    ], name = "block_3"
)
#(model_16.trainable_weights, dense_16.trainable_weights)

x = model_16(inputs)

x = avg_pool_16(x)
x = tf.keras.layers.Flatten()(x)
outputs = dense_16(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.summary()
#model.trainable_weights

Model: "model_16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_316 (InputLayer)      [(None, 32, 32, 3)]       0         
                                                                 
 block_1 (Sequential)        (None, 32, 32, 16)        14368     
                                                                 
 average_pooling2d_157 (Aver  (None, 1, 1, 16)         0         
 agePooling2D)                                                   
                                                                 
 flatten_8 (Flatten)         (None, 16)                0         
                                                                 
 dense_156 (Dense)           (None, 10)                170       
                                                                 
Total params: 14,538
Trainable params: 14,538
Non-trainable params: 0
______________________________________________________

In [182]:
class ForcedNet20(tf.keras.Model):
    def __init__(self, block_1, block_2, block_3):
        super(ForcedNet20, self).__init__()
        self.block_1 = block_1
        self.output_1 = tf.keras.Sequential(
            [
                tf.keras.layers.AveragePooling2D(pool_size=(32,32), strides=(1, 1), padding='valid'),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(10, activation="softmax")
            ]
        )
        
        self.block_2 = block_2
        self.output_2 = tf.keras.Sequential(
            [
                tf.keras.layers.AveragePooling2D(pool_size=(16,16), strides=(1, 1), padding='valid'),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(10, activation="softmax")
            ]
        )
        
        self.block_3 = block_3
    def compile(self, optimizer, loss_fn):
        super(ForcedNet20, self).compile()
        self.optimizer = optimizer
        self.loss_fn = loss_fn
    
    def train_step(self, data):
        images = data[0]
        labels = data[1]
        
        with tf.GradientTape(persistent=True) as tape:
            x = self.block_1(images)
            predictions_1 = self.output_1(x)
            
            x = self.block_2(x)
            predictions_2 = self.output_2(x)

            predictions_3 = self.block_3(x)
            
            loss_1 = self.loss_fn(labels, predictions_1)
            loss_2 = self.loss_fn(labels, predictions_2)
            loss_3 = self.loss_fn(labels, predictions_3)
        
        grads_1 = tape.gradient(loss_1, self.block_1.trainable_weights)
        grads_2 = tape.gradient(loss_2, self.block_2.trainable_weights)
        grads_3 = tape.gradient(loss_3, self.block_3.trainable_weights)
        
        grads_1_output = tape.gradient(loss_1, self.output_1.trainable_weights)
        grads_2_output = tape.gradient(loss_2, self.output_2.trainable_weights)
                                
        self.optimizer.apply_gradients(
            zip(grads_1, self.block_1.trainable_weights,)
        )
        self.optimizer.apply_gradients(
            zip(grads_1_output, self.output_1.trainable_weights)
        )
        
        self.optimizer.apply_gradients(
            zip(grads_2, self.block_2.trainable_weights)
        )
        
        self.optimizer.apply_gradients(
            zip(grads_2_output, self.output_2.trainable_weights)
        )
        
        self.optimizer.apply_gradients(
            zip(grads_3, self.block_3.trainable_weights)
        )
        
        return {"BLock_1_Loss": loss_1, "Block_2_Loss": loss_2, "Block_3_Loss": loss_3}

In [183]:
ForcedLearner = ForcedNet20(model_16, model_32, model_64)
ForcedLearner.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=tf.keras.losses.CategoricalCrossentropy(),
)

ForcedLearner.fit(train_dataset, epochs=10)

Epoch 1/10
 352/6000 [>.............................] - ETA: 1:43 - BLock_1_Loss: 2.2567 - Block_2_Loss: 2.1712 - Block_3_Loss: 2.1634

KeyboardInterrupt: 