In [1]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_diabetes
from idfc.idfc import run_idfc
from idfc.utils import compute_variable_correlations

In [2]:
# --- Load example dataset ---
X_raw = load_diabetes(as_frame=True).frame.drop(columns='target')
X = (X_raw - X_raw.mean()) / X_raw.std()

In [45]:
from varclushi import VarClusHi

resclv = VarClusHi(X,maxeigval2=1,maxclus=None)
resclv.varclus()

print(resclv.rsquare)

# 2. Récupérer l'affectation des variables à leur cluster
cluster_assignments = resclv.rsquare.Cluster  # Series avec index = noms des variables

# 3. Convertir en dictionnaire de liste
partition = {}
for label in cluster_assignments.unique():
    features = cluster_assignments[cluster_assignments == label].index.tolist()
    partition[f"cluster_{label}"] = features
partition

   Cluster Variable    RS_Own     RS_NC  RS_Ratio
0        0      sex  0.407090  0.060259  0.630929
1        0       s3  0.808504  0.145330  0.224058
2        0       s4  0.777680  0.380896  0.359099
3        1       s1  0.948331  0.211875  0.065559
4        1       s2  0.948331  0.181641  0.063137
5        2      age  0.306923  0.060562  0.737758
6        2      bmi  0.491944  0.141952  0.592107
7        2       bp  0.526635  0.073862  0.511117
8        2       s5  0.570794  0.251140  0.573146
9        2       s6  0.550914  0.140378  0.522422


{'cluster_0': [0, 1, 2], 'cluster_1': [3, 4], 'cluster_2': [5, 6, 7, 8, 9]}

In [62]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA

def compute_cov2(x, c):
    """Covariance au carré entre une variable x et un vecteur c."""
    return np.cov(x, c)[0, 1] ** 2

def refine_partition(X, init_partition, rho=0.3, max_iter=100, tol=1e-5):
    """
    Raffinement de la partition via CLV avec stratégie K+1.
    
    Paramètres :
    ------------
    X : DataFrame (n échantillons x p variables)
    init_partition : dict[str, list[str]]
        Dictionnaire {cluster_k : [features]} initial (souvent issu de VARCLUS)
    rho : float
        Seuil de corrélation au carré pour affectation dans un vrai cluster. Sinon → bruit.
    max_iter : int
        Nombre maximal d’itérations
    tol : float
        Tolérance pour le critère de convergence
    
    Retourne :
    ----------
    clusters : dict[str, list[str]] avec un cluster spécial 'noise'
    components : dict[str, np.ndarray] : composantes principales normalisées de chaque cluster
    """
    p_names = X.columns
    X = X.copy()
    n_features = len(p_names)
    
    clusters = init_partition.copy()
    noise_cluster = "noise"
    clusters[noise_cluster] = []
    
    prev_assignments = None
    
    for key, clus in clusters.items():
        clusters[key] = p_names[clus]
    # print(p_names[clus])
        
    for iteration in range(max_iter):
        # Étape 1 : calcul des premières composantes pour chaque cluster
        components = {}
        for k, vars_k in clusters.items():
            if k == noise_cluster or len(vars_k) < 2:
                continue
            pca = PCA(n_components=1)
            # print(vars_k)
            components[k] = pca.fit(X[vars_k]).components_[0]
        # Étape 2 : assignation avec règle K+1
        new_clusters = {k: [] for k in clusters}
        for j in p_names:
            best_k = None
            best_score = -np.inf
            xj = X[j].values
            for k, comp in components.items():
                c_vec = X[clusters[k]].values @ comp
                score = compute_cov2(xj, c_vec)
                if score > best_score:
                    best_score = score
                    best_k = k
            var_j = np.var(xj)
            if best_score >= rho**2 * var_j:
                new_clusters[best_k].append(j)
            else:
                new_clusters[noise_cluster].append(j)
        # Vérifier convergence
        current_assignments = [sorted(v) for v in new_clusters.values()]
        if prev_assignments is not None:
            diffs = [set(a) != set(b) for a, b in zip(prev_assignments, current_assignments)]
            if not any(diffs):
                break
        prev_assignments = current_assignments
        clusters = new_clusters

        # Recalcul final des composantes (hors clusters de taille < 2 sauf "noise")
    final_components = {}
    filtered_clusters = {}

    for k, vars_k in clusters.items():
        if len(vars_k) == 0 and k != "noise":
            continue  # Supprimer les clusters vides (sauf "noise")
        if k != "noise" and len(vars_k) >= 2:
            pca = PCA(n_components=1)
            final_components[k] = pca.fit_transform(X[vars_k])
        filtered_clusters[k] = vars_k  # inclut aussi "noise", même vide

    return filtered_clusters, final_components


