In [None]:
import sys

In [None]:
!{sys.executable} -m pip install git+https://github.com/buguroo/pyknow/

In [1]:
from pyknow import *

In [2]:
class Crosswalk(KnowledgeEngine):    
    @Rule(Fact(action = "Проверить наличие пешеходов"), Fact(pedestrians = "Да"))
    def wait_for_pedestrian(self):
        self.declare(Fact(final_action = "Остановиться и дождаться, пока пешеход пройдет"))
        
    @Rule(Fact(action = "Проверить наличие пешеходов"), NOT(Fact(pedestrians = "Да")))
    def no_pedestrian(self):
        self.declare(Fact(final_action = "Ехать дальше без изменения скорости"))
        
    @Rule(Fact("Пешеходный переход"), OR(Fact(traffic_lights = "Нет"), NOT((Fact(traffic_lights = W())))))
    def crosswalk(self):        
        self.declare(Fact(final_action = "Притормозить"))
        self.declare(Fact(action = "Проверить наличие пешеходов"))
    
    @Rule(Fact(final_action = MATCH.action), salience=1)
    def maneuver(self, action):
        print(action)

In [3]:
test = Crosswalk()
test.reset()
test.declare(Fact("Пешеходный переход"), Fact(pedestrians = "Да"))
test.run()
test.facts

Притормозить
Остановиться и дождаться, пока пешеход пройдет


FactList([(0, InitialFact()),
          (1, Fact('Пешеходный переход')),
          (2, Fact(pedestrians='Да')),
          (3, Fact(final_action='Притормозить')),
          (4, Fact(action='Проверить наличие пешеходов')),
          (5,
           Fact(final_action='Остановиться и дождаться, пока пешеход пройдет'))])

In [4]:
test = Crosswalk()
test.reset()
test.declare(Fact("Пешеходный переход"), Fact(pedestrians = "Нет"))
test.run()
test.facts

Притормозить
Ехать дальше без изменения скорости


FactList([(0, InitialFact()),
          (1, Fact('Пешеходный переход')),
          (2, Fact(pedestrians='Нет')),
          (3, Fact(final_action='Притормозить')),
          (4, Fact(action='Проверить наличие пешеходов')),
          (5, Fact(final_action='Ехать дальше без изменения скорости'))])

In [7]:
import re
import inspect

declaration_re = re.compile(r"(?<=self\.declare\(Fact\().*?(?=\)\)\n)")
constants = [('FOLLOW_TRAFFIC_LIGHTS', 'Следовать сигналам светофора'),
('NOT_MAIN_ROAD', 'Мы на второстепенной дороге'),
('MAIN_ROAD', 'Мы на главной дороге'),
('CHECK_RIGHT', 'Воспользоваться правилом \"помеха справа\"'),
('ACTION_GO_LEFT', 'Повернуть налево, иначе остановиться и ждать'),
('ACTION_GO_RIGHT', 'Повернуть направо, иначе остановиться и ждать'),
('ACTION_GO', 'Можно ехать'),
('ACTION_STOP', 'Остановиться и ждать'),
('ACTION_CHECK_AND_GO', 'Уступить дорогу, если есть другие ТС, иначе можно ехать'),
('ACTION_TURN_AROUND', 'Развернуться'),
('ACTION_CONTINUE_DRIVING_STRAIGHT', 'Продолжать движение вперёд по правилам'),
('ACTION_CONTINUE_DRIVING_CROSSROAD', 'Следуем правилам проезда на перекрёстке'),
('ACTION_END', 'Мы доехали'),
('ACTION_STRAIGHT_CROSSROAD_TURN', 'Ехать до ближайшего перекрёстка и повернуть в направлении пункта назначения'),
('ACTION_CROSSROAD_TURN', 'Развернуться на ближайшем перекрёстке'),
('YES', 'Да'),
('NO', 'Нет')]

def replace_constants(s):
    for (key, value) in constants:
        s = s.replace(key, value)
    return s

