Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 57 additions & 14 deletions tests/TestPC.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from causallearn.graph.SHD import SHD
from causallearn.utils.DAG2CPDAG import dag2cpdag
from causallearn.utils.TXT2GeneralGraph import txt2generalgraph
from utils_simulate_data import simulate_discrete_data, simulate_linear_continuous_data



Expand Down Expand Up @@ -173,19 +174,7 @@ def test_pc_simulate_linear_gaussian_with_fisher_z(self):
# Then by Meek rule 3: 1 -> 3.

###### Simulation configuration: code to generate "./TestData/test_pc_simulated_linear_gaussian_data.txt" ######
# np.random.seed(42)
# linear_weight_minabs, linear_weight_maxabs, linear_weight_netative_prob = 0.5, 0.9, 0.5
# sample_size = 10000
# adjacency_matrix = np.zeros((num_of_nodes, num_of_nodes))
# adjacency_matrix[tuple(zip(*truth_DAG_directed_edges))] = 1
# adjacency_matrix = adjacency_matrix.T
# weight_mask = np.random.uniform(linear_weight_minabs, linear_weight_maxabs, (num_of_nodes, num_of_nodes))
# weight_mask[np.unravel_index(np.random.choice(np.arange(weight_mask.size), replace=False,
# size=int(weight_mask.size * linear_weight_netative_prob)), weight_mask.shape)] *= -1.
# adjacency_matrix = adjacency_matrix * weight_mask
# mixing_matrix = np.linalg.inv(np.eye(num_of_nodes) - adjacency_matrix)
# exogenous_noise = np.random.normal(0, 1, (num_of_nodes, sample_size))
# data = (mixing_matrix @ exogenous_noise).T
# data = simulate_linear_continuous_data(num_of_nodes, 10000, truth_DAG_directed_edges, "gaussian", 42)
###### Simulation configuration: code to generate "./TestData/test_pc_simulated_linear_gaussian_data.txt" ######

data = np.loadtxt("./TestData/test_pc_simulated_linear_gaussian_data.txt", skiprows=1)
Expand All @@ -202,6 +191,61 @@ def test_pc_simulate_linear_gaussian_with_fisher_z(self):
# cg.draw_pydot_graph(labels=list(map(str, range(num_of_nodes))))
print('test_pc_simulate_linear_gaussian_with_fisher_z passed!\n')

# Simulate linear non-Gaussian data. Run PC with kci test with default parameters.
def test_pc_simulate_linear_nongaussian_with_kci(self):
print('Now start test_pc_simulate_linear_nongaussian_with_kci ...')
print('!! It will take around 17 mins to run this test (on M1 Max chip) ... !!')
print('!! You may also reduce the sample size (<2500), but the result will then not be totally correct ... !!')

# Graph specification.
num_of_nodes = 5
truth_DAG_directed_edges = {(0, 1), (0, 3), (1, 2), (1, 3), (2, 3), (2, 4), (3, 4)}
truth_CPDAG_directed_edges = {(0, 3), (1, 3), (2, 3), (2, 4), (3, 4)}
truth_CPDAG_undirected_edges = {(0, 1), (1, 2), (2, 1), (1, 0)}
# this simple graph is the same as in test_pc_simulate_linear_gaussian_with_fisher_z.

data = simulate_linear_continuous_data(num_of_nodes, 2500, truth_DAG_directed_edges, "exponential", 42)
# there is no randomness in data generation (with seed fixed for simulate_data).
# however, there still exists randomness in KCI (null_sample_spectral).
# for this simple test, we can assume that KCI always returns the correct result (despite randomness).

