# Copula Synthesis

In [76]:
import numpy as np
import pandas as pd
from statsmodels.distributions.copula.api import (
    GaussianCopula, StudentTCopula, ClaytonCopula,
    CopulaDistribution)
from scipy.stats import norm
from sklearn.preprocessing import QuantileTransformer
from statsmodels.stats.correlation_tools import corr_nearest
from numpy.linalg import eigh, cholesky, LinAlgError, slogdet, solve
import scipy.stats as st
from scipy.special import gammaln
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import PCA
from sklearn.metrics import roc_auc_score, mean_squared_error
from pathlib import Path
from sklearn.model_selection import train_test_split

In [None]:
# Data ingestion and marginal uniformisation

# 1. Columns containing string identifiers or metadata that are NOT
#    relevant for dependence modelling. We drop them before fitting the copula.
drop_cols = ['uid',          # unique identifier (string)
             'dataset',      # data-set source label
             'group']        # FI-based categorical group label

# 2. Read the numeric source table and remove the non-numeric columns in a single chained operation.
# Save test set data individually
df_full = (pd.read_excel("data/merged_data/fi_pool.xlsx")
      .drop(columns=drop_cols)                       # keep only numeric features
)
df, df_test = train_test_split(    
    df_full,
    test_size=0.20,
    random_state=42,
    shuffle=True
)
test_out = Path("data/merged_data/test_set.csv")
test_out.parent.mkdir(parents=True, exist_ok=True)
df_test.to_csv(test_out, index=False)
print(f"Hold-out test set saved to: {test_out}")

train_out = Path("data/merged_data/train_set.csv")
train_out.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(train_out, index=False)
print(f"Hold-out test set and train set are saved.")

# 3. Convert each marginal distribution to U(0,1) via the
#    probability-integral transform (a.k.a. "pseudo-observations").
qt = QuantileTransformer(
        output_distribution='uniform',
     )

# fit_transform():
# fit: estimate the empirical CDF of each column.
# transform: map every x_ij to u_ij = F_j(x_ij) to (0,1).
u = pd.DataFrame(
        qt.fit_transform(df),
        columns=df.columns        # preserve original feature names
)


Hold-out test set saved to: data/merged_data/test_set.csv
Hold-out test set and train set are saved.




In [103]:
# Build a numerically-stable Gaussian-copula correlation matrix rho (with shrinkage + PD repair)

# 1. Guard-band the pseudo-observations 
eps = 1e-3                                  # 0.1% safety margin
u_clip = u.clip(lower=eps, upper=1 - eps)   # keep inside (eps, 1-eps)

# 2. Rank-correlation 
tau = u_clip.corr(method='kendall').to_numpy()         # d × d matrix
rho = np.sin(0.5 * np.pi * tau)                        # same shape

# 3. shrinkage toward the identity 
# Small samples or highly-collinear features can make rho nearly singular. 
# Shrinkage lifts all eigen-values and stabilises log(rho) in the log-likelihood.
lam  = 0.05                                            # 5% shrinkage
rho  = (1 - lam) * rho + lam * np.eye(rho.shape[0])    # convex blend

# 4. Ensure ρ is strictly positive-definite 
try:
    _ = cholesky(rho)                 # PD quick-check
except np.linalg.LinAlgError:
    # Spectral repair: floor eigen-values to a small positive constant.
    w, v = eigh(rho)                  
    w[w < 1e-6] = 1e-6                # eigen-value floor
    rho = v @ np.diag(w) @ v.T        # nearest PD matrix


In [104]:
# Log-likelihood of the Gaussian copula  and the AIC score

# 1. Preparations 
d = u.shape[1]  # dimensionality of the feature space

# Transform the clipped uniform pseudo-observations back to the standard-normal space
z = norm.ppf(u_clip.to_numpy())             # shape: n × d (n = row-count)

