# Deep Learning-Based Compressed Sensing under IQ Imbalance For Channel Estimation of mmWave Phased Arrays
Institution: TU Delft
Faculty: Mechanical Engineering
Authors: Tom Lijding and Daan van Haasteren

### Python and Package Versions
Python: 3.11.11
Numpy: 2.2.1
Scipy: 1.15.0
Matplotlib: 3.10.0
Pytorch: 2.5.1

### Introduction
This code demonstrates the use of a autoencoder based compressed sensing algorithm in channel estimation under IQ imbalance. We refer the reader to the file "Data_Compression_Deep_Learning_based_CS_under_IQ_Imbalance.pdf" for a full report and nice visuals!

### Structure
The code is structured as follows. First, the dataset is built upon which we train the neural networks. Next we create the neural networks i.e. build the custom layer "ComplexLinearUnitary" which functions as the encoding layer, and build different models for different varying parameters (SNR, IRR, Encoding Dimension).

In [12]:
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import random
import cmath
import math
plt.rcParams.update(plt.rcParamsDefault)

# Define our buildDataSet function here!

In [13]:
def buildDataSet(max_amplitude, min_sparsity, max_sparsity, vector_size, data_set_size):
    sparse_data = np.zeros((vector_size, data_set_size), dtype=float)  # Ensure float type
    
    # Iterate over the columns of the sparse_data matrix to define the data samples
    for i in range(data_set_size):
        sparsity = random.randint(min_sparsity, max_sparsity)
        indices = random.sample(range(vector_size), sparsity)
        amps = np.random.uniform(-max_amplitude, max_amplitude, sparsity)  # Use negative and positive values
        sparse_data[indices, i] = amps
    
    # Define the DFT matrix and multiply our sparse_data vectors with it to find dense data
    DFT = sp.linalg.dft(vector_size) / np.sqrt(vector_size)
    dense_data = DFT @ sparse_data
    
    return dense_data, sparse_data


## Build the dataset

In [19]:
max_amplitude = 100
min_sparsity = 10
max_sparsity = 30
vector_size = 100
data_set_size = 10000
dense_data, sparse_data = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,data_set_size)


## Test the dataset (optional)

In [15]:
DFT = sp.linalg.dft(vector_size)/np.sqrt(vector_size)
iDFT = DFT.conj().T

# Check if the iDFT of the dense data is in fact sparse
print(iDFT@dense_data)


