In [1]:
OPT = "-O3"
#Note that the templates for Falcon512 and Falcon1024 are universal because they are templates built for each coefficient.
#Don't be misled by the "512" here.
Falcon_n_template = 512

In [2]:
import os
import gc
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

import importlib
from sklearn.model_selection import train_test_split
from sca_preprocess import trace_categorize
from sca_preprocess import trace_fft, trace_butter_lpf
from sca_preprocess import calc_snr, calc_ttest, calc_sod
from sca_preprocess import select_poi_max_rank, select_poi_threshold
from sca_preprocess import template_build, template_attack, template_attack_report
from TA_discriminative import classifier_train, classifier_inference, classifier_report

MCU_freq = 30e6
adc_mul = 4
sample_freq = MCU_freq * adc_mul
cutoff_freq = sample_freq / 8

seed = 42
test_split = 0.5

In [3]:
def select_traces_accto_class(traces_dict, class_label_map):
    final_traces = []
    label = []
    for cls in range(len(class_label_map.keys())):
        N_trace = class_label_map[cls]['num_to_select']
        label_range = class_label_map[cls]['labels']
        traces_in_class = []
        for l in label_range:
            traces_in_class.append(traces_dict[l])
            # print(l, traces_in_class[-1].shape)
        combined = np.concatenate(traces_in_class, axis=0)  # ((1/2/4/8/16) x #trace, #sample)
        # (1/2/4/8/16) x #trace ⪭ #trace，random pick #trace from it
        selected_idx = np.random.choice(combined.shape[0], N_trace, replace=False)
        selected = combined[selected_idx]
        final_traces.append(selected)
        label += [cls] * N_trace
    traces = np.vstack(final_traces)  # shape (#class * #trace, #sample)
    label = np.array(label)  # (#class * #trace,)
    return traces, label

In [4]:
def select_and_split(traces_dict, class_label_map, start_idx, end_idx, test_split,
                     seed=seed, sample_freq=sample_freq, cutoff_freq=cutoff_freq):
    # select traces in class
    np.random.seed(seed)
    traces, label = select_traces_accto_class(traces_dict, class_label_map)
    traces = traces[:, start_idx:end_idx]
    traces_lpf = trace_butter_lpf(traces, sample_freq, cutoff_freq)
    # traces_dict = trace_categorize(traces, label)
    print(f"traces.shape: {traces.shape}")
    print(f"label.shape: {label.shape}")

    # train/test split
    traces_train, traces_test, label_train, label_test = \
        train_test_split(traces_lpf, label,
                         test_size=test_split,
                         stratify=label,
                         shuffle=True,
                         random_state=seed)
    print(f"traces_train.shape: {traces_train.shape}")
    print(f"traces_test.shape: {traces_test.shape}")

    return traces_train, traces_test, label_train, label_test

In [5]:
start_end_indices = {
    'scaled': (0, 121),
    'mul_shift': (41, 63),
    'mul_multiply': (22, 39)
}
N_k_trans = 2
N_trace = Falcon_n_template * N_k_trans

In [6]:
print(N_trace)

1024


In [7]:
# collect #trace/label = 100

class_label_map_scaled = {
    0: {"num_to_select": N_trace,
        "labels": [0]},
    1: {"num_to_select": N_trace,
        "labels": [1]},
    2: {"num_to_select": N_trace,
        "labels": [-1]},
    3: {"num_to_select": N_trace,
        "labels": [2, 3]},
    4: {"num_to_select": N_trace,
        "labels": [-3, -2]},
    5: {"num_to_select": N_trace,
        "labels": [4, 5, 6, 7]},
    6: {"num_to_select": N_trace,
        "labels": [-7, -6, -5, -4]},
    7: {"num_to_select": N_trace,
        "labels": list(range(8, 16))},    # [8, ..., 15]
    8: {"num_to_select": N_trace,
        "labels": list(range(-15, -7))},  # [-15, ..., -8]
    # 9: {"num_to_select": N_trace,
    #     "labels": list(range(16, 23))},    # [16, ..., 22]
    # 10: {"num_to_select": N_trace,
    #     "labels": list(range(-22, -15))},  # [-22, ..., -16]
}
# N_class = len(class_label_map_scaled.keys())

In [8]:
#Based on the key distribution, it is virtually impossible to obtain a value larger than 15 in Falcon1024.
class_label_map_shift = {
    0: {"num_to_select": N_trace * 7,  # b.c. class 0 has 14 elements
        "labels":  # make zu<2^55
        # [2, 4, 5, 8, 9, 10, 11, 16, 17, 18, 19, 20, 21, 22]},
    [2, 4, 5, 8, 9, 10, 11]},
    1: {"num_to_select": N_trace * 7,  # b.c. class 1 has 7 elements
        "labels":  # make zu>=2^55
        # [3, 6, 7, 12, 13, 14, 15, 23, 24, 25, 26, 27, 28, 29, 30, 31]},
        [3, 6, 7, 12, 13, 14, 15]},
}

