Denoise cell along with epochs

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Generate synthetic dataset with complex values
num_samples = 1000
n, d = 150, 512  # Dimensions

y_complex = torch.randn(num_samples, n, 1, dtype=torch.cfloat)
psi_complex = torch.randn(num_samples, n, d, dtype=torch.cfloat)
omega_complex = torch.randn(num_samples, n, 1, dtype=torch.cfloat)

# Compute Least Squares Estimate h_LS
h_LS_complex = torch.linalg.pinv(psi_complex) @ (y_complex - omega_complex)
h_LS_complex = h_LS_complex.squeeze(-1)

# Split into real and imaginary components
h_LS_real = h_LS_complex.real
h_LS_imag = h_LS_complex.imag

# Prepare dataset
train_dataset = TensorDataset(h_LS_real, h_LS_imag, h_LS_real, h_LS_imag)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Feature Extraction Layers
class FeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        return x

# NAS Candidate Operations
OPS = {
    'conv_3x3': lambda C: nn.Conv1d(C, C, kernel_size=3, padding=1, bias=False),
    'conv_5x5': lambda C: nn.Conv1d(C, C, kernel_size=5, padding=2, bias=False),
    'identity': lambda C: nn.Identity(),
    'skip_connection': lambda C: nn.Sequential(nn.Conv1d(C, C, 1, bias=False), nn.BatchNorm1d(C)),
    'zero': lambda C: nn.Identity(),  # Replacing ZeroPad1d to avoid errors
}

# Denoise Cell with DAG Structure
class DenoiseCell(nn.Module):
    def __init__(self, C, cell_id):
        super().__init__()
        self.cell_id = cell_id
        self.ops = nn.ModuleList([op(C) for op in OPS.values()])
        self.alphas = nn.Parameter(torch.randn(len(self.ops), 6))  # 6 edges in DAG

    def forward(self, inputs):
        assert len(inputs) == 2, "Each denoise cell must take two inputs."
        node_outputs = [inputs[0], inputs[1], torch.zeros_like(inputs[0]), torch.zeros_like(inputs[0]), torch.zeros_like(inputs[0])]

        edges = [(0, 2), (0, 3), (1, 3), (1, 4), (2, 4), (3, 4)]
        selected_ops = []

        for edge_idx, (src, dest) in enumerate(edges):
            weights = F.softmax(self.alphas[:, edge_idx], dim=0)
            best_op_idx = torch.argmax(weights).item()
            best_op_name = list(OPS.keys())[best_op_idx]
            selected_ops.append((src, dest, best_op_name))
            node_outputs[dest] += self.ops[best_op_idx](node_outputs[src])

        print(f"Denoise Cell {self.cell_id}: {selected_ops}")
        return node_outputs[4]  # Output of last node

# Sequence of 10 Denoise Cells
class DenoiseModule(nn.Module):
    def __init__(self, C):
        super().__init__()
        self.cells = nn.ModuleList([DenoiseCell(C, i) for i in range(10)])

    def forward(self, x):
        out1, out2 = x, x
        outputs = [out1, out2]

        for i in range(10):
            out = self.cells[i]([outputs[-2], outputs[-1]])
            outputs.append(out)

        return outputs[-1]

# Decoder Module
class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(512, 512)
        self.sep_conv1 = nn.Conv1d(32, 16, kernel_size=3, padding=1, groups=16)
        self.sep_conv2 = nn.Conv1d(16, 1, kernel_size=3, padding=1, groups=1)
        self.final_conv = nn.Conv1d(1, 1, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.fc(x))
        x = F.relu(self.sep_conv1(x))
        x = F.relu(self.sep_conv2(x))
        return self.final_conv(x)

# Complete Model
class FullModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature_extractor = FeatureExtractor()
        self.denoise_module = DenoiseModule(32)
        self.decoder = Decoder()

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.denoise_module(x)
        return self.decoder(x)

# Training Function
def train(model, train_loader, arch_optimizer, model_optimizer, criterion):
    model.train()
    for real_in, imag_in, real_out, imag_out in train_loader:
        real_in, imag_in = real_in.unsqueeze(1), imag_in.unsqueeze(1)
        real_out, imag_out = real_out.unsqueeze(1), imag_out.unsqueeze(1)
        real_in, imag_in, real_out, imag_out = real_in.to(device), imag_in.to(device), real_out.to(device), imag_out.to(device)

        model_optimizer.zero_grad()
        real_pred, imag_pred = model(real_in), model(imag_in)
        loss = criterion(real_pred, real_out) + criterion(imag_pred, imag_out)
        loss.backward()
        model_optimizer.step()

        arch_optimizer.zero_grad()
        arch_optimizer.step()

# Hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 20

# Initialize model
model = FullModel().to(device)
arch_optimizer = optim.Adam(model.parameters(), lr=0.003)
model_optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

# Train model
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    train(model, train_loader, arch_optimizer, model_optimizer, criterion)
    print(f"Epoch {epoch+1} completed.\n")

torch.save(model.state_dict(), "best_model.pth")
print("Training completed.")

h estimate

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Generate synthetic dataset with complex values
num_samples = 1000
n, d = 150, 512  # Dimensions

y_complex = torch.randn(num_samples, n, 1, dtype=torch.cfloat)
psi_complex = torch.randn(num_samples, n, d, dtype=torch.cfloat)
omega_complex = torch.randn(num_samples, n, 1, dtype=torch.cfloat)