[[ 8.42236390e-15+5.59858656e-13j -1.78101656e-13+1.16678011e-13j
  -1.96207319e+01+4.47415950e-13j ... -2.66617990e-13-5.70777021e-14j
  -3.01962516e-13+4.10117185e-13j -1.75987889e-13+1.37646140e-13j]
 [-5.76536696e+01-7.06345817e-14j -5.09761094e-14+5.50332059e-13j
  -7.22462010e+00-5.66894773e-13j ... -3.36630425e-13-4.59112807e-14j
   1.92974157e-13+3.80782684e-13j -1.19477162e-13-7.46097795e-13j]
 [-9.07277805e+01-1.92495524e-14j  3.88790302e-13-2.82147707e-13j
  -2.00169755e-13+2.20476229e-13j ... -2.71859013e-13+1.87355447e-13j
   1.54941355e-13+4.96672535e-14j -1.18193726e-13-1.30254032e-13j]
 ...
 [-8.86623889e+00+1.90558045e-13j  1.14481112e-13-2.86399803e-13j
   1.63172946e-13-1.28146288e-13j ... -2.75127771e-13+7.04478073e-13j
   5.26689803e-13-2.63594079e-14j  2.68939306e-13-3.27366672e-13j]
 [ 7.38063978e-13+3.33911980e-13j  1.41576990e-13+2.34875898e-14j
  -2.00927326e-13-1.58520086e-13j ...  2.03568811e-13+2.25272065e-13j
  -4.20844184e-14-3.22162963e-13j -3.69575424e-

From the above results, we can see that our vectors are very sparse if we take the IDFT

## Setting up the dataset for Pytorch

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import torch.nn.init as init

from sklearn.model_selection import train_test_split

print(dense_data.shape)

X = np.concatenate((dense_data.real,dense_data.imag)).T
Y = np.concatenate((dense_data.real,dense_data.imag)).T

X_tensor = torch.tensor(X,dtype=torch.float)
Y_tensor = torch.tensor(Y,dtype=torch.float)
dataset = TensorDataset(X_tensor,Y_tensor)

dataloader = DataLoader(dataset,batch_size = 500,shuffle = True, )
print(X_tensor.shape)


(100, 10000)
torch.Size([10000, 200])


## Setting up the Neural Networks

In [17]:

def complex_xavier_init(tensor_real, tensor_imag, gain=1.0):
    # Apply Xavier initialization (using uniform variant) to both real and imaginary parts
    init.xavier_uniform_(tensor_real, gain=gain)
    init.xavier_uniform_(tensor_imag, gain=gain)

class ComplexLinearUnitary(nn.Module):
    # This class serves as the encoder layer. We are restricted to values which are of the form e^jq where q are trainable parameters
    # Notice that the input and output dimensions are half of what the actual vector size is! Because it is a complex value, our dimensions are twice as long
    def __init__(self,input_dim,output_dim):
        super(ComplexLinearUnitary,self).__init__()
        # Here we create the q-values of our unitary matrix. These are the parameters we are training such that each entry of our complex matrix to encode our data is |F_ij| = 1
        self.q_values = nn.Parameter(torch.randn(output_dim,input_dim))

    def forward(self,x):   
        # Compute unitary weights dynamically in each forward pass
        W_real = torch.cos(self.q_values)
        W_imag = torch.sin(self.q_values)
        W_top = torch.cat([W_real, -W_imag], dim=1)  # [W_real, -W_imag]
        W_bottom = torch.cat([W_imag, W_real], dim=1)  # [W_imag, W_real]
        W_total = torch.cat([W_top, W_bottom], dim=0)  # Stack rows to form the full matrix 
        out = torch.matmul(x,W_total.T)
        return out
    
class ComplexLinear(nn.Module):
    # Notice that the input and output dimensions are half of what the actual vector size is! Because it is a complex value, our dimensions are twice as long. This gets fixed because we make the matrix
    # W_total which multiplies [x_real;x_imag] and returns [y_real;y_imag]
    def __init__(self,input_dim,output_dim):
        super(ComplexLinear,self).__init__()
        # Here we create the complex matrix W
        #self.W_real = nn.Parameter(torch.randn(output_dim,input_dim))# eye(input_dim))
        #self.W_imag = nn.Parameter(torch.randn(output_dim,input_dim)) #zeros((input_dim,output_dim)))

        self.W_real = nn.Parameter(torch.empty(output_dim, input_dim))
        self.W_imag = nn.Parameter(torch.empty(output_dim, input_dim))
        self.reset_parameters()
    
    def reset_parameters(self):
        # Initialize both the real and imaginary parts using Xavier initialization.
        complex_xavier_init(self.W_real, self.W_imag)
        
    def forward(self,x):   
        # Compute unitary weights dynamically in each forward pass
        W_real = self.W_real
        W_imag = self.W_imag
        W_top = torch.cat([W_real, -W_imag], dim=1)  # [W_real, -W_imag]
        W_bottom = torch.cat([W_imag, W_real], dim=1)  # [W_imag, W_real]
        W_total = torch.cat([W_top, W_bottom], dim=0)  # Stack rows to form the full matrix 
        out = torch.matmul(x,W_total.T)
        return out

class FeedthroughEncoder(nn.Module):
    def __init__(self, input_dim, encoding_dim):
        super(FeedthroughEncoder, self).__init__()
        """ compression_factor = input_dim / encoding_dim
        layer_size_factor = int(encoding_dim * compression_factor / 4) """

        self.encoder = ComplexLinear(input_dim,encoding_dim)

        self.decoder = nn.Sequential(
            ComplexLinear(encoding_dim,input_dim)
        )

    def forward(self, x):
        encoder_out = self.encoder(x)
        return self.decoder(encoder_out)


class LearnedAutoencoder(nn.Module):
    def __init__(self, input_dim, encoding_dim,hidden_dims):
        super(LearnedAutoencoder, self).__init__()

        self.encoder = ComplexLinearUnitary(input_dim,encoding_dim)
        layers = []
        prev_dim = encoding_dim*2
        for dim in hidden_dims:
            layers.append(nn.Linear(prev_dim,dim*2))
            layers.append(nn.ReLU())
            prev_dim = dim*2
        self.decoder = nn.Sequential(
            *layers,
            nn.Linear(prev_dim,input_dim*2)
        )

    def forward(self, x):
        encoder_out = self.encoder(x)

        return self.decoder(encoder_out)
    
class LearnedAutoencoderWithNoise(nn.Module):
    def __init__(self, input_dim, encoding_dim,hidden_dims,variance):
        super(LearnedAutoencoderWithNoise, self).__init__()
        self.variance = variance
        self.encoder = ComplexLinearUnitary(input_dim,encoding_dim)
        self.encoding_dim = encoding_dim
        layers = []
        prev_dim = encoding_dim*2
        for dim in hidden_dims:
            layers.append(nn.Linear(prev_dim,dim*2))
            layers.append(nn.ReLU())
            prev_dim = dim*2
        self.decoder = nn.Sequential(
            *layers,
            nn.Linear(prev_dim,input_dim*2)
        )

    def forward(self,x):
        encoder_out = self.encoder(x)
        noise_np = np.random.normal(0,self.variance,size=self.encoding_dim*2)
        noise = torch.tensor(noise_np,dtype=torch.float)
        noisy_y = encoder_out + noise
        return self.decoder(noisy_y)


class LearnedAutoencoderWithIQImbalance(nn.Module):
    def __init__(self, input_dim, encoding_dim,hidden_dims,b, d,variance):
        super(LearnedAutoencoderWithIQImbalance, self).__init__()
        self.encoder = ComplexLinearUnitary(input_dim,encoding_dim)
        self.encoding_dim = encoding_dim
        self.variance = variance
        self.r = torch.tensor(0.5*(1+b*np.exp(1j*d)), dtype=torch.complex64)
        layers = []
        prev_dim = encoding_dim*2
        for dim in hidden_dims:
            layers.append(nn.Linear(prev_dim,dim*2))
            layers.append(nn.ReLU())
            prev_dim = dim*2
        self.decoder = nn.Sequential(
            *layers,
            nn.Linear(prev_dim,input_dim*2)
        )

    def forward(self,x):
        encoder_out = self.encoder(x)
        y_real = encoder_out[:, :self.encoding_dim]
        y_imag = encoder_out[:, self.encoding_dim:]
        y = torch.complex(y_real,y_imag)
        yiq = self.r * y + (1-self.r.conj()) * (y.conj())
        yiqr = yiq.real
        yiqi = yiq.imag
        yiqstack = torch.cat((yiqr,yiqi),dim=1)
        noise_np = np.random.normal(0,self.variance,size=self.encoding_dim*2)
        noise_tensor = torch.tensor(noise_np,dtype=torch.float)
        y_iq_stack_noisy = yiqstack + noise_tensor
        return self.decoder(y_iq_stack_noisy)


#### Training the feedthrough model

In [None]:
# Define the size of our "measurement" vector as encoding_dim. This needs to be larger than the sparsity of our matrix

# encoding_dim = max_sparsity
# encoding_dim = 100
# vector_size = 100
encoding_dim = vector_size
# Initialize model
feedthrough_model = FeedthroughEncoder(vector_size, encoding_dim)
optimizer = torch.optim.Adam(feedthrough_model.parameters(), lr=1E-2, weight_decay=1E-6)
loss_fn = nn.MSELoss()

# def complex_mse_loss(input, target):
#     return F.mse_loss(input, target)

# Training loop
losses = []
for epoch in range(20):
    for batch in dataloader:
        inputs, targets = batch  # Unpack the tuple
        optimizer.zero_grad()
        output = feedthrough_model(inputs)
        loss = loss_fn(output, targets)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())

    print(f"Epoch {epoch+1}, Loss: {loss.item():.6f}")

plt.plot(losses)
plt.show()


#### Train the AutoEncoder Model

In [None]:
# Training variables: Beta_momentum, beta_variance, learning_rate, encoding_dim, sparsity, weight_decay

# Current Iteration: 1
# Beta_momentum: 0.95
# Beta_variance: 0.99
# Learning_rate: 1E-3
# weight_decay = 0
# sparsity = 3-5

encoding_dim = 50

# Initialize model
hidden_dims = np.array([60,80])
learned_autoencoder_model = LearnedAutoencoder(vector_size,encoding_dim,hidden_dims)
optimizer = torch.optim.Adam(learned_autoencoder_model.parameters(), lr=1E-3, betas=(0.9,0.999))
loss_fn = nn.MSELoss()

# def complex_mse_loss(input, target):
#     return F.mse_loss(input, target)

# Training loop
losses = []
lowest_loss = float("inf")
for epoch in range(5000):
    for batch in dataloader:
        inputs, targets = batch  # Unpack the tuple
        optimizer.zero_grad()
        output = learned_autoencoder_model(inputs)
        loss = loss_fn(output, targets)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
    if loss < lowest_loss:
        lowest_loss = loss.item()
        early_stopping_counter = 0
        best_model = learned_autoencoder_model
    else:
        early_stopping_counter += 1
        if early_stopping_counter > 100:
            learned_autoencoder_model = best_model
            print(f"stopped early after {epoch+1} epochs, with a loss of: {lowest_loss}")
            break

    print(f"Epoch {epoch+1}, Loss: {lowest_loss:.6f}")

plt.plot(losses)
plt.show()


Some conclusions that we have found so far:
- Batch size must be minimally greater than 100. 500 seems to work well.
- 2 hidden layers of size 50,70 leads to a loss of 0.3 MSE
- Decreasing variance below 0.999 does not seem to have a positive effect


## Train Noisy Models

In [20]:
# Training variables: Beta_momentum, beta_variance, learning_rate, encoding_dim, sparsity, weight_decay

# Current Iteration: 1
# Beta_momentum: 0.9
# Beta_variance: 0.999
# Learning_rate: 1E-3
# weight_decay = 0
# sparsity = 3-5
# variance = 1
noisy_models = []
noisy_losses = []

# We define our signal to noise ratio as ranging from 0 to 20 dB
db_list = [2,5,8,11,14,17,20]
dataloader, signal_variance = Generate_Dataloader(max_amplitude,min_sparsity,max_sparsity,vector_size,data_set_size)
SNR = {}

# Then define the absolute values of the SNR ratio
for db_ratio in db_list:
    SNR[db_ratio] = 10**(db_ratio/10)

for db,abs in SNR.items():
    encoding_dim = 50
    variance = signal_variance/abs
    # Initialize model
    hidden_dims = np.array([60,80])
    noisy_autoencoder_model = LearnedAutoencoderWithNoise(vector_size,encoding_dim,hidden_dims,variance)
    optimizer = torch.optim.Adam(noisy_autoencoder_model.parameters(), lr=1E-3, betas=(0.9,0.999))
    loss_fn = nn.MSELoss()
# def complex_mse_loss(input, target):
#     return F.mse_loss(input, target)
    # Training loop
    losses = []
    lowest_loss = float("inf")
    for epoch in range(10000):
        for batch in dataloader:
            inputs, targets = batch  # Unpack the tuple
            optimizer.zero_grad()
            output = noisy_autoencoder_model(inputs)
            loss = loss_fn(output, targets)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
        if loss.item() < lowest_loss:
            lowest_loss = loss.item()
            early_stopping_counter = 0
            best_model = noisy_autoencoder_model
        else:
            early_stopping_counter += 1
            if early_stopping_counter > 100:
                noisy_autoencoder_model = best_model
                print(f"Stopped early after {epoch+1} epochs, with loss {lowest_loss:.6f}")
                break

        print(f"SNR Ratio: {db}, Epoch {epoch+1}, Loss: {loss.item():.6f}")
    noisy_models.append(best_model)
    noisy_losses.append(lowest_loss)

# plt.plot(losses)
# plt.show()(noisy_losses


SNR Ratio: 2, Epoch 1, Loss: 380.523987
SNR Ratio: 2, Epoch 2, Loss: 344.328766
SNR Ratio: 2, Epoch 3, Loss: 338.364807
SNR Ratio: 2, Epoch 4, Loss: 335.327332
SNR Ratio: 2, Epoch 5, Loss: 335.813202
SNR Ratio: 2, Epoch 6, Loss: 327.324707
SNR Ratio: 2, Epoch 7, Loss: 337.990845
SNR Ratio: 2, Epoch 8, Loss: 332.402710
SNR Ratio: 2, Epoch 9, Loss: 338.335205
SNR Ratio: 2, Epoch 10, Loss: 330.727051
SNR Ratio: 2, Epoch 11, Loss: 339.765747
SNR Ratio: 2, Epoch 12, Loss: 328.184753
SNR Ratio: 2, Epoch 13, Loss: 329.413635
SNR Ratio: 2, Epoch 14, Loss: 331.271606
SNR Ratio: 2, Epoch 15, Loss: 335.002808
SNR Ratio: 2, Epoch 16, Loss: 328.249695
SNR Ratio: 2, Epoch 17, Loss: 333.738342
SNR Ratio: 2, Epoch 18, Loss: 327.551666
SNR Ratio: 2, Epoch 19, Loss: 334.903687
SNR Ratio: 2, Epoch 20, Loss: 323.854065
SNR Ratio: 2, Epoch 21, Loss: 326.332367
SNR Ratio: 2, Epoch 22, Loss: 333.337006
SNR Ratio: 2, Epoch 23, Loss: 330.342590
SNR Ratio: 2, Epoch 24, Loss: 341.089325
SNR Ratio: 2, Epoch 25, L

# Save the Noisy Models

In [23]:
for indx, (db, value) in enumerate(SNR.items()):
    torch.save(noisy_models[indx].state_dict(), f"Models/noisy_models/sparsity_{min_sparsity}-{max_sparsity}/noisy_model_{db}_{min_sparsity}-{max_sparsity}.pt")

## Initialize pre-trained noiseless model

In [None]:
# Instantiate your model architecture first
encoding_dim = 50
vector_size = 100
hidden_dims = np.array([50,70])
pretrained_model = LearnedAutoencoder(vector_size,encoding_dim,hidden_dims)
# Load the state dictionary
pretrained_model.load_state_dict(torch.load("model_state_hidden_layers_50_70.pt"))

pretrained_model.eval()  # Set the model to evaluation mode if needed

## Show the output for a given sparse input vector



In [None]:
# Here we generate a test vector from our buildDataSet function, put it through the model and look at the output
h, x = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,1)