# Run PC with default parameters: stable=True, uc_rule=0 (uc_sepset), uc_priority=2 (prioritize existing colliders)
cg = pc(data, 0.05, kci)
returned_directed_edges = set(cg.find_fully_directed())
returned_undirected_edges = set(cg.find_undirected())
returned_bidirected_edges = set(cg.find_bi_directed())
self.assertEqual(truth_CPDAG_directed_edges, returned_directed_edges, "Directed edges are not correct.")
self.assertEqual(truth_CPDAG_undirected_edges, returned_undirected_edges, "Undirected edges are not correct.")
self.assertEqual(0, len(returned_bidirected_edges), "There should be no bi-directed edges.")
print(f" pc(data, 0.05, kci)\treturns exactly the same CPDAG as the truth.")
# cg.draw_pydot_graph(labels=list(map(str, range(num_of_nodes))))
print('test_pc_simulate_linear_nongaussian_with_kci passed!\n')

# Simulate discrete data using forward sampling. Run PC with chisq test with default parameters.
def test_pc_simulate_discrete_with_chisq(self):
print('Now start test_pc_simulate_discrete_with_chisq ...')

# Graph specification.
num_of_nodes = 5
truth_DAG_directed_edges = {(0, 1), (0, 3), (1, 2), (1, 3), (2, 3), (2, 4), (3, 4)}
truth_CPDAG_directed_edges = {(0, 3), (1, 3), (2, 3), (2, 4), (3, 4)}
truth_CPDAG_undirected_edges = {(0, 1), (1, 2), (2, 1), (1, 0)}
# this simple graph is the same as in test_pc_simulate_linear_gaussian_with_fisher_z.

data = simulate_discrete_data(num_of_nodes, 10000, truth_DAG_directed_edges, 42)

# Run PC with default parameters: stable=True, uc_rule=0 (uc_sepset), uc_priority=2 (prioritize existing colliders)
cg = pc(data, 0.05, chisq)
returned_directed_edges = set(cg.find_fully_directed())
returned_undirected_edges = set(cg.find_undirected())
returned_bidirected_edges = set(cg.find_bi_directed())
self.assertEqual(truth_CPDAG_directed_edges, returned_directed_edges, "Directed edges are not correct.")
self.assertEqual(truth_CPDAG_undirected_edges, returned_undirected_edges, "Undirected edges are not correct.")
self.assertEqual(0, len(returned_bidirected_edges), "There should be no bi-directed edges.")
print(f" pc(data, 0.05, chisq)\treturns exactly the same CPDAG as the truth.")
# cg.draw_pydot_graph(labels=list(map(str, range(num_of_nodes))))
print('test_pc_simulate_discrete_with_chisq passed!\n')

# Load data from file "data_discrete_10.txt". Run PC with gsq or chisq test.
def test_pc_load_discrete_10_with_gsq_chisq(self):
print('Now start test_pc_load_discrete_10_with_gsq_chisq ...')
Expand Down Expand Up @@ -258,7 +302,6 @@ def test_pc_load_linear_missing_10_with_mv_fisher_z(self):

print('test_pc_load_linear_missing_10_with_mv_fisher_z passed!\n')


# Load data from data in bnlearn repository. Run PC with chisq. Test speed.
def test_pc_load_bnlearn_discrete_datasets(self):
print('Now start test_pc_load_bnlearn_discrete_datasets ...')
Expand Down
93 changes: 93 additions & 0 deletions tests/utils_simulate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import numpy as np

def simulate_discrete_data(
num_of_nodes,
sample_size,
truth_DAG_directed_edges,
random_seed=None):
from pgmpy.models.BayesianNetwork import BayesianNetwork
from pgmpy.factors.discrete import TabularCPD
from pgmpy.sampling import BayesianModelSampling