# Pre-compute the inverse and the log-determinant of the correlation
# matrix  Sigma  (here, Sigma ≡ rho).
Sigma_inv = solve(rho, np.eye(d))          # Sigma^{-1}
sign, logdet_Sigma = slogdet(rho)          # log |Sigma|
quad = (z @ Sigma_inv * z).sum(axis=1)     # shape: (n,)

# 2. Row-wise log-density of the Gaussian copula 
log_joint = -0.5 * quad - 0.5 * logdet_Sigma          # log phi_{Sigma}(z_i)
log_marg  = norm.logpdf(z).sum(axis=1)                # Sigma_j log phi(z_{ij})
logpdf    = log_joint - log_marg                      # log c(u_i) for each row

# 3. Total maximised log-likelihood
log_like = logpdf.sum()                               #  l_max  = Sigma_i log c(u_i)

# 4. Akaike Information Criterion
# AIC = 2*k  −  2*l_max,
k_params = d * (d - 1) / 2
AIC = 2 * k_params - 2 * log_like

print(f"Gaussian Copula  AIC = {AIC:,.2f}")


Gaussian Copula  AIC = 1,712.19


In [105]:
# Draw synthetic samples from the fitted Gaussian copula and map them back to the original measurement scale

# 1. Instantiate the copula for sampling.
gc = GaussianCopula(corr=rho, k_dim=d)

# 2. Random-variates generation (n = 5 000 rows).
u_syn = gc.rvs(5_000, random_state=42)                 # reproducible RNG
u_syn = u_syn.clip(eps, 1 - eps)                       # keep inside (sigma, 1-sigma)

# 3. Reverse the probability-integral transform to recover the original physical units for every feature.  
x_syn = pd.DataFrame(
            qt.inverse_transform(u_syn),
            columns=df.columns                         # restore feature names
        )

# 4. Persist the “coarse” synthetic data set to disk.
out_path = "synthetic_gauss.csv"
x_syn.to_csv(out_path, index=False)
print(f"Synthetic data written to:  {out_path}")

Synthetic data written to:  synthetic_gauss.csv




In [None]:

# Student-t Copula (with correlation-matrix shrinkage already applied)

# 1. Maximum-likelihood search over degrees-of-freedom v
best_ll  = -np.inf          # best log-likelihood so far
best_df  = None             # v that achieves it
z_cache  = {}               # reuse t.ppf(U) per v (n × d arrays)

for nu in [3, 4, 6, 8, 12, 20]:
    # 1a. Map U(0,1) pseudo-obs back to t_v quantiles
    if nu not in z_cache:
        z_cache[nu] = st.t.ppf(u_clip, df=nu)      # ndarray (n, d)
    z = z_cache[nu]

    # 1b. Pre-compute pieces used in every row
    Sigma_inv = solve(rho, np.eye(d))      
    _, logdet_Sigma = slogdet(rho)               
    quad = (z @ Sigma_inv * z).sum(axis=1) 

    # 1c. Multivariate-t normalising constant (log-space)
    log_const = (
        gammaln((nu + d) / 2)           
        - gammaln(nu / 2)               
        - 0.5 * logdet_Sigma
        - (d / 2) * np.log(nu * np.pi)
    )

    # 1d. Row-wise joint log-density of the copula
    log_joint = log_const - (nu + d) / 2 * np.log1p(quad / nu)

    # 1e. Sum of 1-D marginal t log-densities
    log_marg  = st.t.logpdf(z, df=nu).sum(axis=1)        # (n,)

    # 1f. Copula log-pdf = joint − marginal -> total log-likelihood
    ll = (log_joint - log_marg).sum()

    if ll > best_ll:          
        best_ll, best_df = ll, nu


# 2. Information criterion
# k = d(d−1)/2  (ρ parameters)  + 1  (v)
k_t   = d * (d - 1) // 2 + 1
AIC_t = 2 * k_t - 2 * best_ll
print(f"Student-t Copula  (v = {best_df})   AIC = {AIC_t:,.2f}")


# 3. Generate synthetic data and map back to real units
tc      = StudentTCopula(rho, df=best_df, k_dim=d)
u_syn   = tc.rvs(5_000, random_state=42).clip(eps, 1 - eps)
x_syn_t = pd.DataFrame(qt.inverse_transform(u_syn), columns=df.columns)

