In [2]:
#%%writefile ../tests/test_algorithm/test_id_alg.py
# -*- coding: utf-8 -*-


"""Tests for the identify algorithm."""

import unittest

from bel2scm import causal_graph as cg
from y0.parser.craig.grammar import grammar
from y0.dsl import Expression, P, Sum, X, Y, Z
from y0.graph import NxMixedGraph
#MixedGraph, 
from y0.algorithm.identify import identify #does not exist yet

P_XY = P(X, Y, Z)
P_XYZ = P(X, Y, Z)



def nxmixedgraph_to_craig(graph:NxMixedGraph):
    di_edges = list(self.directed.edges())
    bi_edges = list(self.undirected.edges())
    vertices = list(self.directed)  # could be either since they're maintained together
    str_list = [f'{U} => {V}' for U, V in di_edges]
    type_dict = dict([(U, "continuous") for U in vertices])
    graph_craig = cg.str_graph(str_list,'SCM',type_dict)
    graph_craig.add_confound([[U,V] for U,V in bi_edges])
    return graph_craig

def cause_effect_from_query(query:Expression):
    return ['X'],['Y']

class TestIdentify(unittest.TestCase):
    """Test cases from https://github.com/COVID-19-Causal-Reasoning/Y0/blob/master/ID_whittemore.ipynb."""

    def assert_identify(self, expression: Expression, graph: NxMixedGraph, query: Expression):
        """Assert that the graph returns the same."""
        self.assertEqual(expression, identify(graph, query))

    def test_figure_2a(self):
        """Test Figure 2A."""
        graph = NxMixedGraph()
        graph.add_directed_edge('X', 'Y')
        print(identify(graph,Y@X).to_text())
        expr = '[ sum_{} P(Y|X) ]'
        #self.assert_identify(P_XY / Sum[Y](P_XY), graph, Y@X)
        self.assert_identify(grammar.parseString(expr)[0], graph, Y@X)
        

    def test_figure_2b(self):
        """Test Figure 2B."""
        graph = NxMixedGraph()
        graph.add_directed_edge('X', 'Y')
        graph.add_directed_edge('X', 'Z')
        graph.add_directed_edge('Z', 'Y')
        graph.add_undirected_edge('Y', 'Z')
        print(identify(graph,Y@X).to_text())
        expr = '[ sum_{Z} P(Z|X) P(Y|X,Z) ]'
        #self.assertEqual(
        #    Sum[Z](Sum[Y](P_XY) / (Sum[Z](Sum[Y](P_XY))) * (P_XY / Sum[Y](P_XY))),
        #    identify(graph, Y@X),
        #)
        self.assert_identify(grammar.parseString(expr)[0], graph, Y@X)

    def test_figure_2c(self):
        """Test Figure 2C."""
        graph = NxMixedGraph()
        graph.add_directed_edge('X', 'Y')
        graph.add_directed_edge('Z', 'X')
        graph.add_directed_edge('Z', 'Y')
        graph.add_undirected_edge('Y', 'Z')
        print(identify(graph,Y@X).to_text())
        expr = '[ sum_{Z} P(Z) P(Y|X,Z) ]'

        #self.assertEqual(
        #    Sum[Z](Sum[X, Y](P_XYZ) / (Sum[Z](Sum[X, Y](P_XYZ))) * (P_XYZ / Sum[Y](P_XYZ))),
        #    identify(graph, Y@X),
        #)
        self.assert_identify(grammar.parseString(expr)[0], graph, Y@X)

    def test_figure_2d(self):
        """Test Figure 2D."""
        graph = NxMixedGraph()
        graph.add_directed_edge('X', 'Y')
        graph.add_directed_edge('Z', 'X')
        graph.add_directed_edge('Z', 'Y')
        graph.add_undirected_edge('X', 'Z')
        print(identify(graph,Y@X).to_text())
        expr = '[ sum_{Z} [ sum_{} P(Y|X,Z) ] [ sum_{} [ sum_{X,Y} P(X,Y,Z) ] ] ]'

#         self.assertEqual(
#             Sum[Z](Sum[X, Y](P_XYZ) * P_XYZ / Sum[Y](P_XYZ)),
#             identify(graph, Y@X),
#         )
        self.assert_identify(grammar.parseString(expr)[0], graph, Y@X)

    def test_figure_2e(self):
        """Test Figure 2E."""
        graph = NxMixedGraph()
        graph.add_directed_edge('X', 'Z')
        graph.add_directed_edge('Z', 'Y')
        graph.add_undirected_edge('X', 'Y')
        print('ID output:', identify(graph,Y@X).to_text())
        #expr = '[ sum_{Z} [ sum_{} P(Z|X) ] [ sum_{} [ sum_{X} P(Y|X,Z) P(X) ] ] ]'
        
        expr = '[ sum_{Z} [ sum_{} P(Z|X) ] [ sum_{} [ sum_{X} P(X) P(Y|X,Z) ] ] ]'
        print('Expected:',grammar.parseString(expr)[0])
