In [24]:
import numpy as np
#from scipy.linalg import lu, solve_triangular, det, inv
from numpy.linalg import LinAlgError, inv
import SCM_data
import MEC
import os
print(os.getcwd())

# Soft-thresholding operator for matrices
def soft_threshold_matrix(A, threshold):
    return np.sign(A) * np.maximum(np.abs(A) - threshold, 0.0)

# Compute the gradient of f(A) = -2 log det A + trace(A^T R_hat A)
def compute_gradient(A, R_hat):
    epsilon = 1e-6
    try:
        A_inv = inv(A)
    except LinAlgError:
        print("Warning: singular matrix encountered. Adding epsilon * I.")
        A_inv = inv(A + epsilon * np.eye(A.shape[0]))
    return 2 * R_hat @ A - 2 * A_inv

# Objective function value
def objective(A, R_hat, lam):
    sign, logdet = np.linalg.slogdet(A)
    if sign <= 0:
        return np.inf
    trace_term = np.trace(A.T @ R_hat @ A)
    return -2 * logdet + trace_term + lam * np.sum(np.abs(A))

# Proximal gradient algorithm for matrix A
def nodag(R_hat, lam = 0.1, alpha=0.5, max_iter=100, tol=1e-5,init = None, verbose=False):
    p = R_hat.shape[0]
    if init is None:
        A = np.eye(p)  # Initialization
    else:
        A = init
    step = 1.0

    for k in range(max_iter):
        A_old = A.copy()
        grad = compute_gradient(A, R_hat)

        # Line search loop
        for _ in range(100):
            A_temp = soft_threshold_matrix(A - step * grad, step * lam)
            try:
                f_temp = objective(A_temp, R_hat, lam)
                f_curr = objective(A, R_hat, lam)
                g_temp = lam * np.sum(np.abs(A_temp))
                g_curr = lam * np.sum(np.abs(A))

                # Beck & Teboulle condition
                diff = A_temp - A
                v = np.sum((grad * diff)) + (np.linalg.norm(diff, 'fro') ** 2) / (2 * step)
                if f_temp <= f_curr + v and f_temp + g_temp <= f_curr + g_curr:
                    break
                else:
                    step *= alpha  # reduce step
            except np.linalg.LinAlgError:
                step *= alpha  # if det or inv fails, shrink step

        A = A_temp
        if verbose:
            if k % 1000 == 0:
                likelihood = -2 * np.log(np.linalg.det(A)) + np.trace(A.T @ R_hat @ A)
                print(f"Iteration {k}: likelihood = {likelihood}")

        # Convergence check
        if np.linalg.norm(A - A_old, ord='fro') < tol:
            if verbose == True:
                print(f"Iteration {k}: break")
            break
        
    likelihood = -2 * np.log(np.linalg.det(A)) + np.trace(A.T @ R_hat @ A)
    sparsity = lam * np.sum(np.abs(A))

    return A, likelihood, sparsity


c:\Users\super\DAG


In [25]:
def weight_to_adjacency(W, threshold=0.05):
    """
    Convert a weight matrix to an adjacency matrix.
    
    Parameters:
        W (np.ndarray): Weight matrix (square matrix).
        threshold (float): Values with absolute weight <= threshold are treated as 0.
    
    Returns:
        np.ndarray: Binary adjacency matrix of the same shape.
    """
    if not isinstance(W, np.ndarray):
        raise TypeError("Input W must be a numpy array.")
    if W.shape[0] != W.shape[1]:
        raise ValueError("Input W must be a square matrix.")
    
    G = (np.abs(W) > threshold).astype(int)
    return G

In [43]:
from scipy.linalg import sqrtm
# test
n = 10000
W_true = np.array([[0, 1, 0], [0, 0, 0], [0, 1, 0]])
Omega_true = np.eye(3)
Theta_true = (np.eye(3) - W_true) @ inv(Omega_true) @ (np.eye(3) - W_true.T)
print("Theta_true\n", Theta_true)
Sigma_true = inv(Theta_true)
print("Sigma_true\n",Sigma_true)

X, Y, Z, G_true, CPDAG = SCM_data.generate_scm_data(3, n_samples=n)

data = np.array([X, Y, Z]).T

#likelihood_true = np.linalg.slogdet(Theta_true)[1]-np.trace(data @ Theta_true @ data.T)/n

A_true = (np.eye(3) - W_true) @ inv(sqrtm(Omega_true))
likelihood_true_2 = - 2 * np.log(np.linalg.det(A_true)) + np.trace(A_true.T @ Sigma_true @ A_true)

init = np.array([[0, 0, 0], [0, 0, 0], [0, 0, 0]])

R_hat = np.cov(np.array([X, Y, Z]))
A_est, likelihood_est, sparsity_est = nodag(R_hat, max_iter=10000,init = init, verbose=True)
W_est = np.eye(3) - A_est
G_est = weight_to_adjacency(W_est, 0.1)
print(MEC.is_in_markov_equiv_class(G_true, G_est))
print("G_true = \n",G_true)
print("G_est = \n",G_est)
print("A_est = \n",A_est)
# print("likelihood_true = ",likelihood_true)
print("likelihood_true_2 = ",likelihood_true_2)
print("likelihood_est = ",likelihood_est)
print("sparsity_est = ",sparsity_est)
print("diff = ", likelihood_est - likelihood_true_2 )