H = np.concatenate((h.real,h.imag)).T

H_tensor = torch.tensor(H,dtype=torch.float)

H_hat = pretrained_model(H_tensor)

h_hat = np.array(H_hat.detach())

h_real,h_imag = np.split(h_hat,2,1)
h_hat = h_real + 1j*h_imag
h_hat = h_hat.reshape(-1,1)
DFT = sp.linalg.dft(vector_size)/np.sqrt(vector_size)
iDFT = DFT.conj().T


x_hat = iDFT@h_hat
indices = range(len(x_hat))


In [None]:
plt.vlines(indices,0,x,linewidth=3)
plt.vlines(indices,0,x_hat,colors="orange")

plt.legend(("$x$","$\hat{x}$"))
plt.show()


## Extract the encoding matrix, generate a (noisy) y and use the decoder to find $\hat{h}$, then plot $\hat{x}$

In [None]:
# Load the autoencoder

encoding_dim = 50
vector_size = 100
hidden_dims = np.array([60,80])
variance = 10
noisy_autoencoder = LearnedAutoencoderWithNoise(vector_size,encoding_dim,hidden_dims,variance)
noisy_autoencoder.load_state_dict(torch.load("noisy_autoencoder_10var_60_80.pt",))
noisy_autoencoder.eval()

# Build the W matrix
q_values = noisy_autoencoder.encoder.q_values

W_real = torch.cos(q_values)
W_imag = torch.sin(q_values)
W_top = torch.cat([W_real, -W_imag], dim=1)  # [W_real, -W_imag]
W_bottom = torch.cat([W_imag, W_real], dim=1)  # [W_imag, W_real]
W_total = torch.cat([W_top, W_bottom], dim=0)  # Stack rows to form the full matrix 

# Build the input vector

h, x = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,1)

H = np.concatenate((h.real,h.imag)).T

H_tensor = torch.tensor(H,dtype=torch.float)

y = torch.matmul(H_tensor,W_total.T)

# Make the noise

variance = 10

noise_np = np.random.normal(0,variance,size=100)
noise = torch.tensor(noise_np,dtype=torch.float)
noisy_y = y + noise

# Rebuild h, and x from the noisy y using the decoder

H_hat = noisy_autoencoder.decoder(noisy_y)