# Compute Least Squares Estimate h_LS
h_LS_complex = torch.linalg.pinv(psi_complex) @ (y_complex - omega_complex)
h_LS_complex = h_LS_complex.squeeze(-1)

# Split into real and imaginary components
h_LS_real = h_LS_complex.real
h_LS_imag = h_LS_complex.imag

# Prepare dataset
train_dataset = TensorDataset(h_LS_real, h_LS_imag, h_LS_real, h_LS_imag)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Feature Extraction Layers
class FeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        return x

# NAS Candidate Operations
OPS = {
    'conv_3x3': lambda C: nn.Conv1d(C, C, kernel_size=3, padding=1, bias=False),
    'conv_5x5': lambda C: nn.Conv1d(C, C, kernel_size=5, padding=2, bias=False),
    'identity': lambda C: nn.Identity(),
    'skip_connection': lambda C: nn.Sequential(nn.Conv1d(C, C, 1, bias=False), nn.BatchNorm1d(C)),
    'zero': lambda C: nn.ZeroPad1d(0),
}

# Denoise Cell
class DenoiseCell(nn.Module):
    def __init__(self, C):
        super().__init__()
        self.ops = nn.ModuleList([op(C) for op in OPS.values()])
        self.alphas = nn.Parameter(torch.randn(len(self.ops), 3))  # 3 edges per cell

    def forward(self, inputs):
        assert len(inputs) == 2, "Each denoise cell must take two inputs."
        node_outputs = [inputs[0], inputs[1], torch.zeros_like(inputs[0]), torch.zeros_like(inputs[0])]

        for i in range(3):  # Three edges per cell
            weights = F.softmax(self.alphas[:, i], dim=0)
            node_outputs[i + 1] = sum(w * op(node_outputs[i]) for w, op in zip(weights, self.ops))

        return node_outputs[-1]

    def print_best_operations(self):
        best_ops = torch.argmax(self.alphas, dim=0)
        ops_list = list(OPS.keys())
        for i, best in enumerate(best_ops):
            print(f"Edge {i+1}: Best operation = {ops_list[best]}")

# Sequence of 10 Denoise Cells
class DenoiseModule(nn.Module):
    def __init__(self, C):
        super().__init__()
        self.cells = nn.ModuleList([DenoiseCell(C) for _ in range(10)])

    def forward(self, x):
        out1, out2 = x, x  # First two cells take same input
        outputs = [out1, out2]

        for i in range(10):
            out = self.cells[i]([outputs[-2], outputs[-1]])
            outputs.append(out)

        return outputs[-1]

    def print_best_architecture(self):
        print("Best Architecture for Denoise Cells:")
        for i, cell in enumerate(self.cells):
            print(f"Denoise Cell {i+1}:")
            cell.print_best_operations()

# Decoder Module
class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(512, 512)
        self.sep_conv1 = nn.Conv1d(32, 16, kernel_size=3, padding=1, groups=16)
        self.sep_conv2 = nn.Conv1d(16, 1, kernel_size=3, padding=1, groups=1)
        self.final_conv = nn.Conv1d(1, 1, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.fc(x))
        x = F.relu(self.sep_conv1(x))
        x = F.relu(self.sep_conv2(x))
        return self.final_conv(x)

# Complete Model
class FullModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature_extractor = FeatureExtractor()
        self.denoise_module = DenoiseModule(32)
        self.decoder = Decoder()

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.denoise_module(x)
        return self.decoder(x)

    def print_best_model_structure(self):
        self.denoise_module.print_best_architecture()

# Function to get best h estimate
def get_best_h_estimate(model, h_real, h_imag):
    model.eval()
    with torch.no_grad():
        h_real = h_real.unsqueeze(1).to(device)
        h_imag = h_imag.unsqueeze(1).to(device)
        h_real_pred = model(h_real)
        h_imag_pred = model(h_imag)
    return h_real_pred.squeeze(1), h_imag_pred.squeeze(1)

# Hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 20

# Initialize model
model = FullModel().to(device)
arch_optimizer = optim.Adam(model.parameters(), lr=0.003)
model_optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

# Training Function
def train(model, train_loader, arch_optimizer, model_optimizer, criterion):
    model.train()
    for real_in, imag_in, real_out, imag_out in train_loader:
        real_in, imag_in = real_in.unsqueeze(1), imag_in.unsqueeze(1)
        real_out, imag_out = real_out.unsqueeze(1), imag_out.unsqueeze(1)
        real_in, imag_in, real_out, imag_out = real_in.to(device), imag_in.to(device), real_out.to(device), imag_out.to(device)

        model_optimizer.zero_grad()
        real_pred, imag_pred = model(real_in), model(imag_in)
        loss = criterion(real_pred, real_out) + criterion(imag_pred, imag_out)
        loss.backward()
        model_optimizer.step()

        arch_optimizer.zero_grad()
        arch_optimizer.step()

# Train model
for epoch in range(num_epochs):
    train(model, train_loader, arch_optimizer, model_optimizer, criterion)
    print(f"Epoch {epoch+1}/{num_epochs} completed.")

model.print_best_model_structure()
h_real_pred, h_imag_pred = get_best_h_estimate(model, h_LS_real, h_LS_imag)
print("Best h estimate (real part):", h_real_pred)
print("Best h estimate (imaginary part):", h_imag_pred)