In [1]:
# Necessary imports
import numpy as np

def histogram_discretize(target, num_bins=20):
    """
    Discretization based on histograms.
    Converts continuous representations into discrete bins.

    Args:
    - target (np.ndarray): The continuous representation array to discretize.
    - num_bins (int): The number of bins to use for discretization.

    Returns:
    - discretized (np.ndarray): Discretized version of the input array.
    """
    discretized = np.zeros_like(target)
    # Iterate over each dimension of the input array and apply histogram-based discretization
    for i in range(target.shape[0]):
        # Use np.digitize to assign bin numbers
        discretized[i, :] = np.digitize(target[i, :], np.histogram(target[i, :], num_bins)[1][:-1])

    return discretized

# # Test the function with a sample input
# test_array = np.random.rand(5, 100)  # Random array for testing purposes
# discretized_array = histogram_discretize(test_array, num_bins=10)

# discretized_array.shape, discretized_array  # Display the shape and content of the discretized array


In [2]:
import numpy as np
from sklearn.metrics import mutual_info_score

def discrete_mutual_info(mus, ys):
    """
    Compute discrete mutual information.
    """
    num_codes = mus.shape[0]
    num_factors = ys.shape[0]
    m = np.zeros([num_codes, num_factors])
    for i in range(num_codes):
        for j in range(num_factors):
            m[i, j] = mutual_info_score(ys[j, :], mus[i, :])
    return m

In [3]:
import numpy as np
from sklearn.metrics import mutual_info_score

def discrete_entropy(ys):
    """
    Compute discrete mutual information.
    """
    num_factors = ys.shape[0]
    h = np.zeros(num_factors)
    for j in range(num_factors):
        h[j] = mutual_info_score(ys[j, :], ys[j, :])
    return h


In [4]:
def compute_mig(mus_train, ys_train, num_bins=20):
    """ Computes the Mutual Information Gap (MIG) score.
    Args:
    - mus_train: Latent representations, numpy array of shape (num_latents, num_samples).
    - ys_train: True factors, numpy array of shape (num_factors, num_samples).
    - num_bins: Number of bins for discretization (default is 20).
    Returns:
    - mig_score: The computed MIG score.
    """
    # Discretize the latent representations
    discretized_mus = histogram_discretize(mus_train, num_bins=num_bins)

    # Compute mutual information matrix
    mutual_info_matrix = discrete_mutual_info(discretized_mus, ys_train)

    # Compute entropy for each factor
    entropy_values = discrete_entropy(ys_train)

    # Compute the MIG score
    mig_scores = np.zeros(mutual_info_matrix.shape[1])
    for k in range(mutual_info_matrix.shape[1]):
        mi_k = mutual_info_matrix[:, k]
        top_mi = np.max(mi_k)
        top_mi_idx = np.argmax(mi_k)
        second_mi = np.max(mi_k[np.arange(len(mi_k)) != top_mi_idx])
        mig_scores[k] = (top_mi - second_mi) / entropy_values[k]

    mig_score = np.mean(mig_scores)
    return mig_score

In [5]:
import pandas as pd

# Load the dataset from the GitHub repository
url = 'https://raw.githubusercontent.com/gregversteeg/LinearCorex/master/tests/data/test_big5.csv'
df = pd.read_csv(url)

# Display basic information about the dataset
print("Number of instances in the dataset:", df.shape[0])
print("Number of columns in the dataset:", df.shape[1])
print("\nFirst 5 rows of the dataset:")
print(df.head())

# Display additional information
print("\nData Types and Non-Null Counts:")
print(df.info())

print("\nSummary Statistics of the Dataset:")
print(df.describe())

Number of instances in the dataset: 2000
Number of columns in the dataset: 50

First 5 rows of the dataset:
   blue_q0  red_q1  green_q2  purple_q3  q4  blue_q5  red_q6  green_q7  \
0        2       0         3          1   4        1       4         1   
1        2       0         1          2   2        1       4         3   
2        3       0         2          1   3        1       4         3   
3        2       0         1          1   1        0       4         1   
4        2       0         1          1   3        0       4         3   

   purple_q8  q9  ...  blue_q40  red_q41  green_q42  purple_q43  q44  \