h_hat = np.array(H_hat.detach())

h_real,h_imag = np.split(h_hat,2,1)
h_hat = h_real + 1j*h_imag
h_hat = h_hat.reshape(-1,1)
DFT = sp.linalg.dft(vector_size)/np.sqrt(vector_size)
iDFT = DFT.conj().T

x_hat = iDFT@h_hat
indices = range(len(x_hat))


## Plotting

In [None]:
plt.rcParams['text.usetex'] = False

plt.vlines(indices,0,x,linewidth=3)
plt.vlines(indices,0,x_hat,colors="orange")

plt.legend((r"$x$",r"$\hat{x}$"))
plt.show()

## Show the weights

In [None]:
# THIS DOESN'T WORK. TO BE FIXED

for idx, layer in enumerate(pretrained_model.decoder):
    # Check if the layer is an instance of ComplexLinear
    if isinstance(layer, nn.Linear):
        print(f"Decoder layer {idx} (ComplexLinear) W_real:")
        print(layer.weight)
        print(f"Decoder layer {idx} (ComplexLinear) W_imag:")
        print(layer.weight)


In [None]:
imbalance_levels = [0, 0.04, 0.1, 0.3, 0.6, 1]
IRR_ratios = {}
db_IRR_ratios = []

for level in imbalance_levels:
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    r = 0.5*(1+b*np.exp(1j*d))
    IRR_ratio = (np.abs(r)**2)/(np.abs(1-r)**2)
    IRR_ratios[level] = (10*np.log10(IRR_ratio))


In [None]:
imbalanced_models = []
imbalanced_losses = []

# Looping over 
for level, db_ratio in IRR_ratios.items():
    encoding_dim = 50
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    variance = 0
    # Initialize model
    hidden_dims = np.array([60,80])
    imbalanced_autoencoder_model = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
    optimizer = torch.optim.Adam(imbalanced_autoencoder_model.parameters(), lr=1E-3, betas=(0.9,0.999))
    loss_fn = nn.MSELoss()

    # def complex_mse_loss(input, target):
    #     return F.mse_loss(input, target)

    # Training loop
    losses = []
    lowest_loss = float("inf")
    for epoch in range(10000):
        for batch in dataloader:
            inputs, targets = batch  # Unpack the tuple
            optimizer.zero_grad()
            output = imbalanced_autoencoder_model(inputs)
            loss = loss_fn(output, targets)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
        if loss< lowest_loss:
            lowest_loss = loss
            early_stopping_counter = 0
            best_model = imbalanced_autoencoder_model
        else:
            early_stopping_counter += 1
            if early_stopping_counter > 100:
                imbalanced_autoencoder_model = best_model
                print(f"Stopped early after {epoch+1} epochs, with loss of {lowest_loss:.6f}")
                break
        print(f"IRR Ratio:{db_ratio}, Epoch {epoch+1}, Loss: {loss.item():.6f}")
    imbalanced_models.append(best_model)
    imbalanced_losses.append(lowest_loss)

# plt.plot(losses)
# plt.show()


# Train the IQ Imbalance model

In [None]:

imbalance_levels = [0, 0.04, 0.1, 0.3, 0.6, 1]
IRR_ratios = {}

# Calculate the dB imbalance levels, we store them in the dictionary IRR_ratios such that we can extract it with the corresponding level imbalance
for level in imbalance_levels:
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    r = 0.5*(1+b*np.exp(1j*d))
    IRR_ratio = (np.abs(r)**2)/(np.abs(1-r)**2)
    IRR_ratios[level] = 10*np.log10(IRR_ratio)

# Save the IQ-Imbalance Models


In [None]:
for indx, items in enumerate(IRR_ratios):
    torch.save(imbalanced_models[indx].state_dict(), f"imbalanced_model_{items:.3f}_{min_sparsity}-{max_sparsity}.pt")

# Train various measurement matrix size

In [None]:
measurement_sizes = [5, 10, 20, 30, 40, 50]
measurement_models = []
measurement_losses = []

for encoding_dim in measurement_sizes:
    level = 0.6 # Set the IQ imbalance level to around 33 dB IRR
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    variance = signal_variance/SNR[17] # Set the SNR to 17 dB
    # Initialize model
    hidden_dims = np.array([60,80])
    measurement_autoencoder_model = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
    optimizer = torch.optim.Adam(measurement_autoencoder_model.parameters(), lr=1E-3, betas=(0.9,0.999))
    loss_fn = nn.MSELoss()

    # def complex_mse_loss(input, target):
    #     return F.mse_loss(input, target)

    # Training loop
    losses = []
    lowest_loss = float("inf")
    for epoch in range(10000):
        for batch in dataloader:
            inputs, targets = batch  # Unpack the tuple
            optimizer.zero_grad()
            output = measurement_autoencoder_model(inputs)
            loss = loss_fn(output, targets)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
        if loss< lowest_loss:
            lowest_loss = loss
            early_stopping_counter = 0
            best_model = measurement_autoencoder_model
        else:
            early_stopping_counter += 1
            if early_stopping_counter > 100:
                measurement_autoencoder_model = best_model
                print(f"Stopped early after {epoch+1} epochs, with loss of {lowest_loss:.6f}")
                break
        print(f"Encoding dimension:{encoding_dim}, Epoch {epoch+1}, Loss: {loss.item():.6f}")
    measurement_models.append(best_model)
    measurement_losses.append(lowest_loss)


# Save the measurement models

In [None]:
measurement_sizes = [5, 10, 20, 30, 40, 50]
for indx, encoding_dim in enumerate(measurement_sizes):
    torch.save(measurement_models[indx].state_dict(), f"measurement_model_{encoding_dim}_{min_sparsity}-{max_sparsity}.pt")


# Plotting

In [None]:
plt.rcParams['text.usetex'] = False
SNRkeys = SNR.keys()
plt.plot(SNRkeys, noisy_losses)
plt.title("Noisy Autoencoder Loss vs SNR")
plt.xlabel("SNR")
plt.ylabel("RMSE")
plt.show()


In [None]:
imbalanced_losses_np = []
for loss in imbalanced_losses:
    imbalanced_losses_np.append(torch.Tensor.detach(loss))

db_IRR_ratios = []

for level in imbalance_levels:
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    r = 0.5*(1+b*np.exp(1j*d))
    IRR_ratio = (np.abs(r)**2)/(np.abs(1-r)**2)
    IRR_ratios[level] = 10*np.log10(IRR_ratio)
    db_IRR_ratios.append(10*np.log10(IRR_ratio))

print(db_IRR_ratios)
print(imbalanced_losses_np)
print(len(db_IRR_ratios))
plt.figure()
plt.plot(db_IRR_ratios, imbalanced_losses_np)
plt.title("Imbalanced Autoencoder")
plt.xlabel("IRR Ratio ($dB$)")
plt.ylabel("RMSE Loss")
plt.show()

In [None]:
# Test IQ Imbalance model

# Generate a vector
h, x = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,1)

H = np.concatenate((h.real,h.imag)).T

H_tensor = torch.tensor(H,dtype=torch.float)