class_label_map_4_5 = {
    0: {"num_to_select": N_trace, "labels": [4]},
    1: {"num_to_select": N_trace, "labels": [5]},
}
class_label_map_6_7 = {
    0: {"num_to_select": N_trace, "labels": [6]},
    1: {"num_to_select": N_trace, "labels": [7]},
}
class_label_map_8_11 = {
    0: {"num_to_select": N_trace, "labels": [8]},
    1: {"num_to_select": N_trace, "labels": [9]},
    2: {"num_to_select": N_trace, "labels": [10]},
    3: {"num_to_select": N_trace, "labels": [11]},
}
class_label_map_12_15 = {
    0: {"num_to_select": N_trace, "labels": [12]},
    1: {"num_to_select": N_trace, "labels": [13]},
    2: {"num_to_select": N_trace, "labels": [14]},
    3: {"num_to_select": N_trace, "labels": [15]},
}
# class_label_map_16_22 = {
#     0: {"num_to_select": N_trace, "labels": [16]},
#     1: {"num_to_select": N_trace, "labels": [17]},
#     2: {"num_to_select": N_trace, "labels": [18]},
#     3: {"num_to_select": N_trace, "labels": [19]},
#     4: {"num_to_select": N_trace, "labels": [20]},
#     5: {"num_to_select": N_trace, "labels": [21]},
#     6: {"num_to_select": N_trace, "labels": [22]},
# }

In [9]:
def labels_from_csv(file_name, target_row, repeat_times, class_label_map):
    
    value_to_key = {v: k for k, values in class_label_map.items() for v in values['labels']}
    
    df = pd.read_csv(file_name, header=None, skiprows=target_row, nrows=1)
    
    labels_2d = df.map(lambda x: value_to_key[x]).to_numpy()  # shape (1, 512)
    
    labels_repeated_2d = np.tile(labels_2d, (1, repeat_times))  # shape (1, 512*repeat_times)
    
    labels_scaled_key = labels_repeated_2d.flatten()
    return labels_scaled_key

In [10]:
class NaiveModel(nn.Module):
    def __init__(self, N_class):
        super().__init__()
        k = 5
        p = k // 2  # "same" padding
        N_channel = 1
        # CNN
        self.conv1 = nn.Conv1d(N_channel, 4, kernel_size=k, padding=p)
        self.conv2 = nn.Conv1d(4, 4, kernel_size=k, padding=p)
        self.bn1 = nn.BatchNorm1d(4)
        self.bn2 = nn.BatchNorm1d(4)
        self.act = nn.Sigmoid()
        self.pool = nn.AvgPool1d(kernel_size=2, stride=2)
        # MLP
        self.fc1 = nn.LazyLinear(128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, N_class)
        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.act(self.bn1(self.conv1(x))))
        x = self.pool(self.act(self.bn2(self.conv2(x))))
        x = torch.flatten(x, 1)
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.dropout(self.relu(self.fc2(x)))
        x = self.fc3(x)
        return x

In [11]:
def build_fpr_templates_ml(traces_dict, clip_range,
                           test_split, class_label_map, num_epochs=50):
    np.random.seed(seed)
    torch.manual_seed(seed)

    # start_idx = start_end_idx[0]
    # end_idx   = start_end_idx[1]
    # clip_range = list(range((end_idx - start_idx) * 4))

    traces_train, traces_test, label_train, label_test = \
        select_and_split(traces_dict, class_label_map,
                         0, clip_range[-1]+1, test_split)
    N_class = len(class_label_map.keys())
    model = NaiveModel(N_class)
    model, scaler = classifier_train(traces_train, label_train,\
                                     clip_range, model)
    y_prob = classifier_inference(traces_test,
                             clip_range, model, scaler)
    classifier_report(traces_test, label_test, clip_range, model, scaler,
                     verbose=True)
    # we actually don't need y_prob to attack; in case you need it to check bug, I also export it
    return model, scaler, y_prob

In [12]:
label_all = []
for i in range(-15, 16):
    label_all.extend([i] * N_trace)
label_all = np.array(label_all)  # #trace x 45 (-22 to 22)

In [13]:
traces_scaled = []
len_scaled = 121 * 4
for i in range(-15, 16):
    traces_scaled.append(np.load(f'data{OPT}-828/profile/scaled/'
                                 f'val{i}_{N_k_trans}traces.npy').reshape(-1, len_scaled))
traces_scaled = np.array(traces_scaled).reshape(-1, len_scaled)
traces_scaled_dict = trace_categorize(traces_scaled, label_all)

