In [23]:
import pandas as pd
import plotly.graph_objects as go
import numpy as np

from sklearn.preprocessing import StandardScaler
from sklearn.cross_decomposition import CCA
from sklearn.datasets import load_diabetes

import os
import sys

notebook_dir = os.path.dirname(os.path.abspath("__file__"))
src_path = os.path.join(notebook_dir, "..", "code")
if src_path not in sys.path:
    sys.path.append(src_path)

src_path = os.path.join(notebook_dir, "..", "..")
if src_path not in sys.path:
    sys.path.append(src_path)

from missing import introduce_missing_values
from CCA import EM_for_PCCA, EM_for_PCCA_missing

In [24]:
# Load the diabetes dataset
data = load_diabetes()
X = pd.DataFrame(data.data, columns=data.feature_names)# Split data into two sets: Demographic (age, sex) and medical indicators (BMI, BP, etc.)
X1 = X[['age', 'sex']]  # Set 1: Demographic variables
X2 = X[['bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']]  # Set 2: Medical variables# Standardizing the features
scaler = StandardScaler()
X1 = scaler.fit_transform(X1)
X2 = scaler.fit_transform(X2)

print(data.feature_names)

['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']


In [25]:
nb_samples = 50
z = np.random.randn(nb_samples, 1)
W1 = np.random.randn(1, 10)
W2 = np.random.randn(1, 15)
X1 = z @ W1
X2 = z @ W2
X = np.concatenate((X1, X2), axis=1)


In [26]:
# Create a CCA instance
cca = CCA(n_components=1)  # We’ll find 2 canonical components for simplicity# Fit the model
cca.fit(X1, X2)# Transform the data based on the CCA
X1_c, X2_c = cca.transform(X1, X2)# Display the canonical correlations
print("Canonical correlations:")
print(cca.score(X1, X2))

Canonical correlations:
0.9663264951887851


In [27]:
# Get the canonical weights for each variable
print("Canonical weights for X1 (demographics):", cca.x_weights_)
print("Canonical weights for X2 (medical indicators):", cca.y_weights_)

Canonical weights for X1 (demographics): [[ 0.31622777]
 [ 0.31622777]
 [ 0.31622777]
 [-0.31622777]
 [ 0.31622777]
 [-0.31622777]
 [ 0.31622777]
 [ 0.31622777]
 [-0.31622777]
 [ 0.31622777]]
Canonical weights for X2 (medical indicators): [[ 0.25819889]
 [-0.25819889]
 [ 0.25819889]
 [-0.25819889]
 [-0.25819889]
 [ 0.25819889]
 [-0.25819889]
 [-0.25819889]
 [ 0.25819889]
 [ 0.25819889]
 [ 0.25819889]
 [ 0.25819889]
 [-0.25819889]
 [-0.25819889]
 [-0.25819889]]


In [28]:
def plot_CCA(X1_c, X2_c, save_path=None):
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=X1_c[:, 0],
        y=X2_c[:, 0],
        mode='markers',
        marker=dict(size=10, color='mediumblue'), 
        name='Points 2D'
    ))
    fig.update_layout(
        xaxis=dict(title= r"Projection of XA",range=[-1, 1]),
        yaxis=dict(title= r"Projection of XB",range=[-1, 1]),
        template="plotly_white",
        width=500,
        height=500,
        font=dict(size=15)
    )
    fig.show()
    
    if save_path:
        fig.write_image(save_path, scale=3)

In [29]:
X1_c /= np.linalg.norm(X1_c)
X2_c /= np.linalg.norm(X2_c)
plot_CCA(X1_c, X2_c, save_path="CCA.png")

In [30]:
d_A, d_B = 10, 15
W_0 = np.random.rand(d_A+d_B, 1)
phi_0 = np.eye(d_A+d_B)

W, phi, M = EM_for_PCCA(X, d_A, d_B, nb_components=1, W_0=W_0, phi_0=phi_0, max_iter = 10)

100%|██████████| 10/10 [00:00<00:00, 4429.98it/s]


In [31]:
def transform(X, M, nb_components, eps=1e-10):
    X1 = X[:, :d_A]
    X2 = X[:, d_A:]
    mu_1 = np.mean(X1, axis=0)
    mu_2 = np.mean(X2, axis=0)
    sigma_1 = np.cov((X1-mu_1).T)
    sigma_2 = np.cov((X2-mu_2).T)
    eigval1, eigvec1 = np.linalg.eigh(sigma_1)
    eigval2, eigvec2 = np.linalg.eigh(sigma_2)
    U1 = np.diag(1/np.sqrt(eigval1 + eps)) @ eigvec1
    U2 = np.diag(1/np.sqrt(eigval2+eps)) @ eigvec2
    # print(eigval1)
    # print(eigval2)
    M1, M2 = M, M

    proj1, proj2 = [], []
    for i in range(X.shape[0]):
        proj1.append(M1.T @ U1[:, :nb_components].T @ (X1[i] - mu_1))
        proj2.append(M2.T @ U2[:, :nb_components].T @ (X2[i] - mu_2))

    return proj1, proj2

In [32]:
X1_c, X2_c = transform(X, M, nb_components=1)
X1_c = np.array(X1_c)
X2_c = np.array(X2_c)

In [33]:
d_A, d_B = 10, 15
W_0 = np.random.rand(d_A+d_B, 1)
phi_0 = np.eye(d_A+d_B)

W, phi, M = EM_for_PCCA_missing(X, d_A, d_B, nb_components=1, W_0=W_0, phi_0=phi_0, max_iter = 10)

100%|██████████| 10/10 [00:00<00:00, 1286.20it/s]


In [34]:
X1_c, X2_c = transform(X, M, nb_components=1)
X1_c = np.array(X1_c) / np.linalg.norm(X1_c)
X2_c = np.array(X2_c) / np.linalg.norm(X2_c)
plot_CCA(X1_c, X2_c, save_path="PCCA.png")

In [35]:
missing_ratio = 0.70
X_missing = introduce_missing_values(X, missing_ratio)

d_A, d_B = 10, 15
W_0 = np.random.rand(d_A+d_B, 1)
phi_0 = np.eye(d_A+d_B)

W, phi, M = EM_for_PCCA_missing(X_missing, d_A, d_B, nb_components=1, W_0=W_0, phi_0=phi_0, max_iter = 10)

X1_c, X2_c = transform(X, M, nb_components=1)
X1_c = np.array(X1_c) / np.linalg.norm(X1_c)
X2_c = np.array(X2_c) / np.linalg.norm(X2_c)

plot_CCA(X1_c, X2_c, save_path=f"PCCA{missing_ratio*100:.0f}missing.png")

100%|██████████| 10/10 [00:00<00:00, 807.05it/s]


In [36]:
X_missing_filled = X_missing.copy()
X_missing_filled[np.isnan(X_missing_filled)] = np.mean(X_missing_filled[~np.isnan(X_missing_filled)], axis=0)
cca = CCA(n_components=1) 
cca.fit(X_missing_filled[:d_A].T, X_missing_filled[d_A:].T)# Transform the data based on the CCA
X1_c, X2_c = cca.transform(X_missing_filled[:d_A].T, X_missing_filled[d_A:].T)# Display the canonical correlations
X1_c /= np.linalg.norm(X1_c)
X2_c /= np.linalg.norm(X2_c)
plot_CCA(X1_c, X2_c, save_path=f"CCA{missing_ratio*100:.0f}missing.png")