In [1]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error

In [3]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error
import pandas as pd
import os

# ----------------------------- Data Generation ----------------------------- #
def create_data(noise_level, seed=1):
    np.random.seed(seed)
    torch.manual_seed(seed)
    x = np.random.rand(1000)
    y = np.random.rand(1000)
    X = np.vstack((x, y)).T

    f = np.zeros((3, 200))
    for i in range(f.shape[1]):
        t1, t2 = np.random.randint(2), np.random.randint(2)
        f[0, i] = t1 * np.random.randn()
        f[1, i] = t2 * np.random.randn()
        f[2, i] = t2 * np.random.randn()

    L = np.zeros((len(x), 3))
    for i in range(len(x)):
        if ((x[i] < 0.33 and y[i] < 0.33) or
            (0.33 < x[i] < 0.66 and 0.33 < y[i] < 0.66) or
            (x[i] > 0.66 and y[i] > 0.66)):
            L[i] = [1, 0, 0]
        elif ((x[i] < 0.33 and y[i] > 0.66) or
              (0.33 < x[i] < 0.66 and y[i] < 0.33) or
              (x[i] > 0.66 and 0.33 < y[i] < 0.66)):
            L[i] = [0, 1, 0]
        else:
            L[i] = [0, 0, 1]

    Y_true = L @ f
    Z = Y_true + np.random.randn(*Y_true.shape) * noise_level
    return X.astype(np.float32), Z.astype(np.float32), Y_true.astype(np.float32)

# ----------------------------- Models ----------------------------- #
class AE(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64), nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64), nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

class CVAE(nn.Module):
    def __init__(self, input_dim, cond_dim, latent_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim + cond_dim, 64), nn.ReLU()
        )
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim + cond_dim, 64), nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def encode(self, x, c):
        h = self.encoder(torch.cat([x, c], dim=1))
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x, c):
        mu, logvar = self.encode(x, c)
        z = self.reparameterize(mu, logvar)
        x_hat = self.decoder(torch.cat([z, c], dim=1))
        return x_hat, mu, logvar

class DenoisingNCF(nn.Module):
    def __init__(self, input_dim, side_dim, hidden_dim=64):
        super().__init__()
        self.user_mlp = nn.Sequential(
            nn.Linear(side_dim, hidden_dim), nn.ReLU()
        )
        self.item_mlp = nn.Sequential(
            nn.Linear(input_dim, hidden_dim), nn.ReLU()
        )
        self.predictor = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, z_noisy, x_side):
        u = self.user_mlp(x_side)
        i = self.item_mlp(z_noisy)
        return self.predictor(torch.cat([u, i], dim=1))

