In [1]:
# Basics
import trsfile
import numpy as np
import pandas as pd
import random
from tqdm import tqdm
from keras.models import Sequential
from keras.layers import Dense, Dropout, BatchNormalization, Activation, Conv1D, MaxPool1D, Conv2D, MaxPool2D, Flatten
from tensorflow.keras.optimizers import SGD, Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.regularizers import L1, L2, L1L2
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# Custom
import sys
sys.path.insert(0, '../../src/utils')
from data_loader import DataLoader, SplitDataLoader
import constants
import aes
# sys.path.insert(0, '../../src/modeling')
# from network import Network

# Suppress TensorFlow messages
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1' # 1 for INFO, 2 for INFO & WARNINGs, 3 for INFO & WARNINGs & ERRORs

In [29]:
def CNN(hp, input_shape):
    model = Sequential()
    
    model.add(Conv1D(filters=hp['filters'], 
                     kernel_size=hp['kernel_size'], 
                     activation='relu',
                     # kernel_initializer='random_uniform',
                     input_shape=input_shape))
    model.add(Dropout(0.2))
    model.add(MaxPool1D(pool_size=hp['pool_size']))
    
    for _ in range(hp['hidden_conv']):
        model.add(Conv1D(filters=hp['hidden_conv_filters'], 
                         kernel_size=hp['hidden_conv_kernel_size'], 
                         # kernel_initializer='random_uniform',
                         activation='relu'))
        model.add(Dropout(0.2))
        model.add(MaxPool1D(pool_size=hp['hidden_conv_pool_size']))
    
    model.add(Flatten())
    
    for _ in range(hp['hidden_dense']):
        model.add(Dense(hp['hidden_dense_neurons'], activation='relu'))
        
    model.add(Dense(256, activation='softmax'))
    
    model.compile(optimizer=Adam(learning_rate=1e-4),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    
    return model



def CNN_images(hp, input_shape):
    model = Sequential()
    
    model.add(Conv2D(filters=hp['filters'], 
                     kernel_size=hp['kernel_size'], 
                     activation='relu',
                     # kernel_initializer='random_uniform',
                     input_shape=input_shape))
    # model.add(Dropout(0.2))
    model.add(MaxPool2D(pool_size=hp['pool_size']))
    
    # for _ in range(hp['hidden_conv']):
    #     model.add(Conv2D(filters=hp['hidden_conv_filters'], 
    #                      kernel_size=hp['hidden_conv_kernel_size'], 
    #                      # kernel_initializer='random_uniform',
    #                      activation='relu'))
    #     # model.add(Dropout(0.2))
    #     model.add(MaxPool2D(pool_size=hp['hidden_conv_pool_size']))
    
    model.add(Flatten())
    
    for _ in range(hp['hidden_dense']):
        model.add(Dense(hp['hidden_dense_neurons'], activation='relu'))
        
    model.add(Dense(256, activation='softmax'))
    
    model.compile(optimizer=Adam(learning_rate=1e-4),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    
    return model

In [30]:
# HP = {
#     'filters': 16,
#     'kernel_size': 8,
#     'pool_size': 2,
#     'hidden_conv': 3,
#     'hidden_conv_filters': 8,
#     'hidden_conv_kernel_size': 8,
#     'hidden_conv_pool_size': 2,
#     'hidden_dense': 3,
#     'hidden_dense_neurons': 40,
#     'batch_size': 100
# }

HP = {
    'filters': 16,
    'kernel_size': 8,
    'pool_size': 2,
    'hidden_conv': 2,
    'hidden_conv_filters': 8,
    'hidden_conv_kernel_size': 8,
    'hidden_conv_pool_size': 2,
    'hidden_dense': 3,
    'hidden_dense_neurons': 40,
    'batch_size': 128
}

In [31]:
# Basics
import trsfile
import numpy as np
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import StandardScaler
from pyts.image import GramianAngularField
import random



class DataLoader():


    def __init__(self, configs, n_tot_traces, target, byte_idx=None):
        
        self.trace_files = [f'{constants.CURR_TRACES_PATH}/{c}_500MHz + Resampled.trs' 
                            for c in configs]
                            
        self.n_tr_per_config = int(n_tot_traces / len(configs))
        
        self.byte_idx = byte_idx
        
        self.target = target
        self.n_classes = constants.N_CLASSES[target]
        
        self.gaf = GramianAngularField(image_size=64, method='s')
        
        # self.scaler = StandardScaler()
        
    
    def _retrieve_metadata(self, tr):

        p = np.array(tr.get_input()) # int list
        k = np.array(tr.get_key()) # int list
        l = aes.labels_from_key(p, k, self.target) # Compute the set of 16 labels

        if self.byte_idx is not None:
            l = l[self.byte_idx]
            p = p[self.byte_idx]
            k = k[self.byte_idx]

        l = to_categorical(l, self.n_classes)
        
        return l, p, k
        
        
    @staticmethod
    def _shuffle(x, y, pbs, tkbs):
        
        to_shuffle = list(zip(x, y, pbs, tkbs))
        random.shuffle(to_shuffle)
        x, y, pbs, tkbs = zip(*to_shuffle)
        
        x = np.array(x)
        y = np.array(y)
        pbs = np.array(pbs)
        tkbs = np.array(tkbs)
        
        return x, y, pbs, tkbs
        
        
    def load(self):
    
        samples = []
        labels = []
        pltxt_bytes = []
        true_key_bytes = []
        
        for tfile in self.trace_files:
            with trsfile.open(tfile, 'r') as traces:
                for tr in traces[:self.n_tr_per_config]:
                    s = tr.samples
                    l, p, k = self._retrieve_metadata(tr)
                    samples.append(s)
                    labels.append(l)
                    pltxt_bytes.append(p)
                    true_key_bytes.append(k)
                    
        x = np.array(samples) # (n_tot_traces x trace_len)
        # x = self.scaler.fit_transform(x)
        y = np.array(labels) # (n_tot_traces x n_classes)
        pbs = np.array(pltxt_bytes) # (n_tot_traces x 1)
        tkbs = np.array(true_key_bytes) # (n_tot_traces x 1)
        
        x, y, pbs, tkbs = self._shuffle(x, y, pbs, tkbs)
        
        if len(self.trace_files) == 1:
            tkbs = tkbs[0] # All true_key_bytes are equal because the config is unique 
        
        x = self.gaf.fit_transform(x)
        
        return x, y, pbs, tkbs
        
        
class SplitDataLoader(DataLoader):
   
    def __init__(self, configs, n_tot_traces, train_size, target, byte_idx=None):
        
        super().__init__(configs, n_tot_traces, target, byte_idx)
        
        self.n_train_tr_per_config = int(train_size * self.n_tr_per_config)
        
        
    def load(self):
    
        x_train = []
        y_train = []
        pbs_train = []
        tkbs_train = []
        
        x_val = []
        y_val = []
        pbs_val = []
        tkbs_val = []
        
        for tfile in self.trace_files:
            
            config_s = []
            config_l = []
            config_p = []
            config_k = []
            
            with trsfile.open(tfile, 'r') as traces:
                for tr in traces[:self.n_tr_per_config]:
                    s = tr.samples
                    l, p, k = self._retrieve_metadata(tr)
                    
                    config_s.append(s)
                    config_l.append(l)
                    config_p.append(p)
                    config_k.append(k)
            
            x_train.append(config_s[:self.n_train_tr_per_config])
            x_val.append(config_s[self.n_train_tr_per_config:])
            
            y_train.append(config_l[:self.n_train_tr_per_config])
            y_val.append(config_l[self.n_train_tr_per_config:])
            
            pbs_train.append(config_p[:self.n_train_tr_per_config])
            pbs_val.append(config_p[self.n_train_tr_per_config:])
            
            tkbs_train.append(config_k[:self.n_train_tr_per_config])
            tkbs_val.append(config_k[self.n_train_tr_per_config:])
        
        # Reduce the lists of arrays to a single np.ndarray
        x_train = np.concatenate(x_train)
        y_train = np.concatenate(y_train)
        pbs_train = np.concatenate(pbs_train)
        tkbs_train = np.concatenate(tkbs_train)
        
        x_val = np.concatenate(x_val)
        y_val = np.concatenate(y_val)
        pbs_val = np.concatenate(pbs_val)
        tkbs_val = np.concatenate(tkbs_val)
        
        # Scale the whole train-set (train + val) with the same scaler
        n_tot_train = x_train.shape[0] # train_size * n_tot_traces
        x_tot = np.concatenate([x_train, x_val])
        # x_tot = self.scaler.fit_transform(x_tot)
        x_train = x_tot[:n_tot_train]
        x_val = x_tot[n_tot_train:]
        
        # Shuffle the sets
        x_train, y_train, pbs_train, tkbs_train = self._shuffle(x_train, y_train, pbs_train, tkbs_train)
        x_val, y_val, pbs_val, tkbs_val = self._shuffle(x_val, y_val, pbs_val, tkbs_val) 
            
        x_train = self.gaf.fit_transform(x_train)
        x_val = self.gaf.fit_transform(x_val)
            
        # Create train and test packages
        train_data = (x_train, y_train, pbs_train, tkbs_train)
        val_data = (x_val, y_val, pbs_val, tkbs_val)
            
        return train_data, val_data

In [32]:
callbacks = [
            EarlyStopping(
                monitor='val_loss', 
                patience=15
            ),
            ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.2,
                patience=7,
                min_lr=1e-7),
        ]

In [35]:
BYTE_IDX = 0
MAX_N_KEYS = 10
TEST_CONFIG = ['D3-K0']
TOT_TRACES = 100000
# CNN_INPUT_SHAPE = (1183, 1)
CNN_INPUT_SHAPE = (64, 64, 1)


for N_KEYS in range(10, MAX_N_KEYS+1):
    
    TRAIN_DEVS = ['D1', 'D2']
    TRAIN_CONFIGS = [f'{dev}-K{k}' for k in range(1, N_KEYS+1) for dev in TRAIN_DEVS]

    train_dl = SplitDataLoader(
        TRAIN_CONFIGS, 
        TOT_TRACES, 
        train_size=0.9,
        byte_idx=BYTE_IDX, 
        target='SBOX_OUT'
    )
    train_data, val_data = train_dl.load()
    x_train, y_train, _, _ = train_data
    x_val, y_val, _, _ = val_data


    model = CNN_images(HP, CNN_INPUT_SHAPE)

    history = model.fit(
        x_train, 
        y_train, 
        validation_data=(x_val, y_val),
        epochs=300,
        batch_size=HP['batch_size'],
        callbacks=callbacks,
        verbose=1
    ).history
    
    # Retrieve test data here in order to always have random data
    test_dl = DataLoader(
        TEST_CONFIG, 
        5000,
        byte_idx=BYTE_IDX, 
        target='SBOX_OUT'
    )
    x_test, y_test, pltxts_test, tkbs_test = test_dl.load()
    
    test_preds = model.predict(x_test)
    test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
    print(f'test_loss = {test_loss}')
    print(f'test_acc = {test_acc}')
    
    print()
    print()

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
Epoch 73/300
Epoch 74/300
Epoch 75/300
Epoch 76/300
Epoch 77/300
Epoch 78

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

plt.plot(history['loss'], label='train_loss')
plt.plot(history['val_loss'], label='val_loss')
plt.legend()
plt.show()