In [4]:
import numpy as np
import pandas as pd
from scipy.stats import norm

def make_latent_corr(kind="ar1", rho=0.4, custom=None, seed=0):
    """
    Return a 5x5 correlation matrix for the latent Gaussian copula.
    kind = 'ar1' or 'custom'
    - 'ar1': R_ij = rho^{|i-j|}
    - 'custom': supply a valid SPD correlation in 'custom'
    """
    d = 5
    if kind == "ar1":
        idx = np.arange(d)
        R = rho ** np.abs(idx[:, None] - idx[None, :])
        return R
    elif kind == "custom":
        if custom is None or custom.shape != (5, 5):
            raise ValueError("Provide a 5x5 custom correlation matrix.")
        # Basic checks
        if not np.allclose(np.diag(custom), 1.0, atol=1e-8):
            raise ValueError("Custom correlation must have ones on the diagonal.")
        # Symmetrize and nudge to SPD if needed
        R = 0.5 * (custom + custom.T)
        # Attempt a Cholesky to ensure SPD; if it fails, nudge eigenvalues
        try:
            np.linalg.cholesky(R)
        except np.linalg.LinAlgError:
            w, V = np.linalg.eigh(R)
            w = np.clip(w, 1e-6, None)
            R = (V * w) @ V.T
            R = R / np.sqrt(np.outer(np.diag(R), np.diag(R)))  # renormalize to correlation
        return R
    else:
        raise ValueError("kind must be 'ar1' or 'custom'.")

def make_thresholds_from_probs(category_probs):
    """
    Convert a list of category probability vectors into latent normal thresholds.
    For each variable j, category_probs[j] is a list summing to 1 (e.g., [0.2,0.5,0.3]).
    Returns a list of arrays of cutpoints including +/- inf for convenience.
    """
    thresholds = []
    for probs in category_probs:
        cdf = np.cumsum(probs)
        if not np.isclose(cdf[-1], 1.0):
            raise ValueError("Category probabilities must sum to 1 for each variable.")
        # internal cutpoints are inverse-normal of cumulative probs (excluding final 1.0)
        inner = norm.ppf(cdf[:-1])
        thr = np.concatenate(([-np.inf], inner, [np.inf]))
        thresholds.append(thr)
    return thresholds

def sample_categorical_gaussian_copula(n, R, category_probs, seed=123):
    """
    Generate n samples of 5 categorical variables via a Gaussian copula:
    1) Z ~ N(0, R)
    2) X_j = discretize(Z_j) using thresholds implied by category_probs[j]
    Returns a pandas DataFrame with categorical dtype.
    """
    d = 5
    if len(category_probs) != d:
        raise ValueError("Provide category_probs for exactly 5 variables.")
    # Cholesky
    L = np.linalg.cholesky(R)
    rng = np.random.default_rng(seed)
    Z = rng.standard_normal((n, d)) @ L.T

    thresholds = make_thresholds_from_probs(category_probs)
    X = np.zeros((n, d), dtype=int)
    for j in range(d):
        thr = thresholds[j]
        # np.digitize returns bin index where Z falls; use right=False so bins are (-inf, t1], (t1, t2], ...
        # but we constructed thr with +/-inf, so we use searchsorted on thr to find category.
        # Category k if thr[k] < Z <= thr[k+1]; equivalently searchsorted(thr, Z, side='right') - 1
        X[:, j] = np.searchsorted(thr, Z[:, j], side='right') - 1

    # Make a DataFrame with categorical dtype
    df = pd.DataFrame({f"V{j+1}": pd.Categorical(X[:, j]) for j in range(d)})
    return df

def one_hot_covariance(df):
    """
    Compute covariance of one-hot encodings for each column, concatenated.
    Returns covariance matrix and column labels for the one-hot matrix.
    """
    oh_blocks = []
    labels = []
    for col in df.columns:
        cats = df[col].cat.categories
        # get_dummies will produce uint8; cast to float
        O = pd.get_dummies(df[col], prefix=col).astype(float)
        oh_blocks.append(O)
        labels.extend(list(O.columns))
    O_all = pd.concat(oh_blocks, axis=1)
    cov = np.cov(O_all.values, rowvar=False)
    return cov, labels



def integer_covariance(df):
    """
    Covariance of integer-coded categories (0..K-1 per variable).
    """
    X_int = np.column_stack([df[c].cat.codes for c in df.columns])
    return np.cov(X_int, rowvar=False)

