In [None]:
import networkx as nx
import numpy as np
from sklearn.covariance import GraphicalLassoCV
from scipy.linalg import cho_factor, cho_solve
import warnings
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal

warnings.filterwarnings("ignore", category=RuntimeWarning)

def generate_weighted_adjacency_matrix():
    print("生成带权邻接矩阵 A...")
    # 构建随机图结构
    G = nx.grid_2d_graph(10, 10)
    A = np.zeros((100, 100))
    for (i, j) in G.edges():
        weight = np.random.uniform(2, 5)
        A[i,j] = weight
        A[j,i] = weight
    
    return A

In [None]:
# 生成底层精度矩阵 Theta
def generate_precision_matrix(A):
    delta = 1.05 * np.max(np.linalg.eigvals(A))
    Theta = delta * np.eye(100) - A
    return Theta

In [None]:
def get_Theta(A):
    def find_E(A):
        diag_tmp = np.diag(A)
        E = np.diag(1 / np.sqrt(diag_tmp))
        return E
    tmp = np.linalg.inv(A)
    E = find_E(tmp)
    E = np.linalg.inv(E)
    Theta = E @ A @ E
    return Theta

In [None]:
# 生成观测数据
def generate_observation_data(Theta, n):
    Theta_star = get_Theta(Theta)
    Theta_star_inv = np.linalg.inv(Theta_star)
    mean = np.zeros(100)
    samples = np.random.multivariate_normal(mean, Theta_star_inv, size=n)
    cov_matrix = np.cov(samples, rowvar=False)
    return samples, cov_matrix

In [None]:
# 定义权重更新函数 p_lambda (使用SCAD penalty)
def p_lambda(x):
    a = 3.7
    threshold = 0.8 * a
    return np.where(x <= threshold, a * x,  a/2 * (np.abs(x) - threshold))

In [None]:
# 定义目标函数
def objective_function(Theta, bSigma, lambda_func):
    n = Theta.shape[0]
    log_det = -np.log(np.linalg.det(Theta))
    trace_term = np.trace(Theta @ bSigma)
    penalty_term = np.sum(lambda_func * (Theta - np.diag(np.diag(Theta))))
    #print(log_det + trace_term - penalty_term)
    return log_det + trace_term - penalty_term

In [None]:
# 梯度计算
def compute_gradient(Theta, bSigma, lambda_func):
    gradient = -np.linalg.inv(Theta) + bSigma - lambda_func
    return gradient

In [None]:
def projection_matrix(X):
    projection = np.zeros_like(X)
    rows, cols = X.shape
    for i in range(rows):
        for j in range(cols):
            if i == j:
                projection[i,j] = X[i,j]
            else:
                projection[i,j] = min(0, X[i,j])
    return projection

In [None]:
def is_M_matrix(A):
    if A.shape[0] != A.shape[1]:
        return False
    for i in range(A.shape[0]):
        if A[i,i] > 0:
            return False
        for j in range(A.shape[1]):
            if i != j and A[i,j] < 0:
                return False
    return True

In [None]:
def perform_gradient_projection(Theta, bSigma, lambda_func, sigma, alpha, beta):
    # while True:
    for i in range(100):
        gradient = compute_gradient(Theta, bSigma, lambda_func)
        m = 0
        while True:
            tmp_matrix = Theta - sigma * (beta ** m) * gradient
            #print("Theta: ", Theta)
            Theta_new = projection_matrix(tmp_matrix)
            #print("Theta_new: ", Theta_new)
            # m += 1
            factor = 1 / sigma / (beta ** m) * (Theta - Theta_new)
            if np.all(np.linalg.eigvals(Theta_new) > 0) and \
                    objective_function(Theta_new, bSigma, lambda_func) <= \
                    objective_function(Theta, bSigma, lambda_func) - \
                    alpha * sigma * (beta ** m) * np.linalg.norm(factor, 'fro') ** 2:
                Theta = Theta_new
                break
            m += 1
        Theta = Theta_new
    return Theta

In [None]:
A = generate_weighted_adjacency_matrix()
Theta = generate_precision_matrix(A)
samples, bSigma = generate_observation_data(Theta, 100)
#print(bSigma)
print(Theta)

prev_Theta = np.identity(100)
sigma = 0.05
alpha = 0.003
beta = 0.5

for k in range(10):
    lambda_func = p_lambda(np.abs(Theta - np.diag(np.diag(Theta))))
    #print("lambda: ", lambda_func)
    Theta_new1 = perform_gradient_projection(Theta, bSigma, lambda_func, sigma, alpha, beta)
    #print(Theta_new1)
    Theta = Theta_new1

#print(Theta_new1)