In [14]:
model_scaled, scaler_scaled, y_prob = \
    build_fpr_templates_ml(traces_scaled_dict, range(len_scaled),
                           test_split, class_label_map_scaled)

traces.shape: (9216, 484)
label.shape: (9216,)
traces_train.shape: (4608, 484)
traces_test.shape: (4608, 484)
Layer (type:depth-idx)                   Param #
NaiveModel                               --
├─Conv1d: 1-1                            24
├─Conv1d: 1-2                            84
├─BatchNorm1d: 1-3                       8
├─BatchNorm1d: 1-4                       8
├─Sigmoid: 1-5                           --
├─AvgPool1d: 1-6                         --
├─LazyLinear: 1-7                        --
├─Linear: 1-8                            16,512
├─Linear: 1-9                            1,161
├─Dropout: 1-10                          --
├─ReLU: 1-11                             --
Total params: 17,797
Trainable params: 17,797
Non-trainable params: 0
Epoch 01 | train loss 1.8701 acc 0.4502 | val loss 1.4627 acc 0.8460
Epoch 02 | train loss 0.5782 acc 0.9265 | val loss 0.1398 acc 0.9892
Epoch 03 | train loss 0.1120 acc 0.9819 | val loss 0.0500 acc 0.9913
Epoch 04 | train loss 0.0591 ac

In [15]:
traces_shift = []
len_shift = 22 * 4
for i in range(-15, 16):
    traces_shift.append(np.load(f'data{OPT}-828/profile/shift/'
                                f'val{i}_{N_k_trans}traces.npy').reshape(-1, len_shift))
traces_shift = np.array(traces_shift).reshape(-1, len_shift)
traces_shift_dict = trace_categorize(traces_shift, label_all)

In [16]:
model_shift, scaler_shift, y_prob = \
    build_fpr_templates_ml(traces_shift_dict, range(len_shift),
                           test_split, class_label_map_shift, 100)

traces.shape: (14336, 88)
label.shape: (14336,)
traces_train.shape: (7168, 88)
traces_test.shape: (7168, 88)
Layer (type:depth-idx)                   Param #
NaiveModel                               --
├─Conv1d: 1-1                            24
├─Conv1d: 1-2                            84
├─BatchNorm1d: 1-3                       8
├─BatchNorm1d: 1-4                       8
├─Sigmoid: 1-5                           --
├─AvgPool1d: 1-6                         --
├─LazyLinear: 1-7                        --
├─Linear: 1-8                            16,512
├─Linear: 1-9                            258
├─Dropout: 1-10                          --
├─ReLU: 1-11                             --
Total params: 16,894
Trainable params: 16,894
Non-trainable params: 0
Epoch 01 | train loss 0.6393 acc 0.6477 | val loss 0.5628 acc 0.7155
Epoch 02 | train loss 0.5030 acc 0.7583 | val loss 0.4691 acc 0.7880
Epoch 03 | train loss 0.4396 acc 0.8005 | val loss 0.4285 acc 0.8159
Epoch 04 | train loss 0.4085 acc 0

In [17]:
traces_mul = []
len_mul = 17 * 4
for i in range(-15, 16):
    traces_mul.append(np.load(f'data{OPT}-828/profile/mul/'
                              f'val{i}_{N_k_trans}traces.npy').reshape(-1, len_mul))
traces_mul = np.array(traces_mul).reshape(-1, len_mul)
traces_mul_dict = trace_categorize(traces_mul, label_all)

In [18]:
# model_shift, scaler_shift = \
#     build_fpr_templates_ml(traces_mul_dict, start_end_indices['mul_shift'],
#                            test_split, class_label_map_shift, 100)
model_4_5, scaler_4_5, y_prob = \
    build_fpr_templates_ml(traces_mul_dict, range(len_mul),
                           test_split, class_label_map_4_5, 100)
model_6_7, scaler_6_7, y_prob = \
    build_fpr_templates_ml(traces_mul_dict, range(len_mul),
                           test_split, class_label_map_6_7, 100)
model_8_11, scaler_8_11, y_prob  = \
    build_fpr_templates_ml(traces_mul_dict, range(len_mul),
                           test_split, class_label_map_8_11, 200)
model_12_15, scaler_12_15, y_prob = \
    build_fpr_templates_ml(traces_mul_dict, range(len_mul),
                           test_split, class_label_map_12_15, 100)
# model_16_22, scaler_16_22, y_prob = \
#     build_fpr_templates_ml(traces_mul_dict, range(len_mul),
#                            test_split, class_label_map_16_22, 200)