#         self.assertEqual(
#             (
#                 Sum[Z](Sum[Y](P_XYZ) / Sum[Z](Sum[Y](P_XYZ)))
#                 * Sum[X](P_XYZ * Sum[Y, Z](P_XYZ) / Sum[Y](P_XYZ) / Sum[X](Sum[Y, Z](P_XYZ)))
#             ),
#             identify(graph, Y@X),
#         )
        self.assert_identify(grammar.parseString(expr)[0], graph, Y@X)


if __name__ == '__main__':
    unittest.main()


E
ERROR: /Users/zuck016/Library/Jupyter/runtime/kernel-1f84011b-44af-470d-aa54-744b67053ec1 (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/Users/zuck016/Library/Jupyter/runtime/kernel-1f84011b-44af-470d-aa54-744b67053ec1'

----------------------------------------------------------------------
Ran 1 test in 0.002s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [5]:
from y0.dsl import Expression, P, Sum, X, Y, Z, Variable
from y0.graph import NxMixedGraph

from y0.algorithm.identify import identify #does not exist yet

P_XY = P(X, Y, Z)
P_XYZ = P(X, Y, Z)

P(*[Variable('X'), Variable('Y')])

P(Distribution(children=(Variable(name='X'), Variable(name='Y')), parents=()))

In [1]:
import sys, test_id_alg

suite = unittest.TestLoader().loadTestsFromTestCase(TestIdentify)
unittest.TextTestRunner(verbosity=4,stream=sys.stderr).run(suite)

ModuleNotFoundError: No module named 'test_id_alg'

In [None]:
graph_2b = NxMixedGraph()
graph_2b.add_directed_edge('X', 'Y')
graph_2b.add_directed_edge('X', 'Z')
graph_2b.add_directed_edge('Z', 'Y')
graph_2b.add_undirected_edge('Y', 'Z')

In [4]:
for U,V in graph_2b.directed.edges():
    print(U,V)

X Y
X Z
Z Y


In [14]:
from y0.identify import _get_treatments, _get_outcomes
expr = P(Y@X,Z@X)
print(_get_treatments(expr.get_variables()))
print(_get_outcomes(expr.get_variables()))
    

['X']
['Z', 'Y']


In [9]:
from y0.parser.craig.grammar import grammar
expr='[sum_{W,D,Z,V} [sum_{}P(W|X)][sum_{} [sum_{X,W,Z,Y,V} P(X,W,D,Z,Y,V)]][sum_{}P(Z|D,V)][sum_{} [sum_{X} P(X)P(Y|X,W,Z,D,V)]][sum_{} [sum_{X,W,D,Z,Y} P(X,W,D,Z,Y,V)]]]'
grammar.parseString(expr)

([Sum(expression=Product(expressions=(Sum(expression=P(Distribution(children=(Variable(name='W'),), parents=(Variable(name='X'),))), ranges=()), Sum(expression=Sum(expression=P(Distribution(children=(Variable(name='X'), Variable(name='W'), Variable(name='D'), Variable(name='Z'), Variable(name='Y'), Variable(name='V')), parents=())), ranges=(Variable(name='X'), Variable(name='W'), Variable(name='Z'), Variable(name='Y'), Variable(name='V'))), ranges=()), Sum(expression=P(Distribution(children=(Variable(name='Z'),), parents=(Variable(name='D'), Variable(name='V')))), ranges=()), Sum(expression=Sum(expression=Product(expressions=(P(Distribution(children=(Variable(name='X'),), parents=())), P(Distribution(children=(Variable(name='Y'),), parents=(Variable(name='X'), Variable(name='W'), Variable(name='Z'), Variable(name='D'), Variable(name='V')))))), ranges=(Variable(name='X'),)), ranges=()), Sum(expression=Sum(expression=P(Distribution(children=(Variable(name='X'), Variable(name='W'), Variab

In [16]:
P_XY = P(X, Y, Z)
P_XYZ = P(X, Y, Z)
expr = (P_XY / Sum[Y](P_XY)).to_text()
expr


'frac_{P(X,Y,Z)}{[ sum_{Y} P(X,Y,Z) ]}'

In [17]:
grammar.parseString(expr)

([Fraction(numerator=P(Distribution(children=(Variable(name='X'), Variable(name='Y'), Variable(name='Z')), parents=())), denominator=Sum(expression=P(Distribution(children=(Variable(name='X'), Variable(name='Y'), Variable(name='Z')), parents=())), ranges=(Variable(name='Y'),)))], {})

In [18]:
grammar.parseString(expr)[0]

Fraction(numerator=P(Distribution(children=(Variable(name='X'), Variable(name='Y'), Variable(name='Z')), parents=())), denominator=Sum(expression=P(Distribution(children=(Variable(name='X'), Variable(name='Y'), Variable(name='Z')), parents=())), ranges=(Variable(name='Y'),)))