In [13]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets



In [15]:
for i in range(10):
    numpy_array = np.load(f'./output_arrays/epoch_{i}_channel_{i}.npy')
    print("min: ",np.max(numpy_array)," min: ", np.min(numpy_array))

min:  -18.589511615102026  min:  -40.45791106355474
min:  -18.141677057558486  min:  -37.03979895407709
min:  -17.801624276995216  min:  -37.930389259090475
min:  -17.20333912216591  min:  -36.67659142088034
min:  -18.59312671640936  min:  -37.44207857842168
min:  -18.138089153708023  min:  -39.91782140840885
min:  -17.691925088135985  min:  -36.60745044993907
min:  -17.488590011527666  min:  -42.34308809155709
min:  -17.734666662892383  min:  -36.03803250016985
min:  -19.544066415420303  min:  -41.97847545258237


In [5]:
n_freqs,n_times=numpy_array.shape
n_freqs,n_times

(140, 7680)

# Data loader

In [16]:
class NpyImageDataset(Dataset):
    def __init__(self, image_shape, root_dir='./output_arrays/', transform=None):
        """
        root_dir (string): Folder con archivos .npy.
        transform (callable, optional)
        """
        self.root_dir = root_dir
        self.n_freqs,self.n_times=image_shape
        self.transform = transform
        self.image_files_names = [f for f in os.listdir(root_dir)]# if f.endswith('.npy')]

    def __len__(self):
        return len(self.image_files_names)

    def __getitem__(self, idx):
        # Load .npy file
        img_path = os.path.join(self.root_dir, self.image_files_names[idx])
        image = np.load(img_path)
        
        image = image.reshape(1, self.n_freqs, self.n_times)
        # Convert image to a PyTorch tensor
        image = torch.from_numpy(image).float()
        
        # Apply transformations if any
        if self.transform:
            image = self.transform(image)
        
        return image # Retorna (input, ) pair for autoencoder
        
class MinMaxNormalize:
    def __call__(self, image):
        min_val = image.min()
        max_val = image.max()
        return (image - min_val) / (max_val - min_val + 1e-8)


In [17]:
image_transforms = transforms.Compose([
    #transforms.resize((n_freqs, n_times))
    #transforms.Normalize((0.5,), (0.5,))
    MinMaxNormalize()
])

dataset = NpyImageDataset((n_freqs,n_times), root_dir='./output_arrays/', transform=image_transforms) #input_shape=(n_freqs,n_times),
dataloader = DataLoader(dataset,
                        batch_size=32,
                        num_workers=4)

In [16]:
dataset[0][0].storage

<bound method Tensor.storage of tensor([[[-40.7905, -40.7911, -40.7943,  ..., -38.1793, -38.1790, -38.1810],
         [-40.8971, -40.8965, -40.8983,  ..., -38.3108, -38.3074, -38.3065],
         [-40.9999, -40.9976, -40.9976,  ..., -38.4492, -38.4425, -38.4382],
         ...,
         [-46.4949, -46.5135, -46.5351,  ..., -42.9284, -42.9197, -42.9135],
         [-46.3609, -46.3745, -46.3907,  ..., -42.9573, -42.9482, -42.9416],
         [-46.1945, -46.2021, -46.2122,  ..., -42.9789, -42.9695, -42.9626]]])>

# VAE

In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, input_shape, latent_dim=16,variational=False):
        super(Encoder, self).__init__()
        self.variational=variational
        n_freqs, n_times = input_shape

        # Capas convolucionales
        self.conv1 = nn.Conv2d(1, 12, kernel_size=(3,5), stride=2)
        self.conv2 = nn.Conv2d(12, 20, kernel_size=(3,5), stride=2)
        self.conv3 = nn.Conv2d(20, 30, kernel_size=(3,5), stride=2)
        
        # Determinación de shape luego de convolución
        self.conv_c_out,self.conv_h_out,self.conv_w_out=self._get_conv_output(input_shape)
        self.conv_output_size = self.conv_c_out*self.conv_h_out*self.conv_w_out
        
        # Capas fully connected para espacio latente
        self.fc_mu = nn.Linear(self.conv_output_size, latent_dim)
        if self.variational:
            self.fc_logvar = nn.Linear(self.conv_output_size, latent_dim)

    def _get_conv_output(self, input_shape):
        """Función auxiliar en cálculo de tamaño tras convoluciones
        Retorna (C_out,H_out,W_out): numero de canales, altura y ancho de salida."""
        with torch.no_grad():
            x = torch.zeros(1, 1, *input_shape)  # Create a dummy input with batch size 1
            x = self.conv1(x)
            x = self.conv2(x)
            x = self.conv3(x)
            #print("Numel",x.numel(),  "vs shape1xshape2xshape3: ",x.shape[1]*x.shape[2]*x.shape[3],"shape", x.shape)
            return x.shape[1],x.shape[2],x.shape[3]
            
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        # Flatten the output
        x = torch.flatten(x, start_dim=1)
        print("Encoder Forward: Flatten alcanzado. x.hape: ",x.shape, "Predicted output size: ", self.conv_output_size)
    

        # Obtener parámetros mu, logvarianza
        mu = self.fc_mu(x)
        print("Mu alcanzado",x.shape)
        if self.variational:
            logvar = self.fc_logvar(x)
            return mu,logvar
        return mu