traces.shape: (2048, 68)
label.shape: (2048,)
traces_train.shape: (1024, 68)
traces_test.shape: (1024, 68)
Layer (type:depth-idx)                   Param #
NaiveModel                               --
├─Conv1d: 1-1                            24
├─Conv1d: 1-2                            84
├─BatchNorm1d: 1-3                       8
├─BatchNorm1d: 1-4                       8
├─Sigmoid: 1-5                           --
├─AvgPool1d: 1-6                         --
├─LazyLinear: 1-7                        --
├─Linear: 1-8                            16,512
├─Linear: 1-9                            258
├─Dropout: 1-10                          --
├─ReLU: 1-11                             --
Total params: 16,894
Trainable params: 16,894
Non-trainable params: 0
Epoch 01 | train loss 0.6562 acc 0.7611 | val loss 0.6770 acc 0.8058
Epoch 02 | train loss 0.5186 acc 0.9805 | val loss 0.5908 acc 0.9709
Epoch 03 | train loss 0.2900 acc 0.9848 | val loss 0.3438 acc 0.9806
Epoch 04 | train loss 0.1090 acc 0.9

In [19]:
new_class_map = {
    0: [0],
    1: [1],
    2: [-1],
    3: [2, 3],
    4: [-2, -3],
    5: [4, 5, 6, 7],
    6: [-4, -5, -6, -7],
    7: list(range(8, 16)),
    8: list(range(-15, -7)),
    # 9: list(range(16, 32)),
    # 10: list(range(-31, -15)),
}

In [20]:
new_class_abs_map = {
    0: [0],
    1: [1],
    2: [2, 3],
    3: [4, 5, 6, 7],
    4: list(range(8, 16)),
    # 5: list(range(16, 32)),
}

In [21]:
def mle_choose_value(prob_dict):
    mle_tmp_f_dict={}
    for k, prob_array in prob_dict.items():
        log_prob_array = np.log(prob_array)
        col_sum = np.sum(log_prob_array, axis=0)
        max_index = np.argmax(col_sum)
        mle_tmp_f_dict[k]=max_index
    return mle_tmp_f_dict

In [22]:
def record_sign(guess_attack_scaled,falcon_n):
   sign_dict = {}
   for i in range(falcon_n):
        ele = guess_attack_scaled[i]
        if new_class_map[ele][0] == 0:
            sign_dict[i] = 0
        elif new_class_map[ele][0] > 0:
            sign_dict[i] = 1
        elif new_class_map[ele][0] < 0:
            sign_dict[i] = -1
   return sign_dict

In [23]:
def record_abs_table_index(guess_attack_scaled, falcon_n):
    abs_dict = {}
    for i in range(falcon_n):
        ele = guess_attack_scaled[i]
        if ele == 0:
            abs_dict[i] = 0
        elif ele == 1 or ele == 2:
            abs_dict[i] = 1
        elif ele == 3 or ele == 4:
            abs_dict[i] = 2
        elif ele == 5 or ele == 6:
            abs_dict[i] = 3
        elif ele == 7 or ele == 8:
            abs_dict[i] = 4
        elif ele == 9 or ele == 10:
            abs_dict[i] = 5   
    return abs_dict

In [24]:
import numpy as np

def reorder_traces(A):
    """
   Adjust the row order of A from [128,384,128,384,...] to [128,128,129,129,...,511,511]
    """
    # 偶数行对应小索引 (128..255)
    small = A[0::2, :]
    # 奇数行对应大索引 (384..511)
    large = A[1::2, :]
    
    # 拼接：先小索引两条，再大索引两条
    A_new = np.vstack([small, large])
    return A_new

In [25]:
def generate_diffset_label(co_labels, target_filtered, set0):
    label_diffset = []
    for c in target_filtered:
        if co_labels[c] in set0 or -co_labels[c] in set0:
            label_diffset.append(0)
        else: 
            label_diffset.append(1)
    label_diffset = np.array(label_diffset)
    return label_diffset

In [26]:
import numpy as np

def select_rows_by_labels(M, labels, subset):
    """
    Select rows from a 2D array M, with row labels provided by `labels`.
    
    Parameters:  
    M: numpy.array, shape = (n, m)
    labels: list or array, length = n, the labels for each row in M
    subset: list or set, the desired subset of labels
    
    Returns:
    N: numpy.array, shape = (len(subset), m),
    The row order is the same as in the subset.
    """
    labels = np.array(labels)
    subset = list(subset)  # 保留输入顺序
    idx = [np.where(labels == s)[0][0] for s in subset]
    N = M[idx, :]
    return N


In [27]:
import numpy as np
def fill_dict(to_fill_dict, prob_dict):
    for kk, prob_list in prob_dict.items():
        if kk not in to_fill_dict:
            to_fill_dict[kk] = prob_list
        else:
            to_fill_dict[kk] = np.vstack([to_fill_dict[kk], prob_list])

In [28]:
def dict_process(fpr_diffset_mle_prob_list):
    fpr_diffset_mle_prob_list = dict(fpr_diffset_mle_prob_list)
    fpr_diffset_mle_prob_list = {key_coeff: np.array(val_list) for key_coeff, val_list in fpr_diffset_mle_prob_list.items()}
    return fpr_diffset_mle_prob_list