0          2   2  ...         3        3          3           2    3   
1          3   1  ...         2        3          2           2    3   
2          3   0  ...         4        4          2           1    4   
3          3   1  ...         1        2          2           1    3   
4          2   0  ...         3        4          1           3    4   

   blue_q45  r

In [6]:
# Identify column prefixes for each true factor
factor_columns = {
    'Factor1': [col for col in df.columns if col.startswith('blue')],
    'Factor2': [col for col in df.columns if col.startswith('green')],
    'Factor3': [col for col in df.columns if col.startswith('purple')],
    'Factor4': [col for col in df.columns if col.startswith('red')],
    'Factor5': [col for col in df.columns if col.startswith('q')]
}

# Calculate true factors by summing the respective columns
true_factors = pd.DataFrame()
for factor_name, columns in factor_columns.items():
    true_factors[factor_name] = df[columns].sum(axis=1)

# Display the first few rows of the calculated true factors
print(true_factors.head())

   Factor1  Factor2  Factor3  Factor4  Factor5
0       20       21       21       22       28
1       21       20       21       26       23
2       23       20       17       22       25
3       17       15       11       22       15
4       20       14       24       23       24


In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Define the Autoencoder architecture
class Autoencoder(nn.Module):
    def __init__(self, input_dim=50, latent_dim=5):
        super(Autoencoder, self).__init__()

        # Encoder: Compress the input to the latent space
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 32),  # First hidden layer
            nn.ReLU(),  # Activation function
            nn.Linear(32, 16),  # Second hidden layer
            nn.ReLU(),
            nn.Linear(16, latent_dim)  # Latent layer
        )

        # Decoder: Reconstruct the input from the latent space
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 16),  # First hidden layer
            nn.ReLU(),
            nn.Linear(16, 32),  # Second hidden layer
            nn.ReLU(),
            nn.Linear(32, input_dim),  # Output layer
            nn.Sigmoid()  # Output activation to bring the reconstructed values between 0 and 1
        )

    def forward(self, x):
        # Encode input
        latent = self.encoder(x)
        # Decode input from latent representation
        reconstructed = self.decoder(latent)
        return reconstructed

# Create an instance of the Autoencoder model
autoencoder = Autoencoder()

# Display the architecture
print(autoencoder)


Autoencoder(
  (encoder): Sequential(
    (0): Linear(in_features=50, out_features=32, bias=True)
    (1): ReLU()
    (2): Linear(in_features=32, out_features=16, bias=True)
    (3): ReLU()
    (4): Linear(in_features=16, out_features=5, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=5, out_features=16, bias=True)
    (1): ReLU()
    (2): Linear(in_features=16, out_features=32, bias=True)
    (3): ReLU()
    (4): Linear(in_features=32, out_features=50, bias=True)
    (5): Sigmoid()
  )
)


In [8]:
df = df / 4.0
print(df.head())

   blue_q0  red_q1  green_q2  purple_q3    q4  blue_q5  red_q6  green_q7  \
0     0.50     0.0      0.75       0.25  1.00     0.25     1.0      0.25   
1     0.50     0.0      0.25       0.50  0.50     0.25     1.0      0.75   
2     0.75     0.0      0.50       0.25  0.75     0.25     1.0      0.75   
3     0.50     0.0      0.25       0.25  0.25     0.00     1.0      0.25   
4     0.50     0.0      0.25       0.25  0.75     0.00     1.0      0.75   

   purple_q8    q9  ...  blue_q40  red_q41  green_q42  purple_q43   q44  \
0       0.50  0.50  ...      0.75     0.75       0.75        0.50  0.75   
1       0.75  0.25  ...      0.50     0.75       0.50        0.50  0.75   
2       0.75  0.00  ...      1.00     1.00       0.50        0.25  1.00   
3       0.75  0.25  ...      0.25     0.50       0.50        0.25  0.75   
4       0.50  0.00  ...      0.75     1.00       0.25        0.75  1.00   

   blue_q45  red_q46  green_q47  purple_q48   q49  
0      0.25     1.00       1.00        0

In [9]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Convert the DataFrame to a NumPy array
data_array = df.to_numpy()

# Convert the data to a PyTorch tensor
data_tensor = torch.tensor(data_array, dtype=torch.float32)

# Create a PyTorch dataset
dataset = TensorDataset(data_tensor)

