# Clustering-Based Regime Identification Methods

This notebook implements four clustering approaches for macroeconomic regime detection:

1. Fuzzy C-Means Clustering
2. Modified K-Means (Oliveira et al., 2025)
3. Vanilla K-Means with Probabilistic Assignment
4. Gaussian Mixture Model (GMM)

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from scipy.spatial.distance import cdist
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

## Load Preprocessed Data

In [None]:
def transform_series(x, tcode):
    if tcode == 1: return x
    elif tcode == 2: return x.diff()
    elif tcode == 3: return x.diff().diff()
    elif tcode == 4: return np.log(x)
    elif tcode == 5: return np.log(x).diff()
    elif tcode == 6: return np.log(x).diff().diff()
    elif tcode == 7: return x.pct_change()
    else: raise ValueError(f"Unknown tcode: {tcode}")

def load_data(filepath='../data/macro_dataset.csv', start_date='1962-07-01'):
    data = pd.read_csv(filepath, skiprows=[1], index_col=0)
    data.columns = [c.upper() for c in data.columns]
    data = data.loc[pd.notna(data.index), :]
    data.index = pd.date_range(start="1959-01-01", freq="MS", periods=len(data))
    
    tcodes = pd.read_csv(filepath, nrows=1, index_col=0)
    tcodes.columns = [c.upper() for c in tcodes.columns]
    
    data = data.apply(lambda x: transform_series(x, tcodes[x.name].item()))
    data = data.dropna(axis=1, subset=[pd.Timestamp(start_date)])
    data = data.fillna(method='ffill').dropna()
    data = data[data.index >= start_date]
    
    scaler = StandardScaler()
    data_std = pd.DataFrame(scaler.fit_transform(data), index=data.index, columns=data.columns)
    return data_std

df = load_data()
X = df.values
dates = df.index
T, p = X.shape

print(f"Data loaded: {T} observations x {p} features")
print(f"Date range: {dates[0].strftime('%Y-%m')} to {dates[-1].strftime('%Y-%m')}")

In [None]:
K = 4
print(f"Target number of regimes: K = {K}")

## 1. Fuzzy C-Means Clustering

Fuzzy C-Means allows each observation to belong to multiple clusters with varying degrees of membership.

**Objective function:**
$$\min_{\{c_i\}, \{w_{i,t}\}} \sum_{t=1}^{T} \sum_{i=1}^{K} w_{i,t}^m \|x_t - c_i\|^2$$

subject to $\sum_{i=1}^{K} w_{i,t} = 1$ and $w_{i,t} \geq 0$.

In [None]:
class FuzzyCMeans:
    """
    Fuzzy C-Means clustering implementation.
    
    Parameters:
    -----------
    n_clusters : int
        Number of clusters
    m : float
        Fuzziness parameter (m > 1). Higher values = softer assignments.
    """
    
    def __init__(self, n_clusters=4, m=2.0, max_iter=300, tol=1e-6, random_state=42):
        self.n_clusters = n_clusters
        self.m = m
        self.max_iter = max_iter
        self.tol = tol
        self.random_state = random_state
        self.centers_ = None
        self.membership_ = None
        self.n_iter_ = 0
        
    def fit(self, X):
        np.random.seed(self.random_state)
        n_samples, n_features = X.shape
        
        # Initialize membership randomly
        U = np.random.rand(n_samples, self.n_clusters)
        U = U / U.sum(axis=1, keepdims=True)
        
        for iteration in range(self.max_iter):
            U_old = U.copy()
            
            # Update centroids
            Um = U ** self.m
            self.centers_ = (Um.T @ X) / Um.sum(axis=0, keepdims=True).T
            
            # Update membership
            distances = cdist(X, self.centers_, metric='euclidean')
            distances = np.maximum(distances, 1e-10)
            
            power = 2 / (self.m - 1)
            U = 1 / (distances ** power)
            U = U / U.sum(axis=1, keepdims=True)
            
            self.n_iter_ = iteration + 1
            
            # Check convergence
            if np.linalg.norm(U - U_old) < self.tol:
                print(f"Converged after {self.n_iter_} iterations")
                break
        
        self.membership_ = U
        return self
    
    def predict_proba(self, X):
        """Return soft membership weights"""
        distances = cdist(X, self.centers_, metric='euclidean')
        distances = np.maximum(distances, 1e-10)
        power = 2 / (self.m - 1)
        U = 1 / (distances ** power)
        return U / U.sum(axis=1, keepdims=True)
    
    def predict(self, X):
        """Return hard labels"""
        return np.argmax(self.predict_proba(X), axis=1)

In [None]:
# Fit Fuzzy C-Means
print("Fitting Fuzzy C-Means...")
fcm = FuzzyCMeans(n_clusters=K, m=2.0, random_state=42)
fcm.fit(X)

fcm_soft = fcm.predict_proba(X)
fcm_hard = fcm.predict(X)

