In [1]:
#data processing imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm

#model imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [2]:
pcos_data = pd.read_csv("C:/Users/Mariana/UCSD/DSC180B/Causal-Discovery-on-Gut-Microbial-Data-for-Disease-Risk-Prediction/data/clean.csv")

In [None]:
pcos_data.head()

In [4]:
df = pcos_data.drop(columns=["Unnamed: 0", "region"])

In [5]:
X = df.drop(columns=["group"]).values

In [6]:
y = df['group'].values

Model Components

In [7]:
#Initialize Encoder f_enc to map X → [Z1, Z2]
class Encoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super(Encoder, self).__init__()
        self.fc_mu = nn.Linear(input_dim, latent_dim)
        self.fc_sigma = nn.Linear(input_dim, latent_dim)

    def forward(self, x):
        z_mu = self.fc_mu(x)
        z_sigma = torch.exp(0.5 * self.fc_sigma(x))
        z = z_mu + z_sigma * torch.randn_like(z_sigma)
        return z, z_mu, z_sigma

In [8]:
#Initialize Decoder f_dec to map [Z1, Z2] → X'
class Decoder(nn.Module):
    def __init__(self, latent_dim, output_dim):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(latent_dim, output_dim)

    def forward(self, z):
        return self.fc(z)

In [9]:
#Flow model
class FlowModel(nn.Module):
    def __init__(self, latent_dim):
        super(FlowModel, self).__init__()
        self.fc = nn.Linear(latent_dim, latent_dim)  #invertible layer
    
    def forward(self, z_s):
        return self.fc(z_s)

    def inverse(self, z_s):
        weight = self.fc.weight
        bias = self.fc.bias

        if z_s.shape[-1] != weight.shape[0]:
            raise ValueError(f"z_s dimensions {z_s.shape[-1]} do not match weight dimensions {weight.shape[0]}")

        weight_inv = torch.linalg.inv(weight)
        z_s_tilde = (z_s - bias) @ weight_inv.T
        return z_s_tilde


In [10]:
#Initialize clasifier f_cls to map Z1 → Y
class Classifier(nn.Module):
    def __init__(self, latent_dim, num_classes=2):
        super(Classifier, self).__init__()
        self.fc = nn.Linear(latent_dim, num_classes)

    def forward(self, z_c, z_s_tilde):
        z_combined = torch.cat([z_c, z_s_tilde], dim=-1)
        return self.fc(z_combined)

In [11]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

scaler = StandardScaler()
X = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [12]:
import torch
from torch.utils.data import DataLoader, TensorDataset

train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [13]:
#set dataset dimensions
input_dim = X_train.shape[1]
latent_dim = 50
z_dim_c = 10
z_dim_s = 40

In [14]:
#set training params
num_epochs = 20
alpha1 = 0.01
alpha2 = 0.1

In [None]:
#Training loop
encoder = Encoder(input_dim, latent_dim)
decoder = Decoder(latent_dim, input_dim)
flow_model = FlowModel(z_dim_s)
classifier = Classifier(latent_dim, num_classes=2)

# Optimizer and loss
optimizer = torch.optim.Adam(
    list(encoder.parameters()) + 
    list(decoder.parameters()) + 
    list(flow_model.parameters()) + 
    list(classifier.parameters()), lr=1e-3)

for epoch in range(num_epochs):
    total_loss = 0
    for x, y in train_loader:
        # Encode
        z, z_mu, z_sigma = encoder(x)
        z_c, z_s = z[:, :z_dim_c], z[:, z_dim_c:]
        
        # Decode
        x_recon = decoder(z)

        # Flow model
        z_s_tilde = flow_model.inverse(z_s)
        
        # Classification
        y_pred = classifier(z_c, z_s_tilde)
        
        # Losses
        recon_loss = F.mse_loss(x_recon, x)
        kl_loss = -0.5 * torch.mean(1 + z_sigma - z_mu**2 - torch.exp(z_sigma))
        cls_loss = F.cross_entropy(y_pred, y)
        ent_loss = -torch.mean(torch.softmax(y_pred, dim=-1) * torch.log_softmax(y_pred, dim=-1))
        
        loss = cls_loss + alpha1 * ent_loss + alpha2 * (recon_loss + kl_loss)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")


In [None]:
from sklearn.metrics import accuracy_score, roc_auc_score

encoder.eval()
classifier.eval()

all_preds = []
all_labels = []

with torch.no_grad():
    for x_batch, y_batch in test_loader:
        z, _, _ = encoder(x_batch)
        z_c, z_s = z[:, :z_dim_c], z[:, z_dim_c:]
        z_s_tilde = flow_model.inverse(z_s)
        y_pred = classifier(z_c, z_s_tilde)
        
        all_preds.append(torch.argmax(y_pred, dim=1).cpu().numpy())
        all_labels.append(y_batch.cpu().numpy())

all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)

accuracy = accuracy_score(all_labels, all_preds)
auc = roc_auc_score(all_labels, all_preds)

print(f"Accuracy: {accuracy:.4f}")
print(f"AUC: {auc:.4f}")