In [29]:
import numpy as np

def cal_per_mul_prob(sets_dict, model_p,scaler_p,list_range):
    """
    Concatenates, computes, and splits the dictionary `{key: (2, n) array}` back into a dictionary.
    Parameters:
    `data_dict`: dict[int, np.ndarray]
    Each value is a NumPy array of (2, n) values.
    `func`: callable
    A function that takes (2*m, n) as input and outputs (2*m, k) values.
    
    Returns:
    `new_dict`: dict[int, np.ndarray]
     Each value is a NumPy array of (2, k) values.
    """
    
    # 1. Concatenate the keys in order to form a large matrix.
    keys = list(sets_dict.keys())
    big_array = np.vstack([sets_dict[k] for k in keys])
    # print("dimension")
    # print(big_array.shape)
    
    # 2. calculate
    # tmp_labels = np.repeat(labels,2)
    result = classifier_inference(big_array, list_range, model_p, scaler_p)
    
    # 3. Split back into dictionary
    new_dict = {}
    for idx, key in enumerate(keys):
        start = idx * 2
        end = start + 2
        new_dict[key] = result[start:end, :]
    
    return new_dict

In [30]:
#There are 1000 keys in the database; I only collected the attack curves for the first 100 keys.
filename_f = "./data_k_828/Falcon_f_1024_1000.csv"
filename_g = "./data_k_828/Falcon_g_1024_1000.csv"

In [31]:
##The passed-in `guess_part_list` consists of dictionaries for guessing categories {4,5} and {6,7}, respectively.
def fill_guess_dict(guess_part_list,f_final_set_p,f_sign_dict_p):
    for idx_guess, guess_p in enumerate(guess_part_list):
        for key_, guess_v in guess_p.items():
            if idx_guess == 0: 
                f_final_set_p[key_]={guess_v+4} 
            if idx_guess == 1:
                f_final_set_p[key_]={guess_v+6} 
    sorted_f_final_set_p = dict(sorted(f_final_set_p.items()))
    f_guess_dict_p={}
    for key, val in f_sign_dict_p.items():
        if val>=0:
            f_guess_dict_p[key] = sorted_f_final_set_p[key]
        else:
            f_guess_dict_p[key] = {-x for x in sorted_f_final_set_p[key]}
    return f_guess_dict_p