imbalanced_autoencoder_model(H_tensor)

# Generate new dataset, initialize pretrained models, evaluate the loss and plot

In [None]:
noisy_pretrained_models = {}
imbalanced_pretrained_models = {}
measurement_pretrained_models = {}

# Define the SNR dictionary for use
db_list = [2,5,8,11,14,17,20]
signal_variance = 133 # Found by measuring empirically what the variance of the signal is once transformed from sparse signal
SNR = {}

# Then define the absolute values of the SNR ratio
for db_ratio in db_list:
    SNR[db_ratio] = 10**(db_ratio/10)

#Initialize pretrained noisy models
for db,abs in SNR.items():
    encoding_dim = 50
    variance = signal_variance/abs
    # Initialize model
    hidden_dims = np.array([60,80])
    noisy_pretrained_models[db] = LearnedAutoencoderWithNoise(vector_size,encoding_dim,hidden_dims,variance)
    noisy_pretrained_models[db].load_state_dict(torch.load(f"noisy_model_{db}.pt", weights_only=True))

# Initialize pretrained imbalanced_models
for level, db  in IRR_ratios.items():
    encoding_dim = 50
    variance = 0
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    # Initialize model
    hidden_dims = np.array([60,80])
    imbalanced_pretrained_models[level] = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
    imbalanced_pretrained_models[level].load_state_dict(torch.load(f"imbalanced_model_{db:.3f}.pt", weights_only=True))

# Initialize pretrained imbalanced_models
for encoding_dim  in measurement_sizes:
    variance = signal_variance/SNR[17]
    level = 0.6
    b = 1 - (0.2 * level)
    d = level * np.pi/8
    # Initialize model
    hidden_dims = np.array([60,80])
    measurement_pretrained_models[encoding_dim] = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
    measurement_pretrained_models[encoding_dim].load_state_dict(torch.load(f"measurement_model_{encoding_dim}.pt", weights_only=True))


In [None]:
# Generate dataset
max_amplitude = 100
min_sparsity = 7
max_sparsity = 9
vector_size = 100
data_set_size = 10000
val_dense_data, val_sparse_data = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,data_set_size)

print(val_sparse_data[0,:])

X_val = np.concatenate((val_dense_data.real,val_dense_data.imag)).T
Y_val = np.concatenate((val_dense_data.real,val_dense_data.imag)).T

X_val_tensor = torch.tensor(X_val,dtype=torch.float)
Y_val_tensor = torch.tensor(Y_val,dtype=torch.float)
val_dataset = TensorDataset(X_val_tensor,Y_val_tensor)

dataloader_val = DataLoader(val_dataset,batch_size = 500,shuffle = True, )

loss_fn = nn.MSELoss()


In [None]:

# Validating the noisy models
noisy_val_losses = []
noisy_model_losses = []

with torch.no_grad():
    for db, noisy_model in noisy_pretrained_models.items():
        noisy_model.eval()
        for batch in dataloader_val:
            inputs, targets = batch  # Unpack the tuple
            output = noisy_model(inputs)
            loss = loss_fn(output, targets)
            noisy_model_losses.append(loss.item())
        noisy_val_losses.append(np.average(noisy_model_losses))
        noisy_model_losses = []

noisy_val_losses = np.array(noisy_val_losses)
normalized_noisy_val_losses = noisy_val_losses/signal_variance
print(noisy_val_losses)


In [None]:
# Validating the IQ imbalance models
imbalance_model_losses = []
imbalance_val_losses = []

with torch.no_grad():
    for level, imbalance_model in imbalanced_pretrained_models.items():
        imbalance_model.eval()
        for batch in dataloader_val:
            inputs, targets = batch  # Unpack the tuple
            output = noisy_model(inputs)
            loss = loss_fn(output, targets)
            imbalance_model_losses.append(loss.item())
        imbalance_val_losses.append(np.average(imbalance_model_losses))
        imbalance_model_losses = []

imbalance_val_losses = np.array(imbalance_val_losses)
normalized_imbalance_val_losses = imbalance_val_losses/signal_variance
print(imbalance_val_losses)


In [None]:
# Validating the varying measurement dimensions models
measurement_model_losses = []
measurement_val_losses = []

with torch.no_grad():
    for encoding_dim, measurement_model in measurement_pretrained_models.items():
        measurement_model.eval()
        for batch in dataloader_val:
            inputs, targets = batch  # Unpack the tuple
            output = measurement_model(inputs)
            loss = loss_fn(output, targets)
            measurement_model_losses.append(loss.item())
        measurement_val_losses.append(np.average(measurement_model_losses))
        print(f"Encoding dimenson:{encoding_dim} Losses:{measurement_model_losses}")
        measurement_model_losses = []

measurement_val_losses = np.array(measurement_val_losses)
normalized_measurement_val_losses = measurement_val_losses/signal_variance


In [None]:
noiseless_loss = normalized_imbalance_val_losses[0]

plt.style.use('ggplot')
fig1, (ax1, ax2, ax3) = plt.subplots(ncols=3,nrows = 1,figsize=(18, 6))
ax1.plot(SNR.keys(),normalized_noisy_val_losses,color= "blue", marker="o")
ax1.plot(SNR.keys(), [noiseless_loss for i in SNR.keys()])
ax1.set_xlabel("SNR $(dB)$")
ax1.set_ylabel("NMSE")
ax1.set_title("Noisy Model Performance")
ax1.grid(True)

ax2.plot(IRR_ratios.values(),normalized_imbalance_val_losses,color= 'red',marker='s')
ax2.plot(IRR_ratios.values(), [noiseless_loss for i in IRR_ratios.values()])
ax2.set_xlabel("IRR $(dB)$")
ax2.set_title("IQ Imbalanced Model Performance")

ax1.plot(SNR.keys(), [noiseless_loss for i in SNR.keys()])
ax2.grid(True)

ax3.plot(measurement_sizes,normalized_measurement_val_losses,color= 'green',marker='^')
ax3.plot(measurement_sizes, [noiseless_loss for i in measurement_sizes], color='green')
ax3.set_xlabel("Measurement Dimension")
ax3.set_title("Measurement Model Performance")
ax3.grid(True)


## Create a new loss function to drive the values of the measurement matrix to discrete values $\{-\pi,-\frac{1}{2}\pi,0,\frac{1}{2}\pi,\pi\}$

In [None]:
def discreteLossPoly(qweights,scaleFactor):
    loss = 0
    pi = torch.tensor(math.pi)
    # Note that we need to flatten the weights so that our iteration does not result in us iterating over the rows instead of the weights
    qVec = qweights.flatten()
    # Efficient implementation of the loss function, by doing vector operations, saves a lot of time in training
    loss += torch.linalg.vector_norm(qVec*(qVec-1/2*pi)*(qVec-1*pi)*(qVec+pi)*(qVec+1/2*pi),1)
    loss = loss*scaleFactor # Scale the resulting loss
    return loss

