diff --git a/causallearn/graph/GraphClass.py b/causallearn/graph/GraphClass.py index f65211e5..af5bf32b 100644 --- a/causallearn/graph/GraphClass.py +++ b/causallearn/graph/GraphClass.py @@ -18,6 +18,7 @@ from causallearn.graph.Node import Node from causallearn.utils.GraphUtils import GraphUtils from causallearn.utils.PCUtils.Helper import list_union, powerset +from causallearn.utils.cit import CIT class CausalGraph: @@ -34,10 +35,7 @@ def __init__(self, no_of_var: int, node_names: List[str] | None = None): for i in range(no_of_var): for j in range(i + 1, no_of_var): self.G.add_edge(Edge(nodes[i], nodes[j], Endpoint.TAIL, Endpoint.TAIL)) - - self.data = None # store the data - self.test = None # store the name of the conditional independence test - self.corr_mat = None # store the correlation matrix of the data + self.test: CIT | None = None self.sepset = np.empty((no_of_var, no_of_var), object) # store the collection of sepsets self.definite_UC = [] # store the list of definite unshielded colliders self.definite_non_UC = [] # store the list of definite unshielded non-colliders @@ -47,35 +45,17 @@ def __init__(self, no_of_var: int, node_names: List[str] | None = None): self.nx_skel = nx.Graph() # store the undirected graph self.labels = {} self.prt_m = {} # store the parents of missingness indicators - self.mvpc = False - self.cardinalities = None # only works when self.data is discrete, i.e. self.test is chisq or gsq - self.is_discrete = False - self.citest_cache = dict() - self.data_hash_key = None - self.ci_test_hash_key = None - - def set_ind_test(self, indep_test, mvpc=False): + + + def set_ind_test(self, indep_test): """Set the conditional independence test that will be used""" - # assert name_of_test in ["Fisher_Z", "Chi_sq", "G_sq"] - self.mvpc = mvpc self.test = indep_test - self.ci_test_hash_key = hash(indep_test) def ci_test(self, i: int, j: int, S) -> float: """Define the conditional independence test""" # assert i != j and not i in S and not j in S - if self.mvpc: - return self.test(self.data, self.nx_skel, self.prt_m, i, j, S) - - i, j = (i, j) if (i < j) else (j, i) - ijS_key = (i, j, frozenset(S), self.data_hash_key, self.ci_test_hash_key) - if ijS_key in self.citest_cache: - return self.citest_cache[ijS_key] - # if discrete, assert self.test is chisq or gsq, pass into cardinalities - pValue = self.test(self.data, i, j, S, self.cardinalities) if self.is_discrete \ - else self.test(self.data, i, j, S) - self.citest_cache[ijS_key] = pValue - return pValue + if self.test.method == 'mc_fisherz': return self.test(i, j, S, self.nx_skel, self.prt_m) + return self.test(i, j, S) def neighbors(self, i: int): """Find the neighbors of node i in adjmat""" diff --git a/causallearn/search/ConstraintBased/CDNOD.py b/causallearn/search/ConstraintBased/CDNOD.py index bede8eba..b6f93444 100644 --- a/causallearn/search/ConstraintBased/CDNOD.py +++ b/causallearn/search/ConstraintBased/CDNOD.py @@ -10,11 +10,12 @@ from causallearn.utils.PCUtils.BackgroundKnowledge import BackgroundKnowledge from causallearn.utils.PCUtils.BackgroundKnowledgeOrientUtils import orient_by_background_knowledge from causallearn.utils.cit import * +from causallearn.search.ConstraintBased.PC import get_parent_missingness_pairs, skeleton_correction -def cdnod(data: ndarray, c_indx: ndarray, alpha: float = 0.05, indep_test=fisherz, stable: bool = True, - uc_rule: int = 0, uc_priority: int = 2, mvcdnod: bool = False, correction_name: str = 'MV_Crtn_Fisher_Z', - background_knowledge: Optional[BackgroundKnowledge] = None, verbose: bool = False, +def cdnod(data: ndarray, c_indx: ndarray, alpha: float=0.05, indep_test: str=fisherz, stable: bool=True, + uc_rule: int=0, uc_priority: int=2, mvcdnod: bool=False, correction_name: str='MV_Crtn_Fisher_Z', + background_knowledge: Optional[BackgroundKnowledge]=None, verbose: bool=False, show_progress: bool = True) -> CausalGraph: """ Causal discovery from nonstationary/heterogeneous data @@ -43,7 +44,7 @@ def cdnod(data: ndarray, c_indx: ndarray, alpha: float = 0.05, indep_test=fisher show_progress=show_progress) -def cdnod_alg(data: ndarray, alpha: float, indep_test, stable: bool, uc_rule: int, uc_priority: int, +def cdnod_alg(data: ndarray, alpha: float, indep_test: str, stable: bool, uc_rule: int, uc_priority: int, background_knowledge: Optional[BackgroundKnowledge] = None, verbose: bool = False, show_progress: bool = True) -> CausalGraph: """ @@ -84,6 +85,7 @@ def cdnod_alg(data: ndarray, alpha: float, indep_test, stable: bool, uc_rule: in """ start = time.time() + indep_test = CIT(data, indep_test) cg_1 = SkeletonDiscovery.skeleton_discovery(data, alpha, indep_test, stable) # orient the direction from c_indx to X, if there is an edge between c_indx and X @@ -124,7 +126,7 @@ def cdnod_alg(data: ndarray, alpha: float, indep_test, stable: bool, uc_rule: in return cg -def mvcdnod_alg(data: ndarray, alpha: float, indep_test, correction_name: str, stable: bool, uc_rule: int, +def mvcdnod_alg(data: ndarray, alpha: float, indep_test: str, correction_name: str, stable: bool, uc_rule: int, uc_priority: int, verbose: bool, show_progress: bool) -> CausalGraph: """ :param data: data set (numpy ndarray) @@ -154,9 +156,9 @@ def mvcdnod_alg(data: ndarray, alpha: float, indep_test, correction_name: str, s """ start = time.time() - + indep_test = CIT(data, indep_test) ## Step 1: detect the direct causes of missingness indicators - prt_m = get_prt_mpairs(data, alpha, indep_test, stable) + prt_m = get_parent_missingness_pairs(data, alpha, indep_test, stable) # print('Finish detecting the parents of missingness indicators. ') ## Step 2: @@ -204,257 +206,3 @@ def mvcdnod_alg(data: ndarray, alpha: float, indep_test, correction_name: str, s cg.PC_elapsed = end - start return cg - - -####################################################################################################################### -## *********** Functions for Step 1 *********** -def get_prt_mpairs(data: ndarray, alpha: float, indep_test, stable: bool = True) -> Dict[str, list]: - """ - Detect the parents of missingness indicators - If a missingness indicator has no parent, it will not be included in the result - :param data: data set (numpy ndarray) - :param alpha: desired significance level in (0, 1) (float) - :param indep_test: name of the test-wise deletion independence test being used - - "MV_Fisher_Z": Fisher's Z conditional independence test - - "MV_G_sq": G-squared conditional independence test (TODO: under development) - :param stable: run stabilized skeleton discovery if True (default = True) - :return: - cg: a CausalGraph object - """ - prt_m = {'prt': [], 'm': []} - - ## Get the index of missingness indicators - m_indx = get_mindx(data) - - ## Get the index of parents of missingness indicators - # If the missingness indicator has no parent, then it will not be collected in prt_m - for r in m_indx: - prt_r = detect_parent(r, data, alpha, indep_test, stable) - if isempty(prt_r): - pass - else: - prt_m['prt'].append(prt_r) - prt_m['m'].append(r) - return prt_m - - -def isempty(prt_r: ndarray) -> bool: - """Test whether the parent of a missingness indicator is empty""" - return len(prt_r) == 0 - - -def get_mindx(data: ndarray) -> List[int]: - """Detect the parents of missingness indicators - :param data: data set (numpy ndarray) - :return: - m_indx: list, the index of missingness indicators - """ - - m_indx = [] - _, ncol = np.shape(data) - for i in range(ncol): - if np.isnan(data[:, i]).any(): - m_indx.append(i) - return m_indx - - -def detect_parent(r: int, data_: ndarray, alpha: float, indep_test, stable: bool = True) -> ndarray: - """Detect the parents of a missingness indicator - :param r: the missingness indicator - :param data_: data set (numpy ndarray) - :param alpha: desired significance level in (0, 1) (float) - :param indep_test: name of the test-wise deletion independence test being used - - "MV_Fisher_Z": Fisher's Z conditional independence test - - "MV_G_sq": G-squared conditional independence test (TODO: under development) - :param stable: run stabilized skeleton discovery if True (default = True) - : return: - prt: parent of the missingness indicator, r - """ - ## TODO: in the test-wise deletion CI test, if test between a binary and a continuous variable, - # there can be the case where the binary variable only take one value after deletion. - # It is because the assumption is violated. - - ## *********** Adaptation 0 *********** - # For avoid changing the original data - data = data_.copy() - ## *********** End *********** - - assert type(data) == np.ndarray - assert 0 < alpha < 1 - - ## *********** Adaptation 1 *********** - # data - ## Replace the variable r with its missingness indicator - ## If r is not a missingness indicator, return []. - data[:, r] = np.isnan(data[:, r]).astype(float) # True is missing; false is not missing - if sum(data[:, r]) == 0 or sum(data[:, r]) == len(data[:, r]): - return np.empty(0) - ## *********** End *********** - - no_of_var = data.shape[1] - cg = CausalGraph(no_of_var) - cg.data = data - cg.set_ind_test(indep_test) - cg.corr_mat = np.corrcoef(data, rowvar=False) if indep_test == fisherz else [] - - node_ids = range(no_of_var) - pair_of_variables = list(permutations(node_ids, 2)) - - depth = -1 - while cg.max_degree() - 1 > depth: - depth += 1 - edge_removal = [] - for (x, y) in pair_of_variables: - - ## *********** Adaptation 2 *********** - # the skeleton search - ## Only test which variable is the neighbor of r - if x != r: - continue - ## *********** End *********** - - Neigh_x = cg.neighbors(x) - if y not in Neigh_x: - continue - else: - Neigh_x = np.delete(Neigh_x, np.where(Neigh_x == y)) - - if len(Neigh_x) >= depth: - for S in combinations(Neigh_x, depth): - p = cg.ci_test(x, y, S) - if p > alpha: - if not stable: # Unstable: Remove x---y right away - edge1 = cg.G.get_edge(cg.G.nodes[x], cg.G.nodes[y]) - if edge1 is not None: - cg.G.remove_edge(edge1) - edge2 = cg.G.get_edge(cg.G.nodes[y], cg.G.nodes[x]) - if edge2 is not None: - cg.G.remove_edge(edge2) - else: # Stable: x---y will be removed only - edge_removal.append((x, y)) # after all conditioning sets at - edge_removal.append((y, x)) # depth l have been considered - Helper.append_value(cg.sepset, x, y, S) - Helper.append_value(cg.sepset, y, x, S) - break - - for (x, y) in list(set(edge_removal)): - edge1 = cg.G.get_edge(cg.G.nodes[x], cg.G.nodes[y]) - if edge1 is not None: - cg.G.remove_edge(edge1) - - ## *********** Adaptation 3 *********** - ## extract the parent of r from the graph - cg.to_nx_skeleton() - cg_skel_adj: ndarray = nx.to_numpy_array(cg.nx_skel).astype(int) - prt = get_parent(r, cg_skel_adj) - ## *********** End *********** - - return prt - - -def get_parent(r: int, cg_skel_adj: ndarray) -> ndarray: - """Get the neighbors of missingness indicators which are the parents - :param r: the missingness indicator index - :param cg_skel_adj: adjacancy matrix of a causal skeleton - :return: - prt: list, parents of the missingness indicator r - """ - num_var = len(cg_skel_adj[0, :]) - indx = np.array([i for i in range(num_var)]) - prt = indx[cg_skel_adj[r, :] == 1] - return prt - - -## *********** END *********** -####################################################################################################################### - -def skeleton_correction(data: ndarray, alpha: float, test_with_correction_name: str, - init_cg: CausalGraph, prt_m: dict, stable: bool = True) -> CausalGraph: - """Perform skeleton discovery - :param data: data set (numpy ndarray) - :param alpha: desired significance level in (0, 1) (float) - :param test_with_correction_name: name of the independence test being used - - "MV_Crtn_Fisher_Z": Fisher's Z conditional independence test - - "MV_Crtn_G_sq": G-squared conditional independence test - :param stable: run stabilized skeleton discovery if True (default = True) - :return: - cg: a CausalGraph object - """ - - assert type(data) == np.ndarray - assert 0 < alpha < 1 - assert test_with_correction_name in ["MV_Crtn_Fisher_Z", "MV_Crtn_G_sq"] - - ## *********** Adaption 1 *********** - no_of_var = data.shape[1] - - ## Initialize the graph with the result of test-wise deletion skeletion search - cg = init_cg - - cg.data = data - if test_with_correction_name in ["MV_Crtn_Fisher_Z", "MV_Crtn_G_sq"]: - cg.set_ind_test(mc_fisherz, True) - # No need of the correlation matrix if using test-wise deletion test - cg.corr_mat = np.corrcoef(data, rowvar=False) if test_with_correction_name == "MV_Crtn_Fisher_Z" else [] - cg.prt_m = prt_m - ## *********** Adaption 1 *********** - - node_ids = range(no_of_var) - pair_of_variables = list(permutations(node_ids, 2)) - - depth = -1 - while cg.max_degree() - 1 > depth: - depth += 1 - edge_removal = [] - for (x, y) in pair_of_variables: - Neigh_x = cg.neighbors(x) - if y not in Neigh_x: - continue - else: - Neigh_x = np.delete(Neigh_x, np.where(Neigh_x == y)) - - if len(Neigh_x) >= depth: - for S in combinations(Neigh_x, depth): - p = cg.ci_test(x, y, S) - if p > alpha: - if not stable: # Unstable: Remove x---y right away - edge1 = cg.G.get_edge(cg.G.nodes[x], cg.G.nodes[y]) - if edge1 is not None: - cg.G.remove_edge(edge1) - edge2 = cg.G.get_edge(cg.G.nodes[y], cg.G.nodes[x]) - if edge2 is not None: - cg.G.remove_edge(edge2) - else: # Stable: x---y will be removed only - edge_removal.append((x, y)) # after all conditioning sets at - edge_removal.append((y, x)) # depth l have been considered - Helper.append_value(cg.sepset, x, y, S) - Helper.append_value(cg.sepset, y, x, S) - break - - for (x, y) in list(set(edge_removal)): - edge1 = cg.G.get_edge(cg.G.nodes[x], cg.G.nodes[y]) - if edge1 is not None: - cg.G.remove_edge(edge1) - - return cg - - -####################################################################################################################### - -# *********** Evaluation util *********** - -def get_adjacancy_matrix(g: CausalGraph): - return nx.to_numpy_array(g.nx_graph).astype(int) - - -def matrix_diff(cg1: CausalGraph, cg2: CausalGraph): - adj1 = get_adjacancy_matrix(cg1) - adj2 = get_adjacancy_matrix(cg2) - count = 0 - diff_ls = [] - for i in range(len(adj1[:, ])): - for j in range(len(adj2[:, ])): - if adj1[i, j] != adj2[i, j]: - diff_ls.append((i, j)) - count += 1 - return count / 2, diff_ls diff --git a/causallearn/search/ConstraintBased/FCI.py b/causallearn/search/ConstraintBased/FCI.py index 51d2064d..2705a485 100644 --- a/causallearn/search/ConstraintBased/FCI.py +++ b/causallearn/search/ConstraintBased/FCI.py @@ -12,14 +12,13 @@ from causallearn.graph.Node import Node from causallearn.utils.ChoiceGenerator import ChoiceGenerator from causallearn.utils.cit import * -from causallearn.utils.Fas import citest_cache, fas +from causallearn.utils.Fas import fas from causallearn.utils.PCUtils.BackgroundKnowledge import BackgroundKnowledge class SepsetsPossibleDsep: def __init__(self, data: ndarray, graph: Graph, independence_test, alpha: float, - knowledge: BackgroundKnowledge | None, depth: int, maxPathLength: int, verbose: bool, - cache_variables_map=None): + knowledge: BackgroundKnowledge | None, depth: int, maxPathLength: int, verbose: bool): def _unique(column): return np.unique(column, return_inverse=True)[1] @@ -36,18 +35,6 @@ def _unique(column): self.maxPathLength = maxPathLength self.verbose = verbose - if cache_variables_map is None: - if independence_test == chisq or independence_test == gsq: - cardinalities = np.max(data, axis=0) + 1 - else: - cardinalities = None - cache_variables_map = {"data_hash_key": hash(str(data)), - "ci_test_hash_key": hash(independence_test), - "cardinalities": cardinalities} - - self.data_hash_key = cache_variables_map["data_hash_key"] - self.ci_test_hash_key = cache_variables_map["ci_test_hash_key"] - self.cardinalities = cache_variables_map["cardinalities"] def traverseSemiDirected(self, node: Node, edge: Edge) -> Node | None: if node == edge.get_node1(): @@ -214,15 +201,7 @@ def get_cond_set(self, node_1: Node, node_2: Node, max_path_length: int): choice = cg.next() X, Y = self.graph.get_node_map()[node_1], self.graph.get_node_map()[node_2] - X, Y = (X, Y) if (X < Y) else (Y, X) - XYS_key = (X, Y, frozenset(condSet), self.data_hash_key, self.ci_test_hash_key) - if XYS_key in citest_cache: - p_value = citest_cache[XYS_key] - else: - p_value = self.independence_test(self.data, X, Y, tuple(condSet)) if self.cardinalities is None \ - else self.independence_test(self.data, X, Y, tuple(condSet), self.cardinalities) - - citest_cache[XYS_key] = p_value + p_value = self.independence_test(X, Y, tuple(condSet)) independent = p_value > self.alpha if independent and noEdgeRequired: @@ -465,43 +444,22 @@ def getPath(node_c: Node, previous) -> List[Node]: def doDdpOrientation(node_d: Node, node_a: Node, node_b: Node, node_c: Node, previous, graph: Graph, data, independence_test_method, alpha: float, sep_sets: Dict[Tuple[int, int], Set[int]], - change_flag: bool, bk, cache_variables_map, verbose: bool = False) -> (bool, bool): + change_flag: bool, bk, verbose: bool = False) -> (bool, bool): if graph.is_adjacent_to(node_d, node_c): raise Exception("illegal argument!") path = getPath(node_d, previous) X, Y = graph.get_node_map()[node_d], graph.get_node_map()[node_c] - X, Y = (X, Y) if (X < Y) else (Y, X) condSet = tuple([graph.get_node_map()[nn] for nn in path]) - - data_hash_key = cache_variables_map["data_hash_key"] - ci_test_hash_key = cache_variables_map["ci_test_hash_key"] - cardinalities = cache_variables_map["cardinalities"] - - XYS_key = (X, Y, frozenset(condSet), data_hash_key, ci_test_hash_key) - if XYS_key in citest_cache: - p_value = citest_cache[XYS_key] - else: - p_value = independence_test_method(data, X, Y, condSet) if cardinalities is None \ - else independence_test_method(data, X, Y, condSet, cardinalities) - - citest_cache[XYS_key] = p_value + p_value = independence_test_method(X, Y, condSet) ind = p_value > alpha path2 = list(path) path2.remove(node_b) X, Y = graph.get_node_map()[node_d], graph.get_node_map()[node_c] - X, Y = (X, Y) if (X < Y) else (Y, X) condSet = tuple([graph.get_node_map()[nn2] for nn2 in path2]) - XYS_key = (X, Y, frozenset(condSet), data_hash_key, ci_test_hash_key) - if XYS_key in citest_cache: - p_value2 = citest_cache[XYS_key] - else: - p_value2 = independence_test_method(data, X, Y, condSet) if cardinalities is None \ - else independence_test_method(data, X, Y, condSet, cardinalities) - - citest_cache[XYS_key] = p_value2 + p_value2 = independence_test_method(X, Y, condSet) ind2 = p_value2 > alpha if not ind and not ind2: @@ -559,7 +517,7 @@ def doDdpOrientation(node_d: Node, node_a: Node, node_b: Node, node_c: Node, pre def ddpOrient(node_a: Node, node_b: Node, node_c: Node, graph: Graph, maxPathLength: int, data: ndarray, independence_test_method, alpha: float, sep_sets: Dict[Tuple[int, int], Set[int]], change_flag: bool, - bk: BackgroundKnowledge | None, cache_variables_map, verbose: bool = False) -> bool: + bk: BackgroundKnowledge | None, verbose: bool = False) -> bool: Q = Queue() V = set() e = None @@ -599,8 +557,7 @@ def ddpOrient(node_a: Node, node_b: Node, node_c: Node, graph: Graph, maxPathLen if not graph.is_adjacent_to(node_d, node_c) and node_d != node_c: res, change_flag = \ doDdpOrientation(node_d, node_a, node_b, node_c, previous, graph, data, - independence_test_method, alpha, sep_sets, change_flag, bk, - cache_variables_map, verbose) + independence_test_method, alpha, sep_sets, change_flag, bk, verbose) if res: return change_flag @@ -614,7 +571,6 @@ def ddpOrient(node_a: Node, node_b: Node, node_c: Node, graph: Graph, maxPathLen def ruleR4B(graph: Graph, maxPathLength: int, data: ndarray, independence_test_method, alpha: float, sep_sets: Dict[Tuple[int, int], Set[int]], change_flag: bool, bk: BackgroundKnowledge | None, - cache_variables_map, verbose: bool = False) -> bool: nodes = graph.get_nodes() @@ -631,7 +587,7 @@ def ruleR4B(graph: Graph, maxPathLength: int, data: ndarray, independence_test_m continue change_flag = ddpOrient(node_a, node_b, node_c, graph, maxPathLength, data, independence_test_method, - alpha, sep_sets, change_flag, bk, cache_variables_map, verbose) + alpha, sep_sets, change_flag, bk, verbose) return change_flag @@ -772,9 +728,8 @@ def get_color_edges(graph: Graph) -> List[Edge]: return edges -def fci(dataset: ndarray, independence_test_method=fisherz, alpha: float = 0.05, depth: int = -1, - max_path_length: int = -1, verbose: bool = False, background_knowledge: BackgroundKnowledge | None = None, - cache_variables_map=None) -> Tuple[Graph, List[Edge]]: +def fci(dataset: ndarray, independence_test_method: str=fisherz, alpha: float = 0.05, depth: int = -1, + max_path_length: int = -1, verbose: bool = False, background_knowledge: BackgroundKnowledge | None = None) -> Tuple[Graph, List[Edge]]: """ Perform Fast Causal Inference (FCI) algorithm for causal discovery @@ -782,7 +737,7 @@ def fci(dataset: ndarray, independence_test_method=fisherz, alpha: float = 0.05, ---------- dataset: data set (numpy ndarray), shape (n_samples, n_features). The input data, where n_samples is the number of samples and n_features is the number of features. - independence_test_method: the function of the independence test being used + independence_test_method: str, name of the function of the independence test being used [fisherz, chisq, gsq, kci] - fisherz: Fisher's Z conditional independence test - chisq: Chi-squared conditional independence test @@ -793,8 +748,6 @@ def fci(dataset: ndarray, independence_test_method=fisherz, alpha: float = 0.05, max_path_length: the maximum length of any discriminating path, or -1 if unlimited. verbose: True is verbose output should be printed or logged background_knowledge: background knowledge - cache_variables_map: This variable a map which contains the variables relate with cache. If it is not None, - it should contain 'data_hash_key' 、'ci_test_hash_key' and 'cardinalities'. Returns ------- @@ -813,11 +766,7 @@ def fci(dataset: ndarray, independence_test_method=fisherz, alpha: float = 0.05, if dataset.shape[0] < dataset.shape[1]: warnings.warn("The number of features is much larger than the sample size!") - def _unique(column): - return np.unique(column, return_inverse=True)[1] - - if independence_test_method == chisq or independence_test_method == gsq: - dataset = np.apply_along_axis(_unique, 0, dataset).astype(np.int64) + independence_test_method = CIT(dataset, method=independence_test_method) ## ------- check parameters ------------ if (depth is None) or type(depth) != int: @@ -828,14 +777,6 @@ def _unique(column): raise TypeError("'max_path_length' must be 'int' type!") ## ------- end check parameters ------------ - if cache_variables_map is None: - if independence_test_method == chisq or independence_test_method == gsq: - cardinalities = np.max(dataset, axis=0) + 1 - else: - cardinalities = None - cache_variables_map = {"data_hash_key": hash(str(dataset)), - "ci_test_hash_key": hash(independence_test_method), - "cardinalities": cardinalities} nodes = [] for i in range(dataset.shape[1]): @@ -845,8 +786,7 @@ def _unique(column): # FAS (“Fast Adjacency Search”) is the adjacency search of the PC algorithm, used as a first step for the FCI algorithm. graph, sep_sets = fas(dataset, nodes, independence_test_method=independence_test_method, alpha=alpha, - knowledge=background_knowledge, depth=depth, verbose=verbose, - cache_variables_map=cache_variables_map) + knowledge=background_knowledge, depth=depth, verbose=verbose) # reorient all edges with CIRCLE Endpoint ori_edges = graph.get_graph_edges() @@ -857,7 +797,7 @@ def _unique(column): graph.add_edge(ori_edge) sp = SepsetsPossibleDsep(dataset, graph, independence_test_method, alpha, background_knowledge, depth, - max_path_length, verbose, cache_variables_map=cache_variables_map) + max_path_length, verbose) rule0(graph, nodes, sep_sets, background_knowledge, verbose) @@ -901,7 +841,7 @@ def _unique(column): len(background_knowledge.tier_map.keys()) > 0): change_flag = ruleR4B(graph, max_path_length, dataset, independence_test_method, alpha, sep_sets, change_flag, - background_knowledge, cache_variables_map, verbose) + background_knowledge, verbose) first_time = False diff --git a/causallearn/search/ConstraintBased/PC.py b/causallearn/search/ConstraintBased/PC.py index b6080df9..17cdeddf 100644 --- a/causallearn/search/ConstraintBased/PC.py +++ b/causallearn/search/ConstraintBased/PC.py @@ -6,6 +6,7 @@ from typing import Dict, List, Tuple import networkx as nx +import numpy as np from numpy import ndarray from causallearn.graph.GraphClass import CausalGraph @@ -48,11 +49,11 @@ def pc( def pc_alg( data: ndarray, - node_names: List[str] | None, - alpha: float, - indep_test, - stable: bool, - uc_rule: int, + node_names: List[str] | None, + alpha: float, + indep_test: str, + stable: bool, + uc_rule: int, uc_priority: int, background_knowledge: BackgroundKnowledge | None = None, verbose: bool = False, @@ -66,12 +67,12 @@ def pc_alg( data : data set (numpy ndarray), shape (n_samples, n_features). The input data, where n_samples is the number of samples and n_features is the number of features. node_names: Shape [n_features]. The name for each feature (each feature is represented as a Node in the graph, so it's also the node name) alpha : float, desired significance level of independence tests (p_value) in (0, 1) - indep_test : the function of the independence test being used - [fisherz, chisq, gsq, kci] - - fisherz: Fisher's Z conditional independence test - - chisq: Chi-squared conditional independence test - - gsq: G-squared conditional independence test - - kci: Kernel-based conditional independence test + indep_test : str, the name of the independence test being used + ["fisherz", "chisq", "gsq", "kci"] + - "fisherz": Fisher's Z conditional independence test + - "chisq": Chi-squared conditional independence test + - "gsq": G-squared conditional independence test + - "kci": Kernel-based conditional independence test stable : run stabilized skeleton discovery if True (default = True) uc_rule : how unshielded colliders are oriented 0: run uc_sepset @@ -97,6 +98,7 @@ def pc_alg( """ start = time.time() + indep_test = CIT(data, indep_test) cg_1 = SkeletonDiscovery.skeleton_discovery(data, alpha, indep_test, stable, background_knowledge=background_knowledge, verbose=verbose, show_progress=show_progress, node_names=node_names) @@ -135,14 +137,14 @@ def pc_alg( def mvpc_alg( - data: ndarray, - node_names: List[str] | None, - alpha: float, - indep_test, - correction_name: str, - stable: bool, + data: ndarray, + node_names: List[str] | None, + alpha: float, + indep_test: str, + correction_name: str, + stable: bool, uc_rule: int, - uc_priority: int, + uc_priority: int, background_knowledge: BackgroundKnowledge | None = None, verbose: bool = False, show_progress: bool = True, @@ -155,8 +157,8 @@ def mvpc_alg( data : data set (numpy ndarray), shape (n_samples, n_features). The input data, where n_samples is the number of samples and n_features is the number of features. node_names: Shape [n_features]. The name for each feature (each feature is represented as a Node in the graph, so it's also the node name) alpha : float, desired significance level of independence tests (p_value) in (0,1) - indep_test : name of the test-wise deletion independence test being used - [mv_fisherz, mv_g_sq] + indep_test : str, name of the test-wise deletion independence test being used + ["mv_fisherz", "mv_g_sq"] - mv_fisherz: Fisher's Z conditional independence test - mv_g_sq: G-squared conditional independence test (TODO: under development) correction_name : correction_name: name of the missingness correction @@ -190,7 +192,7 @@ def mvpc_alg( """ start = time.time() - + indep_test = CIT(data, indep_test) ## Step 1: detect the direct causes of missingness indicators prt_m = get_parent_missingness_pairs(data, alpha, indep_test, stable) # print('Finish detecting the parents of missingness indicators. ') @@ -329,9 +331,7 @@ def detect_parent(r: int, data_: ndarray, alpha: float, indep_test, stable: bool no_of_var = data.shape[1] cg = CausalGraph(no_of_var) - cg.data = data - cg.set_ind_test(indep_test) - cg.corr_mat = np.corrcoef(data, rowvar=False) if indep_test == fisherz else [] + cg.set_ind_test(CIT(data, indep_test.method)) node_ids = range(no_of_var) pair_of_variables = list(permutations(node_ids, 2)) @@ -427,11 +427,9 @@ def skeleton_correction(data: ndarray, alpha: float, test_with_correction_name: ## Initialize the graph with the result of test-wise deletion skeletion search cg = init_cg - cg.data = data if test_with_correction_name in ["MV_Crtn_Fisher_Z", "MV_Crtn_G_sq"]: - cg.set_ind_test(mc_fisherz, True) + cg.set_ind_test(CIT(data, "mc_fisherz")) # No need of the correlation matrix if using test-wise deletion test - cg.corr_mat = np.corrcoef(data, rowvar=False) if test_with_correction_name == "MV_Crtn_Fisher_Z" else [] cg.prt_m = prt_m ## *********** Adaption 1 *********** diff --git a/causallearn/utils/Fas.py b/causallearn/utils/Fas.py index 6341bd3e..5ac7edfa 100644 --- a/causallearn/utils/Fas.py +++ b/causallearn/utils/Fas.py @@ -13,7 +13,6 @@ from causallearn.utils.cit import * from causallearn.utils.PCUtils.BackgroundKnowledge import BackgroundKnowledge -citest_cache = dict() def possible_parents(node_x: Node, adjx: List[Node], knowledge: BackgroundKnowledge | None = None) -> List[Node]: @@ -52,21 +51,9 @@ def forbiddenEdge(node_x: Node, node_y: Node, knowledge: BackgroundKnowledge | N def searchAtDepth0(data: ndarray, nodes: List[Node], adjacencies: Dict[Node, Set[Node]], sep_sets: Dict[Tuple[int, int], Set[int]], - independence_test_method=fisherz, alpha: float = 0.05, - verbose: bool = False, knowledge: BackgroundKnowledge | None = None, pbar=None, - cache_variables_map=None) -> bool: + independence_test_method: CIT | None=None, alpha: float = 0.05, + verbose: bool = False, knowledge: BackgroundKnowledge | None = None, pbar=None) -> bool: empty = [] - if cache_variables_map is None: - data_hash_key = hash(str(data)) - ci_test_hash_key = hash(independence_test_method) - if independence_test_method == chisq or independence_test_method == gsq: - cardinalities = np.max(data, axis=0) + 1 - else: - cardinalities = None - else: - data_hash_key = cache_variables_map["data_hash_key"] - ci_test_hash_key = cache_variables_map["ci_test_hash_key"] - cardinalities = cache_variables_map["cardinalities"] show_progress = pbar is not None if show_progress: @@ -79,13 +66,7 @@ def searchAtDepth0(data: ndarray, nodes: List[Node], adjacencies: Dict[Node, Set print(nodes[i + 1].get_name()) for j in range(i + 1, len(nodes)): - ijS_key = (i, j, frozenset(), data_hash_key, ci_test_hash_key) - if ijS_key in citest_cache: - p_value = citest_cache[ijS_key] - else: - p_value = independence_test_method(data, i, j, tuple(empty)) if cardinalities is None \ - else independence_test_method(data, i, j, tuple(empty), cardinalities) - citest_cache[ijS_key] = p_value + p_value = independence_test_method(i, j, tuple(empty)) independent = p_value > alpha no_edge_required = True if knowledge is None else \ ((not knowledge.is_required(nodes[i], nodes[j])) or knowledge.is_required(nodes[j], nodes[i])) @@ -104,10 +85,9 @@ def searchAtDepth0(data: ndarray, nodes: List[Node], adjacencies: Dict[Node, Set def searchAtDepth(data: ndarray, depth: int, nodes: List[Node], adjacencies: Dict[Node, Set[Node]], sep_sets: Dict[Tuple[int, int], Set[int]], - independence_test_method=fisherz, + independence_test_method: CIT | None = None, alpha: float = 0.05, - verbose: bool = False, knowledge: BackgroundKnowledge | None = None, pbar=None, - cache_variables_map=None) -> bool: + verbose: bool = False, knowledge: BackgroundKnowledge | None = None, pbar=None) -> bool: def edge(adjx: List[Node], i: int, adjacencies_completed_edge: Dict[Node, Set[Node]]) -> bool: for j in range(len(adjx)): node_y = adjx[j] @@ -124,16 +104,7 @@ def edge(adjx: List[Node], i: int, adjacencies_completed_edge: Dict[Node, Set[No choice = cg.next() Y = nodes.index(adjx[j]) - X, Y = (i, Y) if (i < Y) else (Y, i) - XYS_key = (X, Y, frozenset(cond_set), data_hash_key, ci_test_hash_key) - if XYS_key in citest_cache: - p_value = citest_cache[XYS_key] - else: - p_value = independence_test_method(data, X, Y, tuple(cond_set)) if cardinalities is None \ - else independence_test_method(data, X, Y, tuple(cond_set), cardinalities) - - citest_cache[XYS_key] = p_value - + p_value = independence_test_method(i, Y, tuple(cond_set)) independent = p_value > alpha no_edge_required = True if knowledge is None else ( @@ -168,18 +139,6 @@ def edge(adjx: List[Node], i: int, adjacencies_completed_edge: Dict[Node, Set[No return False return True - if cache_variables_map is None: - data_hash_key = hash(str(data)) - ci_test_hash_key = hash(independence_test_method) - if independence_test_method == chisq or independence_test_method == gsq: - cardinalities = np.max(data, axis=0) + 1 - else: - cardinalities = None - else: - data_hash_key = cache_variables_map["data_hash_key"] - ci_test_hash_key = cache_variables_map["ci_test_hash_key"] - cardinalities = cache_variables_map["cardinalities"] - count = 0 adjacencies_completed = deepcopy(adjacencies) @@ -208,10 +167,9 @@ def edge(adjx: List[Node], i: int, adjacencies_completed_edge: Dict[Node, Set[No def searchAtDepth_not_stable(data: ndarray, depth: int, nodes: List[Node], adjacencies: Dict[Node, Set[Node]], sep_sets: Dict[Tuple[int, int], Set[int]], - independence_test_method=fisherz, alpha: float = 0.05, verbose: bool = False, + independence_test_method: CIT | None=None, alpha: float = 0.05, verbose: bool = False, knowledge: BackgroundKnowledge | None = None, - pbar=None, - cache_variables_map=None) -> bool: + pbar=None) -> bool: def edge(adjx, i, adjacencies_completed_edge): for j in range(len(adjx)): node_y = adjx[j] @@ -228,16 +186,7 @@ def edge(adjx, i, adjacencies_completed_edge): choice = cg.next() Y = nodes.index(adjx[j]) - X, Y = (i, Y) if (i < Y) else (Y, i) - XYS_key = (X, Y, frozenset(cond_set), data_hash_key, ci_test_hash_key) - if XYS_key in citest_cache: - p_value = citest_cache[XYS_key] - else: - p_value = independence_test_method(data, X, Y, tuple(cond_set)) if cardinalities is None \ - else independence_test_method(data, X, Y, tuple(cond_set), cardinalities) - - citest_cache[XYS_key] = p_value - + p_value = independence_test_method(i, Y, tuple(cond_set)) independent = p_value > alpha no_edge_required = True if knowledge is None else \ @@ -269,18 +218,6 @@ def edge(adjx, i, adjacencies_completed_edge): return False return True - if cache_variables_map is None: - data_hash_key = hash(str(data)) - ci_test_hash_key = hash(independence_test_method) - if independence_test_method == chisq or independence_test_method == gsq: - cardinalities = np.max(data, axis=0) + 1 - else: - cardinalities = None - else: - data_hash_key = cache_variables_map["data_hash_key"] - ci_test_hash_key = cache_variables_map["ci_test_hash_key"] - cardinalities = cache_variables_map["cardinalities"] - count = 0 show_progress = pbar is not None @@ -306,9 +243,9 @@ def edge(adjx, i, adjacencies_completed_edge): return freeDegree(nodes, adjacencies) > depth -def fas(data: ndarray, nodes: List[Node], independence_test_method=fisherz, alpha: float = 0.05, +def fas(data: ndarray, nodes: List[Node], independence_test_method: CIT | None=None, alpha: float = 0.05, knowledge: BackgroundKnowledge | None = None, depth: int = -1, - verbose: bool = False, stable: bool = True, show_progress: bool = True, cache_variables_map=None) -> Tuple[ + verbose: bool = False, stable: bool = True, show_progress: bool = True) -> Tuple[ GeneralGraph, Dict[Tuple[int, int], Set[int]]]: """ Implements the "fast adjacency search" used in several causal algorithm in this file. In the fast adjacency @@ -336,9 +273,6 @@ def fas(data: ndarray, nodes: List[Node], independence_test_method=fisherz, alph verbose: True is verbose output should be printed or logged stable: run stabilized skeleton discovery if True (default = True) show_progress: whether to use tqdm to show progress bar - cache_variables_map: This variable a map which contains the variables relate with cache. If it is not None, - it should contain 'data_hash_key' 、'ci_test_hash_key' and 'cardinalities'. - Returns ------- graph: Causal graph skeleton, where graph.graph[i,j] = graph.graph[j,i] = -1 indicates i --- j. @@ -359,20 +293,6 @@ def fas(data: ndarray, nodes: List[Node], independence_test_method=fisherz, alph if depth is None or depth < 0: depth = 1000 - def _unique(column): - return np.unique(column, return_inverse=True)[1] - - if independence_test_method == chisq or independence_test_method == gsq: - data = np.apply_along_axis(_unique, 0, data).astype(np.int64) - - if cache_variables_map is None: - if independence_test_method == chisq or independence_test_method == gsq: - cardinalities = np.max(data, axis=0) + 1 - else: - cardinalities = None - cache_variables_map = {"data_hash_key": hash(str(data)), - "ci_test_hash_key": hash(independence_test_method), - "cardinalities": cardinalities} # ------- end initial variable --------- print('Starting Fast Adjacency Search.') @@ -381,14 +301,14 @@ def _unique(column): for d in range(depth): if d == 0: more = searchAtDepth0(data, nodes, adjacencies, sep_sets, independence_test_method, alpha, verbose, - knowledge, pbar=pbar, cache_variables_map=cache_variables_map) + knowledge, pbar=pbar) else: if stable: more = searchAtDepth(data, d, nodes, adjacencies, sep_sets, independence_test_method, alpha, verbose, - knowledge, pbar=pbar, cache_variables_map=cache_variables_map) + knowledge, pbar=pbar) else: more = searchAtDepth_not_stable(data, d, nodes, adjacencies, sep_sets, independence_test_method, alpha, - verbose, knowledge, pbar=pbar, cache_variables_map=cache_variables_map) + verbose, knowledge, pbar=pbar) if not more: break if show_progress: diff --git a/causallearn/utils/PCUtils/SkeletonDiscovery.py b/causallearn/utils/PCUtils/SkeletonDiscovery.py index 043e3a8c..6cfaa492 100644 --- a/causallearn/utils/PCUtils/SkeletonDiscovery.py +++ b/causallearn/utils/PCUtils/SkeletonDiscovery.py @@ -9,14 +9,14 @@ from causallearn.graph.GraphClass import CausalGraph from causallearn.utils.PCUtils.BackgroundKnowledge import BackgroundKnowledge -from causallearn.utils.cit import chisq, gsq from causallearn.utils.PCUtils.Helper import append_value +from causallearn.utils.cit import CIT def skeleton_discovery( data: ndarray, alpha: float, - indep_test, + indep_test: CIT, stable: bool = True, background_knowledge: BackgroundKnowledge | None = None, verbose: bool = False, @@ -31,7 +31,7 @@ def skeleton_discovery( data : data set (numpy ndarray), shape (n_samples, n_features). The input data, where n_samples is the number of samples and n_features is the number of features. alpha: float, desired significance level of independence tests (p_value) in (0,1) - indep_test : the function of the independence test being used + indep_test : class CIT, the independence test being used [fisherz, chisq, gsq, mv_fisherz, kci] - fisherz: Fisher's Z conditional independence test - chisq: Chi-squared conditional independence test @@ -58,22 +58,6 @@ def skeleton_discovery( no_of_var = data.shape[1] cg = CausalGraph(no_of_var, node_names) cg.set_ind_test(indep_test) - cg.data_hash_key = hash(str(data)) - if indep_test == chisq or indep_test == gsq: - # if dealing with discrete data, data is numpy.ndarray with n rows m columns, - # for each column, translate the discrete values to int indexs starting from 0, - # e.g. [45, 45, 6, 7, 6, 7] -> [2, 2, 0, 1, 0, 1] - # ['apple', 'apple', 'pear', 'peach', 'pear'] -> [0, 0, 2, 1, 2] - # in old code, its presumed that discrete `data` is already indexed, - # but here we make sure it's in indexed form, so allow more user input e.g. 'apple' .. - def _unique(column): - return np.unique(column, return_inverse=True)[1] - - cg.is_discrete = True - cg.data = np.apply_along_axis(_unique, 0, data).astype(np.int64) - cg.cardinalities = np.max(cg.data, axis=0) + 1 - else: - cg.data = data depth = -1 pbar = tqdm(total=no_of_var) if show_progress else None diff --git a/causallearn/utils/cit.py b/causallearn/utils/cit.py index e1671f40..dbd3acbd 100644 --- a/causallearn/utils/cit.py +++ b/causallearn/utils/cit.py @@ -7,436 +7,451 @@ from causallearn.utils.PCUtils import Helper CONST_BINCOUNT_UNIQUE_THRESHOLD = 1e5 +fisherz = "fisherz" +mv_fisherz = "mv_fisherz" +mc_fisherz = "mc_fisherz" +kci = "kci" +chisq = "chisq" +gsq = "gsq" + +class CIT(object): + def __init__(self, data, method='fisherz', **kwargs): + ''' + Parameters + ---------- + data: numpy.ndarray of shape (n_samples, n_features) + method: str, in ["fisherz", "mv_fisherz", "mc_fisherz", "kci", "chisq", "gsq"] + kwargs: placeholder for future arguments, or for KCI specific arguments now + ''' + self.data = data + self.data_hash = hash(str(data)) + self.sample_size, self.num_features = data.shape + self.method = method + self.pvalue_cache = {} + + if method == 'kci': + # parse kwargs contained in the KCI method + kci_kwargs = {k: v for k, v in kwargs.items() if k in + ['kernelX', 'kernelY', 'null_ss', 'approx', 'est_width', 'polyd', 'kwidthx', 'kwidthy']} + self.kci_ui = KCI_UInd(**kci_kwargs) + self.kci_ci = KCI_CInd(**kci_kwargs) + elif method in ['fisherz', 'mv_fisherz', 'mc_fisherz']: + self.correlation_matrix = np.corrcoef(data.T) + elif method in ['chisq', 'gsq']: + def _unique(column): + return np.unique(column, return_inverse=True)[1] + self.data = np.apply_along_axis(_unique, 0, self.data).astype(np.int64) + self.data_hash = hash(str(self.data)) + self.cardinalities = np.max(self.data, axis=0) + 1 + else: + raise NotImplementedError(f"CITest method {method} is not implemented.") + + self.named_caller = { + 'fisherz': self.fisherz, + 'mv_fisherz': self.mv_fisherz, + 'mc_fisherz': self.mc_fisherz, + 'kci': self.kci, + 'chisq': self.chisq, + 'gsq': self.gsq + } + + def kci(self, X, Y, condition_set): + if len(condition_set) == 0: + return self.kci_ui.compute_pvalue(self.data[:, [X]], self.data[:, [Y]])[0] + return self.kci_ci.compute_pvalue(self.data[:, [X]], self.data[:, [Y]], self.data[:, list(condition_set)])[0] + + def fisherz(self, X, Y, condition_set): + """ + Perform an independence test using Fisher-Z's test + Parameters + ---------- + data : data matrices + X, Y and condition_set : column indices of data -def kci(data, X, Y, condition_set=None, kernelX='Gaussian', kernelY='Gaussian', kernelZ='Gaussian', - est_width='empirical', polyd=2, kwidthx=None, kwidthy=None, kwidthz=None): - if condition_set is None or len(condition_set) < 1: - return kci_ui(data[np.ix_(range(data.shape[0]), [X])], data[np.ix_(range(data.shape[0]), [Y])], - kernelX, kernelY, est_width, polyd, kwidthx, kwidthy) - else: - return kci_ci(data[np.ix_(range(data.shape[0]), [X])], data[np.ix_(range(data.shape[0]), [Y])], - data[np.ix_(range(data.shape[0]), list(condition_set))], - kernelX, kernelY, kernelZ, est_width, polyd, kwidthx, kwidthy, kwidthz) - - -def kci_ui(X, Y, kernelX='Gaussian', kernelY='Gaussian', est_width='empirical', polyd=2, kwidthx=None, kwidthy=None): - """ - To test if x and y are unconditionally independent - Parameters - ---------- - kernelX: kernel function for input data x - 'Gaussian': Gaussian kernel - 'Polynomial': Polynomial kernel - 'Linear': Linear kernel - kernelY: kernel function for input data y - est_width: set kernel width for Gaussian kernels - 'empirical': set kernel width using empirical rules (default) - 'median': set kernel width using the median trick - polyd: polynomial kernel degrees (default=2) - kwidthx: kernel width for data x (standard deviation sigma) - kwidthy: kernel width for data y (standard deviation sigma) - """ - - kci_uind = KCI_UInd(kernelX, kernelY, est_width=est_width, polyd=polyd, kwidthx=kwidthx, kwidthy=kwidthy) - pvalue, _ = kci_uind.compute_pvalue(X, Y) - return pvalue - - -def kci_ci(X, Y, Z, kernelX='Gaussian', kernelY='Gaussian', kernelZ='Gaussian', est_width='empirical', polyd=2, - kwidthx=None, kwidthy=None, kwidthz=None): - """ - To test if x and y are conditionally independent given z - Parameters - ---------- - kernelX: kernel function for input data x - 'Gaussian': Gaussian kernel - 'Polynomial': Polynomial kernel - 'Linear': Linear kernel - kernelY: kernel function for input data y - kernelZ: kernel function for input data z - est_width: set kernel width for Gaussian kernels - 'empirical': set kernel width using empirical rules (default) - 'median': set kernel width using the median trick - polyd: polynomial kernel degrees (default=2) - kwidthx: kernel width for data x (standard deviation sigma, default None) - kwidthy: kernel width for data y (standard deviation sigma) - kwidthz: kernel width for data y (standard deviation sigma) - """ - - kci_cind = KCI_CInd(kernelX, kernelY, kernelZ, est_width=est_width, polyd=polyd, kwidthx=kwidthx, kwidthy=kwidthy, - kwidthz=kwidthz) - pvalue, _ = kci_cind.compute_pvalue(X, Y, Z) - return pvalue - - -def mv_fisherz(mvdata, X, Y, condition_set): - """ - Perform an independence test using Fisher-Z's test for data with missing values - - Parameters - ---------- - mvdata : data with missing values - X, Y and condition_set : column indices of data - - Returns - ------- - p : the p-value of the test - """ - var = list((X, Y) + condition_set) - test_wise_deletion_XYcond_rows_index = get_index_no_mv_rows(mvdata[:, var]) - assert len(test_wise_deletion_XYcond_rows_index) != 0, "A test-wise deletion fisher-z test appears no overlapping data of involved variables. Please check the input data." - test_wise_deleted_data = mvdata[test_wise_deletion_XYcond_rows_index,:] - return fisherz(test_wise_deleted_data, X, Y, condition_set) - - -def mc_fisherz(mdata, skel, prt_m, X, Y, condition_set): - """Perform an independent test using Fisher-Z's test with test-wise deletion and missingness correction - If it is not the case which requires a correction, then call function mvfisherZ(...) - :param prt_m: dictionary, with elements: - - m: missingness indicators which are not MCAR - - prt: parents of the missingness indicators - """ - - ## Check whether whether there is at least one common child of X and Y - if not Helper.cond_perm_c(X, Y, condition_set, prt_m, skel): - return mv_fisherz(mdata, X, Y, condition_set) - - ## *********** Step 1 *********** - # Learning generaive model for {X, Y, S} to impute X, Y, and S - - ## Get parents the {xyS} missingness indicators with parents: prt_m - # W is the variable which can be used for missingness correction - W_indx_ = Helper.get_prt_mvars(var=list((X, Y) + condition_set), prt_m=prt_m) - - if len(W_indx_) == 0: # When there is no variable can be used for correction - return mv_fisherz(mdata, X, Y, condition_set) - - ## Get the parents of W missingness indicators - W_indx = Helper.get_prt_mw(W_indx_, prt_m) - - ## Prepare the W for regression - # Since the XYS will be regressed on W, - # W will not contain any of XYS - var = list((X, Y) + condition_set) - W_indx = list(set(W_indx) - set(var)) - - if len(W_indx) == 0: # When there is no variable can be used for correction - return mv_fisherz(mdata, X, Y, condition_set) - - ## Learn regression models with test-wise deleted data - involve_vars = var + W_indx - tdel_data = Helper.test_wise_deletion(mdata[:, involve_vars]) - effective_sz = len(tdel_data[:, 0]) - regMs, rss = Helper.learn_regression_model(tdel_data, num_model=len(var)) - - ## *********** Step 2 *********** - # Get the data of the predictors, Ws - # The sample size of Ws is the same as the effective sample size - Ws = Helper.get_predictor_ws(mdata[:, involve_vars], num_test_var=len(var), effective_sz=effective_sz) - - ## *********** Step 3 *********** - # Generate the virtual data follows the full data distribution P(X, Y, S) - # The sample size of data_vir is the same as the effective sample size - data_vir = Helper.gen_vir_data(regMs, rss, Ws, len(var), effective_sz) - - if len(var) > 2: - cond_set_bgn_0 = np.arange(2, len(var)) - else: - cond_set_bgn_0 = [] - - return mv_fisherz(data_vir, 0, 1, tuple(cond_set_bgn_0)) - - -def fisherz(data, X, Y, condition_set, correlation_matrix=None): - """ - Perform an independence test using Fisher-Z's test - - Parameters - ---------- - data : data matrices - X, Y and condition_set : column indices of data - correlation_matrix : correlation matrix; - None means without the parameter of correlation matrix - - Returns - ------- - p : the p-value of the test - """ - if correlation_matrix is None: - correlation_matrix = np.corrcoef(data.T) - sample_size = data.shape[0] - var = list((X, Y) + condition_set) - sub_corr_matrix = correlation_matrix[np.ix_(var, var)] - inv = np.linalg.inv(sub_corr_matrix) - r = -inv[0, 1] / sqrt(inv[0, 0] * inv[1, 1]) - Z = 0.5 * log((1 + r) / (1 - r)) - X = sqrt(sample_size - len(condition_set) - 3) * abs(Z) - p = 2 * (1 - norm.cdf(abs(X))) - return p - - -def chisq(data, X, Y, conditioning_set, cardinalities=None): - # though cardinalities can be computed from data, here we pass it as argument, - # to prevent from repeated computation on each variable's vardinality - if cardinalities is None: cardinalities = np.max(data, axis=0) + 1 - indexs = list(conditioning_set) + [X, Y] - return chisq_or_gsq_test(data[:, indexs].T, cardinalities[indexs]) - - -def gsq(data, X, Y, conditioning_set, cardinalities=None): - if cardinalities is None: cardinalities = np.max(data, axis=0) + 1 - indexs = list(conditioning_set) + [X, Y] - return chisq_or_gsq_test(data[:, indexs].T, cardinalities[indexs], G_sq=True) - - -def chisq_or_gsq_test(dataSXY, cardSXY, G_sq=False): - """by Haoyue@12/18/2021 - Parameters - ---------- - dataSXY: numpy.ndarray, in shape (|S|+2, n), where |S| is size of conditioning set (can be 0), n is sample size - dataSXY.dtype = np.int64, and each row has values [0, 1, 2, ..., card_of_this_row-1] - cardSXY: cardinalities of each row (each variable) - G_sq: True if use G-sq, otherwise (False by default), use Chi_sq - """ - def _Fill2DCountTable(dataXY, cardXY): - """ - e.g. dataXY: the observed dataset contains 5 samples, on variable x and y they're - x: 0 1 2 3 0 - y: 1 0 1 2 1 - cardXY: [4, 3] - fill in the counts by index, we have the joint count table in 4 * 3: - xy| 0 1 2 - --|------- - 0 | 0 2 0 - 1 | 1 0 0 - 2 | 0 1 0 - 3 | 0 0 1 - note: if sample size is large enough, in theory: - min(dataXY[i]) == 0 && max(dataXY[i]) == cardXY[i] - 1 - however some values may be missed. - also in joint count, not every value in [0, cardX * cardY - 1] occurs. - that's why we pass cardinalities in, and use `minlength=...` in bincount + Returns + ------- + p : the p-value of the test """ - cardX, cardY = cardXY - xyIndexed = dataXY[0] * cardY + dataXY[1] - xyJointCounts = np.bincount(xyIndexed, minlength=cardX * cardY).reshape(cardXY) - xMarginalCounts = np.sum(xyJointCounts, axis=1) - yMarginalCounts = np.sum(xyJointCounts, axis=0) - return xyJointCounts, xMarginalCounts, yMarginalCounts - - def _Fill3DCountTableByBincount(dataSXY, cardSXY): - cardX, cardY = cardSXY[-2:] - cardS = np.prod(cardSXY[:-2]) - cardCumProd = np.ones_like(cardSXY) - cardCumProd[:-1] = np.cumprod(cardSXY[1:][::-1])[::-1] - SxyIndexed = np.dot(cardCumProd[None], dataSXY)[0] - - SxyJointCounts = np.bincount(SxyIndexed, minlength=cardS * cardX * cardY).reshape((cardS, cardX, cardY)) - SMarginalCounts = np.sum(SxyJointCounts, axis=(1, 2)) - SMarginalCountsNonZero = SMarginalCounts != 0 - SMarginalCounts = SMarginalCounts[SMarginalCountsNonZero] - SxyJointCounts = SxyJointCounts[SMarginalCountsNonZero] - - SxJointCounts = np.sum(SxyJointCounts, axis=2) - SyJointCounts = np.sum(SxyJointCounts, axis=1) - return SxyJointCounts, SMarginalCounts, SxJointCounts, SyJointCounts - - def _Fill3DCountTableByUnique(dataSXY, cardSXY): - # Sometimes when the conditioning set contains many variables and each variable's cardinality is large - # e.g. consider an extreme case where - # S contains 7 variables and each's cardinality=20, then cardS = np.prod(cardSXY[:-2]) would be 1280000000 - # i.e., there are 1280000000 different possible combinations of S, - # so the SxyJointCounts array would be of size 1280000000 * cardX * cardY * np.int64, - # i.e., ~3.73TB memory! (suppose cardX, cardX are also 20) - # However, samplesize is usually in 1k-100k scale, far less than cardS, - # i.e., not all (and actually only a very small portion of combinations of S appeared in data) - # i.e., SMarginalCountsNonZero in _Fill3DCountTable_by_bincount is a very sparse array - # So when cardSXY is large, we first re-index S (skip the absent combinations) and then count XY table for each. - # See https://github.com/cmu-phil/causal-learn/pull/37. - cardX, cardY = cardSXY[-2:] - cardSs = cardSXY[:-2] - - cardSsCumProd = np.ones_like(cardSs) - cardSsCumProd[:-1] = np.cumprod(cardSs[1:][::-1])[::-1] - SIndexed = np.dot(cardSsCumProd[None], dataSXY[:-2])[0] - - uniqSIndices, inverseSIndices, SMarginalCounts = np.unique(SIndexed, return_counts=True, return_inverse=True) - cardS_reduced = len(uniqSIndices) - SxyIndexed = inverseSIndices * cardX * cardY + dataSXY[-2] * cardY + dataSXY[-1] - SxyJointCounts = np.bincount(SxyIndexed, minlength=cardS_reduced * cardX * cardY).reshape((cardS_reduced, cardX, cardY)) - - SxJointCounts = np.sum(SxyJointCounts, axis=2) - SyJointCounts = np.sum(SxyJointCounts, axis=1) - return SxyJointCounts, SMarginalCounts, SxJointCounts, SyJointCounts - - def _Fill3DCountTable(dataSXY, cardSXY): - # about the threshold 1e5, see a rough performance example at: - # https://gist.github.com/MarkDana/e7d9663a26091585eb6882170108485e#file-count-unique-in-array-performance-md - if np.prod(cardSXY) < CONST_BINCOUNT_UNIQUE_THRESHOLD: return _Fill3DCountTableByBincount(dataSXY, cardSXY) - return _Fill3DCountTableByUnique(dataSXY, cardSXY) - - def _CalculatePValue(cTables, eTables): + var = list((X, Y) + condition_set) + sub_corr_matrix = self.correlation_matrix[np.ix_(var, var)] + inv = np.linalg.inv(sub_corr_matrix) + r = -inv[0, 1] / sqrt(inv[0, 0] * inv[1, 1]) + Z = 0.5 * log((1 + r) / (1 - r)) + X = sqrt(self.sample_size - len(condition_set) - 3) * abs(Z) + p = 2 * (1 - norm.cdf(abs(X))) + return p + + def mv_fisherz(self, X, Y, condition_set): """ - calculate the rareness (pValue) of an observation from a given distribution with certain sample size. + Perform an independence test using Fisher-Z's test for data with missing values - Let k, m, n be respectively the cardinality of S, x, y. if S=empty, k==1. Parameters ---------- - cTables: tensor, (k, m, n) the [c]ounted tables (reflect joint P_XY) - eTables: tensor, (k, m, n) the [e]xpected tables (reflect product of marginal P_X*P_Y) - if there are zero entires in eTables, zero must occur in whole rows or columns. - e.g. w.l.o.g., row eTables[w, i, :] == 0, iff np.sum(cTables[w], axis=1)[i] == 0, i.e. cTables[w, i, :] == 0, - i.e. in configuration of conditioning set == w, no X can be in value i. + mvdata : data with missing values + X, Y and condition_set : column indices of data - Returns: pValue (float in range (0, 1)), the larger pValue is (>alpha), the more independent. + Returns ------- + p : the p-value of the test """ - eTables_zero_inds = eTables == 0 - eTables_zero_to_one = np.copy(eTables) - eTables_zero_to_one[eTables_zero_inds] = 1 # for legal division - - if G_sq == False: - sum_of_chi_square = np.sum(((cTables - eTables) ** 2) / eTables_zero_to_one) + def _get_index_no_mv_rows(mvdata): + nrow, ncol = np.shape(mvdata) + bindxRows = np.ones((nrow,), dtype=bool) + indxRows = np.array(list(range(nrow))) + for i in range(ncol): + bindxRows = np.logical_and(bindxRows, ~np.isnan(mvdata[:, i])) + indxRows = indxRows[bindxRows] + return indxRows + var = list((X, Y) + condition_set) + test_wise_deletion_XYcond_rows_index = _get_index_no_mv_rows(self.data[:, var]) + assert len(test_wise_deletion_XYcond_rows_index) != 0, \ + "A test-wise deletion fisher-z test appears no overlapping data of involved variables. Please check the input data." + test_wise_deleted_cit = CIT(self.data[test_wise_deletion_XYcond_rows_index], "fisherz") + assert not np.isnan(self.data[test_wise_deletion_XYcond_rows_index][:, var]).any() + return test_wise_deleted_cit(X, Y, condition_set) + # TODO: above is to be consistent with the original code; though below is more accurate (np.corrcoef issues) + # test_wise_deleted_data_var = self.data[test_wise_deletion_XYcond_rows_index][:, var] + # sub_corr_matrix = np.corrcoef(test_wise_deleted_data_var.T) + # inv = np.linalg.inv(sub_corr_matrix) + # r = -inv[0, 1] / sqrt(inv[0, 0] * inv[1, 1]) + # Z = 0.5 * log((1 + r) / (1 - r)) + # X = sqrt(self.sample_size - len(condition_set) - 3) * abs(Z) + # p = 2 * (1 - norm.cdf(abs(X))) + # return p + + def mc_fisherz(self, X, Y, condition_set, skel, prt_m): + """Perform an independent test using Fisher-Z's test with test-wise deletion and missingness correction + If it is not the case which requires a correction, then call function mvfisherZ(...) + :param prt_m: dictionary, with elements: + - m: missingness indicators which are not MCAR + - prt: parents of the missingness indicators + """ + ## Check whether whether there is at least one common child of X and Y + if not Helper.cond_perm_c(X, Y, condition_set, prt_m, skel): + return self.mv_fisherz(X, Y, condition_set) + + ## *********** Step 1 *********** + # Learning generaive model for {X, Y, S} to impute X, Y, and S + + ## Get parents the {xyS} missingness indicators with parents: prt_m + # W is the variable which can be used for missingness correction + W_indx_ = Helper.get_prt_mvars(var=list((X, Y) + condition_set), prt_m=prt_m) + + if len(W_indx_) == 0: # When there is no variable can be used for correction + return self.mv_fisherz(X, Y, condition_set) + + ## Get the parents of W missingness indicators + W_indx = Helper.get_prt_mw(W_indx_, prt_m) + + ## Prepare the W for regression + # Since the XYS will be regressed on W, + # W will not contain any of XYS + var = list((X, Y) + condition_set) + W_indx = list(set(W_indx) - set(var)) + + if len(W_indx) == 0: # When there is no variable can be used for correction + return self.mv_fisherz(X, Y, condition_set) + + ## Learn regression models with test-wise deleted data + involve_vars = var + W_indx + tdel_data = Helper.test_wise_deletion(self.data[:, involve_vars]) + effective_sz = len(tdel_data[:, 0]) + regMs, rss = Helper.learn_regression_model(tdel_data, num_model=len(var)) + + ## *********** Step 2 *********** + # Get the data of the predictors, Ws + # The sample size of Ws is the same as the effective sample size + Ws = Helper.get_predictor_ws(self.data[:, involve_vars], num_test_var=len(var), effective_sz=effective_sz) + + ## *********** Step 3 *********** + # Generate the virtual data follows the full data distribution P(X, Y, S) + # The sample size of data_vir is the same as the effective sample size + data_vir = Helper.gen_vir_data(regMs, rss, Ws, len(var), effective_sz) + + if len(var) > 2: + cond_set_bgn_0 = np.arange(2, len(var)) else: - div = np.divide(cTables, eTables_zero_to_one) - div[div == 0] = 1 # It guarantees that taking natural log in the next step won't cause any error - sum_of_chi_square = 2 * np.sum(cTables * np.log(div)) - - # array in shape (k,), zero_counts_rows[w]=c (0<=calpha), the more independent. + ------- + """ + eTables_zero_inds = eTables == 0 + eTables_zero_to_one = np.copy(eTables) + eTables_zero_to_one[eTables_zero_inds] = 1 # for legal division + + if G_sq == False: + sum_of_chi_square = np.sum(((cTables - eTables) ** 2) / eTables_zero_to_one) + else: + div = np.divide(cTables, eTables_zero_to_one) + div[div == 0] = 1 # It guarantees that taking natural log in the next step won't cause any error + sum_of_chi_square = 2 * np.sum(cTables * np.log(div)) + + # array in shape (k,), zero_counts_rows[w]=c (0<=c no (1,3). ###### +# ########### otherwise #degree_of_freedom will add a spurious 1: (0-1)*(0-1) ####### +# if len(sub_data) == 0: continue ################################################# +# +# ################################################################################### +# +# # Step 2: Generate contingency table (applying Fienberg's method) +# def make_ctable(D, cat_size): +# x = np.array(D[:, 0], dtype=np.dtype(int)) +# y = np.array(D[:, 1], dtype=np.dtype(int)) +# bin_count = np.bincount(cat_size * x + y) # Perform linear transformation to obtain frequencies +# diff = (cat_size ** 2) - len(bin_count) +# if diff > 0: # The number of cells generated by bin_count can possibly be less than cat_size**2 +# bin_count = np.concatenate( +# (bin_count, np.zeros(diff))) # In that case, we concatenate some zeros to fit cat_size**2 +# ctable = bin_count.reshape(cat_size, cat_size) +# ctable = ctable[~np.all(ctable == 0, axis=1)] # Remove rows consisted entirely of zeros +# ctable = ctable[:, ~np.all(ctable == 0, axis=0)] # Remove columns consisted entirely of zeros +# +# return ctable +# +# ctable = make_ctable(sub_data, max_categories) +# +# # Step 3: Calculate chi-square statistic and degree of freedom from the contingency table +# row_sum = np.sum(ctable, axis=1) +# col_sum = np.sum(ctable, axis=0) +# expected = np.outer(row_sum, col_sum) / sub_data.shape[0] +# if G_sq == False: +# chi_sq_stat = np.sum(((ctable - expected) ** 2) / expected) +# else: +# div = np.divide(ctable, expected) +# div[div == 0] = 1 # It guarantees that taking natural log in the next step won't cause any error +# chi_sq_stat = 2 * np.sum(ctable * np.log(div)) +# df = (ctable.shape[0] - 1) * (ctable.shape[1] - 1) +# +# sum_of_chi_square += chi_sq_stat +# sum_of_df += df +# +# # Step 4: Compute p-value from chi-square CDF +# if sum_of_df == 0: +# return 1 +# else: +# return chi2.sf(sum_of_chi_square, sum_of_df) +# +# +# def cartesian_product(lists): +# "Return the Cartesian product of lists (List of lists)" +# result = [[]] +# for pool in lists: +# result = [x + [y] for x in result for y in pool] +# return result - Returns - ------- - p : the p-value of the test - """ - # Step 1: Subset the data - categories_list = [np.unique(data[:, i]) for i in - list(conditioning_set)] # Obtain the categories of each variable in conditioning_set - value_config_list = cartesian_product( - categories_list) # Obtain all the possible value configurations of the conditioning_set (e.g., [[]] if categories_list == []) - max_categories = int( - np.max(data)) + 1 # Used to fix the size of the contingency table (before applying Fienberg's method) - sum_of_chi_square = 0 # initialize a zero chi_square statistic - sum_of_df = 0 # initialize a zero degree of freedom - def recursive_and(L): - "A helper function for subsetting the data using the conditions in L of the form [(variable, value),...]" - if len(L) == 0: - return data - else: - condition = data[:, L[0][0]] == L[0][1] - i = 1 - while i < len(L): - new_conjunct = data[:, L[i][0]] == L[i][1] - condition = new_conjunct & condition - i += 1 - return data[condition] - - for value_config in range(len(value_config_list)): - L = list(zip(conditioning_set, value_config_list[value_config])) - sub_data = recursive_and(L)[:, [X, - Y]] # obtain the subset dataset (containing only the X, Y columns) with only rows specifed in value_config - - ############# Haoyue@12/18/2021 DEBUG: this line is a must: ##################### - ########### not all value_config in cartesian product occurs in data ############## - # e.g. S=(S0,S1), where S0 has categories {0,1}, S1 has {2,3}. But in combination,# - ##### (S0,S1) only shows up with value pair (0,2), (0,3), (1,2) -> no (1,3). ###### - ########### otherwise #degree_of_freedom will add a spurious 1: (0-1)*(0-1) ####### - if len(sub_data) == 0: continue ################################################# - - ################################################################################### - - # Step 2: Generate contingency table (applying Fienberg's method) - def make_ctable(D, cat_size): - x = np.array(D[:, 0], dtype=np.dtype(int)) - y = np.array(D[:, 1], dtype=np.dtype(int)) - bin_count = np.bincount(cat_size * x + y) # Perform linear transformation to obtain frequencies - diff = (cat_size ** 2) - len(bin_count) - if diff > 0: # The number of cells generated by bin_count can possibly be less than cat_size**2 - bin_count = np.concatenate( - (bin_count, np.zeros(diff))) # In that case, we concatenate some zeros to fit cat_size**2 - ctable = bin_count.reshape(cat_size, cat_size) - ctable = ctable[~np.all(ctable == 0, axis=1)] # Remove rows consisted entirely of zeros - ctable = ctable[:, ~np.all(ctable == 0, axis=0)] # Remove columns consisted entirely of zeros - - return ctable - - ctable = make_ctable(sub_data, max_categories) - - # Step 3: Calculate chi-square statistic and degree of freedom from the contingency table - row_sum = np.sum(ctable, axis=1) - col_sum = np.sum(ctable, axis=0) - expected = np.outer(row_sum, col_sum) / sub_data.shape[0] - if G_sq == False: - chi_sq_stat = np.sum(((ctable - expected) ** 2) / expected) - else: - div = np.divide(ctable, expected) - div[div == 0] = 1 # It guarantees that taking natural log in the next step won't cause any error - chi_sq_stat = 2 * np.sum(ctable * np.log(div)) - df = (ctable.shape[0] - 1) * (ctable.shape[1] - 1) - - sum_of_chi_square += chi_sq_stat - sum_of_df += df - - # Step 4: Compute p-value from chi-square CDF - if sum_of_df == 0: - return 1 - else: - return chi2.sf(sum_of_chi_square, sum_of_df) - - -def cartesian_product(lists): - "Return the Cartesian product of lists (List of lists)" - result = [[]] - for pool in lists: - result = [x + [y] for x in result for y in pool] - return result - - -def get_index_no_mv_rows(mvdata): - nrow, ncol = np.shape(mvdata) - bindxRows = np.ones((nrow,), dtype=bool) - indxRows = np.array(list(range(nrow))) - for i in range(ncol): - bindxRows = np.logical_and(bindxRows, ~np.isnan(mvdata[:, i])) - indxRows = indxRows[bindxRows] - return indxRows diff --git a/tests/TestFCI.py b/tests/TestFCI.py index 11cc6b34..522381aa 100644 --- a/tests/TestFCI.py +++ b/tests/TestFCI.py @@ -99,7 +99,7 @@ def test_bnlearn_discrete_datasets(self): "andes" ] - bnlearn_path = './TestData/bnlearn_discrete_10000' + bnlearn_path = './TestData/bnlearn_discrete_10000/data' for bname in benchmark_names: data = np.loadtxt(os.path.join(bnlearn_path, f'{bname}.txt'), skiprows=1) G, edges = fci(data, chisq, 0.05, verbose=False) diff --git a/tests/TestMVPC_mv_fisherz_test.py b/tests/TestMVPC_mv_fisherz_test.py index 88e857ed..5a934516 100644 --- a/tests/TestMVPC_mv_fisherz_test.py +++ b/tests/TestMVPC_mv_fisherz_test.py @@ -10,7 +10,7 @@ import unittest import numpy as np -from causallearn.utils.cit import fisherz, mv_fisherz +from causallearn.utils.cit import CIT class TestCIT_mv_fisherz(unittest.TestCase): @@ -34,11 +34,12 @@ def test_chain(self): # Z -->R_Y mdata[Z > 0, 1] = np.nan - - mv_pvalues_t1.append(mv_fisherz(mdata, 0, 1, ())) - pvalues_t1.append(fisherz(data, 0, 1, ())) - mv_pvalues_t2.append(mv_fisherz(mdata, 0, 1, (2,))) - pvalues_t2.append(fisherz(data, 0, 1, (2,))) + indep_test = CIT(data, method='fisherz') + mv_indep_test = CIT(mdata, method='mv_fisherz') + mv_pvalues_t1.append(mv_indep_test(0, 1, ())) + pvalues_t1.append(indep_test(0, 1, ())) + mv_pvalues_t2.append(mv_indep_test(0, 1, (2,))) + pvalues_t2.append(indep_test(0, 1, (2,))) print('mv_fisherz: X and Y are not independent, pvalue is mean {:.3f}'.format(np.mean(mv_pvalues_t1)) + ' std: {:.3f}'.format(np.std(mv_pvalues_t1))) print('fisherz: X and Y are not independent, pvalue is mean {:.3f}'.format(np.mean(pvalues_t1)) + ' std: {:.3f}'.format(np.std(pvalues_t1))) @@ -68,10 +69,13 @@ def test_confounder(self): mdata[Z > 0, 1] = np.nan - mv_pvalues_t1.append(mv_fisherz(mdata, 0, 1, ())) - pvalues_t1.append(fisherz(data, 0, 1, ())) - mv_pvalues_t2.append(mv_fisherz(mdata, 0, 1, (2,))) - pvalues_t2.append(fisherz(data, 0, 1, (2,))) + indep_test = CIT(data, method='fisherz') + mv_indep_test = CIT(mdata, method='mv_fisherz') + + mv_pvalues_t1.append(mv_indep_test(0, 1, ())) + pvalues_t1.append(indep_test(0, 1, ())) + mv_pvalues_t2.append(mv_indep_test(0, 1, (2,))) + pvalues_t2.append(indep_test(0, 1, (2,))) print('mv_fisherz: X and Y are not independent, pvalue is mean {:.3f}'.format(np.mean(mv_pvalues_t1)) + ' std: {:.3f}'.format(np.std(mv_pvalues_t1))) print('fisherz: X and Y are not independent, pvalue is mean {:.3f}'.format(np.mean(pvalues_t1)) + ' std: {:.3f}'.format(np.std(pvalues_t1))) @@ -101,10 +105,13 @@ def test_fork(self): mdata[Z > 0, 1] = np.nan - mv_pvalues_t1.append(mv_fisherz(mdata, 0, 1, ())) - pvalues_t1.append(fisherz(data, 0, 1, ())) - mv_pvalues_t2.append(mv_fisherz(mdata, 0, 1, (2,))) - pvalues_t2.append(fisherz(data, 0, 1, (2,))) + indep_test = CIT(data, method='fisherz') + mv_indep_test = CIT(mdata, method='mv_fisherz') + + mv_pvalues_t1.append(mv_indep_test(0, 1, ())) + pvalues_t1.append(indep_test(0, 1, ())) + mv_pvalues_t2.append(mv_indep_test(0, 1, (2,))) + pvalues_t2.append(indep_test(0, 1, (2,))) print('mv_fisherz: X and Y are independent, pvalue is mean {:.3f}'.format(np.mean(mv_pvalues_t1)) + ' std: {:.3f}'.format(np.std(mv_pvalues_t1))) print('fisherz: X and Y are independent, pvalue is mean {:.3f}'.format(np.mean(pvalues_t1)) + ' std: {:.3f}'.format(np.std(pvalues_t1))) @@ -133,10 +140,13 @@ def test_fork2(self): mdata[Y > 0, 2] = np.nan - mv_pvalues_t1.append(mv_fisherz(mdata, 0, 1, ())) - pvalues_t1.append(fisherz(data, 0, 1, ())) - mv_pvalues_t2.append(mv_fisherz(mdata, 0, 1, (2,))) - pvalues_t2.append(fisherz(data, 0, 1, (2,))) + indep_test = CIT(data, method='fisherz') + mv_indep_test = CIT(mdata, method='mv_fisherz') + + mv_pvalues_t1.append(mv_indep_test(0, 1, ())) + pvalues_t1.append(indep_test(0, 1, ())) + mv_pvalues_t2.append(mv_indep_test(0, 1, (2,))) + pvalues_t2.append(indep_test(0, 1, (2,))) print('mv_fisherz: X and Y are independent, pvalue is mean {:.3f}'.format(np.mean(mv_pvalues_t1)) + ' std: {:.3f}'.format(np.std(mv_pvalues_t1))) print('fisherz: X and Y are independent, pvalue is mean {:.3f}'.format(np.mean(pvalues_t1)) + ' std: {:.3f}'.format(np.std(pvalues_t1))) diff --git a/tests/TestPC.py b/tests/TestPC.py index 207e0fdd..351478f6 100644 --- a/tests/TestPC.py +++ b/tests/TestPC.py @@ -10,6 +10,8 @@ from causallearn.utils.DAG2CPDAG import dag2cpdag from causallearn.utils.TXT2GeneralGraph import txt2generalgraph + + ######################################### Test Notes ########################################### # All the benchmark results of loaded files (e.g. "./TestData/benchmark_returned_results/") # # are obtained from the code of causal-learn as of commit # @@ -81,7 +83,7 @@ "./TestData/bnlearn_discrete_10000/benchmark_returned_results/win95pts_pc_chisq_0.05_stable_0_-1.txt": "1168e7c6795df8063298fc2f727566be", } INCONSISTENT_RESULT_GRAPH_ERRMSG = "Returned graph is inconsistent with the benchmark. Please check your code with the commit 94d1536." - +UNROBUST_RESULT_GRAPH_ERRMSG = "Returned graph is much too different from the benchmark. Please check the randomness in your algorithm." # verify files integrity first for file_path, expected_MD5 in BENCHMARK_TXTFILE_TO_MD5.items(): with open(file_path, 'rb') as fin: @@ -236,10 +238,22 @@ def test_pc_load_linear_missing_10_with_mv_fisher_z(self): truth_cpdag = dag2cpdag(truth_dag) num_edges_in_truth = truth_dag.get_num_edges() - cg = pc(data, 0.05, mv_fisherz, True, 0, 4, mvpc=True) + # since there is randomness in mvpc (np.random.shuffle in get_predictor_ws of utils/PCUtils/Helper.py), + # we need to get two results respectively: + # - one with randomness to ensure that randomness is not a big problem for robustness of the algorithm end-to-end + # - one with no randomness (deterministic) to ensure that logic of the algorithm is consistent after any further changes + # (i.e., to ensure that the little difference in the results is caused by randomness, not by the logic change). + cg_with_randomness = pc(data, 0.05, mv_fisherz, True, 0, 4, mvpc=True) + state = np.random.get_state() # save the current random state + np.random.seed(42) # set the random state to 42 temporarily, just for the following line + cg_without_randomness = pc(data, 0.05, mv_fisherz, True, 0, 4, mvpc=True) + np.random.set_state(state) # restore the random state + benchmark_returned_graph = np.loadtxt("./TestData/benchmark_returned_results/linear_missing_10_mvpc_fisherz_0.05_stable_0_4.txt") - assert np.all(cg.G.graph == benchmark_returned_graph), INCONSISTENT_RESULT_GRAPH_ERRMSG - shd = SHD(truth_cpdag, cg.G) + assert np.all(cg_without_randomness.G.graph == benchmark_returned_graph), INCONSISTENT_RESULT_GRAPH_ERRMSG + assert np.all(cg_with_randomness.G.graph != benchmark_returned_graph) / benchmark_returned_graph.size < 0.02,\ + UNROBUST_RESULT_GRAPH_ERRMSG # 0.05 is an empiric value we find here + shd = SHD(truth_cpdag, cg_with_randomness.G) print(f" pc(data, 0.05, mv_fisherz, True, 0, 4, mvpc=True)\tSHD: {shd.get_shd()} of {num_edges_in_truth}") print('test_pc_load_linear_missing_10_with_mv_fisher_z passed!\n')