In [32]:
from collections import defaultdict
def attack_key_repeat(attack_scaled, attack_shift, attack_mul, mle_N):
    #########First, process the data from fpr_scaled.
    attack_traces_scaled = attack_scaled.reshape(-1,484)
    attack_traces_scaled_lpf = trace_butter_lpf(attack_traces_scaled, sample_freq, cutoff_freq)
    attack_traces_scaled_lpf = attack_traces_scaled_lpf.reshape(-1,1024,484)
    
    fpr_scaled_mle_prob_list = defaultdict(list)
    for i in range(mle_N):
        prob_fpr_scaled = classifier_inference(attack_traces_scaled_lpf[i,:,:], list(range(484)), model_scaled, scaler_scaled)
        for j in range(1024):
            fpr_scaled_mle_prob_list[j].append(prob_fpr_scaled[j])
   
    fpr_scaled_mle_prob_list = dict(fpr_scaled_mle_prob_list)
    fpr_scaled_mle_prob_list = {key_coeff: np.array(val_list) for key_coeff, val_list in fpr_scaled_mle_prob_list.items()}
    scaled_dict = mle_choose_value(fpr_scaled_mle_prob_list)

    #Record information after attacking fpr_scaled
    sign_dict = record_sign(scaled_dict,1024)
    abs_dict = record_abs_table_index(scaled_dict,1024)
    # print(abs_dict)

    ######### Process the trace data corresponding to the normalization procedure in fpr_mul
    #f_128~f_255; f_384~f_511 will involve multiplication operations. We need to analyze these coefficients, 
    #identifying which are already 0, -1, or 1, and which require further differentiation.
    check_keys = list(range(256, 512)) + list(range(768, 1024))
    keys_012 = [k for k in check_keys if scaled_dict[k] in (0, 1, 2)]
    # keys_other = [k for k in check_keys if scaled_dict[k] not in (0, 1, 2)]
  
    # At this point, the `mle_N` observations should be utilized.
    set0 = {2, 4, 5, 8, 9, 10, 11}
    set1 = {3, 6, 7, 12, 13, 14, 15}
    target_order = list(range(256, 512)) + list(range(768, 1024))
    target_filtered = [c for c in target_order if c not in keys_012]
    keep_rows = [i for i, c in enumerate(target_order) if c not in keys_012]
    
    fpr_shift_mle_prob_list = defaultdict(list)
    
    for i in range(mle_N):
        attack_traces_shift = reorder_traces(attack_shift[i,:,:]) 
        attack_traces_shift_lpf = trace_butter_lpf(attack_traces_shift, sample_freq, cutoff_freq)
        attack_traces_shift_lpf_1 = attack_traces_shift_lpf[0::2] 
        attack_traces_shift_lpf_2 = attack_traces_shift_lpf[1::2] 
        
       # Continuing with the processing of attack_traces_shift_lpf, since the coefficient indices in keys_012 are already determined, 
        # no further processing is needed; therefore, this part of the data is filtered out.
        attack_traces_shift_filtered_1 = attack_traces_shift_lpf_1[keep_rows, :]
        attack_traces_shift_filtered_2 = attack_traces_shift_lpf_2[keep_rows, :]

        prob_fpr_shift_1 = classifier_inference(attack_traces_shift_filtered_1, list(range(88)), model_shift, scaler_shift)
        # print(prob_fpr_diffset_1)
        prob_fpr_shift_2 = classifier_inference(attack_traces_shift_filtered_2, list(range(88)), model_shift, scaler_shift)
        # print(prob_fpr_diffset_2)
        
        for j in range(len(target_filtered)):
            fpr_shift_mle_prob_list[target_filtered[j]].append(prob_fpr_shift_1[j])
            fpr_shift_mle_prob_list[target_filtered[j]].append(prob_fpr_shift_2[j])
    
    fpr_shift_mle_prob_list = dict(fpr_shift_mle_prob_list)
    fpr_shift_mle_prob_list = {key_coeff: np.array(val_list) for key_coeff, val_list in fpr_shift_mle_prob_list.items()}
    shift_dict = mle_choose_value(fpr_shift_mle_prob_list)#mle得到标签值
    # print(shift_dict)

     ######Based on the attack results of fpr_scaled and the normalization procedure in fpr_mul, 
    # the initial range of values for the partitioning coefficients is determined.
    final_set = {}
    sure_set=[]
    insure_set=[]
    for key, value in abs_dict.items():
        if value == 0 or value == 1:
            sure_set.append(key)
            final_set[key]=set(new_class_abs_map[value])
        else:
            insure_set.append(key)

    for insure_idx in insure_set:
        if insure_idx in target_filtered:
            # label_choose = guess_attack_diffset[target_filtered_f.index(insure_idx)]
            label_choose = shift_dict[insure_idx]
            if label_choose == 0:
                tmp_set = set0 & set(new_class_abs_map[abs_dict[insure_idx]])
                final_set[insure_idx]=tmp_set
            else:
                tmp_set = set1 & set(new_class_abs_map[abs_dict[insure_idx]])
                final_set[insure_idx]=tmp_set

    no_mul_w = list(range(0, 256)) + list(range(512, 768))
    no_mul_w_insure = [d for d in no_mul_w if d not in sure_set]
    for d in no_mul_w_insure:
        final_set[d] = set(new_class_abs_map[abs_dict[d]])

    fpr_mul_mle_prob_list_45 = defaultdict(list)
    fpr_mul_mle_prob_list_67 = defaultdict(list)
    mul_idx_list=[]
    mul_w = list(range(256, 512)) + list(range(768, 1024))
    for d in mul_w:
        if len(final_set[d]) != 1:
            mul_idx_list.append(d)


    for i in range(mle_N):
        
        attack_traces_mul = reorder_traces(attack_mul[i,:,:])
        attack_traces_mul_1 = attack_traces_mul[0::2] 
        attack_traces_mul_2 = attack_traces_mul[1::2] 
        attack_traces_mul_lpf_1 = trace_butter_lpf(attack_traces_mul_1, sample_freq, cutoff_freq)
        attack_traces_mul_lpf_2 = trace_butter_lpf(attack_traces_mul_2, sample_freq, cutoff_freq)
        attack_traces_mul_lpf_1 = select_rows_by_labels(attack_traces_mul_lpf_1, mul_w, mul_idx_list)
        attack_traces_mul_lpf_2 = select_rows_by_labels(attack_traces_mul_lpf_2, mul_w, mul_idx_list)
        
        
        sets_4_5 = defaultdict(list)
        sets_6_7 = defaultdict(list)  
       
        for i in range(len(mul_idx_list)):
            if final_set[mul_idx_list[i]] == {4,5}:
                sets_4_5[mul_idx_list[i]].append(attack_traces_mul_lpf_1[i])
                sets_4_5[mul_idx_list[i]].append(attack_traces_mul_lpf_2[i])
                # labels_4_5.append(abs(mul_labels[mul_idx_list[i]])-4)
            elif final_set[mul_idx_list[i]] == {6,7}:
                sets_6_7[mul_idx_list[i]].append(attack_traces_mul_lpf_1[i])
                sets_6_7[mul_idx_list[i]].append(attack_traces_mul_lpf_2[i])
               

        sets_4_5 = dict_process(sets_4_5)
        sets_6_7 = dict_process(sets_6_7)
        # labels_4_5 = np.array(labels_4_5)
   
        prob_fpr_mul_4_5 = cal_per_mul_prob(sets_4_5,model_4_5,scaler_4_5,list(range(68)))
        prob_fpr_mul_6_7 = cal_per_mul_prob(sets_6_7,model_6_7,scaler_6_7,list(range(68))) 
        # print(prob_fpr_mul_4_5)
            
        fill_dict(fpr_mul_mle_prob_list_45, prob_fpr_mul_4_5)
        fill_dict(fpr_mul_mle_prob_list_67, prob_fpr_mul_6_7)
       
    fpr_mul_mle_prob_list_45 = dict_process(fpr_mul_mle_prob_list_45)
    fpr_mul_mle_prob_list_67 = dict_process(fpr_mul_mle_prob_list_67)
    # print(fpr_mul_mle_prob_list_45)
    
    guess_45 = mle_choose_value(fpr_mul_mle_prob_list_45)
    guess_67 = mle_choose_value(fpr_mul_mle_prob_list_67)
    # print(guess_45)

    part_guess_list=[guess_45, guess_67]
    final_guess_dict = fill_guess_dict(part_guess_list,final_set,sign_dict)
            
    return final_guess_dict
    
    

