In [1]:
# saturation prover environment can be created
# as any other OpenAI Gym environment
import gym
import os
from glob import glob
import sys
if sys.version_info.major == 3 and sys.version_info.minor >= 9:
    from importlib.resources import files
else:
    from importlib_resources import files

# this example uses package's resources
# one can indicate the path to unpacked (TPTP)[http://tptp.org/] instead
# it's obligatory to set the maximal number of steps of saturation algorithm
# and the list of problems. Here are CNF problems from TPTP
problem_list = sorted(glob(
    os.path.join(
        files(
            "gym_saturation"
        ).joinpath("resources/TPTP-mock"),
        "Problems/*/*-*.p"
    )
))
env = gym.make(
    "gym_saturation:saturation-v0",
    step_limit=100,
    problem_list=problem_list
).unwrapped

In [2]:
# to make results reproducible, let's set the seed
print(env.seed(2))

2


In [3]:
# when resettig the environment one may pass an optional argument ---
# a name of an exact problem from TPTP
# (if omitted, a problem will be chosen at random)
# parsing is done with `lark` package which sometimes might be slow
# reimplementing parsing in a faster language than Python might help
obs = env.reset()

In [4]:
# during the reset a problem is chosen at random
print(env.render())

cnf(b_equals_bb, hypothesis, equal_sets(b, bb)).
cnf(element_of_b, hypothesis, member(element_of_b, b)).
cnf(prove_element_of_bb, hypothesis, ~member(element_of_b, bb)).
cnf(membership_in_subsets, hypothesis, ~member(X0, X1) | ~subset(X1, X2) | member(X0, X2)).
cnf(subsets_axiom1, hypothesis, subset(X3, X4) | member(member_of_1_not_of_2(X3,X4), X3)).
cnf(subsets_axiom2, hypothesis, ~member(member_of_1_not_of_2(X5,X6), X6) | subset(X5, X6)).
cnf(set_equal_sets_are_subsets1, hypothesis, ~equal_sets(X7, X8) | subset(X7, X8)).
cnf(set_equal_sets_are_subsets2, hypothesis, ~equal_sets(X9, X10) | subset(X10, X9)).
cnf(subsets_are_set_equal_sets, hypothesis, ~subset(X11, X12) | ~subset(X12, X11) | equal_sets(X12, X11)).


In [5]:
# observation is a dict with two keys: "action_mask" and "real_obs"
# "action_mask" is 1 for actions an agent can choose and 0 for the rest
# "real_obs" is a list of dictionaries, representing logical clauses
print(obs.keys())
print(obs["action_mask"][:20])
print(obs["real_obs"][0]["literals"][0])

dict_keys(['real_obs', 'action_mask'])
[1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
{'class': 'Literal', 'negated': False, 'atom': {'class': 'Predicate', 'name': 'equal_sets', 'arguments': [{'class': 'Function', 'name': 'b', 'arguments': []}, {'class': 'Function', 'name': 'bb', 'arguments': []}]}}


In [6]:
# possible actions are indices of not processed clauses from the state
# formally, action space is a large enough discrete
print(env.action_space)
# but the number of ones in the mask
print(obs["action_mask"].sum())
# equals to number of not processed clauses
print(sum(
    [1.0 for clause in obs["real_obs"] if not clause["processed"]]
))

Discrete(1000)
9.0
9.0


In [7]:
# only "ansi" and "human" rendering methods are implemented
# "human" (default) returns TPTP representation
# "ansi" returns a JSON string representing the state
print(env.render("ansi")[:1000])

[{'class': 'Clause', 'literals': [{'class': 'Literal', 'negated': False, 'atom': {'class': 'Predicate', 'name': 'equal_sets', 'arguments': [{'class': 'Function', 'name': 'b', 'arguments': []}, {'class': 'Function', 'name': 'bb', 'arguments': []}]}}], 'label': 'b_equals_bb', 'birth_step': 0, 'processed': False, 'inference_parents': [], 'inference_rule': None}, {'class': 'Clause', 'literals': [{'class': 'Literal', 'negated': False, 'atom': {'class': 'Predicate', 'name': 'member', 'arguments': [{'class': 'Function', 'name': 'element_of_b', 'arguments': []}, {'class': 'Function', 'name': 'b', 'arguments': []}]}}], 'label': 'element_of_b', 'birth_step': 0, 'processed': False, 'inference_parents': [], 'inference_rule': None}, {'class': 'Clause', 'literals': [{'class': 'Literal', 'negated': True, 'atom': {'class': 'Predicate', 'name': 'member', 'arguments': [{'class': 'Function', 'name': 'element_of_b', 'arguments': []}, {'class': 'Function', 'name': 'bb', 'arguments': []}]}}], 'label': 'prov

In [8]:
# the package includes a simple episode function
from gym_saturation.agent_testing import episode
# and an example policy, selecting the shortest clause
from gym_saturation.agent_testing import SizeAgent

size_agent = SizeAgent()
env.problem_list = [
    files("gym_saturation")
    .joinpath("resources/TPTP-mock/Problems/SET/SET001-1.p")
]
episode_memory = episode(env, size_agent)
# this policy manages to solve a simple problem from TPTP
print(episode_memory[-1].done, episode_memory[-1].reward)

True 1.0


In [9]:
# the proof here consists of only three steps
# but a naïve clause size policy finds it only after 15 steps
# a trained DL policy might have done better
print(len(episode_memory))

15


In [10]:
# one can now check the TSTP proof found
print(env.tstp_proof)

cnf(_4, hypothesis, ~subset(b, X2) | member(element_of_b, X2), inference(resolution, [], [element_of_b, membership_in_subsets])).
cnf(_0, hypothesis, subset(b, bb), inference(resolution, [], [b_equals_bb, set_equal_sets_are_subsets1])).
cnf(_23, hypothesis, member(element_of_b, bb), inference(resolution, [], [_0, _4])).
cnf(_27, hypothesis, $false, inference(resolution, [], [prove_element_of_bb, _23])).


In [11]:
# paramodulation test
env.problem_list = [
    files("gym_saturation")
    .joinpath("resources/TPTP-mock/Problems/TST/TST002-1.p")
]
episode_memory = episode(env, size_agent)
# here the proof consists of four steps
# but a naïve policy needs 25
print(
    len(env.tstp_proof.split("\n")), 
    len(episode_memory)
)

4 25
