In [620]:
'''
This script incorporates an autoencoder, leveraging input data for the autoencoder (AE) and subsequently conducting a principal component analysis to evaluate the outcomes.
'''
import torch
import torchvision
import pandas as pd
import numpy as np
import sys
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms
from sklearn.model_selection import train_test_split  # Import train_test_split
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

In [621]:
#class CustomVariableInputDataset_2(Dataset):
#    def __init__(self, csv_file):
#        # Skip the first row containing non-numeric values
#        data = pd.read_csv(csv_file, skiprows=[0], header=None)
#        self.data = data.values.astype(np.float32)
#        non_zero_indices = self.data > 0
#        self.data[non_zero_indices] = np.log10(self.data[non_zero_indices])
#        self.scaler = MinMaxScaler()
#        self.data = self.scaler.fit_transform(self.data)
#        self.data = self.scaler.inverse_transform(self.data)
#        self.data = 10 ** self.data
#
#    def __len__(self):
#        return len(self.data)
#
#    def __getitem__(self, idx):
#        sample = self.data[idx]
#        return sample

In [622]:
class CustomVariableInputDataset(Dataset):
    def __init__(self, csv_file):
        # Skip the first row containing non-numeric values
        data = pd.read_csv(csv_file, skiprows=[0], header=None)
        self.data = data.values.astype(np.float32)
        self.data = np.log10(self.data)
        self.scaler = MinMaxScaler()
        self.data = self.scaler.fit_transform(self.data)
        

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        return sample

In [623]:
# Step 2: Define Data Transformations # not aplicable for this project 
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [624]:
# Step 3: Instantiate Your Custom Variable Input Dataset
csv_file_path = '/Users/alancangas/Xnet/test/x_log_training_output.csv'
custom_variable_input_dataset = CustomVariableInputDataset(csv_file_path)
#custom_variable_input_dataset_2 = CustomVariableInputDataset_2(csv_file_path)

In [625]:
print("Number of samples:", len(custom_variable_input_dataset))
print("First few samples:")
for i in range(1): 
    print('log and scaled')
    print(custom_variable_input_dataset[i])
    #print(custom_variable_input_dataset_2[i])