In [33]:
# To merge f and g into f||g, first check if the number of currently "determined" coefficients (with only one candidate value) is greater than or equal to 1024.
# If less than 1024, fail directly.
# If equal to 1024, check if these 1024 are correct.
# If greater than 1024, set it as num_guess, and randomly select 1024 from num_guess each time, continuously checking for correctness. If none of these attempts succeed, then it's a failure.

#The extraction principle is to prioritize keeping those definitely 0 or ±1, followed by ±2 or ±3, then ±4 or ±5, and finally ±6 or ±7.

#Return values: -1: fewer than 1024 confirmed cases; -2: exactly 1024 confirmed cases, but with errors; -3: len(candidate) < 1024 - need;
#-4: multiple attempts failed. 1: exactly 1024 successful; 2: successful after random sampling; 3: successful when the value reaches {2, 3}.

def check_key_right(f_guess_dict_p, g_guess_dict_p, filename_f_p, filename_g_p, kk_idx, attempts):
 
    import csv, random

    f_determined = {k: list(v)[0] for k, v in f_guess_dict_p.items() if len(v) == 1}
    g_determined = {k: list(v)[0] for k, v in g_guess_dict_p.items() if len(v) == 1}
    total_determined = len(f_determined) + len(g_determined)

    if total_determined < 1024:
        return -1

    def read_row(csv_path, row_idx):
        with open(csv_path, newline="") as csvfile:
            reader = csv.reader(csvfile)
            for j, row in enumerate(reader):
                if j == row_idx:
                    return list(map(int, row))
        raise ValueError(f"Row {row_idx} not found in {csv_path}")

    f_row = read_row(filename_f_p, kk_idx)
    g_row = read_row(filename_g_p, kk_idx)

    if total_determined == 1024:
        for k, v in f_determined.items():
            if f_row[k] != v:
                return -2
        for k, v in g_determined.items():
            if g_row[k] != v:
                return -2
        return 1

    
    #check the value at a specific location within {0, ±1, ±2, ±3}. 
    # This step is equivalent to the O0 level experiment.
    
    f_dict_p = {}
    g_dict_p = {}
  
    # special_values = {0, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5}
    special_values = {0, 1, -1, 2, -2, 3, -3}
    priority_positions = []
    for k, v in f_determined.items():
        if v in special_values:
            priority_positions.append(("f", k, v))
    for k, v in g_determined.items():
        if v in special_values:
            priority_positions.append(("g", k, v))

    if len(priority_positions) >= 1024:
        print("jinru")
        ok = True
        for src, k, v in priority_positions:
            if src == "f":
                if f_row[k] != v:
                    # print("f")
                    # print(k)
                    # print(f_row[k])
                    # print(v)
                    ok = False
                    break
            else:  # "g"
                if g_row[k] != v:
                    # print("g")
                    # print(k)
                    ok = False
                    break
        if ok:
            for source, k, v in priority_positions:
                if source == "f":
                    f_dict_p[k] = v
                elif source == "g":
                    g_dict_p[k] = v
            with open(f"./data/data_k_guess_isd/Falcon1024/f_{kk_idx}_guess.pkl", "wb") as f:
                pickle.dump(f_dict_p, f)
            with open(f"./data/data_k_guess_isd/Falcon1024/g_{kk_idx}_guess.pkl", "wb") as g:
                pickle.dump(g_dict_p, g)        
            return 3
    
    # --- Step3: total_determined > 512 ---
    special_positions = []
    for k, v in f_determined.items():
        if v in (0, 1, -1):
            special_positions.append(("f", k, v))
    for k, v in g_determined.items():
        if v in (0, 1, -1):
            special_positions.append(("g", k, v))

    m = len(special_positions)

    candidates = []
    for k, v in f_determined.items():
        if v not in (0, 1, -1):
            candidates.append(("f", k, v))
    for k, v in g_determined.items():
        if v not in (0, 1, -1):
            candidates.append(("g", k, v))

    need = 1024 - m
    print(f"need is {need}")
    print(f"len of candidates is {len(candidates)}")
    if need < 0 or len(candidates) < need:
        return -3

    # attempts = 10^6  # can adjust
    for _ in range(attempts):
        sample = random.sample(candidates, need)
        positions = special_positions + sample

        ok = True
        for src, k, v in positions:
            if src == "f":
                if f_row[k] != v:
                    ok = False
                    break
            else:  # src == "g"
                if g_row[k] != v:
                    ok = False
                    break
        if ok:
            for source, k, v in positions:
                if source == "f":
                    f_dict_p[k] = v
                elif source == "g":
                    g_dict_p[k] = v
            with open(f"./data/data_k_guess_isd/Falcon1024/f_{kk_idx}_guess.pkl", "wb") as f:
                pickle.dump(f_dict_p, f)
            with open(f"./data/data_k_guess_isd/Falcon1024/g_{kk_idx}_guess.pkl", "wb") as g:
                pickle.dump(g_dict_p, g)       
            return 2

    return -4
    
    

