Features - ( Real, Imaginary )

CIRs - 8000

Atoms - 16000, 128  
trainData - 12800, 128  
testData - 3200, 64

dictionary - 128, 12800

In [16]:
import numpy as np
from sklearn.linear_model import OrthogonalMatchingPursuit
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split 
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.decomposition import DictionaryLearning
from sklearn.linear_model import Lasso

np.set_printoptions(threshold=np.inf)
np.set_printoptions(suppress=True)

measurement = np.load('../../dataset/meas_symm_1.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_symm_2.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_symm_3.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_symm_4.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_symm_5.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_symm_nomove_1.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_symm_varspeed_1.npz', allow_pickle=False)

# measurement = np.load('../../dataset/meas_asymm_1.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_asymm_2.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_asymm_nomove_1.npz', allow_pickle=False)
# measurement = np.load('../../dataset/meas_asymm_reflector_1.npz', allow_pickle=False)

header, data = measurement['header'], measurement['data']
data_cir = data['cirs'][:8000]
trainCIR, testCIR = train_test_split(data_cir, test_size=0.2, random_state=42)
print(f'trainData - {trainCIR.shape}')
print(f'testData - {testCIR.shape}')

trainData - (6400, 15, 251, 2)
testData - (1600, 15, 251, 2)


In [17]:
def get64Samples(cirs):

    real = cirs[:, :, 0]
    imag = cirs[:, :, 1]
    
    # Number of signalsd
    num_signals = real.shape[0]  # 3 in this case
    # print(f'num_signals: {num_signals}')
    # Initialize lists to store the focused samples
    imp_real_parts = []
    imp_imag_parts = []
    img_mag_parts = []
    
    for i in range(num_signals):
        # Calculate the magnitude
        magnitude = np.abs(real[i] + 1j * imag[i])
        
        # find the peak index
        peak_index = np.argmax(magnitude)
        
        # Calculate the start and end indices for the focused part
        start_index = max(0, peak_index - 32)
        end_index = min(magnitude.shape[0], peak_index + 32)
        
        # Extract the part of the signal around the peak
        real_part_focus = real[i, start_index:end_index]
        imag_part_focus = imag[i, start_index:end_index]
        mag_part_focus = magnitude[start_index:end_index]
        
        imp_real_parts.append(real_part_focus)
        imp_imag_parts.append(imag_part_focus)
        img_mag_parts.append(mag_part_focus)
        

    # Convert lists back to arrays for further processing if needed
    imp_real_parts = np.array(imp_real_parts)
    imp_imag_parts = np.array(imp_imag_parts)
    img_mag_parts = np.array(img_mag_parts)

    return imp_real_parts, imp_imag_parts, img_mag_parts

In [None]:
def omp_manual(dictionary, signal, sparsity):
    n_features = dictionary.shape[1]
    residual = signal
    idx = []
    coefficients = np.zeros(n_features)

    for _ in range(sparsity):
        
        # Step 1: Find the best matching atom (column from the dictionary)
        projections = np.abs(dictionary.T @ residual)
        best_atom = np.argmax(projections)
        
        # Step 2: Add the index of the best atom to the list
        idx.append(best_atom)
        
        # Step 3: Solve least squares to find new coefficients
        selected_atoms = dictionary[:, idx]
        coefficients_ls, _, _, _ = np.linalg.lstsq(selected_atoms, signal, rcond=None)
        
        # Step 4: Update the residual
        residual = signal - selected_atoms @ coefficients_ls

    # Store the final coefficients
    for i, index in enumerate(idx):
        coefficients[index] = coefficients_ls[i]

    return coefficients


# Step 5: Sparse Coding Function
def find_sparse_coefficients(tSample, D, n_nonzero_coefs=15):
    omp = OrthogonalMatchingPursuit(n_nonzero_coefs=n_nonzero_coefs)
    omp.fit(D, tSample)
    return omp.coef_


def lasso_method(dictionary, signal, alpha=0.01):
    lasso = Lasso(alpha=alpha, fit_intercept=False)
    lasso.fit(dictionary, signal)
    return lasso.coef_

def testOMP(tSample, D, n_nonzero_coefs=15):
    
    # residual === test sample
    residual = tSample.copy()
    
    # List to keep track of which atoms are selected.
    idx_selected = []
    
    n_atoms = D.shape[1]
    
    coefficients = np.zeros(n_atoms)
    
    # We loop up to n_nonzero_coefs times to select the most relevant atoms.
    for _ in range(n_nonzero_coefs):
        # how similar each atom is to the current residual.
        correlations = D.T @ residual
        
        idx = np.argmax(np.abs(correlations))
        
        idx_selected.append(idx)
        
        # Extract the selected atoms
        D_selected = D[:, idx_selected]  # Shape: (n_features, len(idx_selected))
        
        coef_selected, _, _, _ = np.linalg.lstsq(D_selected, tSample, rcond=None)
        coefficients[idx_selected] = coef_selected
        residual = tSample - D_selected @ coef_selected

        if np.linalg.norm(residual) < 1e-6:
            break
    
    return coefficients