out_path = f"synthetic_t{best_df}.csv"
x_syn_t.to_csv(out_path, index=False)
print(f"Synthetic data written to:  {out_path}")

Student-t Copula  (v = 8)   AIC = 1,345.29
Synthetic data written to:  synthetic_t8.csv




In [107]:
# Clayton Copula 

# 1. One–parameter estimation
theta_hat = ClaytonCopula().fit_corr_param(u_clip) 
print(f"Clayton theta (from Kendall tau) = {theta_hat:.3f}")

# 2. Build the copula object
# k_dim = d  (number of marginals); 
clay = ClaytonCopula(theta=theta_hat, k_dim=d)

# 3. Log-likelihood and AIC
log_like_clay = clay.logpdf(u_clip).sum()      # l_max (scalar)
k_clay   = 1                               
AIC_clay = 2 * k_clay - 2 * log_like_clay
print(f"Clayton Copula  AIC = {AIC_clay:,.2f}")

# 4. Sampling + inverse marginal transform
u_syn = clay.rvs(5_000, random_state=42).clip(eps, 1 - eps)
x_syn_clay = pd.DataFrame(
                 qt.inverse_transform(u_syn),
                 columns=df.columns
              )

out_path = "synthetic_clayton.csv"
x_syn_clay.to_csv(out_path, index=False)
print(f"Synthetic data written to:  {out_path}")

Clayton theta (from Kendall tau) = 0.274
Clayton Copula  AIC = 535.20
Synthetic data written to:  synthetic_clayton.csv




In [108]:
# Comparison :  Gaussian  vs.  Student-t  vs.  Clayton

# 1. Gather raw numbers
metrics = {
    "model"        : ["Gaussian",      f"Student-t(nu={best_df})", "Clayton"],
    "AIC_total"    : [AIC,             AIC_t,                     AIC_clay],
    "loglike_total": [log_like,        best_ll,                   log_like_clay],
    "k_params"     : [k_params,        k_t,                       k_clay]
}

# 2. Scale-free normalisation (per training row)
n_train = len(u_clip)                 # number of rows used for fitting
metrics["AIC_per_row"]   = [v / n_train for v in metrics["AIC_total"]]
metrics["LL_per_row"]    = [v / n_train for v in metrics["loglike_total"]]

tbl = pd.DataFrame(metrics)\
         .sort_values("AIC_per_row")\
         .reset_index(drop=True)

print("\n--- Copula model comparison (sorted by per-row AIC) ---")
print(tbl[["model", "AIC_total", "AIC_per_row", "LL_per_row", "k_params"]])

# 3. Save comparison table to disk 
out_cmp = Path("data/Copula_data/copula_comparison.csv")
tbl.to_csv(out_cmp, index=False)
print(f"\nComparison table saved to: {out_cmp}")



--- Copula model comparison (sorted by per-row AIC) ---
             model    AIC_total  AIC_per_row  LL_per_row  k_params
0          Clayton   535.204910     2.560789   -1.275610       1.0
1  Student-t(nu=8)  1345.290369     6.436796   -3.079642      29.0
2         Gaussian  1712.185518     8.192275   -3.962166      28.0

Comparison table saved to: data/Copula_data/copula_comparison.csv


## Conclusion
Basing on the result, **Clayton Copula** perform much better in our circumstance. We can do a data quality check on the generated data of Clayton Copula.


# Data Quality Checking of Copula Synthetic data

In [109]:
"""
Full QC pipeline for Gaussian-copula synthetic data.
Assumes you already have:
    df_real   : pandas DataFrame of cleaned real data (numeric only)
    x_syn     : pandas DataFrame of synthetic data from Gaussian copula
Both must have identical columns in the same order.
"""
# Basic settings
plt.rcParams["figure.dpi"] = 120
out_dir = "qc_reports/"
Path(out_dir).mkdir(exist_ok=True)