In [34]:
aaaa = np.load(f"./data-O3-828/falcon1024_attack/fpr_scaled/0th_f_50traces.npy")
print(aaaa.shape)

(50, 1024, 484)


In [35]:
import pickle
import random
random.seed(2025)
mle_N = 10
try_random_num = 100000
succ_num_count=0
num_attack_key=100
key_key_idx_list=[]
error_list=[]
# aaa_list = [0,10,22,25,30,72,80,90]
# for key_th in range(num_attack_key):
for key_th in range(num_attack_key):
    # the attack trace segments corresponding to the fpr_scaled, normalization procedure and mantissa multiplication in fpr_mul for the f key
    attack_traces_scaled_f = np.load(f"./data-O3-828/falcon1024_attack/fpr_scaled/{key_th}th_f_50traces.npy")[:mle_N,:,:]
    attack_traces_diffset_f = np.load(f"./data-O3-828/falcon1024_attack/fpr_mul/{key_th}th_f_shift_50traces.npy")[:mle_N,:,:]
    attack_traces_multi_f = np.load(f"./data-O3-828/falcon1024_attack/fpr_mul/{key_th}th_f_multiply_50traces.npy")[:mle_N,:,:]
    # the attack trace segments corresponding to the fpr_scaled, normalization procedure and mantissa multiplication in fpr_mul for the g key
    attack_traces_scaled_g = np.load(f"./data-O3-828/falcon1024_attack/fpr_scaled/{key_th}th_g_50traces.npy")[:mle_N,:,:]
    attack_traces_diffset_g = np.load(f"./data-O3-828/falcon1024_attack/fpr_mul/{key_th}th_g_shift_50traces.npy")[:mle_N,:,:]
    attack_traces_multi_g = np.load(f"./data-O3-828/falcon1024_attack/fpr_mul/{key_th}th_g_multiply_50traces.npy")[:mle_N,:,:]
    
    f_guess_dict = attack_key_repeat(attack_traces_scaled_f,attack_traces_diffset_f,attack_traces_multi_f,mle_N)
    g_guess_dict = attack_key_repeat(attack_traces_scaled_g,attack_traces_diffset_g,attack_traces_multi_g,mle_N)
 
    re = check_key_right(f_guess_dict, g_guess_dict, filename_f, filename_g, key_th, try_random_num)
    print(f"the {key_th}th key guess's state is {re}")
    if re>0:
        succ_num_count = succ_num_count + 1
        key_key_idx_list.append(key_th)
    else:
        error_list.append(key_th)
print(f"the succ count is {succ_num_count}")
   

jinru
the 0th key guess's state is 3
jinru
need is 212
len of candidates is 614
the 1th key guess's state is 2
jinru
need is 205
len of candidates is 606
the 2th key guess's state is -4
jinru
the 3th key guess's state is 3
jinru
the 4th key guess's state is 3
jinru
the 5th key guess's state is 3
jinru
the 6th key guess's state is 3
jinru
the 7th key guess's state is 3
jinru
the 8th key guess's state is 3
jinru
need is 186
len of candidates is 577


KeyboardInterrupt: 