def _simulate_cards():
'''
why we need this: to calculate cpd of a node with k parents,
the conditions to be enumerated is the production of these k parents' cardinalities
which will be exponentially slow w.r.t. k.
so we want that, if a node has many parents (large k), these parents' cardinalities should be small

denote peers_num: peers_num[i, j] = k (where k>0),
means that there are k parents pointing to node i, and j is among these k parents.
max_peers = peers_num.max(axis=0): the larger max_peers[j], the smaller card[j] should be.
'''
MAX_ENUMERATION_COMBINATION_NUM = 20
in_degrees = adjacency_matrix.sum(axis=1)
peers_num = in_degrees[:, None] * adjacency_matrix
max_peers_num = peers_num.max(axis=0)
max_peers_num[max_peers_num == 0] = 1 # to avoid division by 0 (for leaf nodes)
cards = [np.random.randint(2, 1 + max(2, MAX_ENUMERATION_COMBINATION_NUM ** (1. / mpn)))
for mpn in max_peers_num]
return cards

def _random_alpha():
DIRICHLET_ALPHA_LOWER, DIRICHLET_ALPHA_UPPER = 1., 5.
return np.random.uniform(DIRICHLET_ALPHA_LOWER, DIRICHLET_ALPHA_UPPER)

if random_seed is not None:
state = np.random.get_state() # save the current random state
np.random.seed(random_seed) # set the random state to 42 temporarily, just for the following lines
adjacency_matrix = np.zeros((num_of_nodes, num_of_nodes))
adjacency_matrix[tuple(zip(*truth_DAG_directed_edges))] = 1
adjacency_matrix = adjacency_matrix.T

cards = _simulate_cards()
bn = BayesianNetwork(truth_DAG_directed_edges) # so isolating nodes will echo error
for node in range(num_of_nodes):
parents = np.where(adjacency_matrix[node])[0].tolist()
parents_card = [cards[prt] for prt in parents]
rand_ps = np.array([np.random.dirichlet(np.ones(cards[node]) * _random_alpha()) for _ in
range(int(np.prod(parents_card)))]).T.tolist()

cpd = TabularCPD(node, cards[node], rand_ps, evidence=parents, evidence_card=parents_card)
bn.add_cpds(cpd)
inference = BayesianModelSampling(bn)
df = inference.forward_sample(size=sample_size, show_progress=False)
topo_order = list(map(int, df.columns))
topo_index = [-1] * len(topo_order)
for ind, node in enumerate(topo_order): topo_index[node] = ind
data = df.to_numpy()[:, topo_index].astype(np.int64)

if random_seed is not None: np.random.set_state(state) # restore the random state
return data

def simulate_linear_continuous_data(
num_of_nodes,
sample_size,
truth_DAG_directed_edges,
noise_type='gaussian', # currently: 'gaussian' or 'exponential'
random_seed=None,
linear_weight_minabs=0.5,
linear_weight_maxabs=0.9,
linear_weight_netative_prob=0.5):
if random_seed is not None:
state = np.random.get_state() # save the current random state
np.random.seed(random_seed) # set the random state to 42 temporarily, just for the following lines
adjacency_matrix = np.zeros((num_of_nodes, num_of_nodes))
adjacency_matrix[tuple(zip(*truth_DAG_directed_edges))] = 1
adjacency_matrix = adjacency_matrix.T
weight_mask = np.random.uniform(linear_weight_minabs, linear_weight_maxabs, (num_of_nodes, num_of_nodes))
weight_mask[np.unravel_index(np.random.choice(np.arange(weight_mask.size), replace=False,
size=int(weight_mask.size * linear_weight_netative_prob)), weight_mask.shape)] *= -1.
adjacency_matrix = adjacency_matrix * weight_mask
mixing_matrix = np.linalg.inv(np.eye(num_of_nodes) - adjacency_matrix)
if noise_type == 'gaussian':
exogenous_noise = np.random.normal(0, 1, (num_of_nodes, sample_size))
elif noise_type == 'exponential':
exogenous_noise = np.random.exponential(1, (num_of_nodes, sample_size))
else:
raise NotImplementedError
data = (mixing_matrix @ exogenous_noise).T # in shape (sample_size, num_of_nodes)
if random_seed is not None: np.random.set_state(state) # restore the random state
return data