###Preparation of Libraries

In [None]:
!pip install numpy==1.25.0

In [None]:
!pip install openai
!pip install lingam
!pip install factor_analyzer
!pip install igraph
!pip install pygam
!pip install causal-learn

In [None]:

import os
import numpy as np
import pandas as pd
import graphviz
import lingam
from sklearn.preprocessing import StandardScaler
from lingam.utils import print_causal_directions, print_dagc, make_dot, make_prior_knowledge
import hashlib
import matplotlib.pyplot as plt
import seaborn as sns
from causallearn.utils.GraphUtils import GraphUtils
import matplotlib.image as mpimg
import io
from scipy.stats import norm
from copy import deepcopy
from itertools import combinations
import numbers
from sklearn.utils import check_array, resample
from multiprocessing import Pool

from causallearn.search.ConstraintBased.PC import pc
from causallearn.utils.PCUtils.BackgroundKnowledge import BackgroundKnowledge
from causallearn.graph.GraphNode import GraphNode
from causallearn.search.ScoreBased.ExactSearch import bic_exact_search

print("NumPy",  "ver:", np.__version__)
print("Pandas", "ver:", pd.__version__)
print("Graphviz",   "ver:", graphviz.__version__)
print("LiNGAM", "ver:", lingam.__version__)

np.set_printoptions(precision=3, suppress=True)

np.random.seed(203)

NumPy ver: 1.25.0
Pandas ver: 1.5.3
Graphviz ver: 0.20.1
LiNGAM ver: 1.8.2


setting ground truth

In [None]:
GT_X_df = pd.read_csv('', header=None) #read the csv file of ground truth matrix
GT_X = GT_X_df.to_numpy()

functions for metrics calcuation

In [None]:
def create_0or1_causal_matrix(adjacency_matrix):
    num_nodes = adjacency_matrix.shape[0]
    causal_0or1_matrix = np.empty(adjacency_matrix.shape, dtype = object)

    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix[i, j] == 0:
                causal_0or1_matrix[i, j] = 0
            else:
                causal_0or1_matrix[i, j] = 1

    return causal_0or1_matrix

#SHD
def calc_SHD(True_matrix, adjacency_matrix):
    num_nodes = adjacency_matrix.shape[0]
    adjacency_matrix_0or1 = create_0or1_causal_matrix(adjacency_matrix)
    A = 0 # total number of edge additions
    for i in range(num_nodes):
        for j in range(num_nodes):
            if True_matrix[i, j]==0 and True_matrix[j, i]==0 and adjacency_matrix_0or1[i, j] == 1:
                A = A + 1
            else:
                continue

    D = 0 # total number of edge deletions
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==0 and adjacency_matrix_0or1[j, i]==0 and True_matrix[i, j] == 1:
                D = D + 1
            else:
                continue

    R = 0 # total number of edge reversals
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==0 and adjacency_matrix_0or1[j, i]==1 and True_matrix[i, j] == 1 and True_matrix[j, i] == 0:
                R = R + 1
            else:
                continue
    return A + D + R

#False Positive Rate
def calc_FPR(True_matrix, adjacency_matrix):
    num_nodes = adjacency_matrix.shape[0]
    adjacency_matrix_0or1 = create_0or1_causal_matrix(adjacency_matrix)
    TN = 0 # total number of true negative
    for i in range(num_nodes):
        for j in range(num_nodes):
            if True_matrix[i, j]==0 and adjacency_matrix_0or1[i, j] == 0:
                TN = TN + 1
            else:
                continue

    FP = 0 # total number of false positive
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==1 and True_matrix[i, j] == 0:
                FP = FP + 1
            else:
                continue

    return FP/(TN+FP)

#False Negative Rate
def calc_FNR(True_matrix, adjacency_matrix):
    num_nodes = adjacency_matrix.shape[0]
    adjacency_matrix_0or1 = create_0or1_causal_matrix(adjacency_matrix)
    TP = 0 # total number of true positive
    for i in range(num_nodes):
        for j in range(num_nodes):
            if True_matrix[i, j]==1 and adjacency_matrix_0or1[i, j] == 1:
                TP = TP + 1
            else:
                continue

    FN = 0 # total number of false negative
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==0 and True_matrix[i, j] == 1:
                FN = FN + 1
            else:
                continue

    return FN/(TP+FN)

#Precision
def calc_precision(True_matrix, adjacency_matrix):
    num_nodes = adjacency_matrix.shape[0]
    adjacency_matrix_0or1 = create_0or1_causal_matrix(adjacency_matrix)
    TP = 0 # total number of true positive
    for i in range(num_nodes):
        for j in range(num_nodes):
            if True_matrix[i, j]==1 and adjacency_matrix_0or1[i, j] == 1:
                TP = TP + 1
            else:
                continue

    FP = 0 # total number of false positive
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==1 and True_matrix[i, j] == 0:
                FP = FP + 1
            else:
                continue

    return TP/(TP+FP)

#F1 score
def calc_F1score(True_matrix, adjacency_matrix):
    num_nodes = adjacency_matrix.shape[0]
    adjacency_matrix_0or1 = create_0or1_causal_matrix(adjacency_matrix)
    TP = 0 # total number of true positive
    for i in range(num_nodes):
        for j in range(num_nodes):
            if True_matrix[i, j]==1 and adjacency_matrix_0or1[i, j] == 1:
                TP = TP + 1
            else:
                continue

    FP = 0 # total number of false positive
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==1 and True_matrix[i, j] == 0:
                FP = FP + 1
            else:
                continue

    FN = 0 # total number of false negative
    for i in range(num_nodes):
        for j in range(num_nodes):
            if adjacency_matrix_0or1[i, j]==0 and True_matrix[i, j] == 1:
                FN = FN + 1
            else:
                continue

    return 2*TP/(2*TP+FN+FP)