In [None]:
# load data
import numpy as np
import os
def get_Sq2D_and_input_data(folder, parameters):

    all_kappa, all_f, all_gL = [], [], []  # force related
    all_Sq2D = []
    qB = []
    for L, run_num in parameters:
        filename = f"{folder}/obs_L{L}_random_run{run_num}.csv"
        if not os.path.exists(filename):
            print(f"File not found: {filename}")
            continue
        data = np.genfromtxt(filename, delimiter=',', skip_header=1)
        kappa, f, g = data[0, 2:5]
        all_kappa.append(kappa)
        all_f.append(f)
        all_gL.append(g*L)

        qB = data[2, 21:]
        Sq2D = data[6:, 21:]
        all_Sq2D_flatten.append(Sq2D.flatten())

    all_feature = np.array([all_kappa, all_f, all_gL, all_R2, all_Rg2, all_Sxx, all_Syy, all_Szz, all_Sxy, all_Sxz, all_Syz])
    all_feature_name = ["kappa", "f", "gL", "R2", "Rg2", "Sxx", "Syy", "Szz", "Sxy", "Sxz", "Syz"]
    all_Sq2D_flatten = np.array(all_Sq2D_flatten)
    qB = np.array(qB)
    return all_feature.T, all_feature_name, all_Sq2D_flatten, qB



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Encoder using 2 convolutional layers.
# The input image is assumed to be of size (nq, nq)
class Encoder(nn.Module):
    def __init__(self, latent_dim, nq):
        super(Encoder, self).__init__()
        self.nq = nq
        self.conv = nn.Sequential(
            # Input: [B, 1, nq, nq] -> Output: [B, 32, nq/2, nq/2]
            nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            # Output: [B, 32, nq/2, nq/2] -> [B, 64, nq/4, nq/4]
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU()
        )
        # Assuming nq is divisible by 4, the spatial size after conv layers is:
        d2 = nq // 4
        self.flatten_dim = 64 * d2 * d2
        self.fc_mu = nn.Linear(self.flatten_dim, latent_dim)
        self.fc_logvar = nn.Linear(self.flatten_dim, latent_dim)

    def forward(self, x):
        batch_size = x.size(0)
        x = self.conv(x)  # shape: [B, 64, nq/4, nq/4]
        x = x.view(batch_size, -1)
        return self.fc_mu(x), self.fc_logvar(x)

# Decoder using 2 transposed convolutional layers.
# It maps the latent vector back to an image of size (nq, nq)
class Decoder(nn.Module):
    def __init__(self, latent_dim, nq):
        super(Decoder, self).__init__()
        d2 = nq // 4
        self.flatten_dim = 64 * d2 * d2
        self.fc = nn.Linear(latent_dim, self.flatten_dim)
        self.deconv = nn.Sequential(
            # Reshape from [B, 64, d2, d2] -> Upsample: [B, 32, 2*d2, 2*d2]
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            # Upsample: [B, 32, 2*d2, 2*d2] -> [B, 1, 4*d2, 4*d2] which equals [B, 1, nq, nq]
            nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, z):
        batch_size = z.size(0)
        d2 = int((self.flatten_dim // 64) ** 0.5)  # should equal nq//4
        x = self.fc(z)
        x = x.view(batch_size, 64, d2, d2)
        x = self.deconv(x)
        return x

# VAE that combines the encoder and decoder.
class VAE(nn.Module):
    def __init__(self, latent_dim, nq):
        super(VAE, self).__init__()
        self.encoder = Encoder(latent_dim, nq)
        self.decoder = Decoder(latent_dim, nq)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), mu, logvar

# Converter module that maps three model parameters (kappa, f, gL) into the latent space.
class Converter(nn.Module):
    def __init__(self, latent_dim):
        super(Converter, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(3, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )

    def forward(self, params):
        return self.net(params)

# Example training functions

def train_vae(vae, dataloader, num_epochs=50, lr=1e-3, device='cuda'):
    optimizer = optim.Adam(vae.parameters(), lr=lr)
    vae.to(device)
    vae.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in dataloader:  # batch: [B, 1, nq, nq]
            batch = batch.to(device)
            optimizer.zero_grad()
            recon, mu, logvar = vae(batch)
            recon_loss = F.mse_loss(recon, batch, reduction='sum')
            kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
            loss = recon_loss + kl_loss
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader.dataset):.4f}")

def train_converter(converter, decoder, dataloader, num_epochs=30, lr=1e-3, device='cuda'):
    # dataloader yields pairs (params, target_scattering) where:
    #   params: [B, 3] and target_scattering: [B, 1, nq, nq]
    optimizer = optim.Adam(converter.parameters(), lr=lr)
    converter.to(device)
    decoder.to(device)
    decoder.eval()  # freeze the decoder
    for epoch in range(num_epochs):
        total_loss = 0
        for params, target in dataloader:
            params, target = params.to(device), target.to(device)
            optimizer.zero_grad()
            z = converter(params)
            output = decoder(z)
            loss = F.mse_loss(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"[Converter] Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")

def generate_scattering(converter, decoder, polymer_params, device='cuda'):
    # polymer_params: [B, 3] for (kappa, f, gL)
    converter.to(device)
    decoder.to(device)
    converter.eval()
    decoder.eval()
    with torch.no_grad():
        z = converter(polymer_params.to(device))
        return decoder(z)

# Example usage:
if __name__ == '__main__':
    latent_dim = 64
    nq = 64  # Set the input image size (nq x nq); can be any value divisible by 4.
    vae = VAE(latent_dim, nq)
    converter = Converter(latent_dim)

    # Assume you have:
    # - vae_dataloader that provides [B, 1, nq, nq] scattering images.
    # - reg_dataloader that provides (polymer_params, target_scattering) pairs.
    #
    # To train the VAE:
    # train_vae(vae, vae_dataloader, num_epochs=50, lr=1e-3, device='cuda')
    #
    # Freeze the decoder from the pretrained VAE:
    # pretrained_decoder = vae.decoder
    #
    # Train the converter:
    # train_converter(converter, pretrained_decoder, reg_dataloader, num_epochs=30, lr=1e-3, device='cuda')
    #
    # To generate a scattering image for new polymer parameters:
    # polymer_params = torch.tensor([[0.8, 1.2, 0.5]])  # shape: [1, 3] for (kappa, f, gL)
    # scattering_image = generate_scattering(converter, pretrained_decoder, polymer_params, device='cuda')
    # The scattering_image will have shape [1, 1, nq, nq].