print(f"\nMembership matrix shape: {fcm_soft.shape}")
print(f"\nRegime distribution:")
unique, counts = np.unique(fcm_hard, return_counts=True)
for r, c in zip(unique, counts):
    print(f"  Regime {r}: {c} months ({100*c/T:.1f}%)")

In [None]:
# Check membership values
print("Sample soft assignments (first 10 periods):")
print(pd.DataFrame(fcm_soft[:10], columns=[f'R{i}' for i in range(K)]).round(3))

## 2. Modified K-Means (Oliveira et al., 2025)

Two-step approach:
1. Identify "atypical" periods (outliers) based on distance from cluster centers
2. Fit k-means on typical periods only, then assign all periods probabilistically

In [None]:
class ModifiedKMeans:
    """
    Modified K-Means with atypical period detection.
    Based on Oliveira et al. (2025).
    """
    
    def __init__(self, n_clusters=4, atypical_threshold=2.0, random_state=42):
        self.n_clusters = n_clusters
        self.atypical_threshold = atypical_threshold
        self.random_state = random_state
        self.kmeans_ = None
        self.centers_ = None
        self.atypical_mask_ = None
        
    def fit(self, X):
        n_samples = X.shape[0]
        
        print("  Step 1: Initial k-means for outlier detection...")
        initial_km = KMeans(n_clusters=self.n_clusters, random_state=self.random_state, n_init=10)
        initial_labels = initial_km.fit_predict(X)
        
        # Compute distances to assigned centers
        distances = np.array([
            np.linalg.norm(X[i] - initial_km.cluster_centers_[initial_labels[i]])
            for i in range(n_samples)
        ])
        
        # Flag atypical periods
        mean_d, std_d = distances.mean(), distances.std()
        self.atypical_mask_ = distances > (mean_d + self.atypical_threshold * std_d)
        n_atypical = self.atypical_mask_.sum()
        print(f"  Found {n_atypical} atypical periods ({100*n_atypical/n_samples:.1f}%)")
        
        # Re-fit on typical periods only
        print("  Step 2: Re-fitting on typical periods...")
        X_typical = X[~self.atypical_mask_]
        self.kmeans_ = KMeans(n_clusters=self.n_clusters, random_state=self.random_state, n_init=10)
        self.kmeans_.fit(X_typical)
        self.centers_ = self.kmeans_.cluster_centers_
        
        return self
    
    def predict_proba(self, X):
        """Inverse-distance based probability assignment"""
        distances = cdist(X, self.centers_, metric='euclidean')
        distances = np.maximum(distances, 1e-10)
        inv_dist = 1 / distances
        return inv_dist / inv_dist.sum(axis=1, keepdims=True)
    
    def predict(self, X):
        return np.argmax(self.predict_proba(X), axis=1)
    
    def get_atypical_periods(self, dates):
        """Return dates of atypical periods"""
        return dates[self.atypical_mask_]

In [None]:
# Fit Modified K-Means
print("Fitting Modified K-Means...")
mkm = ModifiedKMeans(n_clusters=K, atypical_threshold=2.0, random_state=42)
mkm.fit(X)

mkm_soft = mkm.predict_proba(X)
mkm_hard = mkm.predict(X)

# Show atypical periods
atypical_dates = mkm.get_atypical_periods(dates)
print(f"\nAtypical periods detected:")
for d in atypical_dates[:10]:
    print(f"  {d.strftime('%Y-%m')}")
if len(atypical_dates) > 10:
    print(f"  ... and {len(atypical_dates)-10} more")

In [None]:
# Regime distribution
print("\nRegime distribution (Modified K-Means):")
unique, counts = np.unique(mkm_hard, return_counts=True)
for r, c in zip(unique, counts):
    print(f"  Regime {r}: {c} months ({100*c/T:.1f}%)")

## 3. Vanilla K-Means with Probabilistic Assignment

Standard k-means, but we derive soft assignments from inverse distances:
$$w_{i,t} = \frac{1/d_{i,t}}{\sum_j 1/d_{j,t}}$$

In [None]:
class VanillaKMeansProb:
    """Standard K-Means with probabilistic (soft) assignment"""
    
    def __init__(self, n_clusters=4, random_state=42):
        self.n_clusters = n_clusters
        self.random_state = random_state
        self.kmeans_ = None
        self.centers_ = None
        
    def fit(self, X):
        self.kmeans_ = KMeans(n_clusters=self.n_clusters, random_state=self.random_state, n_init=10)
        self.kmeans_.fit(X)
        self.centers_ = self.kmeans_.cluster_centers_
        print(f"K-Means converged. Inertia: {self.kmeans_.inertia_:.2f}")
        return self
    
    def predict_proba(self, X):
        distances = cdist(X, self.centers_, metric='euclidean')
        distances = np.maximum(distances, 1e-10)
        inv_dist = 1 / distances
        return inv_dist / inv_dist.sum(axis=1, keepdims=True)
    
    def predict(self, X):
        return self.kmeans_.predict(X)

