In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

In [None]:
# MODIFY HERE TO YOUR PATH IN GOOGLE DRIVE
%cd /content/gdrive/MyDrive/CryptoML

In [None]:
import speck as sp
import numpy as np
from pickle import dump
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, Conv1D, Input, Reshape, Permute, Add, Flatten, BatchNormalization, Activation
from tensorflow.keras import backend as K
from tensorflow.keras.regularizers import l2

bs = 5000;
dir = './models/'

def cyclic_lr(num_epochs, high_lr, low_lr):
  res = lambda i: low_lr + ((num_epochs-1) - i % num_epochs)/(num_epochs-1) * (high_lr - low_lr)
  return res

def make_checkpoint(datei):
  assert datei[:9] == "./models/"
  assert datei[-3:] == ".h5"
    
  res = ModelCheckpoint(datei, monitor='val_loss', save_best_only = True)
  return res

# make residual tower of convolutional blocks
def make_resnet(num_blocks=2, num_filters=32, num_outputs=1, d1=64, d2=64, word_size=16, ks=3, depth=5, reg_param=0.0001, final_activation='sigmoid'):
  assert num_outputs == 1
  assert num_blocks in [2,4]

  # Input and preprocessing layers
  inp = Input(shape=(num_blocks * word_size * 2))
  rs = Reshape((2 * num_blocks, word_size))(inp)
  perm = Permute((2,1))(rs)
  
  # add a single residual layer that will expand the data to num_filters channels
  # this is a bit-sliced layer
  conv0 = Conv1D(num_filters, kernel_size=1, padding='same', kernel_regularizer=l2(reg_param))(perm)
  conv0 = BatchNormalization()(conv0)
  conv0 = Activation('relu')(conv0)

  # add residual blocks
  shortcut = conv0
  for _ in range(depth):
    conv1 = Conv1D(num_filters, kernel_size=ks, padding='same', kernel_regularizer=l2(reg_param))(shortcut)
    conv1 = BatchNormalization()(conv1)
    conv1 = Activation('relu')(conv1)
    conv2 = Conv1D(num_filters, kernel_size=ks, padding='same',kernel_regularizer=l2(reg_param))(conv1)
    conv2 = BatchNormalization()(conv2)
    conv2 = Activation('relu')(conv2)
    shortcut = Add()([shortcut, conv2])

  # add prediction head
  flat1 = Flatten()(shortcut)
  dense1 = Dense(d1,kernel_regularizer=l2(reg_param))(flat1)
  dense1 = BatchNormalization()(dense1)
  dense1 = Activation('relu')(dense1)
  dense2 = Dense(d2, kernel_regularizer=l2(reg_param))(dense1)
  dense2 = BatchNormalization()(dense2)
  dense2 = Activation('relu')(dense2)
  out = Dense(num_outputs, activation=final_activation, kernel_regularizer=l2(reg_param))(dense2)
  model = Model(inputs=inp, outputs=out)

  return model

def train_speck_distinguisher(num_epochs, num_rounds=7, depth=1, num_blocks=2, diffa=(0x0040,0), diffb=(0x0020,0)):
    assert num_epochs > 0
    assert num_rounds > 0
    assert depth > 0
    assert num_blocks in [2, 4]
    assert type(diffa) is tuple and type(diffb) is tuple

    # create the network
    net = make_resnet(num_blocks=num_blocks, depth=depth, reg_param=10**-5)
    net.compile(optimizer='adam',loss='mse',metrics=['acc'])

    # generate training and validation data
    if num_blocks == 2:
      X, Y = sp.make_train_data_2pt(10**7, num_rounds, diff=diffa)
      X_eval, Y_eval = sp.make_train_data_2pt(10**6, num_rounds, diff=diffa)
    
    else:
      X, Y = sp.make_train_data_4pt(10**7, num_rounds, diffa=diffa, diffb=diffb)
      X_eval, Y_eval = sp.make_train_data_4pt(10**6, num_rounds, diffa=diffa, diffb=diffb)

    # set up model checkpoint
    check = make_checkpoint(dir+'best'+str(num_rounds)+'depth'+str(depth)+'.h5')

    # create learnrate schedule
    lr = LearningRateScheduler(cyclic_lr(10,0.002, 0.0001))

    # train and evaluate
    h = net.fit(X,Y,epochs=num_epochs,batch_size=bs,validation_data=(X_eval, Y_eval), callbacks=[lr,check])
    np.save(dir+'h'+str(num_rounds)+'r_depth'+str(depth)+'.npy', h.history['val_acc'])
    np.save(dir+'h'+str(num_rounds)+'r_depth'+str(depth)+'.npy', h.history['val_loss'])
    dump(h.history,open(dir+'hist'+str(num_rounds)+'r_depth'+str(depth)+'.p','wb'))
    print("Best validation accuracy: ", np.max(h.history['val_acc']))

    return net, h

In [None]:
# MODIFY HERE: 

# number of epochs, number of Speck rounds, depth, number of plaintexts (2 or 4), bit difference #1, bit difference #2 (if 4 plaintexts)
train_speck_distinguisher(1, num_rounds=5, depth=10, num_blocks=4, diffa=(0x0040,0), diffb=(0x0020,0))

