In [None]:
import pandas as pd
import numpy as np
import os
from math import ceil
import seaborn as sns
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.utils as vutils
from PIL import Image

#Autoencoder for producing images
#Assuming input is 19; allow for change of hidden size and lowered default latent_dim
class CSVEncoder(nn.Module):
    def __init__(self, input_size=19, hidden_size=128, latent_dim=8, output_size=None):
        super(CSVEncoder, self).__init__()
        self.output_size = output_size if output_size is not None else input_size
        
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, latent_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, self.output_size),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

#Autoencoder for decoding the images
#Only one channel needed - greyscale
class IMGAutoencoder(nn.Module):
    def __init__(self, output_channels=1):
        super(IMGAutoencoder, self).__init__()

        #Encoder: Compress image
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1), #[batch, 16, 28, 28]
            nn.ReLU(),
            nn.MaxPool2d(2, 2),             #[batch, 16, 14, 14]
            nn.Conv2d(16, 8, 3, padding=1), #[batch, 8, 14, 14]
            nn.ReLU(),
            nn.MaxPool2d(2, 2)              #[batch, 8, 7, 7]
        )
        
        #Decoder: Decompress image
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(8, 16, 3, stride=2, padding=1, output_padding=1),                #[batch, 16, 14, 14]
            nn.ReLU(),
            nn.ConvTranspose2d(16, output_channels, 3, stride=2, padding=1, output_padding=1),  #[batch, 1, 28, 28]
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

#Globals
source_dir = "./../dataset/reduced_only_full/"
output_dir = "./../dataset/visuals/"
chunk_size = 100
components = 19
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
csv_encoder = CSVEncoder(input_size=19, hidden_size=128, latent_dim=8).to(DEVICE)
img_autoencoder = IMGAutoencoder(output_channels=1).to(DEVICE)

optimizer = optim.Adam(csv_encoder.parameters(), lr=1e-3)
criterion = nn.MSELoss()



[[0.48353836 0.45350114 0.51018274 ... 0.4955537  0.4879785  0.49731788]
 [0.48353907 0.45350015 0.5101837  ... 0.4955537  0.4879778  0.49731725]
 [0.48353076 0.4535226  0.5101875  ... 0.4955384  0.4879921  0.49732003]
 ...
 [0.4835331  0.45350906 0.51017696 ... 0.4955568  0.48797157 0.49733052]
 [0.48352614 0.45351452 0.510182   ... 0.49555522 0.48798802 0.4973084 ]
 [0.4835704  0.45347157 0.510227   ... 0.49550757 0.48795727 0.4973656 ]]


In [None]:
for file in os.listdir(source_dir):
    print(f"Found: {file}")
    if file.endswith(".csv"):
        #Load the dataset
        file_path = os.path.join(source_dir, file)
        df = pd.read_csv(file_path)

        #Base target for writing for results from this file
        folder_base = file.replace(".csv", "")
        # base_hm_path = f"{output_dir}{folder_base}_HM"
        # base_sp_matrix_path = f"{output_dir}{folder_base}_SP_Matrix"
        # base_sp_path = f"{output_dir}{folder_base}_SP"
        base_ae_path = f"{output_dir}{folder_base}_AE"

        #Make the directories
        # os.makedirs(base_hm_path, exist_ok=True)
        # os.makedirs(base_sp_matrix_path, exist_ok=True)
        # os.makedirs(base_sp_path, exist_ok=True)
        os.makedirs(base_ae_path, exist_ok=True)

        #For each chunk
        for i in range(ceil(len(df) / chunk_size)):
            start_idx = i * chunk_size
            end_idx = min(start_idx + chunk_size, len(df))
            df_chunk = df.iloc[start_idx:end_idx]


            #Autoencoder approach
            #Convert to tensor
            data = torch.tensor(df_chunk.values, dtype=torch.float32).to(DEVICE)
            output_image = csv_encoder(data)
            output_image = output_image.cpu().detach().numpy() #NEED to detach before conversion

            output_image = output_image.reshape(10, 10, 28, 28)  # Reshape to create 10x10 grid

            if output_image.max() <= 1:
                output_image = (output_image * 255).astype(np.uint8)

            grid_image = np.concatenate([np.concatenate(output_image[i], axis=1) for i in range(10)], axis=0)

            grid_image_pil = Image.fromarray(grid_image.astype(np.uint8))
            grid_image_pil.save(f"{base_ae_path}/AE_PIL_{i+1}.png")

            # #Heatmap image - No way to get back to data points
            # plt.figure(figsize=(10, 8))
            # sns.heatmap(df_chunk.corr(), annot=True, cmap="coolwarm", fmt=".2f")
            # plt.title(f"Correlation Heatmap (Chunk {i+1})")
            # hm_path = os.path.join(base_hm_path, f"Heatmap_{i+1}.png")
            # plt.savefig(hm_path)
            # plt.close()

            # #Pairplot - Takes a long time, 
            # plt.figure(figsize=(15, 15))
            # pair_plot = sns.pairplot(df_chunk, diag_kind='hist')
            # plt.title(f"Scatter Matrix (Chunk {i+1})")
            # sp_matrix_path = os.path.join(base_sp_matrix_path, f"SP_Matrix_{i+1}.png")
            # pair_plot.savefig(sp_matrix_path)
            # plt.close()

            # #3D plot - 8 Features
            # dist = df_chunk['dist'].values
            # h_dist = df_chunk['h_dist'].values
            # v_dist = df_chunk['v_dist'].values
            # avgPower = df_chunk['avgPower'].values 
            # avgSnr = df_chunk['avgSnr'].values
            # avg_pl = df_chunk['avg_pl'].values

            # #3D plotting
            # fig = plt.figure(figsize=(10, 8))
            # ax = fig.add_subplot(111, projection='3d')
            # sc = ax.scatter(dist, h_dist, v_dist, c=avgPower, cmap='viridis', s=avg_pl, alpha=0.7)
            # fig.colorbar(sc, label='Average Power')
            # sp_path = os.path.join(base_sp_path, f"SP_{i+1}.png")
            # plt.savefig(sp_path)
            # plt.close()
            

            print(f"Chunk {i+1} visualization saved for {file}!")

print("All visualizations saved!")

In [None]:
ae_image_path = "../dataset/visuals/2023-12-15_15_41-results_AE/AE_PIL_1.png"

stiched_image = Image.open(ae_image_path)

#Convert to numpy array 280x280
stiched_image = np.array(stiched_image)

images = []

#Split into 100 28x28 images
for w in range(10):
    for h in range(10):
        image = stiched_image[w*28:(w+1)*28, h*28:(h+1)*28]
        images.append(image)

images_tensor = torch.tensor(images).float().unsqueeze(1).to(DEVICE)
img_autoencoder.eval()

with torch.no_grad():
    #Decode the images
    decoded_images = img_autoencoder(images_tensor)
    
    #Flatten the images to 28*28 dimensions
    flattened = decoded_images.view(decoded_images.size(0), -1)  # Shape: [100, 784]
    
    #Create a new reverse encoder by flipping input & output size
    reverse_encoder = CSVEncoder(input_size=28*28, hidden_size=128, output_size=19).to(DEVICE)
    
    #Convert back to features
    final_features = reverse_encoder(flattened)

    final_features = final_features.cpu().detach().numpy()
    print(final_features) #SOMETHING IS WRONG 

torch.Size([100, 1, 28, 28])