def ke2dot(engine):
    '''Генератор схемы связей между фактами и правилами'''
    
    def gen_node(name, label, shape):
        return "\t\"{}\" [label=\"{}\", shape={}]\n".format(name, label, shape)

    def gen_edge(name1, name2):
        return "\t\"{}\" -> \"{}\"\n".format(name1, name2)
    
    def get_id(name):
        nonlocal facts
        nonlocal id_
        
        res = facts.get(name)
        if res is None:
            facts[name] = id_
            id_ += 1
            return str(name), True
        return str(name), False

    def iterate_over_tree(node: tuple, node_id: str):
        nonlocal result
        nonlocal id_

        for child in node:
            node_type = str(type(child)).rsplit('.', 1)[-1][:-2]
            if node_type == 'Fact':
                fact = str(child).rsplit("Fact", 1)[-1][1:-1].replace('"', "'")
                child_id, is_new = get_id(fact)
                if is_new: result += gen_node(child_id, fact, "rectangle")
                result += gen_edge(child_id, node_id)
            else:
                child_id = str(id_)
                id_ += 1
                result += gen_node(child_id, node_type, "circle")
                result += gen_edge(child_id, node_id)
                iterate_over_tree(child, child_id)

    result = "digraph KnowledgeEngine {\n\tgraph [splines=line]\n"
    facts = dict()
    id_ = 0

    for rule in engine.get_rules():
        top_node = ""
        if len(rule) > 1:
            top_node = str(id_)
            id_ += 1
            result += gen_node(top_node, "AND", "circle")
            iterate_over_tree(conditionalelement.AND(*rule), top_node)
        else:
            node_type = str(type(rule[0])).rsplit('.', 1)[-1][:-2]
            if node_type == "Fact":
                fact = str(rule[0]).rsplit("Fact", 1)[-1][1:-1].replace('"', "'")
                top_node, is_new = get_id(fact)
                if is_new: result += gen_node(top_node, fact, "rectangle")
            else:
                top_node = str(id_)
                id_ += 1
                result += gen_node(top_node, node_type, "circle")
                iterate_over_tree(rule[0], top_node)

        funcname = rule._wrapped.__name__
        result += gen_node(funcname, funcname, "parallelogram")
        result += gen_edge(top_node, funcname)

        declarations = declaration_re.findall(inspect.getsource(rule._wrapped))
        for declaration in map(replace_constants, declarations):
            declaration = declaration.replace('"', "'").replace(" = ", "=")
            node_id, is_new = get_id(declaration)
            if is_new: result += gen_node(node_id, declaration, "rectangle")
            result += gen_edge(funcname, node_id)

    result += "}\n"
    return result

In [8]:
print(ke2dot(test))

digraph KnowledgeEngine {
	graph [splines=line]
	"0" [label="AND", shape=circle]
	"'Пешеходный переход'" [label="'Пешеходный переход'", shape=rectangle]
	"'Пешеходный переход'" -> "0"
	"2" [label="OR", shape=circle]
	"2" -> "0"
	"traffic_lights='Нет'" [label="traffic_lights='Нет'", shape=rectangle]
	"traffic_lights='Нет'" -> "2"
	"4" [label="NOT", shape=circle]
	"4" -> "2"
	"traffic_lights=W()" [label="traffic_lights=W()", shape=rectangle]
	"traffic_lights=W()" -> "4"
	"crosswalk" [label="crosswalk", shape=parallelogram]
	"0" -> "crosswalk"
	"final_action='Притормозить'" [label="final_action='Притормозить'", shape=rectangle]
	"crosswalk" -> "final_action='Притормозить'"
	"action='Проверить наличие пешеходов'" [label="action='Проверить наличие пешеходов'", shape=rectangle]
	"crosswalk" -> "action='Проверить наличие пешеходов'"
	"final_action=W('action')" [label="final_action=W('action')", shape=rectangle]
	"maneuver" [label="maneuver", shape=parallelogram]
	"final_action=W('action')" ->

In [None]:
# def ask(question, answers):
#     print(question, '(' + ', '.join(answers) + '): ', end='')
#     ans = input()
#     while ans not in answers:
#         ans = input()
#     return ans

# def read_event():
#     facts = []
#     event = input()
#     facts.append(Fact(event))
#     if (event == "пешеходный переход"):
#         facts.append(Fact(pedestrians = ask("Пешеходы?", ["да", "нет"])))
#     else:
#         return None
#     return facts

In [None]:
# # main loop
# facts = read_event()
# while (facts):
#     engine.reset()
#     engine.declare_facts(facts)
#     engine.run()
#     facts = read_event()