In [None]:
import wandb
from dotenv import load_dotenv
import os
import numpy as np
import json
from copy import deepcopy
import tensorflow as tf
import keras_tuner as kt
from wandb.integration.keras import WandbMetricsLogger, WandbModelCheckpoint
from sklearn.model_selection import train_test_split
import sys
sys.path.append(os.path.abspath('../'))
import utils.dataset_loader as dataset_loader
import utils.my_model as model_builder
Load the .env file
load_dotenv()

In [6]:
simpler_model = model_builder.build_cnn_model(trace_length=50 , num_classes=16)
simpler_model.summary()

In [5]:
complex_model = model_builder.build_cnn_model_range(num_classes=16)
complex_model.summary()

In [16]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  0


In [17]:
tf.config.get_visible_devices()

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

In [18]:
# Optionally fetch the key (for debugging or explicit control)
wandb_api_key = os.getenv("WANDB_API_KEY")

# Check if the key is available
if wandb_api_key is None:
    print("WANDB_API_KEY not found in environment variables.")
else:
    wandb.login(key=wandb_api_key)

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: C:\Users\Miro\_netrc


In [19]:
# Paths 
DATASETS_PATH = '../datasets'
random_dataset = dataset_loader.load_dataset_files_with_cache(DATASETS_PATH+'/random_dataset', cache_path=f"{DATASETS_PATH}/cache/random_dataset_cache.pkl")

Checking for cache at: ../datasets/cache/random_dataset_cache.pkl
Loading datasets from cache: ../datasets/cache/random_dataset_cache.pkl


In [20]:
traces = dataset_loader.get_trace_matrix(random_dataset)  # shape (n_traces, n_samples)
print(f"Loaded traces: {traces.shape}")


Loaded traces: (10000, 5000)


In [21]:
def train_model_fixed(traces, algo, labels, model_out_path, augment=False, config=None, range=False):
    """
    Train a fixed model using given trace and label data.
    """
    if augment:
        traces = traces * np.random.uniform(0.9, 1.1)

    if len(traces.shape) == 2:
        traces = np.expand_dims(traces, axis=-1)


    assert traces.shape[0] == labels.shape[0], "Mismatch between traces and labels!"

    x_train, x_val, y_train, y_val = train_test_split(
        traces, labels, test_size=0.2, shuffle=True, random_state=42
    )

    input_shape = x_train.shape[1:]
    num_classes = 16 if algo == "PRESENT" else 256
    
    print(f"Input shape: {input_shape}, Number of classes: {num_classes}")

    if range:
        model = model_builder.build_cnn_model_range(input_shape[0], num_classes)
    else:
        model = model_builder.build_cnn_model(input_shape[0], num_classes)

    run = wandb.init(
        entity="mt-thesis",
        project="aes-training",
        name=os.path.splitext(os.path.basename(model_out_path))[0],
        config=config,
        reinit=True
    )
    
    lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=3,
        min_lr=1e-5,
        verbose=1
    )
    
    with tf.device('/gpu:0'):
        model.fit(
            x_train, y_train,
            epochs=50, 
            batch_size=64,
            validation_data=(x_val, y_val),
            callbacks=[
                WandbMetricsLogger(),
                WandbModelCheckpoint(
                    filepath=model_out_path,
                    save_best_only=True,
                    monitor='val_accuracy',
                    mode='max'
                ),
                lr_scheduler
            ])

    model.save(model_out_path)
    run.finish()


In [22]:
def load_trace_ranges(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)

    global_range = tuple(data["global"])

    # Convert all per-nibble ranges to list-of-tuples
    per_nibble = {
        int(k): [tuple(r) for r in v] for k, v in data["per_nibble"].items()
    }

    return global_range, per_nibble

In [23]:
def get_traces_for_ranges(traces, ranges):
    if isinstance(ranges, tuple):  # single (start, end)
        return traces[:, ranges[0]:ranges[1]]
    elif isinstance(ranges, list):  # list of (start, end)
        return np.concatenate([traces[:, start:end] for start, end in ranges], axis=1)
    else:
        raise ValueError("Invalid trace range format.")