In [None]:
# Fit Vanilla K-Means
print("Fitting Vanilla K-Means...")
vkm = VanillaKMeansProb(n_clusters=K, random_state=42)
vkm.fit(X)

vkm_soft = vkm.predict_proba(X)
vkm_hard = vkm.predict(X)

print("\nRegime distribution (Vanilla K-Means):")
unique, counts = np.unique(vkm_hard, return_counts=True)
for r, c in zip(unique, counts):
    print(f"  Regime {r}: {c} months ({100*c/T:.1f}%)")

## 4. Gaussian Mixture Model (GMM)

Model-based clustering assuming data comes from a mixture of Gaussians:
$$p(x_t) = \sum_{i=1}^{K} \pi_i \phi(x_t; \mu_i, \Sigma_i)$$

In [None]:
# Fit GMM
# Use diagonal covariance for numerical stability with high-dimensional data
print("Fitting Gaussian Mixture Model...")
gmm = GaussianMixture(
    n_components=K,
    covariance_type='diag',  # diagonal covariance for stability
    random_state=42,
    n_init=10,
    max_iter=200,
    verbose=0
)
gmm.fit(X)

gmm_soft = gmm.predict_proba(X)
gmm_hard = gmm.predict(X)

print(f"GMM converged: {gmm.converged_}")
print(f"Log-likelihood: {gmm.lower_bound_:.2f}")
print(f"Mixing proportions: {gmm.weights_.round(3)}")

print("\nRegime distribution (GMM):")
unique, counts = np.unique(gmm_hard, return_counts=True)
for r, c in zip(unique, counts):
    print(f"  Regime {r}: {c} months ({100*c/T:.1f}%)")

## Compare Methods

In [None]:
# Store all results
methods = ['Fuzzy C-Means', 'Modified K-Means', 'Vanilla K-Means', 'GMM']

soft_assignments = {
    'Fuzzy C-Means': fcm_soft,
    'Modified K-Means': mkm_soft,
    'Vanilla K-Means': vkm_soft,
    'GMM': gmm_soft,
}

hard_assignments = {
    'Fuzzy C-Means': fcm_hard,
    'Modified K-Means': mkm_hard,
    'Vanilla K-Means': vkm_hard,
    'GMM': gmm_hard,
}

print("All clustering methods fitted successfully.")

In [None]:
# Visualize regime assignments over time
colors = plt.cm.Set1(np.linspace(0, 1, K))

fig, axes = plt.subplots(len(methods), 1, figsize=(14, 3*len(methods)), sharex=True)

for idx, method in enumerate(methods):
    ax = axes[idx]
    hard = hard_assignments[method]
    
    for t in range(len(dates)):
        ax.axvspan(dates[t], dates[min(t+1, len(dates)-1)], 
                   color=colors[hard[t]], alpha=0.7)
    
    ax.set_ylabel(method, fontsize=10)
    ax.set_yticks([])
    
    # Add regime distribution annotation
    regime_counts = pd.Series(hard).value_counts().sort_index()
    dist_text = ', '.join([f'R{i}:{100*c/len(hard):.0f}%' for i, c in regime_counts.items()])
    ax.text(0.02, 0.85, dist_text, transform=ax.transAxes, fontsize=8,
            bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

axes[-1].set_xlabel('Date')

# Legend
legend_elements = [plt.Rectangle((0,0),1,1, color=colors[i], label=f'Regime {i}') 
                   for i in range(K)]
fig.legend(handles=legend_elements, loc='upper right', ncol=K, bbox_to_anchor=(0.98, 0.98))

plt.suptitle('Regime Assignments by Clustering Method', fontsize=14, y=1.01)
plt.tight_layout()
plt.show()

In [None]:
# Cross-method agreement (Adjusted Rand Index)
from sklearn.metrics import adjusted_rand_score

ari_matrix = np.zeros((len(methods), len(methods)))
for i, m1 in enumerate(methods):
    for j, m2 in enumerate(methods):
        ari_matrix[i, j] = adjusted_rand_score(hard_assignments[m1], hard_assignments[m2])

plt.figure(figsize=(8, 6))
sns.heatmap(ari_matrix, annot=True, fmt='.3f', cmap='YlOrRd',
            xticklabels=methods, yticklabels=methods)
plt.title('Cross-Method Agreement (Adjusted Rand Index)')
plt.tight_layout()
plt.show()

In [None]:
results_df = pd.DataFrame(index=dates)
for method in methods:
    results_df[f'{method}_hard'] = hard_assignments[method]
    for k in range(K):
        results_df[f'{method}_soft_R{k}'] = soft_assignments[method][:, k]

# results_df.to_csv('../data/clustering_results.csv')