In [5]:
# ------------------ Example usage ------------------
if __name__ == "__main__":
    n = 10000

    # Choose how to "specify the covariance matrix" of the latent Gaussian:
    # Option A: AR(1) with rho
    R = make_latent_corr(kind="ar1", rho=0.6)

    # Option B: Custom correlation (uncomment to use)
    # custom_R = np.array([
    #     [1.00, 0.35, 0.10, 0.00, -0.20],
    #     [0.35, 1.00, 0.25, 0.05,  0.00],
    #     [0.10, 0.25, 1.00, 0.40,  0.10],
    #     [0.00, 0.05, 0.40, 1.00,  0.30],
    #     [-0.20, 0.00, 0.10, 0.30, 1.00]
    # ])
    # R = make_latent_corr(kind="custom", custom=custom_R)

    # Specify category counts and (optionally) marginal probabilities per variable.
    # Here: V1 has 3 categories, V2 has 4, others have 2/3 as examples.
    # You can set any probs; they must sum to 1 per variable.
    category_probs = [
        [0.2, 0.5, 0.3],        # V1 (3 categories)
        [0.1, 0.2, 0.3, 0.4],   # V2 (4 categories)
        [0.6, 0.4],             # V3 (2 categories)
        [0.25, 0.25, 0.5],      # V4 (3 categories)
        [0.45, 0.55]            # V5 (2 categories)
    ]

    # Sample categorical data
    df_cat = sample_categorical_gaussian_copula(n=n, R=R, category_probs=category_probs, seed=123)

    # Empirical covariance of integer-coded categories (0..K-1)
    cov_int = integer_covariance(df_cat)

    # Empirical covariance of the one-hot representation (block structure)
    cov_oh, oh_labels = one_hot_covariance(df_cat)

    # Quick summaries
    print("Latent correlation (target) R:")
    np.set_printoptions(precision=3, suppress=True)
    print(R)

    print("\nEmpirical covariance (integer-coded categories):")
    print(cov_int)

    print("\nEmpirical covariance (one-hot encoding) shape:", cov_oh.shape)
    # If you want to see the one-hot cov, uncomment:
    # print(pd.DataFrame(cov_oh, index=oh_labels, columns=oh_labels).round(3))

Latent correlation (target) R:
[[1.    0.6   0.36  0.216 0.13 ]
 [0.6   1.    0.6   0.36  0.216]
 [0.36  0.6   1.    0.6   0.36 ]
 [0.216 0.36  0.6   1.    0.6  ]
 [0.13  0.216 0.36  0.6   1.   ]]

Empirical covariance (integer-coded categories):
[[0.493 0.339 0.088 0.093 0.029]
 [0.339 0.979 0.202 0.233 0.078]
 [0.088 0.202 0.241 0.169 0.055]
 [0.093 0.233 0.169 0.679 0.185]
 [0.029 0.078 0.055 0.185 0.247]]

Empirical covariance (one-hot encoding) shape: (14, 14)


In [6]:
df_cat

Unnamed: 0,V1,V2,V3,V4,V5
0,0,1,1,2,1
1,2,2,1,1,0
2,1,1,1,1,1
3,1,3,1,1,1
4,0,1,1,2,1
...,...,...,...,...,...
9995,2,3,1,2,1
9996,1,2,0,2,1
9997,0,2,0,2,1
9998,0,0,1,2,1


In [7]:
import numpy as np
from scipy.stats import kurtosis, chi2
# If you prefer a mixture instead of t, you don't need scipy

def make_covariance(d=15, kind="ar1", rho=0.5, seed=0):
    rng = np.random.default_rng(seed)
    if kind == "ar1":
        i = np.arange(d)
        R = rho ** np.abs(i[:,None] - i[None,:])  # correlation
        stds = rng.uniform(0.8, 1.6, size=d)
        S = np.diag(stds)
        return S @ R @ S
    elif kind == "random_spd":
        A = rng.standard_normal((d,d))
        Q, _ = np.linalg.qr(A)
        eigs = (0.7 ** np.arange(d))
        eigs = eigs / np.mean(eigs)
        target_var = 1.2
        eigs = eigs * target_var
        Sig = Q @ np.diag(eigs) @ Q.T
        return 0.5*(Sig+Sig.T)
    else:
        raise ValueError

def sample_gaussian(n, mu, cov, rng):
    return rng.multivariate_normal(mu, cov, size=n)

def sample_student_t(n, mu, cov, df, rng):
    # Draw Z ~ N(0, cov), W ~ chi2_df; T = mu + Z / sqrt(W/df)
    L = np.linalg.cholesky(cov)
    Z = rng.standard_normal((n, cov.shape[0])) @ L.T
    W = rng.chisquare(df, size=n)
    return mu + Z / np.sqrt(W/df)[:,None]

def sample_gaussian_mixture(n, mu, cov, rng, n_comp=3, shift_scale=2.5):
    # Same base covariance in each component, different means to create multimodality
    d = cov.shape[0]
    L = np.linalg.cholesky(cov)
    means = rng.standard_normal((n_comp, d)) * shift_scale
    pi = rng.dirichlet(np.ones(n_comp))
    comps = rng.choice(n_comp, size=n, p=pi)
    X = np.empty((n,d))
    for k in range(n_comp):
        idx = np.where(comps==k)[0]
        if idx.size:
            Z = rng.standard_normal((idx.size, d)) @ L.T
            X[idx] = means[k] + Z
    return X