Number of samples: 3303106
First few samples:
log and scaled
[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [639]:
# Step 4: Split Data into Training and Testing Sets
X_train, X_test = train_test_split(custom_variable_input_dataset, test_size=0.2, random_state=42)

[0.9078262  0.9937367  0.9816858  0.96454465 0.9508282  0.9029752
 0.84939796 0.75768363 0.60696137 0.49136055 0.21487063 0.
 0.         0.        ]


In [627]:
# Define DataLoaders for both training and testing sets
batch_size = 32 #Control Batch size : 32
train_dataloader = DataLoader(X_train, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(X_test, batch_size=batch_size, shuffle=False)

In [628]:
# Step 5: Define the Autoencoder Model
class VariableInputAutoencoder(nn.Module):
    def __init__(self, input_size):
        super(VariableInputAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, 12),
            nn.ReLU(),
            nn.Linear(12, 8),
            nn.ReLU(),
            nn.Linear(8, 4),
            nn.ReLU(),
            nn.Linear(4, 2)
        )

        self.decoder = nn.Sequential(
            nn.Linear(2, 4),
            nn.ReLU(),
            nn.Linear(4, 8),
            nn.ReLU(),
            nn.Linear(8, 12),
            nn.ReLU(),
            nn.Linear(12, input_size),  # Output size must match input size
            nn.ReLU() # replace with ReLU for -log(x)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [629]:
# Step 6: Instantiate the Autoencoder Model with an Appropriate Input Size
input_size = len(custom_variable_input_dataset[0])  # Determine the input size dynamically
model = VariableInputAutoencoder(input_size)

In [630]:
# Step 7: Define Training Parameters and Train the Autoencoder
num_epochs = 10
learning_rate = 0.001 #control learning rate 0.001
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [631]:
for epoch in range(num_epochs):
    for batch in train_dataloader:
        optimizer.zero_grad()
        output = model(batch)
        loss = criterion(output, batch)
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {loss.item():.8f}')
    # Move this loop into the epochs to print both and compare
    # Testing loop
    test_loss = 0.0
    with torch.no_grad():
        for batch in test_dataloader:
            output = model(batch)
            loss = criterion(output, batch)
            test_loss += loss.item()

    print(f'Testing Loss: {test_loss / len(test_dataloader):.8f}')

Epoch [1/10], Training Loss: 0.28778756
Testing Loss: 0.28542992
Epoch [2/10], Training Loss: 0.24244355
Testing Loss: 0.23880636
Epoch [3/10], Training Loss: 0.17925207
Testing Loss: 0.16793109
Epoch [4/10], Training Loss: 0.15599847
Testing Loss: 0.15074926
Epoch [5/10], Training Loss: 0.14339384
Testing Loss: 0.14779282
Epoch [6/10], Training Loss: 0.13666169
Testing Loss: 0.14673367
Epoch [7/10], Training Loss: 0.14448139
Testing Loss: 0.14603965
Epoch [8/10], Training Loss: 0.14999712
Testing Loss: 0.14571310
Epoch [9/10], Training Loss: 0.15456361
Testing Loss: 0.14554310
Epoch [10/10], Training Loss: 0.14921314
Testing Loss: 0.14546254


In [632]:
for i in range(100,110):
    original_input = torch.from_numpy(custom_variable_input_dataset[i]) ## deprocess
    inverse_transformed_original_input = custom_variable_input_dataset.scaler.inverse_transform(original_input.detach().numpy().reshape([1,-1]))
    reconstructed_original_input = 10 ** inverse_transformed_original_input
    ae_reconstructed_input = model.forward(original_input)
    print('')
    print('---Processed Input---')
    print('')
    print(original_input)
    print('')
    print('---Reconstructed Processed Input---')
    print('')
    print(ae_reconstructed_input)
    #print('')
    #print('---Pre-processed Input---')
    #print('')
    #print(reconstructed_original_input)
   
    


---Processed Input---

tensor([0.7988, 0.8527, 1.0000, 0.8658, 0.8527, 0.8366, 0.7890, 0.7075, 0.5710,
        0.4677, 0.2131, 0.0000, 0.0000, 0.0000])

---Reconstructed Processed Input---

tensor([0.0000, 0.9590, 0.9908, 0.0000, 0.9488, 0.8881, 0.8264, 0.7289, 0.5629,
        0.0000, 0.1778, 0.0000, 0.0000, 0.0000], grad_fn=<ReluBackward0>)

---Processed Input---

tensor([0.7999, 0.8539, 1.0000, 0.8669, 0.8543, 0.8379, 0.7907, 0.7093, 0.5727,
        0.4697, 0.2149, 0.0000, 0.0000, 0.0000])

---Reconstructed Processed Input---

tensor([0.0000, 0.9587, 0.9908, 0.0000, 0.9495, 0.8890, 0.8278, 0.7308, 0.5651,
        0.0000, 0.1803, 0.0000, 0.0000, 0.0000], grad_fn=<ReluBackward0>)

---Processed Input---

tensor([0.8008, 0.8550, 1.0000, 0.8678, 0.8558, 0.8390, 0.7924, 0.7111, 0.5746,
        0.4718, 0.2169, 0.0000, 0.0000, 0.0000])

---Reconstructed Processed Input---

tensor([0.0000, 0.9584, 0.9909, 0.0000, 0.9502, 0.8899, 0.8292, 0.7328, 0.5673,
        0.0000, 0.1828, 0.0000, 0.0000,

In [648]:
# Desired lower dimensionality
n_components = 2 

# Extract the training data
X_train = [] # log # then standardize
for batch in train_dataloader:
    X_train.append(batch.numpy())

# Concatenate the training data batches into a single array
X_train = np.concatenate(X_train, axis=0)

# Extract the test data from DataLoader
X_test = [] # stay as it is
for batch in test_dataloader:
    X_test.append(batch.numpy())

# Concatenate the test data batches into a single array
X_test = np.concatenate(X_test, axis=0)
inverse_transformed_X_test = custom_variable_input_dataset.scaler.inverse_transform(X_test)
X_test = 10 ** inverse_transformed_X_test



# Create and fit PCA
pca = PCA(n_components=n_components)
pca.fit(X_train)

# Transform test data to the lower-dimensional space
data_pca = pca.transform(X_test)

# Reconstruct data from the lower-dimensional space
data_pca_reconstructed = pca.inverse_transform(data_pca)
# de standarized the reconstructed data and "de log it"
# Calculate PCA reconstruction error
pca_reconstruction_error = mean_squared_error(X_test, data_pca_reconstructed)
pca_rel_reconstruction_error = mean_squared_error(X_test, data_pca_reconstructed) / mean_squared_error(X_test, np.zeros(X_test.shape))
 
print(f"PCA Reconstruction Error: {pca_reconstruction_error:.8f}")
print(f"PCA Reconstruction Error: {pca_rel_reconstruction_error:.8f}")
# 2 test data, log , normal for comparizon
# log_X_test

PCA Reconstruction Error: 0.24286094
PCA Reconstruction Error: 7.02149324


In [646]:
# first step convert the data
# has to be normalized "standradize", why? 
# all my zeros in my data convert them into 10^-30
# log 10 
#column wise, 0 mean with standard deviation 
# Desired lower dimensionality 
# when interpreting the results "de-log it"
n_components = 2 

# Compute the covariance matrix
cov_matrix = np.cov(X_train, rowvar=False)

# Perform eigenvalue decomposition
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)

# Sort eigenvalues and eigenvectors in descending order
sorted_indices = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[sorted_indices]
eigenvectors = eigenvectors[:, sorted_indices]

# Select the top N principal components
selected_eigenvectors = eigenvectors[:, :n_components]

# Project the data onto the selected principal components
projected_data = np.dot(X_test, selected_eigenvectors)

# Reconstruct data from the lower-dimensional space
reconstructed_data = np.dot(projected_data, selected_eigenvectors.T)

# Calculate the reconstruction error
reconstruction_error = np.mean(np.square(X_test - reconstructed_data))

print(f"Reconstruction Error using {n_components} Principal Components: {reconstruction_error:.8f}")


Reconstruction Error using 2 Principal Components: 0.03227447


In [1]:
pwd

'/Users/alancangas'