In [24]:
def get_wandb_config(algo, use_pois, poi_type, trace_mode, nibble, augment):
    wandb_config = {
        "algorithm": algo,
        "use_pois": use_pois,
        "poi_type": poi_type if use_pois else None,
        "trace_mode": trace_mode if not use_pois else None,
        "nibble": nibble,
        "augmentation": augment
    }
    return wandb_config

In [None]:
def run_all_trainings(base_dataset_dir, output_dir):
    algorithms = ["AES"]
    use_pois_options = [True, False]
    poi_types = ["per_nibble"] # removed "global" -> There was no change
    augmentation_options = [True, False]

    for algo in algorithms:
        label = "nibble" if algo == "PRESENT" else "byte" 
        dataset_path = os.path.join(base_dataset_dir, algo.lower())
        trace_ranges_path = os.path.join(dataset_path, "ranges.json")
        global_trace_range, per_nibble_trace_ranges = load_trace_ranges(trace_ranges_path)
        for use_pois in use_pois_options:
            for augment in augmentation_options:
                if use_pois:
                    for poi_type in poi_types:
                        for idx in range(16):
                            print(f"Training {algo} | POIs={poi_type} | {label}={idx} | Aug={augment}")
                            pois_path = f"{dataset_path}/pois/{'global' if poi_type == 'global' else f'{label}_{idx}_pois'}.npy"
                            pois = np.load(pois_path)
                            _traces = None
                            _traces = traces.copy()
                            _traces = _traces[:, pois]
                            print(f"Loaded POIs: {_traces.shape}")  # (n_traces, n_pois, 1)
                            labels = np.load(f"{dataset_path}/{label}_{idx}_labels.npy")

                            out_path = f"{output_dir}/{algo}/{poi_type}_POI_n{idx}_aug{int(augment)}.keras"
                            config = get_wandb_config(algo, use_pois, poi_type, None, idx, augment)
                            train_model_fixed(_traces, algo, labels, out_path, augment, config)

                else: 
                    for trace_mode, trace_range in [ ("per_nibble", per_nibble_trace_ranges)]: # removed global -> didn't work
                        for idx in range(16):
                            print(f"Training {algo} | {trace_mode} trace | {label}={idx} | Aug={augment}")

                            if trace_mode == "per_nibble":
                                ranges = trace_range[idx]  # list of (start, end)
                            else:
                                ranges = trace_range          # single (start, end)

                            # Get the traces for the specified ranges
                            _traces = None
                            _traces = traces.copy()
                            trace_subset = get_traces_for_ranges(_traces, ranges)
                            print(f"Subset shape: {trace_subset.shape}")
                            labels = np.load(f"{dataset_path}/{label}_{idx}_labels.npy")

                            out_path = f"{output_dir}/{algo}/{trace_mode}_range_n{idx}_aug{int(augment)}.keras"
                            config = get_wandb_config(algo, use_pois, None, trace_mode, idx, augment)
                            train_model_fixed(trace_subset, algo, labels, out_path, augment, config, range=True)


In [26]:
DATASET_DIR = "../dataset"

In [28]:
run_all_trainings(DATASET_DIR, "trained")

Training AES | POIs=per_nibble | byte=0 | Aug=True
Loaded POIs: (10000, 55)
Input shape: (55, 1), Number of classes: 256


Epoch 1/50
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 10ms/step - accuracy: 0.0489 - loss: 3.8660 - val_accuracy: 0.0490 - val_loss: 4.4738 - learning_rate: 0.0010
Epoch 2/50
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 8ms/step - accuracy: 0.0587 - loss: 3.3213 - val_accuracy: 0.0780 - val_loss: 3.5799 - learning_rate: 0.0010
Epoch 3/50
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - accuracy: 0.0621 - loss: 3.3106 - val_accuracy: 0.0780 - val_loss: 3.3757 - learning_rate: 0.0010
Epoch 4/50
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 8ms/step - accuracy: 0.0680 - loss: 3.2752 - val_accuracy: 0.0915 - val_loss: 3.3705 - learning_rate: 0.0010
Epoch 5/50
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 8ms/step - accuracy: 0.0840 - loss: 3.2580 - val_accuracy: 0.0900 - val_loss: 3.3092 - learning_rate: 0.0010
Epoch 6/50
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1

KeyboardInterrupt: 