Theta_tmp = generate_precision_matrix(A)
theta_star = get_Theta(Theta_tmp)
estimation_error = np.linalg.norm(Theta_new1 - theta_star, 'fro') / np.linalg.norm(theta_star, 'fro')
print(estimation_error)

In [None]:
def calculate_evaluation_metrics(Theta_hat, Theta_star, threshold=0.1):
    # 二值化估计的和真实的精度矩阵
    estimated_binary = (np.abs(Theta_hat) > threshold).astype(int)
    true_binary = (np.abs(Theta_star) > threshold).astype(int)

    # 计算真正例（TP）、假正例（FP）、真负例（TN）和假负例（FN）
    TP = np.sum(np.logical_and(estimated_binary == 1, true_binary == 1))
    FP = np.sum(np.logical_and(estimated_binary == 1, true_binary == 0))
    TN = np.sum(np.logical_and(estimated_binary == 0, true_binary == 0))
    FN = np.sum(np.logical_and(estimated_binary == 0, true_binary == 1))

    # 计算真正例率（TPR）、假正例率（FPR）和 F-score
    TPR = TP / (TP + FN)
    FPR = FP / (FP + TN)
    F_score = 2 * TP / (2 * TP + FP + FN)

    return TPR, FPR, F_score

In [None]:
def sltp_precision(samples, alpha = 0.5):
    cov_estimator = GraphicalLassoCV()
    cov_estimator.fit(samples)
    cov_matrix = cov_estimator.covariance_
    n_samples = samples.shape[0]
    L, lower = cho_factor(cov_matrix)
    sltp_matrix = cho_solve((L, lower), np.eye(n_samples)) / alpha
    return sltp_matrix

Theta_SLTP = sltp_precision(samples)
Theta_GLasso = GraphicalLassoCV().fit(samples).precision_
TPR, FPR, F_score = calculate_evaluation_metrics(Theta_SLTP, theta_star)
estimation_error = np.linalg.norm(Theta_SLTP - theta_star, 'fro') / np.linalg.norm(theta_star, 'fro')
print("SLTP Method")
print("error:", estimation_error)
print("TPR:", TPR)
print("FPR:", FPR)
print("F-score:", F_score)
print('\n')
TPR_g, FPR_g, F_score_g = calculate_evaluation_metrics(Theta_GLasso, theta_star)
estimation_error_g = np.linalg.norm(Theta_GLasso - theta_star, 'fro') / np.linalg.norm(theta_star, 'fro')
print("GLasso Method")
print("error:", estimation_error_g)
print("TPR:", TPR_g)
print("FPR:", FPR_g)
print("F-score:", F_score_g)
TPR_p, FPR_p, F_score_p = calculate_evaluation_metrics(Theta_new1, theta_star)
estimation_error_p = np.linalg.norm(Theta_new1 - theta_star, 'fro') / np.linalg.norm(theta_star, 'fro')
print("Proposed Method")
print("error:", estimation_error_p)
print("TPR:", TPR_p)
print("FPR:", FPR_p)
print("F-score:", F_score_p)

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# 创建一个10x10的grid网络
grid_graph = nx.grid_2d_graph(10, 10)

# 创建一个line网络
line_graph = nx.path_graph(100)

# 创建一个Barabasi-Albert模型网络
barabasi_albert_graph = nx.barabasi_albert_graph(100, 1)

# 创建一个Stochastic Block Model网络
n = 100
p = [[0.1, 0.02, 0.02, 0.02],  # 第一个块内连接概率
     [0.02, 0.1, 0.02, 0.02],  # 第二个块内连接概率
     [0.02, 0.02, 0.1, 0.02],  # 第三个块内连接概率
     [0.02, 0.02, 0.02, 0.1]]  # 第四个块内连接概率