In [79]:
partition.items()

dict_items([('cluster_0', [0, 1, 2]), ('cluster_1', [3, 4]), ('cluster_2', [5, 6, 7, 8, 9])])

In [92]:
partition

{'cluster_0': [0, 1, 2], 'cluster_1': [3, 4], 'cluster_2': [5, 6, 7, 8, 9]}

In [64]:
clusters, pca_result = refine_partition(X=X, 
                 init_partition=partition, 
                 rho=0.3, 
                 max_iter=100, 
                 tol=1e-5)

In [66]:
pca_result['cluster_0'].shape

(442, 1)

In [35]:
import numpy as np
import pandas as pd

def compute_covariance_score(x, c):
    """
    Renvoie la covariance au carré entre une variable x et une composante c.
    """
    return np.cov(x, c)[0, 1] ** 2

# def compute_variable_correlations(X, clusters, components):
#     """
#     Calcule les corrélations (r²) entre chaque variable d’un cluster et la composante principale du cluster.
    
#     Paramètres :
#     ------------
#     X : pd.DataFrame, (n x p) jeu de données
#     clusters : dict[str, list[str]] : dictionnaire des clusters
#     components : dict[str, np.ndarray] : composante principale de chaque cluster
    
#     Retourne :
#     ----------
#     result : dict[str, pd.DataFrame] : clé = nom du cluster, valeur = tableau avec corrélations r²
#     """
#     results = {}
#     for k, features in clusters.items():
#         if k not in components or len(features) < 2:
#             continue
#         c_vec = X[features] @ components[k]
#         corr_data = {}
#         for f in features:
#             x = X[f]
#             r = np.corrcoef(x, c_vec)[0, 1]
#             corr_data[f] = r**2
#         df = pd.DataFrame.from_dict(corr_data, orient='index', columns=['r²'])
#         df.sort_values(by='r²', ascending=False, inplace=True)
#         results[k] = df
#     return results

def compute_variable_correlations(X, clusters, components):
    if components is None:
        raise ValueError("Erreur : 'components' est None. Assure-toi de récupérer la sortie complète de refine_partition() ou run_idfc().")

    results = {}
    for k, features in clusters.items():
        if k not in components or len(features) < 2:
            continue
        c_vec = X[features] @ components[k]
        corr_data = {}
        for f in features:
            x = X[f]
            r = np.corrcoef(x, c_vec)[0, 1]
            corr_data[f] = r**2
        df = pd.DataFrame.from_dict(corr_data, orient='index', columns=['r²'])
        df.sort_values(by='r²', ascending=False, inplace=True)
        results[k] = df
    return results

In [32]:
clusters

In [40]:
# --- Run IDFC with noise cluster (rho=0.3) ---
selected_features, pca_cluster = run_idfc(X, rho=0.3, verbose=True)

print("🎯 Selected representative features:")
print(selected_features)

# --- Compute correlation tables ---
# correlations = compute_variable_correlations(X, clusters_c, components=None)

# # --- Export tables to CSV ---
# for cluster, df in correlations.items():
#     filename = f"cluster_{cluster}_correlations.csv"
#     df.to_csv(filename)
#     print(f"✅ Cluster '{cluster}' saved to {filename}")


Étape 1 : Initialisation (VARCLUS)...
Étape 2 : Raffinement (CLV k+1)...
Étape 3 : Sélection des variables interprétables...
🎯 Selected representative features:
['sex', 's4', None]


In [41]:
pca_cluster

{'cluster_0': ['age', 'sex'],
 'cluster_2': ['bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6'],
 'noise': []}