In [None]:
# TEST
import unittest

class TestTrainModelIPYNB(unittest.TestCase):
    
    def test_cyclic_lr(self):
        
        # Test cyclic_lr(10,0.002,0.0001)'s correctness and border cases
        small_epochs = [1,5,9,10]
        answer_small = [0.001788888888888889, 0.0009444444444444445, 0.0001, 0.002]
        self.assertEqual(answer_small, list(map(cyclic_lr(10, 0.002, 0.0001), small_epochs)))
        
        big_epochs = [10,20,30,50,100,200,1000]
        answer_big = [0.002 for _ in range(7)]
        self.assertEqual(answer_big, list(map(cyclic_lr(10, 0.002, 0.0001), big_epochs)))
        
        # Test cyclic_lr(20,0.002,0.0001)'s correctness and border cases
        num_epochs = [1,10,15,19,50,200]
        answer = [0.0019, 0.001, 0.0005, 0.0001, 0.001, 0.002]
        self.assertEqual(answer, list(map(cyclic_lr(20, 0.002, 0.0001), num_epochs)))
        
        # Test with num_epochs = 0 as parameter
        with self.assertRaises(ZeroDivisionError):
            list(map(cyclic_lr(0, 0.002, 0.0001), num_epochs))
        
    def test_make_checkpoint(self):
        # Check if model is saving in ./models/
        with self.assertRaises(AssertionError):
            make_checkpoint("./model/best5depth10.h5")
        
        # Check model in correct file format
        with self.assertRaises(AssertionError):
            make_checkpoint("./models/best5depth1.com")
        
        # Able to save models with correct format
        self.assertTrue(make_checkpoint("./models/best10depth10.h5"))
        self.assertTrue(make_checkpoint("./models/best6depth10.h5"))
        
    def test_make_resnet(self):
        
        # Invalid num_blocks or num_outputs
        with self.assertRaises(AssertionError):
            make_resnet(num_blocks=1)
            
        with self.assertRaises(AssertionError):
            make_resnet(num_outputs=2)
            
        with self.assertRaises(ValueError):
            make_resnet(final_activation='wrong')
        
        # Check number of layers affected by depth
        for i in range(10):
            model = make_resnet(depth=i)
            self.assertEqual(14+(i*7), len(model.layers))
            
        # Model used will be depth=1 
        # (increasing in depth only increase number of loops in the same residual block I tested)
        
        # Check input_layer affected by num_blocks
        model = make_resnet(num_blocks=2, depth=1)
        self.assertEqual([(None,64)], model.layers[0].input_shape)
        
        model = make_resnet(num_blocks=4, depth=1)
        self.assertEqual([(None,128)], model.layers[0].input_shape)
        
        # Check filters in the Conv1D layers
        model = make_resnet(num_filters=32, depth=1)
        self.assertEqual(32, model.layers[3].filters)
        self.assertEqual(32, model.layers[6].filters)
        self.assertEqual(32, model.layers[9].filters)
        
        # Check kernel size in the Conv1D layers
        model = make_resnet(depth=1, ks=6)
        self.assertEqual((1,), model.layers[3].kernel_size)
        self.assertEqual((6,), model.layers[6].kernel_size)
        self.assertEqual((6,), model.layers[9].kernel_size)
        
        model = make_resnet(depth=1, ks=3)
        self.assertEqual((1,), model.layers[3].kernel_size)
        self.assertEqual((3,), model.layers[6].kernel_size)
        self.assertEqual((3,), model.layers[9].kernel_size)
        
        # Check units in Dense layers
        model = make_resnet(depth=1, num_outputs=1, d1=64, d2=32)
        self.assertEqual(64, model.layers[-7].units)
        self.assertEqual(32, model.layers[-4].units)
        self.assertEqual(1, model.layers[-1].units)
        
        model = make_resnet(depth=1, num_outputs=1, d1=64, d2=64)
        self.assertEqual(64, model.layers[-7].units)
        self.assertEqual(64, model.layers[-4].units)
        self.assertEqual(1, model.layers[-1].units)
        
        
        
    def test_train_speck_distinguisher(self):
        
        # Didn't input num_epochs
        with self.assertRaises(TypeError):
            train_speck_distinguisher()
        
        # Invalid num_epochs
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=0)
  
        # Invalid num_rounds
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=200, num_rounds=0)
        
        # Invalid depth
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=200, depth=0)
        
        # Invalid num_blocks
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=200, num_blocks=1)
            
        # Invalid num_blocks
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=200, num_blocks=10)
            
        # Invalid diffa: not tuple
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=200, diffa=10)

        # Invalid diffa: not hexadecimal
        with self.assertRaises(SyntaxError):
            eval('train_speck_distinguisher(num_epochs=200, diffa=(0x00fz,0))')

        # Invalid diffb: not tuple
        with self.assertRaises(AssertionError):
            train_speck_distinguisher(num_epochs=200, diffb=0)
                
        # Invalid diffb: not hexadecimal
        with self.assertRaises(SyntaxError):
            eval('train_speck_distinguisher(num_epochs=200, diffb=(0x0040,0xfffg))')
            
        
if __name__ == '__main__': 
    unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)