block_sizes = [n // 4, n // 4, n // 4, n // 4]  # 四个块的节点数量

sbm_graph = nx.stochastic_block_model(block_sizes, p, seed=0)

# 创建一个2x2的图
plt.figure(figsize=(10, 10))

# 绘制Grid Network
plt.subplot(2, 2, 1)
pos = nx.spring_layout(grid_graph)
nx.draw(grid_graph, pos, with_labels=False, node_size=50, node_color='skyblue', edge_color='gray', linewidths=0.5)
plt.title("Grid Network")

# 绘制Line Network
plt.subplot(2, 2, 2)
pos = nx.spring_layout(line_graph)
nx.draw(line_graph, pos, with_labels=False, node_size=50, node_color='lightgreen', edge_color='gray', linewidths=0.5)
plt.title("Line Network")

# 绘制Barabasi-Albert Network
plt.subplot(2, 2, 3)
pos = nx.spring_layout(barabasi_albert_graph)
nx.draw(barabasi_albert_graph, pos, with_labels=False, node_size=50, node_color='lightcoral', edge_color='gray', linewidths=0.5)
plt.title("Barabasi-Albert Network")

# 绘制Stochastic Block Model Network
plt.subplot(2, 2, 4)
pos = nx.spring_layout(sbm_graph)
nx.draw(sbm_graph, pos, with_labels=False, node_size=50, node_color='lightblue', edge_color='gray', linewidths=0.5)
plt.title("Stochastic Block Model Network")

plt.tight_layout()
plt.show()

In [None]:
def generate_line_adjacency_matrix():
    print("生成带权邻接矩阵 A...")
    # 构建随机图结构
    G = nx.path_graph(100)
    A = np.zeros((100, 100))
    for (i, j) in G.edges():
        weight = np.random.uniform(2, 5)
        A[i,j] = weight
        A[j,i] = weight
    
    return A

A_line = generate_line_adjacency_matrix()
Theta_line = generate_precision_matrix(A_line)
samples_line, bSigma_line = generate_observation_data(Theta_line, 40)

prev_Theta = np.identity(100)
sigma = 0.05
alpha = 0.01
beta = 0.5

for k in range(10):
    lambda_func_line = p_lambda(np.abs(Theta_line - np.diag(np.diag(Theta_line))))
    #print("lambda: ", lambda_func)
    Theta_new1_line = perform_gradient_projection(Theta_line, bSigma_line, lambda_func_line, sigma, alpha, beta)
    #print(Theta_new1)
    Theta_line = Theta_new1_line

Theta_tmp_line = generate_precision_matrix(A_line)
theta_star_line = get_Theta(Theta_tmp_line)
estimation_error_line = np.linalg.norm(Theta_new1_line - theta_star_line, 'fro') / np.linalg.norm(theta_star_line, 'fro')
print(estimation_error_line)

In [None]:
def generate_barabasi_adjacency_matrix():
    print("生成带权邻接矩阵 A...")
    # 构建随机图结构
    G = nx.barabasi_albert_graph(100, 1)
    A = np.zeros((100, 100))
    for (i, j) in G.edges():
        weight = np.random.uniform(2, 5)
        A[i,j] = weight
        A[j,i] = weight
    
    return A

A_barabasi = generate_barabasi_adjacency_matrix()
Theta_barabasi = generate_precision_matrix(A_barabasi)
samples_barabasi, bSigma_barabasi = generate_observation_data(Theta_barabasi, 100)

prev_Theta = np.identity(100)
sigma = 0.05
alpha = 0.01
beta = 0.5

for k in range(10):
    lambda_func_barabasi = p_lambda(np.abs(Theta_barabasi - np.diag(np.diag(Theta_barabasi))))
    #print("lambda: ", lambda_func)
    Theta_new1_barabasi = perform_gradient_projection(Theta_barabasi, bSigma_barabasi, lambda_func_barabasi, sigma, alpha, beta)
    #print(Theta_new1)
    Theta_barabasi = Theta_new1_barabasi

Theta_tmp_barabasi = generate_precision_matrix(A_barabasi)
theta_star_barabasi = get_Theta(Theta_tmp_barabasi)
estimation_error_barabasi = np.linalg.norm(Theta_new1_barabasi - theta_star_barabasi, 'fro') / np.linalg.norm(theta_star_barabasi, 'fro')
print(estimation_error_barabasi)

In [None]:
def generate_stock_adjacency_matrix():
    print("生成带权邻接矩阵 A...")
    # 构建随机图结构
    G = sbm_graph
    A = np.zeros((100, 100))
    for (i, j) in G.edges():
        weight = np.random.uniform(2, 5)
        A[i,j] = weight
        A[j,i] = weight
    
    return A

A_stock = generate_stock_adjacency_matrix()
Theta_stock = generate_precision_matrix(A_stock)
samples_stock, bSigma_stock = generate_observation_data(Theta_stock, 100)

prev_Theta = np.identity(100)
sigma = 0.05
alpha = 0.01
beta = 0.5

for k in range(10):
    lambda_func_stock = p_lambda(np.abs(Theta_stock - np.diag(np.diag(Theta_stock))))
    Theta_new1_stock = perform_gradient_projection(Theta_stock, bSigma_stock, lambda_func_stock, sigma, alpha, beta)
    #print(Theta_new1)
    Theta_stock = Theta_new1_stock

Theta_tmp_stock = generate_precision_matrix(A_stock)
theta_star_stock = get_Theta(Theta_tmp_stock)
estimation_error_stock = np.linalg.norm(Theta_new1_stock - theta_star_stock, 'fro') / np.linalg.norm(theta_star_stock, 'fro')
print(estimation_error_stock)