class Decoder(nn.Module):
    def __init__(self, input_shape, encoder_conv_out_shape,latent_dim=16,):
        super(Decoder, self).__init__()
        self.n_freqs, self.n_times = input_shape

        # Shape luego de convoluciones en el encoder para hacer reshaping
        self.conv_output_shape = encoder_conv_out_shape
        
        # Fully connected layer for reconstructing feature map shape
        self.fc = nn.Linear(latent_dim, self.conv_output_shape[0]*self.conv_output_shape[1]*self.conv_output_shape[2])#C*H*W
        
        # Deconvolution layers
        self.deconv1 = nn.ConvTranspose2d(30, 20, kernel_size=3, stride=2)
        self.deconv2 = nn.ConvTranspose2d(20, 12, kernel_size=3, stride=2)
        self.deconv3 = nn.ConvTranspose2d(12, 1, kernel_size=3, stride=2)

    def forward(self, z):
        # Decode fully connected to a feature map
        x = self.fc(z)
        x = x.view(-1, self.conv_output_shape[0], self.conv_output_shape[1], self.conv_output_shape[2])# C(canales), H, W
        
        # Apply deconvolutions
        x = F.relu(self.deconv1(x))
        
        x = F.relu(self.deconv2(x))
        print("Decoder 1",x.shape)
        x = torch.sigmoid(self.deconv3(x))  # Output in [0, 1] range
        
        return x

class VariationalAutoencoder(nn.Module):
    def __init__(self, input_shape, latent_dim=16,variational=False):
        super(VariationalAutoencoder, self).__init__()
        self.variational=variational
        self.encoder = Encoder(input_shape, latent_dim,variational=variational)
        encoder_conv_h_w_out=self.encoder.conv_c_out,self.encoder.conv_h_out,self.encoder.conv_w_out
        print("Instanciado encoder")
        self.decoder = Decoder(input_shape,encoder_conv_h_w_out, latent_dim)
        print("Instanciado decoder")
        
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)  # Standard deviation
        eps = torch.randn_like(std)  # Random noise
        return mu + eps * std
    
    def forward(self, x):
        if self.variational:# Encode
            mu, logvar = self.encoder(x)
            # Reparameterize
            z = self.reparameterize(mu, logvar)
            # Decode
            reconstructed = self.decoder(z)
            return reconstructed, mu, logvar
        else: 
            z=self.encoder(x)
            reconstructer=self.decoder(z)
            return reconstructed
# Define loss function
def loss_funct(reconstructed, original, mu=None, logvar=None, kld_weight=0.1,variational=False):
    """
    Compute VAE loss with weighted KL divergence
    """
    recon_loss = F.binary_cross_entropy(reconstructed, original, reduction='sum')
    if variational:
        kld_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        return recon_loss + kld_weight * kld_loss
    else:
        return recon_loss

In [57]:
# import CVA
input_shape = (n_freqs, n_times) 
latent_dim = 16
variational=True

if variational:
    model = VariationalAutoencoder(input_shape=input_shape, latent_dim=latent_dim,variational=True)
else: 
    model = VariationalAutoencoder(input_shape=input_shape, latent_dim=latent_dim,variational=False)    
learning_rate = 1e-3
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

Numel 459360 vs shape1xshape2xshape3:  459360 shape torch.Size([1, 30, 16, 957])
Instanciado encoder
Instanciado decoder


In [58]:
num_epochs = 10  

for epoch in range(num_epochs):#training epochs
    model.train()  
    total_loss = 0
    
    for images in dataloader:  # batch
        # Forward
        reconstructed, mu, logvar = model(images)     
        #
        #print(logvar,type(logvar))
          
        loss = loss_funct(reconstructed, images, mu, logvar,variational=True)
        # Backward pass and optimization step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Track the loss
        total_loss += loss.item()
    
    # Print average loss per epoch
    average_loss = total_loss / len(dataloader.dataset)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {average_loss:.4f}")


torch.Size([32, 12, 69, 3838])
torch.Size([32, 20, 34, 1917])
torch.Size([32, 30, 16, 957])
Flatten alcanzado torch.Size([32, 459360]) Predicted output size:  459360
Mu alcanzado torch.Size([32, 459360])
Decoder 1 torch.Size([32, 12, 67, 3831])
Epoch [1/10], Loss: 0.0000


In [55]:
logvar.shape

torch.Size([32, 16])