In [None]:
sparsity_ranges = [(3, 5), (5, 7), (7, 9), (10, 30)]
measurement_sizes = [5, 10, 20, 30, 40, 50]
# Define the SNR dictionary for use
db_list = [2,5,8,11,14,17,20]
signal_variance = 133 # Found by measuring empirically what the variance of the signal is once transformed from sparse signal
SNR = {}

# Then define the absolute values of the SNR ratio
for db_ratio in db_list:
    SNR[db_ratio] = 10**(db_ratio/10)

#empty dicts to store models in
noisy_pretrained_models = {}
imbalanced_pretrained_models = {}
measurement_pretrained_models = {}

for i, (min_spars, max_spars) in enumerate(sparsity_ranges):

    #Initialize pretrained noisy models
    for db,abs in SNR.items():
        encoding_dim = 50
        variance = signal_variance/abs
        # Initialize model
        hidden_dims = np.array([60,80])
        noisy_pretrained_models[(i, db)] = LearnedAutoencoderWithNoise(vector_size,encoding_dim,hidden_dims,variance)
        noisy_pretrained_models[(i, db)].load_state_dict(torch.load(f"noisy_model_{db}_{min_spars}-{max_spars}.pt", weights_only=True))

    # Initialize pretrained imbalanced_models
    for level, db  in IRR_ratios.items():
        encoding_dim = 50
        variance = 0
        b = 1 - (0.2 * level)
        d = level * np.pi/8
        # Initialize model
        hidden_dims = np.array([60,80])
        imbalanced_pretrained_models[(i, level)] = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
        imbalanced_pretrained_models[(i, level)].load_state_dict(torch.load(f"imbalanced_model_{level:.3f}_{min_spars}-{max_spars}.pt", weights_only=True))

    # Initialize pretrained measurement models
    for encoding_dim  in measurement_sizes:
        variance = signal_variance/SNR[17]
        level = 0.6
        b = 1 - (0.2 * level)
        d = level * np.pi/8
        # Initialize model
        hidden_dims = np.array([60,80])
        measurement_pretrained_models[(i, encoding_dim)] = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
        measurement_pretrained_models[(i, encoding_dim)].load_state_dict(torch.load(f"measurement_model_{encoding_dim}_{min_spars}-{max_spars}.pt", weights_only=True))




In [18]:

# Generate dataset
def Generate_Dataloader(max_amplitude,min_sparsity,max_sparsity,vector_size,data_set_size):
    val_dense_data, val_sparse_data = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,data_set_size)

    X_val = np.concatenate((val_dense_data.real,val_dense_data.imag)).T
    Y_val = np.concatenate((val_dense_data.real,val_dense_data.imag)).T

    X_val_tensor = torch.tensor(X_val,dtype=torch.float)
    Y_val_tensor = torch.tensor(Y_val,dtype=torch.float)
    val_dataset = TensorDataset(X_val_tensor,Y_val_tensor)

    dataloader_val = DataLoader(val_dataset,batch_size = 500,shuffle = True, )
    variance = np.var(Y_val)
    return dataloader_val, variance



In [None]:

all_noisy_losses = []
all_imbalanced_losses = []
all_measurement_losses = []


for i, (min_spars, max_spars) in enumerate(sparsity_ranges):
    dataloader_val, signal_variance = Generate_Dataloader(max_amplitude, min_spars, max_spars, vector_size, data_set_size)
    # Evaluate noisy models
    noisy_val_losses = []
    noisy_model_losses = []
    with torch.no_grad():
        for db, abs in SNR.items():
            noisy_model = noisy_pretrained_models[(i, db)]
            noisy_model.eval()
            for batch in dataloader_val:
                inputs, targets = batch  # Unpack the tuple
                output = noisy_model(inputs)
                loss = loss_fn(output, targets)
                noisy_model_losses.append(loss.item())
            noisy_val_losses.append(np.average(noisy_model_losses))
            noisy_model_losses = []

    noisy_val_losses = np.array(noisy_val_losses)
    normalized_noisy_val_losses = noisy_val_losses/signal_variance
    all_noisy_losses.append(normalized_noisy_val_losses)

    # Evaluate Imbalanced models
    imbalance_model_losses = []
    imbalance_val_losses = []

    with torch.no_grad():
        for level, db in IRR_ratios.items():
            imbalance_model = imbalanced_pretrained_models[(i, level)]
            imbalance_model.eval()
            for batch in dataloader_val:
                inputs, targets = batch  # Unpack the tuple
                output = imbalance_model(inputs)
                loss = loss_fn(output, targets)
                imbalance_model_losses.append(loss.item())
            imbalance_val_losses.append(np.average(imbalance_model_losses))
            imbalance_model_losses = []

    imbalance_val_losses = np.array(imbalance_val_losses)
    normalized_imbalance_val_losses = imbalance_val_losses/signal_variance
    all_imbalanced_losses.append(normalized_imbalance_val_losses)

    # Evaluate models with varying measurement sizes
    measurement_model_losses = []
    measurement_val_losses = []

    with (torch.no_grad()):
        for encoding_dim in measurement_sizes:
            measurement_model = measurement_pretrained_models[(i, encoding_dim)]
            measurement_model.eval()
            for batch in dataloader_val:
                inputs, targets = batch  # Unpack the tuple
                output = measurement_model(inputs)
                loss = loss_fn(output, targets)
                measurement_model_losses.append(loss.item())
            measurement_val_losses.append(np.average(measurement_model_losses))
            measurement_model_losses = []

    measurement_val_losses = np.array(measurement_val_losses)
    normalized_measurement_val_losses = measurement_val_losses/signal_variance
    all_measurement_losses.append(normalized_measurement_val_losses)



In [None]:
plt.style.use('ggplot')
fig1, (ax1, ax2, ax3) = plt.subplots(ncols=3,nrows = 1,figsize=(18, 6))

for i in range(4):
    if i==0:
        noiseless_loss = all_imbalanced_losses[0][0]
        ax1.plot(SNR.keys(), [noiseless_loss for i in SNR.keys()])
        ax2.plot(IRR_ratios.values(), [noiseless_loss for i in IRR_ratios.values()])
        ax3.plot(measurement_sizes, [noiseless_loss for i in measurement_sizes])

    ax1.plot(SNR.keys(), all_noisy_losses[i], marker="o")
    ax1.set_xlabel("SNR $(dB)$")
    ax1.set_ylabel("NMSE")
    ax1.set_title("Noisy Model Performance")
    ax1.grid(True)
    ax1.legend(["baseline model", "sparsity 3-5", "sparsity 5-7","sparsity 7-9", "sparsity 10-30"])

    ax2.plot(IRR_ratios.values(),all_imbalanced_losses[i],marker='s')
    ax2.set_xlabel("IRR $(dB)$")
    ax2.set_title("IQ Imbalanced Model Performance")
    ax2.legend(["baseline model", "sparsity 3-5","sparsity 5-7","sparsity 7-9", "sparsity 10-30"])
    ax2.grid(True)

    ax3.plot(measurement_sizes, all_measurement_losses[i],marker='^')
    ax3.set_xlabel("Measurement Dimension")
    ax3.set_title("Measurement Model Performance")
    ax3.legend(["baseline model", "sparsity 3-5","sparsity 5-7","sparsity 7-9", "sparsity 10-30"])
    ax3.grid(True)