# Real data  
raw_path  = "data/merged_data/fi_pool.xlsx"          
drop_cols = ['uid', 'dataset', 'group']     

df_real = (
    pd.read_excel(raw_path)
      .drop(columns=drop_cols)                      
)

# Synthetic data  (already generated by Gaussian copula)
syn_path = "data/Copula_data/synthetic_clayton.csv"   
x_syn    = pd.read_csv(syn_path)

# Helper: save a figure then close
def save_fig(name): plt.tight_layout(); plt.savefig(f"{out_dir}/{name}", bbox_inches="tight"); plt.close()


# 1. Univariate KS / AD + overlay histogram
ks_results = {}
for col in df_real.columns:
    # KS statistic
    D, p = st.ks_2samp(df_real[col], x_syn[col])
    ks_results[col] = D
    
    # Overlay histogram
    plt.hist(df_real[col], bins=40, density=True, alpha=.4, label="real")
    plt.hist(x_syn[col] , bins=40, density=True, alpha=.4, label="synthetic")
    plt.title(f"Histogram - {col}  (KS={D:.3f})")
    plt.legend()
    save_fig(f"hist_{col}.png")

# Save KS table
pd.Series(ks_results, name="KS").to_csv(f"{out_dir}/ks_univariate.csv")

# 2. Correlation heat-map
tau_real = df_real.corr("kendall")
tau_syn  = x_syn.corr("kendall")

fig, ax = plt.subplots(1, 2, figsize=(10, 4))
sns.heatmap(tau_real, vmin=-1, vmax=1, cmap="coolwarm", ax=ax[0])
ax[0].set_title("Real Kendall τ")
sns.heatmap(tau_syn , vmin=-1, vmax=1, cmap="coolwarm", ax=ax[1])
ax[1].set_title("Synthetic Kendall τ")
save_fig("tau_heatmap.png")

# 3. Tail-dependence (upper & lower 1 %) for each variable pair
def empirical_tail_coef(a, b, tail=0.01, upper=True):
    if upper:
        q_a, q_b = np.quantile(a, 1-tail), np.quantile(b, 1-tail)
        return ((a > q_a) & (b > q_b)).mean() / tail
    else:
        q_a, q_b = np.quantile(a, tail), np.quantile(b, tail)
        return ((a < q_a) & (b < q_b)).mean() / tail

pairs = [(df_real.columns[i], df_real.columns[j]) 
         for i in range(len(df_real.columns)) for j in range(i+1, len(df_real.columns))]
records = []
for c1, c2 in pairs:
    lam_L_real = empirical_tail_coef(df_real[c1], df_real[c2], upper=False)
    lam_L_syn  = empirical_tail_coef(x_syn[c1] , x_syn[c2] , upper=False)
    records.append([c1, c2, lam_L_real, lam_L_syn])

pd.DataFrame(records, columns=["var1","var2","lambda_L_real","lambda_L_syn"])\
  .to_csv(f"{out_dir}/tail_coef_lower.csv", index=False)

# 4. PCA 2-D projection
pca = PCA(n_components=2, random_state=42)
pca_real = pca.fit_transform(df_real)
pca_syn  = pca.transform(x_syn)

plt.scatter(*pca_real.T, s=6, alpha=.4, label="real")
plt.scatter(*pca_syn.T , s=6, alpha=.4, label="synthetic")
plt.title("PCA - first 2 components")
plt.legend(); save_fig("pca_2d.png")

#  5.  Physcial rule check  (example: burden ≤ spacing)
ratio_col = "spacing_over_burden"        # name of the ratio column

# Count rows that violate  spacing_over_burden ≥ 1
violations = (x_syn[ratio_col] < 1).sum()
print(f"[RULE] {ratio_col} ≥ 1  violated in {violations} / {len(x_syn)} rows")

# Optional hard-filter: keep only rows that satisfy the rule
x_syn = x_syn.query(f"{ratio_col} >= 1").reset_index(drop=True)


[RULE] spacing_over_burden ≥ 1  violated in 0 / 5000 rows