# ----------------------------- Training Functions ----------------------------- #
def train_ae(model, dataloader, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    model.train()
    for epoch in range(epochs):
        for x_batch in dataloader:
            x_batch = x_batch[0]
            optimizer.zero_grad()
            x_hat = model(x_batch)
            loss = loss_fn(x_hat, x_batch)
            loss.backward()
            optimizer.step()

def train_cvae(model, dataloader, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    model.train()
    for epoch in range(epochs):
        for x_batch, c_batch in dataloader:
            optimizer.zero_grad()
            x_hat, mu, logvar = model(x_batch, c_batch)
            recon_loss = nn.functional.mse_loss(x_hat, x_batch)
            kl_div = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
            loss = recon_loss + kl_div
            loss.backward()
            optimizer.step()

def train_ncf(model, dataloader, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    model.train()
    for epoch in range(epochs):
        for z_batch, x_batch in dataloader:
            optimizer.zero_grad()
            z_hat = model(z_batch, x_batch)
            loss = loss_fn(z_hat, z_batch)  # Predict noisy Z
            loss.backward()
            optimizer.step()

# ----------------------------- Benchmark Routine ----------------------------- #
def run_denoising_benchmark(noise_level=1.0, seed=1):
    X, Z, Y_true = create_data(noise_level, seed)
    batch_size = 32

    tensor_X = torch.tensor(X)
    tensor_Z = torch.tensor(Z)

    # AE
    ae = AE(input_dim=Z.shape[1], latent_dim=10)
    loader_ae = DataLoader(TensorDataset(tensor_Z), batch_size=batch_size, shuffle=True)
    train_ae(ae, loader_ae)
    ae.eval()
    with torch.no_grad():
        Z_ae = ae(tensor_Z).numpy()

    # CVAE
    cvae = CVAE(input_dim=Z.shape[1], cond_dim=2, latent_dim=10)
    loader_cvae = DataLoader(TensorDataset(tensor_Z, tensor_X), batch_size=batch_size, shuffle=True)
    train_cvae(cvae, loader_cvae)
    cvae.eval()
    with torch.no_grad():
        Z_cvae, _, _ = cvae(tensor_Z, tensor_X)
        Z_cvae = Z_cvae.numpy()

    # NCF
    ncf = DenoisingNCF(input_dim=Z.shape[1], side_dim=X.shape[1])
    loader_ncf = DataLoader(TensorDataset(tensor_Z, tensor_X), batch_size=batch_size, shuffle=True)
    train_ncf(ncf, loader_ncf)
    ncf.eval()
    with torch.no_grad():
        Z_ncf = ncf(tensor_Z, tensor_X).numpy()

    rmse = lambda A, B: np.sqrt(mean_squared_error(A, B))
    return {
        "AE": rmse(Z_ae, Y_true),
        "CVAE": rmse(Z_cvae, Y_true),
        "NCF": rmse(Z_ncf, Y_true),
        "noise_level": noise_level,
        "seed": seed
    }

# ----------------------------- Main Loop ----------------------------- #
output_file = r"C:\Document\Serieux\Travail\python_work\cEBMF_additional_simulation_VAE\tiling_benchmark_results.csv"
all_results = []

for noise in [1, 2, 3, 5]:
    print(f"Running experiments for noise level {noise}...")
    for i in range(100):
        result = run_denoising_benchmark(noise_level=noise, seed=i + 1)
        all_results.append(result)
        print(f"  Seed {i+1} done.")

    df = pd.DataFrame(all_results)
    df.to_csv(output_file, index=False)

print(f"\n✅ All results saved to {output_file}")


Running experiments for noise level 1...
  Seed 1 done.
  Seed 2 done.
  Seed 3 done.
  Seed 4 done.
  Seed 5 done.
  Seed 6 done.
  Seed 7 done.
  Seed 8 done.
  Seed 9 done.
  Seed 10 done.
  Seed 11 done.
  Seed 12 done.
  Seed 13 done.
  Seed 14 done.
  Seed 15 done.
  Seed 16 done.
  Seed 17 done.
  Seed 18 done.
  Seed 19 done.
  Seed 20 done.
  Seed 21 done.
  Seed 22 done.
  Seed 23 done.
  Seed 24 done.
  Seed 25 done.
  Seed 26 done.
  Seed 27 done.
  Seed 28 done.
  Seed 29 done.
  Seed 30 done.
  Seed 31 done.
  Seed 32 done.
  Seed 33 done.
  Seed 34 done.
  Seed 35 done.
  Seed 36 done.
  Seed 37 done.
  Seed 38 done.
  Seed 39 done.
  Seed 40 done.
  Seed 41 done.
  Seed 42 done.
  Seed 43 done.
  Seed 44 done.
  Seed 45 done.
  Seed 46 done.
  Seed 47 done.
  Seed 48 done.
  Seed 49 done.
  Seed 50 done.
  Seed 51 done.
  Seed 52 done.
  Seed 53 done.
  Seed 54 done.
  Seed 55 done.
  Seed 56 done.
  Seed 57 done.
  Seed 58 done.
  Seed 59 done.
  Seed 60 done.
  Seed 6

In [None]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error
import pandas as pd
import os

# ----------------------------- Data Generation ----------------------------- #
def create_data(noise_level, seed=1):
    np.random.seed(seed)
    torch.manual_seed(seed)
    x = np.random.rand(1000)
    y = np.random.rand(1000)
    X = np.vstack((x, y)).T

    f = np.zeros((3, 200))
    for i in range(f.shape[1]):
        t1, t2 = np.random.randint(2), np.random.randint(2)
        f[0, i] = t1 * np.random.randn()
        f[1, i] = t2 * np.random.randn()
        f[2, i] = t2 * np.random.randn()

    L = np.zeros((len(x), 3))
    for i in range(len(x)):
        if ((x[i] < 0.33 and y[i] < 0.33) or
            (0.33 < x[i] < 0.66 and 0.33 < y[i] < 0.66) or
            (x[i] > 0.66 and y[i] > 0.66)):
            L[i] = [1, 0, 0]
        elif ((x[i] < 0.33 and y[i] > 0.66) or
              (0.33 < x[i] < 0.66 and y[i] < 0.33) or
              (x[i] > 0.66 and 0.33 < y[i] < 0.66)):
            L[i] = [0, 1, 0]
        else:
            L[i] = [0, 0, 1]

    Y_true = L @ f
    Z = Y_true + np.random.randn(*Y_true.shape) * noise_level
    return X.astype(np.float32), Z.astype(np.float32), Y_true.astype(np.float32)

# ----------------------------- Models ----------------------------- #
class AE(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64), nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64), nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

class CVAE(nn.Module):
    def __init__(self, input_dim, cond_dim, latent_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim + cond_dim, 64), nn.ReLU()
        )
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim + cond_dim, 64), nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def encode(self, x, c):
        h = self.encoder(torch.cat([x, c], dim=1))
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x, c):
        mu, logvar = self.encode(x, c)
        z = self.reparameterize(mu, logvar)
        x_hat = self.decoder(torch.cat([z, c], dim=1))
        return x_hat, mu, logvar