plt.show()



## Train neural network using new loss function

In [None]:
vector_size = 100
encoding_dim = 50
hidden_dims = np.array([60,80])
discrete_autoencoder_model = LearnedAutoencoder(vector_size,encoding_dim,hidden_dims)
optimizer = torch.optim.Adam(discrete_autoencoder_model.parameters(), lr=1E-3, betas=(0.9,0.999))
MSELossfn = nn.MSELoss()
scaleFactor = 0.05 # Hyperparameter, setting this too high causes the problem to not converge to low loss, due to the problem converging to discrete values too early, setting too low causes the
# values to not converge to discrete values. Empirical testing showed that scaleFactor of 0.05 was nice

# Training loop
losses = []
lowest_loss = float("inf")
for epoch in range(5000):
    for batch in dataloader:
        inputs, targets = batch  # Unpack the tuple
        optimizer.zero_grad()
        output = discrete_autoencoder_model(inputs)
        qweights = discrete_autoencoder_model.encoder.q_values
        loss = discreteLossPoly(qweights,scaleFactor) + MSELossfn(output,targets)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
    if loss < lowest_loss:
        lowest_loss = loss.item()
        early_stopping_counter = 0
        best_model = discrete_autoencoder_model
    else:
        early_stopping_counter += 1
        if early_stopping_counter > 100:
            discrete_autoencoder_model = best_model
            print(f"stopped early after {epoch+1} epochs, with a loss of: {lowest_loss}")
            break

    print(f"Epoch {epoch+1}, Loss: {lowest_loss:.6f}")

plt.plot(losses)
plt.show()

## Show the values of the weights, they should be $\{0,\pm1.57079,\pm3.14159\}$

In [None]:
print(discrete_autoencoder_model.encoder.q_values[0:20,0:20])
discreteLossPoly(X_tensor[0,:],X_tensor[0,:],qweights,MSELossfn,scaleFactor)

## Now we map all the values to actual discrete values

In [None]:
def mapToDiscreteValues(weights,discrete_values):
    # Input is a tensor (possibly a matrix) of weights, and a np array of discrete values
    discrete_values = discrete_values.flatten()
    weights_np = weights.detach().cpu().numpy() # Convert to numpy array
    shape = weights_np.shape
    weights_vector = np.reshape(weights_np,(-1,1)) # flatten the matrix to a vector such that subtracting from the discrete values results in a matrix!
    
    # Create a matrix of distances, then make a vector of indices from this matrix. Each value of the vector is the index of the closest discrete value
    distances = np.abs(weights_vector - discrete_values)
    indices = np.argmin(distances,1)

    # Map the weights to the closest discrete values and reshape into original matrix, and turn into a nn.Parameter object
    mappedWeights = discrete_values[indices]
    mappedWeights = np.reshape(mappedWeights,shape)
    mappedWeights = np.float32(mappedWeights) # Notice we map it to a float because that is what is used for our model
    mappedWeights = nn.Parameter(torch.from_numpy(mappedWeights))
    return mappedWeights

# Testing

qweights = discrete_autoencoder_model.encoder.q_values
discrete_values = np.array([-np.pi, -0.5*np.pi,0,0.5*np.pi,np.pi])
mapped_q_weights = mapToDiscreteValues(qweights,discrete_values)


mapped_discrete_autoencoder_model = discrete_autoencoder_model
mapped_discrete_autoencoder_model.encoder.q_values = mapped_q_weights


## Save the model

In [None]:
torch.save(mapped_discrete_autoencoder_model.state_dict(), f"discrete_model.pt")

## Load discrete model

In [None]:
discrete_model = LearnedAutoencoder(vector_size,encoding_dim,hidden_dims)
# Load the state dictionary
discrete_model.load_state_dict(torch.load("discrete_model.pt"))

## Finally we test on a single data point and map the result!

In [None]:
# Here we generate a test vector from our buildDataSet function, put it through the model and look at the output
h, x = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,1)

H = np.concatenate((h.real,h.imag)).T

H_tensor = torch.tensor(H,dtype=torch.float)

H_hat = discrete_model(H_tensor)

h_hat = np.array(H_hat.detach())

h_real,h_imag = np.split(h_hat,2,1)
h_hat = h_real + 1j*h_imag
h_hat = h_hat.reshape(-1,1)
DFT = sp.linalg.dft(vector_size)/np.sqrt(vector_size)
iDFT = DFT.conj().T


x_hat = iDFT@h_hat
indices = range(len(x_hat))

plt.vlines(indices,0,x,linewidth=3)
plt.vlines(indices,0,x_hat,colors="orange")

plt.legend(("$x$","$\hat{x}$"))
plt.show()

## And then we test on the validation data set!

In [None]:
discrete_model_losses = []
with torch.no_grad():
    discrete_model.eval()
    for batch in dataloader_val:
        inputs, targets = batch  # Unpack the tuple
        output = discrete_model(inputs)
        loss = loss_fn(output, targets)
        discrete_model_losses.append(loss.item())
    discrete_loss = np.average(discrete_model_losses)
signal_variance = 133
normalized_discrete_val_loss = discrete_loss/signal_variance
print(normalized_discrete_val_loss)

## Code works, so we train the model for the two best case scenario's since at a certain point the model does not work anymore.

In [None]:
def trainModelsForDiscreteSet(dataloader,SNR_values,imb_percentages,encoding_dims,signal_variance = 133,hidden_dims=[60,80],scale_factor=0.05):
    # Function takes as inputs:
    # dataloader: The dataloader object of the training data set
    # SNR_values: Signal to noise ratios
    # imb_percentages: imbalance percentages
    # encoding_dims: Encoding dimensions
    # signal_variance: The variance of the original signal
    # hidden_dims: Hidden dimensions for the neural network
    # scale_factor: Hyperparameter for the discretization step
    models = []
    discrete_values = np.array([-np.pi, -0.5*np.pi,0,0.5*np.pi,np.pi])
    for model_num,(SNR,imb_percentage,encoding_dim) in enumerate(zip(SNR_values,imb_percentages,encoding_dims)):
        abs_noise_ratio = 10**(SNR/10)
        variance = signal_variance/abs_noise_ratio
        b = 1 - (0.2 * imb_percentage)
        d = imb_percentage * np.pi/8
        hidden_dims = np.array([60,80])
        current_training_model = LearnedAutoencoderWithIQImbalance(vector_size,encoding_dim,hidden_dims,b,d,variance)
        optimizer = torch.optim.Adam(current_training_model.parameters(), lr=1E-3, betas=(0.9,0.999))
        MSEloss_fn = nn.MSELoss()

        # Training loop
        losses = []
        lowest_loss = float("inf")
        for epoch in range(10000):
            for batch in dataloader:
                inputs, targets = batch  # Unpack the tuple
                optimizer.zero_grad()
                output = current_training_model(inputs)
                qweights = current_training_model.encoder.q_values
                loss = discreteLossPoly(qweights,scale_factor) + MSEloss_fn(output, targets)
                loss.backward()
                optimizer.step()
                losses.append(loss.item())
            if loss< lowest_loss:
                lowest_loss = loss
                early_stopping_counter = 0
                best_model = current_training_model
            else:
                early_stopping_counter += 1
                if early_stopping_counter > 100:
                    current_training_model = best_model
                    print(f"Stopped early after {epoch+1} epochs, with loss of {lowest_loss:.6f}")
                    break
            print(f"SNR:{SNR}, Imbalance Percentage:{imb_percentage}, Encoding dimension:{encoding_dim}, Epoch {epoch+1}, Loss: {loss.item():.6f}")
        best_qvalues = best_model.encoder.q_values
        mapped_best_qvalues = mapToDiscreteValues(best_qvalues,discrete_values)
        best_model.encoder.q_values = mapped_best_qvalues
        models.append(best_model)
        losses.append(lowest_loss)
    return models