def recolor_to_target_cov(X, Sigma_target):
    """
    Linear transform X so that its *sample* covariance equals Sigma_target exactly.
    Let S = sample_cov(X). Then X' = A (X - mean) + m, with A = L_t @ inv(L_s),
    where L_t L_t^T = Sigma_target, L_s L_s^T = S.
    """
    n, d = X.shape
    m = X.mean(axis=0)
    Xc = X - m
    S = np.cov(Xc, rowvar=False, bias=False)  # d x d
    Ls = np.linalg.cholesky(S)
    Lt = np.linalg.cholesky(Sigma_target)
    A = Lt @ np.linalg.inv(Ls)
    X_new = Xc @ A.T + m  # add back mean; means can differ arbitrarily
    # Numerically, the sample covariance of X_new is ~ Sigma_target
    return X_new

def summarize(name, X):
    emp_cov = np.cov(X, rowvar=False)
    d = X.shape[1]
    tail = (np.linalg.norm((X - X.mean(0)) / X.std(0, ddof=1), axis=1) > 3).mean()
    krt = kurtosis(X, axis=0, fisher=True, bias=False)  # per-dim excess kurtosis
    print(f"\n{name}:")
    print("  mean variance (trace/d):", np.trace(emp_cov)/d)
    print("  average excess kurtosis:", float(np.mean(krt)))
    print("  fraction of samples with z-norm > 3:", float(tail))

In [8]:
if __name__ == "__main__":
    rng = np.random.default_rng(123)
    n, d = 8000, 15

    # 1) Choose a target covariance you want both datasets to share
    Sigma_target = make_covariance(d=d, kind="ar1", rho=0.5, seed=42)

    # 2) Create two *different* base datasets
    #    A: Gaussian
    Xg_base = sample_gaussian(n, mu=np.zeros(d), cov=np.eye(d), rng=rng)

    #    B: heavy-tailed t (df=5)  [Alternative: use sample_gaussian_mixture(...)]
    Xt_base = sample_student_t(n, mu=np.zeros(d), cov=np.eye(d), df=5, rng=rng)
    # Or try: Xt_base = sample_gaussian_mixture(n, np.zeros(d), np.eye(d), rng, n_comp=3)

    # 3) Recolor both to have the SAME sample covariance = Sigma_target
    Xg = recolor_to_target_cov(Xg_base, Sigma_target)
    Xt = recolor_to_target_cov(Xt_base, Sigma_target)

    # 4) Verify: the two sample covariances are (numerically) identical to Sigma_target
    cov_g = np.cov(Xg, rowvar=False)
    cov_t = np.cov(Xt, rowvar=False)

    print("Max |Cov_G - Cov_T|:", float(np.max(np.abs(cov_g - cov_t))))
    print("Max |Cov_G - Sigma_target|:", float(np.max(np.abs(cov_g - Sigma_target))))
    print("Max |Cov_T - Sigma_target|:", float(np.max(np.abs(cov_t - Sigma_target))))

    # 5) Show they differ substantially in other statistics
    summarize("Gaussian (recolored)", Xg)
    summarize("Student-t df=5 (recolored)", Xt)

Max |Cov_G - Cov_T|: 8.881784197001252e-16
Max |Cov_G - Sigma_target|: 8.881784197001252e-16
Max |Cov_T - Sigma_target|: 1.7763568394002505e-15

Gaussian (recolored):
  mean variance (trace/d): 1.707547709515343
  average excess kurtosis: 0.01933380811425452
  fraction of samples with z-norm > 3: 0.806125

Student-t df=5 (recolored):
  mean variance (trace/d): 1.7075477095153433
  average excess kurtosis: 3.615264183179276
  fraction of samples with z-norm > 3: 0.541875


In [19]:
Xg

array([[-1.423, -0.952,  1.069, ...,  0.477, -0.639,  0.752],
       [ 0.185,  1.579,  0.141, ...,  2.166,  1.674,  0.787],
       [-0.527, -1.439,  0.681, ...,  1.232,  0.456, -0.453],
       ...,
       [-0.578,  0.578, -0.086, ...,  0.583, -0.167, -1.04 ],
       [-0.498, -1.687, -2.929, ...,  0.008, -1.73 , -0.916],
       [-1.381,  1.661,  0.196, ...,  2.417,  0.235, -1.024]])

In [20]:
Xt

array([[ 0.721,  0.045,  1.113, ...,  0.643, -0.363,  0.048],
       [-0.751,  0.329,  0.387, ..., -0.04 , -0.039,  1.143],
       [ 2.933,  0.555, -0.73 , ..., -0.754, -1.22 , -2.189],
       ...,
       [-1.278,  0.265, -3.9  , ...,  0.853,  4.332,  0.331],
       [-1.555, -0.748, -0.485, ..., -0.087,  1.464,  1.892],
       [-1.614,  0.173, -1.048, ..., -1.295, -2.03 , -1.135]])

In [13]:
pd.DataFrame(Xg).mean()

0     0.023547
1     0.004922
2    -0.011737
3     0.021344
4     0.008978
5     0.006381
6    -0.022074
7     0.003173
8    -0.024061
9    -0.011829
10   -0.002408
11    0.001171
12    0.007928
13   -0.002583
14   -0.011554
dtype: float64

In [21]:
pd.DataFrame(Xt).to_csv('xt.csv',index=None)

In [22]:
pd.DataFrame(Xg).to_csv('xg.csv',index=None)