class DenoisingNCF(nn.Module):
    def __init__(self, input_dim, side_dim, hidden_dim=64):
        super().__init__()
        self.user_mlp = nn.Sequential(
            nn.Linear(side_dim, hidden_dim), nn.ReLU()
        )
        self.item_mlp = nn.Sequential(
            nn.Linear(input_dim, hidden_dim), nn.ReLU()
        )
        self.predictor = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, z_noisy, x_side):
        u = self.user_mlp(x_side)
        i = self.item_mlp(z_noisy)
        return self.predictor(torch.cat([u, i], dim=1))

# ----------------------------- Training Functions ----------------------------- #
def train_ae(model, dataloader, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    model.train()
    for epoch in range(epochs):
        for x_batch in dataloader:
            x_batch = x_batch[0]
            optimizer.zero_grad()
            x_hat = model(x_batch)
            loss = loss_fn(x_hat, x_batch)
            loss.backward()
            optimizer.step()

def train_cvae(model, dataloader, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    model.train()
    for epoch in range(epochs):
        for x_batch, c_batch in dataloader:
            optimizer.zero_grad()
            x_hat, mu, logvar = model(x_batch, c_batch)
            recon_loss = nn.functional.mse_loss(x_hat, x_batch)
            kl_div = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
            loss = recon_loss + kl_div
            loss.backward()
            optimizer.step()

def train_ncf(model, dataloader, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    model.train()
    for epoch in range(epochs):
        for z_batch, x_batch in dataloader:
            optimizer.zero_grad()
            z_hat = model(z_batch, x_batch)
            loss = loss_fn(z_hat, z_batch)  # Predict noisy Z
            loss.backward()
            optimizer.step()

# ----------------------------- Benchmark Routine ----------------------------- #
def run_denoising_benchmark(noise_level=1.0, seed=1):
    X, Z, Y_true = create_data(noise_level, seed)
    batch_size = 32

    tensor_X = torch.tensor(X)
    tensor_Z = torch.tensor(Z)

    # AE
    ae = AE(input_dim=Z.shape[1], latent_dim=10)
    loader_ae = DataLoader(TensorDataset(tensor_Z), batch_size=batch_size, shuffle=True)
    train_ae(ae, loader_ae)
    ae.eval()
    with torch.no_grad():
        Z_ae = ae(tensor_Z).numpy()

    # CVAE
    cvae = CVAE(input_dim=Z.shape[1], cond_dim=2, latent_dim=10)
    loader_cvae = DataLoader(TensorDataset(tensor_Z, tensor_X), batch_size=batch_size, shuffle=True)
    train_cvae(cvae, loader_cvae)
    cvae.eval()
    with torch.no_grad():
        Z_cvae, _, _ = cvae(tensor_Z, tensor_X)
        Z_cvae = Z_cvae.numpy()

    # NCF
    ncf = DenoisingNCF(input_dim=Z.shape[1], side_dim=X.shape[1])
    loader_ncf = DataLoader(TensorDataset(tensor_Z, tensor_X), batch_size=batch_size, shuffle=True)
    train_ncf(ncf, loader_ncf)
    ncf.eval()
    with torch.no_grad():
        Z_ncf = ncf(tensor_Z, tensor_X).numpy()

    rmse = lambda A, B: np.sqrt(mean_squared_error(A, B))
    return {
        "AE": rmse(Z_ae, Y_true),
        "CVAE": rmse(Z_cvae, Y_true),
        "NCF": rmse(Z_ncf, Y_true),
        "noise_level": noise_level,
        "seed": seed
    }


In [None]:
X, Z, Y_true = create_data(1, 1)
batch_size = 32

tensor_X = torch.tensor(X)
tensor_Z = torch.tensor(Z)

    # AE
ae = AE(input_dim=Z.shape[1], latent_dim=10)
loader_ae = DataLoader(TensorDataset(tensor_Z), batch_size=batch_size, shuffle=True)
train_ae(ae, loader_ae)
ae.eval()
with torch.no_grad():
        Z_ae = ae(tensor_Z).numpy()

    # CVAE
cvae = CVAE(input_dim=Z.shape[1], cond_dim=2, latent_dim=10)
loader_cvae = DataLoader(TensorDataset(tensor_Z, tensor_X), batch_size=batch_size, shuffle=True)
train_cvae(cvae, loader_cvae)
cvae.eval()
with torch.no_grad():
        Z_cvae, _, _ = cvae(tensor_Z, tensor_X)
        Z_cvae = Z_cvae.numpy()

    # NCF
ncf = DenoisingNCF(input_dim=Z.shape[1], side_dim=X.shape[1])
loader_ncf = DataLoader(TensorDataset(tensor_Z, tensor_X), batch_size=batch_size, shuffle=True)
train_ncf(ncf, loader_ncf)
ncf.eval()
with torch.no_grad():
        Z_ncf = ncf(tensor_Z, tensor_X).numpy()

rmse = lambda A, B: np.sqrt(mean_squared_error(A, B))
 
rmse(Z_ncf, Y_true),
 