def validateModels(dataloader,models,signal_variance=133):
    models_losses = []

    with torch.no_grad():
        for model in models:
            current_model_losses = []
            model.eval()
            for batch in dataloader:
                inputs, targets = batch  # Unpack the tuple
                output = model(inputs)
                loss = loss_fn(output, targets)
                current_model_losses.append(loss.item())
            models_losses.append(np.average(current_model_losses))

    models_losses = np.array(models_losses)
    normalized_models_losses = models_losses/signal_variance
    return normalized_models_losses,models_losses

def visualizeReconstruction(model,max_amplitude=100,min_sparsity=3,max_sparsity=5,vector_size=100):
    h, x = buildDataSet(max_amplitude,min_sparsity,max_sparsity,vector_size,1)

    H = np.concatenate((h.real,h.imag)).T

    H_tensor = torch.tensor(H,dtype=torch.float)

    H_hat = model(H_tensor)

    h_hat = np.array(H_hat.detach())

    h_real,h_imag = np.split(h_hat,2,1)
    h_hat = h_real + 1j*h_imag
    h_hat = h_hat.reshape(-1,1)
    DFT = sp.linalg.dft(vector_size)/np.sqrt(vector_size)
    iDFT = DFT.conj().T


    x_hat = iDFT@h_hat
    indices = range(len(x_hat))

    plt.vlines(indices,0,x,linewidth=3)
    plt.vlines(indices,0,x_hat,colors="orange")

    plt.legend(("x","x_hat"))

In [None]:
# We train six models
encoding_dims_list = [50, 40, 30, 20, 10, 5, 50, 50, 50, 50,50,50,50,50,50,50,50,50,50]
SNR_list = [np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, 20, 17, 14, 11, 8, 5, 2, np.inf, np.inf,np.inf, np.inf,np.inf, np.inf]
imb_percentage_list = [0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0.04, 0.1, 0.3, 0.6, 1]

print(len(encoding_dims_list))
print(len(SNR_list))
print(len(imb_percentage_list))

In [None]:

discrete_models = trainModelsForDiscreteSet(dataloader,SNR_list,imb_percentage_list,encoding_dims_list,scale_factor=0.01)


In [None]:

normalized_losses, unnormalized_losses = validateModels(dataloader_val,discrete_models)
print(normalized_losses)

visualizeReconstruction(discrete_models[0])

In [None]:
for iter,model in enumerate(discrete_models):
    torch.save(model.state_dict(),
               f'Models/discrete_models/discrete_model_SNR{SNR_list[iter]}_IRR{imb_percentage_list[iter]}_enc{encoding_dims_list[iter]}.pt')

In [None]:
imb_db_list = []
r_list = []

for perc in imb_percentage_list[13:19]:
    b = 1-0.2*perc
    d = np.pi/8*perc
    r = 0.5*(1+b*np.exp(1j*d))
    IRR_abs = np.abs(r)**2/np.abs(1-r)**2
    imb_db_list.append(10*np.log10(IRR_abs))


print(imb_db_list)

## Plotting!

In [None]:
plt.style.use('ggplot')
fig1, (ax1, ax2, ax3) = plt.subplots(ncols=3,nrows = 1,figsize=(18, 6))


ax1.plot(SNR_list[6:13], normalized_losses[6:13],marker="o",color="g")
ax2.plot(imb_db_list, normalized_losses[13:20],marker='s',color='b')
ax3.plot(encoding_dims_list[0:6], normalized_losses[0:6],marker='^',color='r')

ax1.set_xlabel("SNR $(dB)$")
ax1.set_ylabel("NMSE")
ax1.set_title("Noisy Model Performance")
ax1.grid(True)
ax1.legend(["Discrete Model"])

ax2.set_xlabel("IRR $(dB)$")
ax2.set_title("IQ Imbalanced Model Performance")
ax2.legend(["Discrete Model"])
ax2.grid(True)

ax3.set_xlabel("Measurement Dimension")
ax3.set_title("Measurement Model Performance")
ax3.legend(["Discrete Model"])
ax3.grid(True)
plt.savefig("Images/discrete_model_performance.pdf", format="pdf", bbox_inches="tight")
plt.show()


# Find RIP from Model

In [None]:
import itertools

# Create an array of 100 numbers (0 to 99)
numbers = list(range(100))
RIC = {}

# Generate all possible 3-length combinations
for model in discrete_models:
    max_RIC = 0
    print(model)
    # Get q-values and create the complex matrix W
    qvalues = model.encoder.q_values.data.numpy()
    W = np.e**(1j * qvalues)
    
    # Normalize each column so they have unit norm
    col_norms = np.linalg.norm(W, axis=0)
    diag_norm_matrix = np.diag(col_norms)
    W_normalized = W @ np.linalg.inv(diag_norm_matrix)
    
    for combo in itertools.combinations(numbers, 3):
        # Select the columns specified by the combination
        W_cols = W_normalized[:, combo]
        mod_mat = W_cols.T @ W_cols

        # Compute eigenvalues of the Gram matrix
        eig_vals, _ = np.linalg.eig(mod_mat)
        eigenvalues = np.abs(eig_vals)  # They should be real and close to 1
        
        # Compute deviations from 1
        lower_deviation = 1 - np.min(eigenvalues)
        upper_deviation = np.max(eigenvalues) - 1
        temp_RIC = max(lower_deviation, upper_deviation)
        
        if temp_RIC > max_RIC:
            max_RIC = temp_RIC

    RIC[model] = max_RIC

In [None]:
print(RIC.values())

## Calculate coherences

In [None]:
models = discrete_models

mu = {}
for model in discrete_models:
    qvalues = model.encoder.q_values.data.numpy()
    W = np.e**(1j * qvalues)
    
    # Normalize each column so they have unit norm
    col_norms = np.linalg.norm(W, axis=0)
    diag_norm_matrix = np.diag(col_norms)
    W_normalized = W @ np.linalg.inv(diag_norm_matrix)
    W_dotprod = np.abs(W_normalized.T@W_normalized)
    W_no_diag = W_dotprod - np.diag(np.diag(W_dotprod))
    mu[model] = np.max(W_no_diag)


print(mu.values())