In [19]:
# Define channels
alice_channel = 3  # Channel 3 is ALICE (legitimate)
eve_channel = 6  # Channel 6 is EVE (illegitimate)

# Extract data for ALICE and BOB channels
alice_train_CIRs = trainCIR[:, alice_channel, :, :]
eve_train_CIRs = trainCIR[:, eve_channel, :, :] # (100, 251, 2)

alice_train_real, alice_train_imag, alice_train_magnitude = get64Samples(alice_train_CIRs)
alice_train_features = np.hstack((alice_train_real, alice_train_imag))

eve_train_real, eve_train_imag, eve_train_magnitude = get64Samples(eve_train_CIRs)
eve_train_features = np.hstack((eve_train_real, eve_train_imag))

# Create labels for Alice and Eve for training
alice_train_labels = np.zeros(alice_train_features.shape[0])  # Label '0' for Alice.
eve_train_labels = np.ones(eve_train_features.shape[0])       # Label '1' for Eve.

# Combine data and labels for training
train_atoms = np.vstack((alice_train_features, eve_train_features))
train_labels = np.hstack((alice_train_labels, eve_train_labels))
print(f'Atoms - {train_atoms.shape}')
D = train_atoms.T
print(f'Dictionary - {D.shape}')

Atoms - (12800, 128)
Dictionary - (128, 12800)


In [20]:
# Step 3: Extract Features for Test Data
alice_test_CIRs = testCIR[:, alice_channel, :, :]
eve_test_CIRs = testCIR[:, eve_channel, :, :]

alice_test_real, alice_test_imag, alice_test_magnitude = get64Samples(alice_test_CIRs)
alice_test_features = np.hstack((alice_test_real, alice_test_imag))

eve_test_real, eve_test_imag, eve_test_magnitude = get64Samples(eve_test_CIRs)
eve_test_features = np.hstack((eve_test_real, eve_test_imag))

# Create labels for Alice and Eve for testing
alice_test_labels = np.zeros(alice_test_features.shape[0])  # Label '0' for Alice.
eve_test_labels = np.ones(eve_test_features.shape[0])       # Label '1' for Eve.

# Combine data and labels for testing
test_atoms = np.vstack((alice_test_features, eve_test_features))
test_labels = np.hstack((alice_test_labels, eve_test_labels))

In [None]:


# Step 6: Function to Calculate Residuals for Each Class
def calculate_residual(tSample, coefficients, class_indices, D):
    coef_class = np.zeros_like(coefficients)
    coef_class[class_indices] = coefficients[class_indices]  # Keep onltSample coefficients for the specified class
    reconstructed_signal = D @ coef_class
    residual = np.linalg.norm(tSample - reconstructed_signal)
    return residual

# Step 7: Classification Function
def classify_signal(tSample, D, trainLabel):
    sparsity = 15 # Number of non-zero coefficients
    # coefficients = find_sparse_coefficients(tSample, D)
    # print(f'coefficients - {coefficients}')
    # coefficients = omp_manual(D, tSample, sparsity)
    # print(f'coefficients - {coefficients}')
    # coefficients = lasso_method(D, tSample)
    coefficients = testOMP(tSample, D)
    
    # print(coefficients_old.shape)
    # print(f'coefficients_old - {coefficients_old}')
    
    # print(coefficients.shape)
    # print(f'coefficients - {coefficients}')
    
    
    residuals = []

    unique_classes = np.unique(trainLabel) # 0 and 1
    for class_label in unique_classes:
        class_indices = np.where(trainLabel == class_label)[0]  # Indices of columns in D belonging to this class
        residual = calculate_residual(tSample, coefficients, class_indices, D)
        residuals.append(residual)

    # Predict the class with the smallest residual
    predicted_class = unique_classes[np.argmin(residuals)]
    return predicted_class

In [22]:
# Step 8: Classifying Test Data and Evaluating the Model
predictions = []

for testSample in test_atoms:
    predicted_class = classify_signal(testSample, D, train_labels)
    predictions.append(predicted_class)


predictions = np.array(predictions)
# print(predictions.shape)

accuracy = np.mean(predictions == test_labels)
print(f"Classification Accuracy: {accuracy * 100:.2f}%")

Classification Accuracy: 64.72%


In [23]:
# Calculate confusion matrix
print(f"\nTotal testing channel: {test_labels.shape}")

tn, fp, fn, tp = confusion_matrix(test_labels, predictions, labels=[0, 1]).ravel()

print(f"tp: {tp}")
print(f"tn: {tn}")
print(f"fp: {fp}")
print(f"fn: {fn}")

# # Missed Detection Rate (MDR)
MDR = fp / (fp + tn)

# # False Alarm Rate (FAR)
FAR = fn / (fn + tp)

# # Gamma calculation
gamma = (tp + fn) / (tn + fp)

# # Authentication Rate (AR)
AR = (tp + gamma * tn) / ((tp + fn) + gamma * (tn + fp))

print(f"MDR: {MDR}")
print(f"FAR: {FAR}")
print(f"AR: {AR}")


Total testing channel: (3200,)
tp: 1330
tn: 741
fp: 859
fn: 270
MDR: 0.536875
FAR: 0.16875
AR: 0.6471875
