In [None]:
import datetime
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split

# Constants
Sampling_Interval = 1e-3  # Sampling interval
Count_Samples = 1000  # Number of samples
Sampling_Frequency = 1 / Sampling_Interval  # Sampling frequency (1MHz)
Frequency_Resolution = Sampling_Frequency / Count_Samples  # Frequency resolution (1)
Count_Input_Frequencies = 22
Count_Output_Frequencies = 202
Linearity_Factor = 0.3

# Carrier and modulation parameters
Carrier_Frequency = 100  # Carrier frequency in MHz
Frequency_Spacing = 1  # Frequency spacing for modulation
Count_Frequencies = 200  # Number of modulating frequencies

Time_Axis = np.arange(Count_Samples) * Sampling_Interval  # Time axis
Frequency_Vector = np.arange(Count_Samples // 2) * Frequency_Resolution  # Frequency vector
# AWG_Frequencies = np.array([99, 100, 101])  # frequencies in MHz
AWG_Frequencies = np.array([95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105])  # frequencies in MHz
AWG_Frequencies_Indices = (Count_Frequencies/2 + (AWG_Frequencies - Carrier_Frequency) / Frequency_Spacing).astype(int)  # indices in fm
Frequencies = Carrier_Frequency + np.arange(-Count_Frequencies / 2, Count_Frequencies / 2 + 1) * Frequency_Spacing

# Training
seed = 42
Count_Training_Samples = 10_000

In [None]:
def generate_awg_time_signal(time_axis, awg_frequencies_indices, frequencies, awg_freqs_real, awg_freqs_imag):
    awg_time_signal = np.zeros_like(time_axis)
    for index in awg_frequencies_indices:
        awg_time_signal += awg_freqs_real[index] * np.cos(2 * np.pi * frequencies[index] * time_axis) - awg_freqs_imag[index] * np.sin(2 * np.pi * frequencies[index] * time_axis)

    return awg_time_signal

def simulate_aom(awg_time_signal):
    return np.sin(2 * np.pi * awg_time_signal)

def fft(time_signal):
    spectrum = np.fft.fft(time_signal) / (Count_Samples / 2)
    real_part = np.real(spectrum[:Count_Samples // 2])
    imag_part = np.imag(spectrum[:Count_Samples // 2])
    return real_part, imag_part

def mask_fft(real_part, imag_part):
    f_min = 50  # MHz
    f_max = 150  # MHz
    mask = (Frequency_Vector >= f_min) & (Frequency_Vector <= f_max)
    real_part_masked = real_part[mask]
    imag_part_masked = imag_part[mask]
    return real_part_masked, imag_part_masked

def generate_input():
    vec = np.random.uniform(-0.5, 0.5, Count_Input_Frequencies)
    vec =  Linearity_Factor * vec / np.linalg.norm(vec)
    awg_input_a = vec[:(Count_Input_Frequencies // 2)]
    awg_input_b = vec[(Count_Input_Frequencies // 2):]
    return awg_input_a, awg_input_b

def generate_output(awg_input_a, awg_input_b):
    an = np.zeros_like(Frequencies)
    bn = np.zeros_like(Frequencies)

    an[AWG_Frequencies_Indices] = awg_input_a
    bn[AWG_Frequencies_Indices] = awg_input_b

    awg_time_signal = generate_awg_time_signal(Time_Axis, AWG_Frequencies_Indices, Frequencies, an, bn)
    aom_time_signal = simulate_aom(awg_time_signal)

    (real_part, imag_part) = fft(aom_time_signal)
    (real_part_masked, imag_part_masked) = mask_fft(real_part, imag_part)

    return real_part_masked, imag_part_masked

def generate_samples(number_samples = 10000):
    inputs = []
    outputs = []
    for _ in range(number_samples):
        (awg_input_a, awg_input_b) = generate_input()
        inputs.append(np.concatenate((awg_input_a, awg_input_b)))

        (real_part_masked, imag_part_masked) = generate_output(awg_input_a, awg_input_b)
        outputs.append(np.concatenate((real_part_masked, imag_part_masked)))

    return np.array(inputs), np.array(outputs)

In [None]:
# Dataset
class SpectrumDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y
    def __len__(self): return len(self.X)
    def __getitem__(self, i): return self.X[i], self.Y[i]


# Model
class SpectrumMLP(nn.Module):
    def __init__(self, layer_1_size = 256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(Count_Output_Frequencies, layer_1_size),
            nn.BatchNorm1d(layer_1_size),
            nn.LeakyReLU(),
            # nn.Dropout(0.4),

            # nn.Linear(layer_1_size, layer_1_size),
            # nn.BatchNorm1d(layer_1_size),
            # nn.LeakyReLU(),
            #
            # nn.Linear(layer_1_size, layer_1_size),
            # nn.BatchNorm1d(layer_1_size),
            # nn.LeakyReLU(),

            nn.Linear(layer_1_size, 128),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(),
            # nn.Dropout(0.4),

            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(),

            # nn.Linear(64, 6)
            nn.Linear(64, Count_Input_Frequencies)
        )
    def forward(self, x): return self.net(x)

# Train function
def train_model(model, train_loader, val_loader):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)
    loss_fn = nn.MSELoss()
    # loss_fn = nn.L1Loss()

    train_losses = []
    val_losses = []

    for epoch in range(40):
        model.train()
        train_loss = 0
        for xb, yb in train_loader:
            preds = model(xb)
            loss = loss_fn(preds, yb)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item() * xb.size(0)

        train_loss /= len(train_loader.dataset)
        train_losses.append(train_loss)

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for xb, yb in val_loader:
                preds = model(xb)
                val_loss += loss_fn(preds, yb).item() * xb.size(0)
        val_loss /= len(val_loader.dataset)
        val_losses.append(val_loss)

        scheduler.step(val_loss)

        print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | Epoch {epoch+1:2d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    plt.figure(figsize=(8, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Learning Curve')
    plt.legend()
    plt.grid()
    plt.show()

    return train_losses, val_losses

def predict_output(model, real_part_masked, imag_part_masked):
    input_vector = np.concatenate((real_part_masked, imag_part_masked))
    input_vector = input_vector.reshape(1, -1)  # reshape to (1, length)
    input_tensor = torch.tensor(input_vector, dtype=torch.float32)
    model.eval()  # set the model to evaluation mode
    with torch.no_grad():
        predicted = model(input_tensor).numpy().flatten()
    predicted_a = predicted[:(Count_Input_Frequencies // 2)]
    predicted_b = predicted[(Count_Input_Frequencies // 2):]
    return predicted_a, predicted_b

def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def evaluate_layer_sizes(layer_sizes):
    train_losses = []
    val_losses = []

    for size in layer_sizes:
        set_seed(seed)
        torch.use_deterministic_algorithms(True)
        Y, X = generate_samples(10000)
        X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2)
        train_loader = DataLoader(SpectrumDataset(X_train, Y_train), batch_size=64, shuffle=True)
        val_loader = DataLoader(SpectrumDataset(X_val, Y_val), batch_size=64)
        model = SpectrumMLP(size)
        train_loss, val_loss = train_model(model, train_loader, val_loader)
        train_losses.append(train_loss[-1])
        val_losses.append(val_loss[-1])

    return train_losses, val_losses

In [None]:
(X, Y) = generate_samples(Count_Training_Samples)
X = torch.tensor(X, dtype=torch.float32)
Y = torch.tensor(Y, dtype=torch.float32)
torch.save((X, Y), f"{Count_Training_Samples}_{Linearity_Factor}.pt")

In [None]:
# set_seed(seed)
# torch.use_deterministic_algorithms(True)
Y, X = torch.load('100000_0.3.pt')

# Split
X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2)

train_loader = DataLoader(SpectrumDataset(X_train, Y_train), batch_size=64, shuffle=True)
val_loader = DataLoader(SpectrumDataset(X_val, Y_val), batch_size=64)

model = SpectrumMLP(1024)
(train_losses, val_losses) = train_model(model, train_loader, val_loader)

In [None]:
layer_sizes = [256, 512, 1024, 2048, 4096, 8192]
(train_losses, val_losses) = evaluate_layer_sizes(layer_sizes)

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(layer_sizes, train_losses, marker='o', label='Train Loss')
plt.plot(layer_sizes, val_losses, marker='o', label='Validation Loss')
plt.xlabel('Layer Size')
plt.ylabel('Loss')
plt.title('Loss vs Layer Size')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# awg_input_a = [0, 0, 0, 0, 0.05, -0.05, 0.025, 0, 0, 0, 0]
# awg_input_b = [0, 0, 0, 0, -0.025, 0.05, -0.05, 0, 0, 0, 0]

# awg_input_a = [0, 0, 0, 0, 0.1, -0.5, 0.2, 0, 0, 0, 0]
# awg_input_b = [0, 0, 0, 0, -0.2, 0.5, -0.1, 0, 0, 0, 0]

(awg_input_a, awg_input_b) = generate_input()

(real_part_masked, imag_part_masked) = generate_output(awg_input_a, awg_input_b)

(awg_input_a_pred, awg_input_b_pred) = predict_output(model, real_part_masked, imag_part_masked)

(real_part_masked_pred, imag_part_masked_pred) = generate_output(awg_input_a_pred, awg_input_b_pred)

In [None]:
x_vals_input = np.arange(len(awg_input_a)) + 95
x_vals_output = np.arange(len(real_part_masked)) + 50


fig, axs = plt.subplots(2, 2, figsize=(14, 8))  # Create 2x2 layout

# First plot: AWG Input Spectrum
axs[0, 0].stem(x_vals_input, awg_input_a, label='Real', linefmt='b-', markerfmt='bo', basefmt=" ")
axs[0, 0].stem(x_vals_input, awg_input_b, label='Imag', linefmt='r-', markerfmt='ro', basefmt=" ")
axs[0, 0].set_title("AWG Input Spectrum")
axs[0, 0].set_xlabel("Frequency (MHz)")
axs[0, 0].set_ylabel("Amplitude")
axs[0, 0].set_xlim([94.5, 105.5])
axs[0, 0].legend()
axs[0, 0].grid(True)

# Second plot: AOM Output Spectrum
axs[0, 1].stem(x_vals_output, real_part_masked, label='Real', linefmt='b-', markerfmt='bo', basefmt=" ")
axs[0, 1].stem(x_vals_output, imag_part_masked, label='Imag', linefmt='r-', markerfmt='ro', basefmt=" ")
axs[0, 1].set_title("AOM Output Spectrum")
axs[0, 1].set_xlabel("Frequency (MHz)")
axs[0, 1].set_ylabel("Amplitude")
axs[0, 1].set_xlim([80, 120])
axs[0, 1].legend()
axs[0, 1].grid(True)

# Third plot: Predicted AWG Input Spectrum
axs[1, 0].stem(x_vals_input, awg_input_a_pred, label='Real', linefmt='b-', markerfmt='bo', basefmt=" ")
axs[1, 0].stem(x_vals_input, awg_input_b_pred, label='Imag', linefmt='r-', markerfmt='ro', basefmt=" ")
axs[1, 0].set_title("Predicted AWG Input Spectrum")
axs[1, 0].set_xlabel("Frequency (MHz)")
axs[1, 0].set_ylabel("Amplitude")
axs[1, 0].set_xlim([94.5, 105.5])
axs[1, 0].legend()
axs[1, 0].grid(True)

# Fourth plot: Predicted AOM Output Spectrum
axs[1, 1].stem(x_vals_output, real_part_masked_pred, label='Real', linefmt='b-', markerfmt='bo', basefmt=" ")
axs[1, 1].stem(x_vals_output, imag_part_masked_pred, label='Imag', linefmt='r-', markerfmt='ro', basefmt=" ")
axs[1, 1].set_title("Predicted AOM Output Spectrum")
axs[1, 1].set_xlabel("Frequency (MHz)")
axs[1, 1].set_ylabel("Amplitude")
axs[1, 1].set_xlim([80, 120])
axs[1, 1].legend()
axs[1, 1].grid(True)

# Adjust layout
plt.tight_layout()
plt.show()