Theta_true
 [[ 2. -1.  1.]
 [-1.  1. -1.]
 [ 1. -1.  2.]]
Sigma_true
 [[1. 1. 0.]
 [1. 3. 1.]
 [0. 1. 1.]]
Iteration 0: likelihood = 20126660195616.137
Iteration 259: break
False
G_true = 
 [[0 1 0]
 [0 0 0]
 [0 1 0]]
G_est = 
 [[1 1 1]
 [1 1 1]
 [1 1 1]]
A_est = 
 [[ 1.22656673 -0.32772511  0.22399825]
 [-0.33520821  0.75186236 -0.33644307]
 [ 0.22424808 -0.32918123  1.21872634]]
likelihood_true_2 =  3.0
likelihood_est =  3.0392714692596976
sparsity_est =  0.49739593917754243
diff =  0.0392714692596976


In [36]:
def log_likelihood(Theta, X):
    n = X.shape[0]
    return np.linalg.slogdet(Theta)[1] - np.trace(X @ Theta @ X.T) / n

def train_multiple_nodag_trials(R_hat, X, Theta_true, n_trials=10, verbose = False):
    d = R_hat.shape[0]
    best_result = None
    best_diff = float('inf')
    results = []

   
    ll_true = log_likelihood(Theta_true, X)

    for i in range(n_trials):
        A_init = np.random.normal(scale=1, size=(d, d))
        np.fill_diagonal(A_init, 1.0)  

        A_est, likelihood_est, sparsity_est = nodag(R_hat, max_iter = 10000, init = A_init)

        diff = abs(likelihood_est - ll_true)
        results.append((i, diff, likelihood_est, A_init, A_est))    

        if verbose == True:
            print(f"[Trial {i}] LogLik = {likelihood_est:.4f}, Diff = {diff:.6f}")

        if diff < best_diff:
            best_result = (i, diff, likelihood_est, A_init, A_est)
            best_diff = diff

    best_i, best_diff, best_ll, best_init, best_A = best_result
    print("\n✅ Best Trial:")
    print(f"  Trial Index      = {best_i}")
    print(f"  Log-Likelihood   = {best_ll:.4f}")
    print(f"  LL Diff to Truth = {best_diff:.6f}")
    print(f" Best init matrix:\n", best_init)

    return best_A, best_ll, best_diff, results

In [37]:
n = 10000
W_true = np.array([[0, 1, 0], [0, 0, 0], [0, 1, 0]])
Omega_true = np.eye(3)
Theta_true = (np.eye(3) - W_true) @ inv(Omega_true) @ (np.eye(3) - W_true.T)
print("Theta_true\n", Theta_true)
Sigma_true = inv(Theta_true)
print("Sigma_true\n",Sigma_true)

X, Y, Z, G_true, CPDAG = SCM_data.generate_scm_data(3, n_samples=n)

data = np.array([X, Y, Z]).T
likelihood_true = np.linalg.slogdet(Theta_true)[1]-np.trace(data @ Theta_true @ data.T)/n
print("likelihood_true = ",likelihood_true)
A_true = (np.eye(3) - W_true) @ inv(sqrtm(Omega_true))
R_hat = np.cov(np.array([X, Y, Z]))
A_est, best_likelihood, best_diff, results = train_multiple_nodag_trials(R_hat, data, Theta_true, 100)
W_est = np.eye(3) - A_est
G_est = weight_to_adjacency(W_est, 0.1)
print("G_true = \n",G_true)
print("G_est = \n",G_est)
print("A_est = \n",A_est)

Theta_true
 [[ 2. -1.  1.]
 [-1.  1. -1.]
 [ 1. -1.  2.]]
Sigma_true
 [[1. 1. 0.]
 [1. 3. 1.]
 [0. 1. 1.]]


likelihood_true =  -3.008174905798895

✅ Best Trial:
  Trial Index      = 6
  Log-Likelihood   = 3.0731
  LL Diff to Truth = 6.081314
 Best init matrix:
 [[ 1.          0.17640832  0.56648302]
 [ 0.5118715   1.         -0.36525408]
 [ 0.56588891 -0.57684138  1.        ]]
G_true = 
 [[0 1 0]
 [0 0 0]
 [0 1 0]]
G_est = 
 [[1 1 1]
 [1 1 1]
 [1 1 1]]
A_est = 
 [[ 1.12094228 -0.3003065   0.233333  ]
 [-0.28321754  0.74835811 -0.33778562]
 [ 0.1743131  -0.33798551  1.22281997]]


In [46]:
A = np.array([
    [ 1.23109931, -0.50780174,  0.45404282],
    [-0.28575097,  0.83222012, -0.46476196],
    [ 0.14120805, -0.38342536,  1.34691221]
])
print(Sigma_true)
print( - 2 * np.log(np.linalg.det(A)) + np.trace(A.T @ Sigma_true @ A))

[[1. 1. 0.]
 [1. 3. 1.]
 [0. 1. 1.]]
3.0000957007924645
