diff --git a/tests/TestPC.py b/tests/TestPC.py index 8871da8a..9450835a 100644 --- a/tests/TestPC.py +++ b/tests/TestPC.py @@ -9,6 +9,7 @@ from causallearn.graph.SHD import SHD from causallearn.utils.DAG2CPDAG import dag2cpdag from causallearn.utils.TXT2GeneralGraph import txt2generalgraph +from utils_simulate_data import simulate_discrete_data, simulate_linear_continuous_data @@ -173,19 +174,7 @@ def test_pc_simulate_linear_gaussian_with_fisher_z(self): # Then by Meek rule 3: 1 -> 3. ###### Simulation configuration: code to generate "./TestData/test_pc_simulated_linear_gaussian_data.txt" ###### - # np.random.seed(42) - # linear_weight_minabs, linear_weight_maxabs, linear_weight_netative_prob = 0.5, 0.9, 0.5 - # sample_size = 10000 - # adjacency_matrix = np.zeros((num_of_nodes, num_of_nodes)) - # adjacency_matrix[tuple(zip(*truth_DAG_directed_edges))] = 1 - # adjacency_matrix = adjacency_matrix.T - # weight_mask = np.random.uniform(linear_weight_minabs, linear_weight_maxabs, (num_of_nodes, num_of_nodes)) - # weight_mask[np.unravel_index(np.random.choice(np.arange(weight_mask.size), replace=False, - # size=int(weight_mask.size * linear_weight_netative_prob)), weight_mask.shape)] *= -1. - # adjacency_matrix = adjacency_matrix * weight_mask - # mixing_matrix = np.linalg.inv(np.eye(num_of_nodes) - adjacency_matrix) - # exogenous_noise = np.random.normal(0, 1, (num_of_nodes, sample_size)) - # data = (mixing_matrix @ exogenous_noise).T + # data = simulate_linear_continuous_data(num_of_nodes, 10000, truth_DAG_directed_edges, "gaussian", 42) ###### Simulation configuration: code to generate "./TestData/test_pc_simulated_linear_gaussian_data.txt" ###### data = np.loadtxt("./TestData/test_pc_simulated_linear_gaussian_data.txt", skiprows=1) @@ -202,6 +191,61 @@ def test_pc_simulate_linear_gaussian_with_fisher_z(self): # cg.draw_pydot_graph(labels=list(map(str, range(num_of_nodes)))) print('test_pc_simulate_linear_gaussian_with_fisher_z passed!\n') + # Simulate linear non-Gaussian data. Run PC with kci test with default parameters. + def test_pc_simulate_linear_nongaussian_with_kci(self): + print('Now start test_pc_simulate_linear_nongaussian_with_kci ...') + print('!! It will take around 17 mins to run this test (on M1 Max chip) ... !!') + print('!! You may also reduce the sample size (<2500), but the result will then not be totally correct ... !!') + + # Graph specification. + num_of_nodes = 5 + truth_DAG_directed_edges = {(0, 1), (0, 3), (1, 2), (1, 3), (2, 3), (2, 4), (3, 4)} + truth_CPDAG_directed_edges = {(0, 3), (1, 3), (2, 3), (2, 4), (3, 4)} + truth_CPDAG_undirected_edges = {(0, 1), (1, 2), (2, 1), (1, 0)} + # this simple graph is the same as in test_pc_simulate_linear_gaussian_with_fisher_z. + + data = simulate_linear_continuous_data(num_of_nodes, 2500, truth_DAG_directed_edges, "exponential", 42) + # there is no randomness in data generation (with seed fixed for simulate_data). + # however, there still exists randomness in KCI (null_sample_spectral). + # for this simple test, we can assume that KCI always returns the correct result (despite randomness). + + # Run PC with default parameters: stable=True, uc_rule=0 (uc_sepset), uc_priority=2 (prioritize existing colliders) + cg = pc(data, 0.05, kci) + returned_directed_edges = set(cg.find_fully_directed()) + returned_undirected_edges = set(cg.find_undirected()) + returned_bidirected_edges = set(cg.find_bi_directed()) + self.assertEqual(truth_CPDAG_directed_edges, returned_directed_edges, "Directed edges are not correct.") + self.assertEqual(truth_CPDAG_undirected_edges, returned_undirected_edges, "Undirected edges are not correct.") + self.assertEqual(0, len(returned_bidirected_edges), "There should be no bi-directed edges.") + print(f" pc(data, 0.05, kci)\treturns exactly the same CPDAG as the truth.") + # cg.draw_pydot_graph(labels=list(map(str, range(num_of_nodes)))) + print('test_pc_simulate_linear_nongaussian_with_kci passed!\n') + + # Simulate discrete data using forward sampling. Run PC with chisq test with default parameters. + def test_pc_simulate_discrete_with_chisq(self): + print('Now start test_pc_simulate_discrete_with_chisq ...') + + # Graph specification. + num_of_nodes = 5 + truth_DAG_directed_edges = {(0, 1), (0, 3), (1, 2), (1, 3), (2, 3), (2, 4), (3, 4)} + truth_CPDAG_directed_edges = {(0, 3), (1, 3), (2, 3), (2, 4), (3, 4)} + truth_CPDAG_undirected_edges = {(0, 1), (1, 2), (2, 1), (1, 0)} + # this simple graph is the same as in test_pc_simulate_linear_gaussian_with_fisher_z. + + data = simulate_discrete_data(num_of_nodes, 10000, truth_DAG_directed_edges, 42) + + # Run PC with default parameters: stable=True, uc_rule=0 (uc_sepset), uc_priority=2 (prioritize existing colliders) + cg = pc(data, 0.05, chisq) + returned_directed_edges = set(cg.find_fully_directed()) + returned_undirected_edges = set(cg.find_undirected()) + returned_bidirected_edges = set(cg.find_bi_directed()) + self.assertEqual(truth_CPDAG_directed_edges, returned_directed_edges, "Directed edges are not correct.") + self.assertEqual(truth_CPDAG_undirected_edges, returned_undirected_edges, "Undirected edges are not correct.") + self.assertEqual(0, len(returned_bidirected_edges), "There should be no bi-directed edges.") + print(f" pc(data, 0.05, chisq)\treturns exactly the same CPDAG as the truth.") + # cg.draw_pydot_graph(labels=list(map(str, range(num_of_nodes)))) + print('test_pc_simulate_discrete_with_chisq passed!\n') + # Load data from file "data_discrete_10.txt". Run PC with gsq or chisq test. def test_pc_load_discrete_10_with_gsq_chisq(self): print('Now start test_pc_load_discrete_10_with_gsq_chisq ...') @@ -258,7 +302,6 @@ def test_pc_load_linear_missing_10_with_mv_fisher_z(self): print('test_pc_load_linear_missing_10_with_mv_fisher_z passed!\n') - # Load data from data in bnlearn repository. Run PC with chisq. Test speed. def test_pc_load_bnlearn_discrete_datasets(self): print('Now start test_pc_load_bnlearn_discrete_datasets ...') diff --git a/tests/utils_simulate_data.py b/tests/utils_simulate_data.py new file mode 100644 index 00000000..18796d71 --- /dev/null +++ b/tests/utils_simulate_data.py @@ -0,0 +1,93 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +import numpy as np + +def simulate_discrete_data( + num_of_nodes, + sample_size, + truth_DAG_directed_edges, + random_seed=None): + from pgmpy.models.BayesianNetwork import BayesianNetwork + from pgmpy.factors.discrete import TabularCPD + from pgmpy.sampling import BayesianModelSampling + + def _simulate_cards(): + ''' + why we need this: to calculate cpd of a node with k parents, + the conditions to be enumerated is the production of these k parents' cardinalities + which will be exponentially slow w.r.t. k. + so we want that, if a node has many parents (large k), these parents' cardinalities should be small + + denote peers_num: peers_num[i, j] = k (where k>0), + means that there are k parents pointing to node i, and j is among these k parents. + max_peers = peers_num.max(axis=0): the larger max_peers[j], the smaller card[j] should be. + ''' + MAX_ENUMERATION_COMBINATION_NUM = 20 + in_degrees = adjacency_matrix.sum(axis=1) + peers_num = in_degrees[:, None] * adjacency_matrix + max_peers_num = peers_num.max(axis=0) + max_peers_num[max_peers_num == 0] = 1 # to avoid division by 0 (for leaf nodes) + cards = [np.random.randint(2, 1 + max(2, MAX_ENUMERATION_COMBINATION_NUM ** (1. / mpn))) + for mpn in max_peers_num] + return cards + + def _random_alpha(): + DIRICHLET_ALPHA_LOWER, DIRICHLET_ALPHA_UPPER = 1., 5. + return np.random.uniform(DIRICHLET_ALPHA_LOWER, DIRICHLET_ALPHA_UPPER) + + if random_seed is not None: + state = np.random.get_state() # save the current random state + np.random.seed(random_seed) # set the random state to 42 temporarily, just for the following lines + adjacency_matrix = np.zeros((num_of_nodes, num_of_nodes)) + adjacency_matrix[tuple(zip(*truth_DAG_directed_edges))] = 1 + adjacency_matrix = adjacency_matrix.T + + cards = _simulate_cards() + bn = BayesianNetwork(truth_DAG_directed_edges) # so isolating nodes will echo error + for node in range(num_of_nodes): + parents = np.where(adjacency_matrix[node])[0].tolist() + parents_card = [cards[prt] for prt in parents] + rand_ps = np.array([np.random.dirichlet(np.ones(cards[node]) * _random_alpha()) for _ in + range(int(np.prod(parents_card)))]).T.tolist() + + cpd = TabularCPD(node, cards[node], rand_ps, evidence=parents, evidence_card=parents_card) + bn.add_cpds(cpd) + inference = BayesianModelSampling(bn) + df = inference.forward_sample(size=sample_size, show_progress=False) + topo_order = list(map(int, df.columns)) + topo_index = [-1] * len(topo_order) + for ind, node in enumerate(topo_order): topo_index[node] = ind + data = df.to_numpy()[:, topo_index].astype(np.int64) + + if random_seed is not None: np.random.set_state(state) # restore the random state + return data + +def simulate_linear_continuous_data( + num_of_nodes, + sample_size, + truth_DAG_directed_edges, + noise_type='gaussian', # currently: 'gaussian' or 'exponential' + random_seed=None, + linear_weight_minabs=0.5, + linear_weight_maxabs=0.9, + linear_weight_netative_prob=0.5): + if random_seed is not None: + state = np.random.get_state() # save the current random state + np.random.seed(random_seed) # set the random state to 42 temporarily, just for the following lines + adjacency_matrix = np.zeros((num_of_nodes, num_of_nodes)) + adjacency_matrix[tuple(zip(*truth_DAG_directed_edges))] = 1 + adjacency_matrix = adjacency_matrix.T + weight_mask = np.random.uniform(linear_weight_minabs, linear_weight_maxabs, (num_of_nodes, num_of_nodes)) + weight_mask[np.unravel_index(np.random.choice(np.arange(weight_mask.size), replace=False, + size=int(weight_mask.size * linear_weight_netative_prob)), weight_mask.shape)] *= -1. + adjacency_matrix = adjacency_matrix * weight_mask + mixing_matrix = np.linalg.inv(np.eye(num_of_nodes) - adjacency_matrix) + if noise_type == 'gaussian': + exogenous_noise = np.random.normal(0, 1, (num_of_nodes, sample_size)) + elif noise_type == 'exponential': + exogenous_noise = np.random.exponential(1, (num_of_nodes, sample_size)) + else: + raise NotImplementedError + data = (mixing_matrix @ exogenous_noise).T # in shape (sample_size, num_of_nodes) + if random_seed is not None: np.random.set_state(state) # restore the random state + return data