# Create a DataLoader for the dataset
batch_size = 64  # You can adjust the batch size as needed
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Display the shape of the tensor to verify
print(f"Data tensor shape: {data_tensor.shape}")
print(f"Number of batches: {len(dataloader)}")


Data tensor shape: torch.Size([2000, 50])
Number of batches: 32


In [10]:
# Define the loss function and optimizer for training
loss_function = nn.MSELoss()  # Mean Squared Error loss for reconstruction
optimizer = optim.Adam(autoencoder.parameters(), lr=1e-3)  # Adam optimizer with a learning rate of 0.001

# Training loop for the autoencoder
num_epochs = 400  # You can adjust the number of epochs as needed

for epoch in range(num_epochs):
    epoch_loss = 0.0  # To accumulate the loss for the current epoch

    for data in dataloader:
        inputs = data[0]  # Get the input data

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass: Encode and decode the input data
        outputs = autoencoder(inputs)

        # Compute the reconstruction loss
        loss = loss_function(outputs, inputs)

        # Backward pass: Compute gradients
        loss.backward()

        # Update the weights
        optimizer.step()

        # Accumulate the loss for the current epoch
        epoch_loss += loss.item()

    # Print the loss for each epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss / len(dataloader):.4f}")

print("Training complete!")

Epoch [1/400], Loss: 0.1125
Epoch [2/400], Loss: 0.0945
Epoch [3/400], Loss: 0.0873
Epoch [4/400], Loss: 0.0836
Epoch [5/400], Loss: 0.0809
Epoch [6/400], Loss: 0.0794
Epoch [7/400], Loss: 0.0756
Epoch [8/400], Loss: 0.0682
Epoch [9/400], Loss: 0.0638
Epoch [10/400], Loss: 0.0609
Epoch [11/400], Loss: 0.0595
Epoch [12/400], Loss: 0.0594
Epoch [13/400], Loss: 0.0588
Epoch [14/400], Loss: 0.0586
Epoch [15/400], Loss: 0.0582
Epoch [16/400], Loss: 0.0579
Epoch [17/400], Loss: 0.0578
Epoch [18/400], Loss: 0.0573
Epoch [19/400], Loss: 0.0572
Epoch [20/400], Loss: 0.0566
Epoch [21/400], Loss: 0.0564
Epoch [22/400], Loss: 0.0559
Epoch [23/400], Loss: 0.0552
Epoch [24/400], Loss: 0.0539
Epoch [25/400], Loss: 0.0529
Epoch [26/400], Loss: 0.0527
Epoch [27/400], Loss: 0.0523
Epoch [28/400], Loss: 0.0519
Epoch [29/400], Loss: 0.0516
Epoch [30/400], Loss: 0.0515
Epoch [31/400], Loss: 0.0513
Epoch [32/400], Loss: 0.0514
Epoch [33/400], Loss: 0.0510
Epoch [34/400], Loss: 0.0514
Epoch [35/400], Loss: 0

In [11]:
# Function to get latent representations from input data
def get_latent_representation(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    latent_representations = []

    # Disable gradient calculation for inference
    with torch.no_grad():
        for data in dataloader:
            inputs = data[0]  # Get the input data

            # Encode the input data to get the latent representation
            latent = model.encoder(inputs)

            # Collect the latent representations
            latent_representations.append(latent)

    # Concatenate all latent representations into a single tensor
    latent_representations = torch.cat(latent_representations, dim=0)

    return latent_representations

# Get the latent representations using the trained autoencoder
latent_representations = get_latent_representation(autoencoder, dataloader)

# Convert to NumPy array and transpose for MIG calculation
latent_representations_np = latent_representations.numpy().T  # Shape will be (latent_dim, num_samples)

print("Latent representations shape:", latent_representations_np.shape)

Latent representations shape: (5, 2000)


In [12]:
# Convert true factors to numpy array and transpose to match shape (num_factors, num_samples)
true_factors_np = true_factors.to_numpy().T  # Shape will be (num_factors, num_samples)

# Compute MIG score between latent representations and true factors
mig_score = compute_mig(latent_representations_np, true_factors_np)

# Print the MIG score
print(f"MIG Score between latent representations and true factors: {mig_score:.4f}")

MIG Score between latent representations and true factors: 0.0014
