In [None]:
import scarches as sca
import scanpy as sc
import numpy as np

# Specify the path to the dataset and model directory
dataset = "forebrain"
model = "expimap"
model_path = f"{dataset}/model"

# Load the saved EXPIMAP model
adata_path = f"{dataset}/{model}_{dataset}.h5ad"
adata = sc.read_h5ad(adata_path)

# Assuming 'MuMs' and other necessary data are in `adata`
data = adata.obsm["MuMs"]
adata_expimap = sc.AnnData(X=data)
adata_expimap.uns["terms"] = adata.uns["terms"].copy()
adata_expimap.obs = adata.obs.copy()
mask = adata.varm["I"]
adata_expimap.varm["I"] = np.concatenate([mask, mask], axis=0)

# Add a dummy 'study' column to adata_expimap.obs as required by EXPIMAP
adata_expimap.obs["study"] = "0"

# Load the saved EXPIMAP model
intr_cvae = sca.models.EXPIMAP.load(model_path, adata=adata_expimap)

# Now the model is loaded, and you can perform further analysis or use its methods


In [None]:
import numpy as np

# Get the number of observations (cells) in the dataset
n_cells = adata_expimap.n_obs

# Define the fraction used for training
train_frac = 0.9

# Set the seed if one was used during training to ensure reproducibility
np.random.seed(2020)  # Seed used during training

# Generate a shuffled list of indices
all_indices = np.arange(n_cells)
np.random.shuffle(all_indices)

# Determine the number of training samples
n_train = int(n_cells * train_frac)

# Split the indices into training and testing sets
train_indices = all_indices[:n_train]
test_indices = all_indices[n_train:]

print(f"Training indices: {train_indices}")
print(f"Test indices: {test_indices}")


In [None]:
import numpy as np
import torch

# Step 1: Extract the input data for the test indices
input_data_test = adata_expimap.X[test_indices, :]  # Convert to dense if necessary

# Step 2: Determine the device (CPU or GPU) from the model's parameters
device = next(intr_cvae.model.parameters()).device  # Check the device of the model's parameters

# Step 3: Use the model to reconstruct the data
# Pass the input data through the model's decoder
with torch.no_grad():  # Ensure the model doesn't compute gradients
    input_tensor = torch.tensor(input_data_test).float().to(device)  # Ensure tensor is on the correct device
    reconstructed_data_test = intr_cvae.model.decoder(input_tensor).cpu().numpy()  # Move the output back to CPU if necessary

# Step 4: Compute MSE across genes for each cell
mse_per_cell_test = np.mean((input_data_test - reconstructed_data_test) ** 2, axis=1)

# Print or store the MSE for further analysis
print("MSE per cell:", mse_per_cell_test)


In [None]:
print(f"Input data shape: {input_data_test.shape}")
print(f"Model input dimension: {intr_cvae.model.input_dim}")


In [None]:
adata.varm

In [None]:
print(f"Mask shape: {adata.varm['I'].shape}")
print(f"Mask shape: {intr_cvae.model.mask.shape}")


In [None]:
import pickle

# Load the model's attributes dictionary to see the expected conditions
with open(f"{model_path}/attr.pkl", "rb") as f:
    attr_dict = pickle.load(f)

# Print the expected conditions
print("Expected conditions:", attr_dict['conditions_'])


In [None]:
import scarches as sca
import scanpy as sc
import numpy as np
import torch
import pandas as pd

# Load the saved EXPIMAP model
dataset = "forebrain"
model_path = f"{dataset}/model"
adata_path = f"{dataset}/expimap_{dataset}.h5ad"
adata = sc.read_h5ad(adata_path)

data = adata.obsm["MuMs"]
adata_expimap = sc.AnnData(X=data)
adata_expimap.uns["terms"] = adata.uns["terms"].copy()
adata_expimap.obs = adata.obs.copy()
mask = adata.varm["I"]
adata_expimap.varm["I"] = np.concatenate([mask, mask], axis=0)

# Initialize 'study' column in adata_expimap.obs to match expected condition
adata_expimap.obs["study"] = "0"  # Set to string '0' to match expected condition during training

# Ensure 'study' column is of string type
adata_expimap.obs["study"] = adata_expimap.obs["study"].astype(str)

# Load the model ensuring the condition is already set in adata
intr_cvae = sca.models.EXPIMAP.load(model_path, adata=adata_expimap)

# Get the test indices from your earlier split
n_cells = adata_expimap.n_obs
train_frac = 0.9
np.random.seed(2020)
all_indices = np.arange(n_cells)
np.random.shuffle(all_indices)
n_train = int(n_cells * train_frac)
train_indices = all_indices[:n_train]
test_indices = all_indices[n_train:]

# Extract test input data
input_data_test = adata_expimap.X[test_indices, :]

# Get the device
device = next(intr_cvae.model.parameters()).device

# Extract the condition labels for the test indices as a NumPy array
condition_test = adata_expimap.obs["study"].values[test_indices]

# Create a mapping from condition labels to indices
condition_to_label = {c: i for i, c in enumerate(intr_cvae.conditions_)}
# Map the condition labels in condition_test to their corresponding indices
condition_test_indices = np.array([condition_to_label[c] for c in condition_test])

# Forward pass using the latent variables
with torch.no_grad():
    # Get the latent representation by passing input data and condition labels
    z = intr_cvae.get_latent(x=input_data_test, c=condition_test)

    # Convert z to a PyTorch tensor and move it to the correct device
    z = torch.tensor(z).float().to(device)

    # Prepare the batch tensor for the decoder
    batch = torch.tensor(condition_test_indices).to(device)

    # Pass the latent variables to the decoder to get the reconstructed data
    reconstructed_data_test, _ = intr_cvae.model.decoder(z, batch=batch)
    reconstructed_data_test = reconstructed_data_test.cpu().numpy()

# Compute MSE across genes for each cell
mse_per_cell_test = np.mean((input_data_test - reconstructed_data_test) ** 2, axis=1)
print("MSE per cell:", mse_per_cell_test)


In [None]:
adata.obs

In [None]:
intr_cvae.trainer

In [None]:
mse_per_cell_test.shape

In [None]:
reconstructed_data_test

In [None]:
print(f"Input data shape: {input_data_test.shape}")  # Should be (N, 4898)
print(f"Mask shape: {adata_expimap.varm['I'].shape}")  # Should align with features
print(f"Model input dimension: {intr_cvae.model.input_dim}")



In [None]:
# Forward pass with the condition
with torch.no_grad():
    input_tensor = torch.tensor(input_data_test).float().to(device)
    output = intr_cvae.model(input_tensor, batch=condition_test)  # Pass the condition
    
    # Check what the output is by inspecting each element
    print(f"Output: {output}")

    # If we expect reconstructed data, let's inspect the individual components of the tuple
    for i, part in enumerate(output):
        print(f"Part {i} - shape: {part.shape if isinstance(part, torch.Tensor) else part}")


In [None]:
# Forward pass using the latent variables
with torch.no_grad():
    # Assuming `get_latent()` encodes the input data
    z = intr_cvae.get_latent(input_data_test)  # Get latent representation
    
    # Now pass the latent variables to the decoder to get the reconstructed data
    reconstructed_data_test, _ = intr_cvae.model.decoder(z, batch=condition_test)
    reconstructed_data_test = reconstructed_data_test.cpu().numpy()

# Compute MSE across genes for each cell
mse_per_cell_test = np.mean((input_data_test - reconstructed_data_test) ** 2, axis=1)
print("MSE per cell:", mse_per_cell_test)
