# Communicating with Lean

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Communicating-with-Lean" data-toc-modified-id="Communicating-with-Lean-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Communicating with Lean</a></span><ul class="toc-item"><li><span><a href="#Lean-Server" data-toc-modified-id="Lean-Server-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Lean Server</a></span></li><li><span><a href="#Testing-tactics-with-the-Lean-Server" data-toc-modified-id="Testing-tactics-with-the-Lean-Server-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Testing tactics with the Lean Server</a></span><ul class="toc-item"><li><span><a href="#Speed-and-robustness" data-toc-modified-id="Speed-and-robustness-1.2.1"><span class="toc-item-num">1.2.1&nbsp;&nbsp;</span>Speed and robustness</a></span></li><li><span><a href="#Stepping-through-a-full-proof" data-toc-modified-id="Stepping-through-a-full-proof-1.2.2"><span class="toc-item-num">1.2.2&nbsp;&nbsp;</span>Stepping through a full proof</a></span></li></ul></li><li><span><a href="#Creating-a-simple-Lean-solver-(with-breath-first-search)-in-Python" data-toc-modified-id="Creating-a-simple-Lean-solver-(with-breath-first-search)-in-Python-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Creating a simple Lean solver (with breath first search) in Python</a></span></li><li><span><a href="#Custom-printing-of-Lean-expressions" data-toc-modified-id="Custom-printing-of-Lean-expressions-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Custom printing of Lean expressions</a></span></li><li><span><a href="#Current-issues" data-toc-modified-id="Current-issues-1.5"><span class="toc-item-num">1.5&nbsp;&nbsp;</span>Current issues</a></span><ul class="toc-item"><li><span><a href="#Privately-defined-terms" data-toc-modified-id="Privately-defined-terms-1.5.1"><span class="toc-item-num">1.5.1&nbsp;&nbsp;</span>Privately defined terms</a></span></li><li><span><a href="#Term-blowup" data-toc-modified-id="Term-blowup-1.5.2"><span class="toc-item-num">1.5.2&nbsp;&nbsp;</span>Term blowup</a></span></li><li><span><a href="#Missing-@-symbols" data-toc-modified-id="Missing-@-symbols-1.5.3"><span class="toc-item-num">1.5.3&nbsp;&nbsp;</span>Missing @ symbols</a></span></li><li><span><a href="#Extra-@-sysmbols-(or-maybe-a-missing-case-in-the-Lean-parser)" data-toc-modified-id="Extra-@-sysmbols-(or-maybe-a-missing-case-in-the-Lean-parser)-1.5.4"><span class="toc-item-num">1.5.4&nbsp;&nbsp;</span>Extra <code>@</code> sysmbols (or maybe a missing case in the Lean parser)</a></span></li><li><span><a href="#Implicit-and-type-class-assumptions" data-toc-modified-id="Implicit-and-type-class-assumptions-1.5.5"><span class="toc-item-num">1.5.5&nbsp;&nbsp;</span>Implicit and type class assumptions</a></span></li><li><span><a href="#Let-expressions" data-toc-modified-id="Let-expressions-1.5.6"><span class="toc-item-num">1.5.6&nbsp;&nbsp;</span>Let expressions</a></span></li><li><span><a href="#Other" data-toc-modified-id="Other-1.5.7"><span class="toc-item-num">1.5.7&nbsp;&nbsp;</span>Other</a></span></li></ul></li><li><span><a href="#TODOs" data-toc-modified-id="TODOs-1.6"><span class="toc-item-num">1.6&nbsp;&nbsp;</span>TODOs</a></span></li><li><span><a href="#What-I-want-from-Lean-4" data-toc-modified-id="What-I-want-from-Lean-4-1.7"><span class="toc-item-num">1.7&nbsp;&nbsp;</span>What I want from Lean 4</a></span></li></ul></li></ul></div>

This is a hacky prototype showing how one can use Lean's server for communication between Lean and another program.  We have in mind implementing something like HOList for Lean. The main idea is that theorem proving is like a puzzle similar to the Rubik cube.  One has states and actions one can perform to move to other states.  This states are goals (or lists of goals) and the actions are tactics.  

What we build here is an interface where one says to Lean "apply this tactic to this goal" and Lean says whether that tactic succeeds and if so what the new list of goals are.  It is written in a Python notebook, but the ideas transcend languages.

In [1]:
import subprocess
import tempfile
import json
from pprint import pprint
import time
from datetime import datetime
import collections
from IPython.display import display

In [3]:
import subprocess
import json

def send_lsp_request():
    # Start the Lean server as a subprocess
    process = subprocess.Popen(['lean', '--server'],
                               stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               text=True, bufsize=1)
    
    initialize_request = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "processId": None,
            "rootPath": None,
            "rootUri": None,
            "capabilities": {},
            "trace": "off"
        }
    }

    json_initialize = json.dumps(initialize_request) + '\n'
    process.stdin.write(json_initialize)
    process.stdin.flush()

    # Read from stderr to capture any errors
    stderr_output = process.stderr.readline()
    if stderr_output:
        print("Error from Lean server:", stderr_output)
    # Read and print initialization response
    initialize_response = process.stdout.readline()
    print("Initialization response:", initialize_response)

    # LSP request to open a document
    lsp_request = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "textDocument/didOpen",
        "params": {
            "textDocument": {
                "uri": "file:///tmp/fake_lean_file.lean",
                "languageId": "lean4",
                "version": 1,
                "text": "example (p: Prop) : p = p := begin sorry end"
            }
        }
    }

    # Send the request
    json_request = json.dumps(lsp_request) + '\n'
    process.stdin.write(json_request)
    process.stdin.flush()

    # Wait for and print the response
    try:
        # Read from stderr to capture any errors
        stderr_output = process.stderr.readline()
        if stderr_output:
            print("Error from Lean server:", stderr_output)

        # Read from stdout to capture normal responses
        stdout_output = process.stdout.readline()
        if stdout_output:
            print("Response from Lean server:", stdout_output)
    except Exception as e:
        print("Exception while reading server response:", str(e))

    # Cleanup
    process.terminate()
    process.wait()

send_lsp_request()

Error from Lean server: Watchdog error: Cannot read LSP request: Invalid header field: "{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"initialize\", \"params\": {\"processId\": null, \"rootPath\": null, \"rootUri\": null, \"capabilities\": {}, \"trace\": \"off\"}}\n"

Initialization response: 


BrokenPipeError: [Errno 32] Broken pipe

## Lean Server
This class starts up a Lean server and communicates with it.  There are a number of requests one can make to the Lean server, but we only need one or two.  To understand the Lean server, it is important to know it is not like a REPL, but more like the LSP.  It is designed to partial compile and comment on files as they are being written.  The use here is more of a hack (and it is unfortunately quite a bit slow we will see).

One communicates to Lean by sending JSON requests and getting back responses.  We only need one or two request types for our purposes.  The first request we need is a "sync" request.  It sends a "file" to Lean to be synced.  (The file doesn't actually need to exist.)  The second request, an "info" request, asks Lean to provide more information about particular places in the code.  Unfortunately, the responses to the requests don't actually provide a lot of direct answers.  Especially for the "sync" request, one has to wait for an "all_messages" request to provide answers.  (And even then, it is difficult to determine that the response you got is up-to-date with the current Lean file.  We've employed a lot of tricks to make sure we are getting the most up-to-date information from Lean.)  Our class will store any "all_messages" responses it sees into a queue for latter processing.

Ultimately, the "info" requests turned out not to be a reliable as we hoped, so we just use the "sync" request and only use the "info" request to prevent blocking of the channel.

In [22]:
class LeanServer:
    """
    Open up a Lean server and communicate with it.  
    
    Right now it only has support for the "sync" and "info" requests (and their)
    corresponding responses.  It does however, store all "all_messages" response
    into a queue for later processing.
    """
    
    def __init__(self):
        self.cntr = -1
        self.cntr2 = -1
        self.all_messages = []
        self.all_messages_time = -1
        self.current_tasks = []
        self.current_tasks_time = -1
        self.proc = subprocess.Popen(['lean', '--server'], 
                    universal_newlines=True, 
                    stdin=subprocess.PIPE, # pipe STDIN and STDOUT to send and receive messages
                    stdout=subprocess.PIPE, 
                    #stderr=subprocess.PIPE
                )
    
    # make into a context manager so that it closes lean server automatically
    def __enter__(self):
        self.proc.__enter__()
        return self
    
    def __exit__(self, type, value, traceback):
        self.proc.__exit__(type, value, traceback)
    
    def seq_num(self):
        self.cntr += 1
        return self.cntr
    
    def send_request(self, request, expected_response, verbose=False, sleep=0.0):
        seq_num = self.seq_num()
        request1 = request.copy()
        request1['seq_num'] = seq_num
        j = json.dumps(request1)
        
        # TODO: Use logging instead
        if verbose:
            print()
            print("=>:", datetime.now().strftime('%H:%M:%S.%f'))
            pprint(j)
            print()
        
        print(1.1)
        # send
        print(j, file=self.proc.stdin, flush=True)
        
        print(1.2)
        # wait for response
        time.sleep(sleep)
        while True:
            self.cntr2 += 1
            print(1.3)
            raw_output = self.proc.stdout.readline()
            if verbose:
                print("raw_output:", raw_output)
            output = json.loads(raw_output)
            if verbose:
                print("<=:", datetime.now().strftime('%H:%M:%S.%f'))
                pprint(output)
                print()
            if 'response' in output:
                if output['response'] == expected_response and output['seq_num'] == seq_num:
                    return output
                # TODO: Handle error differently.  It means the JSON is bad
                elif output['response'] == 'error':
                    return output
                # record messages since they may point to errors in the lean code
                elif output['response'] == 'all_messages':
                    self.all_messages = output['msgs']
                    self.all_messages_time = self.cntr2
                # record tasks to know when Lean has stopped processing file
                elif output['response'] == 'current_tasks':
                    self.current_tasks = output['tasks']
                    self.current_tasks_time = self.cntr2
    
    def send_sync_request(self, file_name, content, verbose=False):
#         request = {
#             'command':'sync',
#             'file_name': file_name,
#             "content": content,
#         }

        request = {
            "jsonrpc": "2.0",
            "id": 1,  # Or another integer, if managing multiple concurrent requests
            "method": "textDocument/didOpen",
            "params": {
                "textDocument": {
                    "uri": "file://" + file_name,  # Ensure this is a proper URI to the file
                    "languageId": "lean4",  # This might need to be adjusted based on server expectations
                    "version": 1,  # Increment this with each change if managing versions
                    "text": content  # The content of the document being opened
                }
            }
        }
        
#         request = {
#             "jsonrpc":"2.0",
#             "method":"textDocument/didOpen",
#             "params":{
#                 "textDocument":{
#                     "uri":"file:///test.lean",
#                     "languageId":"lean",
#                     "version":1,
#                     "text":"import Init.System.IO\n\ndef n : Nat := 1234\n\n#check n\n\ndef s : String := \"abcd\"\n\n#check s\n\ndef hello : IO Unit := IO.println \"Hello world!\"\n\n#eval hello\n\ndef αβγ /- 😉😉😉😉 -/: Nat := \"NotANat\"\n\nnamespace MyNs\n\ndef u : Unit := ()\n\nend MyNs\n\n#check MyNs.u\n\n#print MyNs.u\n"
#                 }
#             }
#         }
        
        return self.send_request(request, expected_response='ok', verbose=verbose, sleep=0.0)
    
    def send_info_request(self, file_name, line, column, verbose=False):
        request = {            
            'command':'info',
            'file_name': file_name,
            'column': column,
            'line': line
        }
        
        return self.send_request(request, expected_response='ok', verbose=verbose, sleep=0.0)

In [23]:
# sync file (the "file" doesn't need to exist)
with LeanServer() as lean_server:
    print(1)
    response1 = lean_server.send_sync_request(
        file_name='/tmp/fake_lean_file.lean', 
        content='example (p: Prop) : p = p := begin sorry end',
        verbose=True)
    print(2)
    time.sleep(1)
    response2 = lean_server.send_info_request(
        file_name='/tmp/fake_lean_file.lean', 
        line=1, 
        column=0)
    
    # the responses themselves aren't always useful
    display(response1)
    display(response2)
    
    # the "all_messages" responses are more useful
    display(lean_server.all_messages)

1


Watchdog error: Cannot read LSP request: Stream was closed


AttributeError: 'str' object has no attribute 'copy'

In much of the code, use the optional parameter `verbose=True` to see the message traffic.

In [4]:
# sync file (the "file" doesn't need to exist)
with LeanServer() as lean_server:
    lean_server.send_sync_request(
        file_name='/tmp/fake_lean_file.lean', 
        content='example (p: Prop) : p = p := begin sorry end',
        verbose=True)
    lean_server.send_info_request(
        file_name='/tmp/fake_lean_file.lean', 
        line=1,
        column=0,
        verbose=True)


=>: 10:45:37.131428
('{"content": "example (p: Prop) : p = p := begin sorry end", "file_name": '
 '"/tmp/fake_lean_file.lean", "seq_num": 0, "command": "sync"}')

<=: 10:45:37.387175
{'is_running': False, 'response': 'current_tasks', 'tasks': []}

<=: 10:45:37.605599
{'is_running': False, 'response': 'current_tasks', 'tasks': []}

<=: 10:45:37.808850
{'is_running': False, 'response': 'current_tasks', 'tasks': []}

<=: 10:45:37.890468
{'message': 'file invalidated', 'response': 'ok', 'seq_num': 0}


=>: 10:45:37.890757
('{"line": 1, "column": 0, "file_name": "/tmp/fake_lean_file.lean", "seq_num": '
 '1, "command": "info"}')

<=: 10:45:37.891602
{'response': 'ok', 'seq_num': 1}



## Testing tactics with the Lean Server

Again, tactic-based theorem proving is like a puzzle.  You go from state to state exploring.  What we need is a way to test if we can use a tactic on a goal and what the next goals are.  This is what the LeanTacticTestingInterface does.  (Note, it is much faster to ask the Lean server about multiple goal-tactic pairs instead of just one at a time.)

In [5]:
# For storing information about a Lean goal
LeanGoal = collections.namedtuple('LeanGoal', ['universes', 'assumptions', 'conclusion'])

In [6]:
inspection_tools = r"""
namespace inspection_tools

def join (sep : string) : list string → string
| [x]     := x
| []      := ""
| (x::xs) := x ++ sep ++ join xs

meta def expr_to_string (e : expr bool.tt) : tactic string :=
do
  o ← tactic.get_options,
  tactic.set_options (options.mk.set_bool `pp.all tt),
  f ← tactic.pp e,
  tactic.set_options o,  -- set back to before
  return $ to_string f
  
meta def local_cxt_to_string (v : expr bool.tt) : tactic string := 
do 
  tp ← tactic.infer_type v,
  v_str ← expr_to_string v,
  tp_str ← expr_to_string tp,
  return $ v_str ++ "\n\n" ++ tp_str

meta def goal_to_string (g : expr) : tactic string :=
do 
  tactic.set_goals [g],
  goal ← tactic.target,
  local_cxt ← tactic.local_context,
  let local_cxt_len := list.length local_cxt,
  goal_str ← expr_to_string goal,
  local_cxt_strs ← (list.mmap local_cxt_to_string local_cxt),
  let s1 := goal_str ++ "\n\n",
  let s2 := "Local Context Vars: " ++ (to_string local_cxt_len) ++ "\n\n",
  let s3 := join "\n\n" local_cxt_strs,
  return $ s1 ++ s2 ++ s3

meta def state_report : tactic string :=
do 
 gs ← tactic.get_goals,
 -- loop over all goals (has effect of resetting the goal each time)
 let gs_len := list.length gs,
 goal_strings ← gs.mmap goal_to_string,
 tactic.set_goals gs,  -- set goals back
 let s := "Goals: " ++ (to_string gs_len) ++ "\n\n" ++ (join "\n\n" goal_strings),
 return s
 
meta def trace_goal_state : tactic unit :=
do 
 s ← state_report,
 tactic.trace s,
 return ()

end inspection_tools
"""

default_custom_code = r"""
namespace custom
 
meta def trace_custom_state : tactic unit :=
do 
 tactic.trace "",  -- make more interesting
 return ()

end custom
"""

In [7]:
class LeanTacticTestingInterface:
    """
    Interface to test a tactic on a goal (or multiple tactics on multiple goals)
    """
    def __init__(self):
        self.lean_server = LeanServer()
        
        self.msg_cntr = 0
        self.file_name = 'dummy.lean'   # the name of the "file" being checked (not a real file)
        self.dummy_file = 'dummy.lean'  # a file always kept empty 
        self.eval_line_pos = None       # used for checking that a file is complete
        
        # set up a dummy file for later use
        self.lean_server.send_sync_request(file_name=self.dummy_file, content="")
        
        # one can implement their own custom state trace.  This is the default.
        self.custom_code = default_custom_code
    
     # make into a context manager so that it closes lean server automatically
    def __enter__(self):
        self.lean_server.__enter__()
        return self
    
    def __exit__(self, type, value, traceback):
        self.lean_server.__exit__(type, value, traceback)
        
    def parse_state(self, universes, goal_state):
        """
        Parses the trace string returned by trace_goal_state
        """
        lines = iter(' '.join(l.split()) for l in goal_state.split("\n\n"))
        
        goal_cnt = int(next(lines).split()[-1])
        
        goals = []
        for _ in range(goal_cnt):
            target = next(lines)
            assump_cnt = int(next(lines).split()[-1])
            assumptions = []
            for _ in range(assump_cnt):
                v = next(lines)
                tp = next(lines)
                assumptions.append(v + " : " + tp)
            goal = LeanGoal(universes, assumptions, target)
            goals.append(goal)

        return goals
    
    def check_content(self, content, verbose):
        """
        Have Lean check the contents of a Lean "file". 
        """
        self.msg_cntr += 1
        
        # empty out the previous file for good measure
        self.lean_server.send_sync_request(file_name=self.file_name, content="", verbose=verbose)
        
        # use a few different file names to help avoid old data in the messages
        self.file_name = 'fake_lean_file{}.lean'.format(self.msg_cntr % 10)
        
        # put an eval at the end to make sure the response is not from an old file
        extended_content = content + "\n#eval " + str(self.msg_cntr)
        self.eval_line_pos = extended_content.count("\n") + 1
        
        if verbose:
            print("-- {} --------------".format(self.file_name))
            for i, l in enumerate(extended_content.split('\n')):
                print("{:3}   {}".format(i+1, l))
            print("-------------------------")
            
        # send the file
        self.lean_server.send_sync_request(file_name=self.file_name, content=extended_content, verbose=verbose)
        
    def get_messages(self, verbose):
        """
        Get messages for the current content being checked.
        Return all the messages for that "file"
        
        This is really really hard to make robust.  How do I know Lean has finished
        or worst that Lean isn't still providing information from an old
        version of the file?
        """
        
        # wait for tasks to stop running
        tasks_stopped_time = None
        for _ in range(10000):  # lazy time-out mechanism
            
            # keep sending requests to a dummy file to collect more "all_messages" responses
            self.lean_server.send_info_request(file_name=self.dummy_file, line=1, column=0, verbose=verbose)
            
            # check that tasks are complete for file_name
            task_running = False
            for task in self.lean_server.current_tasks:
                if task['file_name'] == self.file_name:
                    task_running = True
                    break
            if not task_running:
                tasks_stopped_time = self.lean_server.current_tasks_time
                break
        else:
            raise TimeoutError("Tasks still running for {}:\n{}".format('file_name', "\n".join(self.lean_server.current_tasks)))
            
                
        # check that messages are complete by looking at eval at the end
        for _ in range(10000):  # lazy time-out mechanism
            
            # keep sending requests to a dummy file to collect more "all_messages" responses
            self.lean_server.send_info_request(self.dummy_file, line=1, column=0, verbose=verbose)
            
            # wait until messages are after current tasks
            #if self.lean_server.all_messages_time < tasks_stopped_time:
            #    continue
                
            # filter the messages to those for this "file" 
            # and also check for the final the eval statement
            filtered_messages = []
            messages_are_complete = False
            for msg in self.lean_server.all_messages:
                if msg['file_name'] == self.file_name:
                    if msg['pos_line'] < self.eval_line_pos:
                        filtered_messages.append(msg)
                    elif msg['pos_line'] == self.eval_line_pos and msg['caption'] == "eval result":
                        if msg['text'] == str(self.msg_cntr):
                            messages_are_complete = True
                        else:
                            break

            # clear messages
            self.lean_server.all_messages = []

            if messages_are_complete:
                return filtered_messages
            
        raise TimeoutError("Expecting eval result on line {} of file {} to be {}.  Run with verbose=True to check for error messages.".format(
                self.eval_line_pos, self.file_name, self.msg_cntr
        ))
    
    def preamble(self, imports):
        if imports:
            import_line = "import " + " ".join(imports) + "\n"
        else:
            import_line = ""
        
        return import_line + inspection_tools + "\n" + self.custom_code + "\n"
        
    def example_block(self, goal, tactic):
        block = ""
        # line: preamble + 9 * n + 0
        block += "section example_block\n"
        # line: preamble + 9 * n + 1
        if goal.universes:
            block += "universes " + " ".join(goal.universes) + "\n"
        else:
            block += "--no universes\n"
        # line: preamble + 9 * n + 2  
        block += "example "
        block += " ".join("(" + l.replace("\n", " ").replace("…", "_redacted_code_too_long_") + ")" for l in goal.assumptions)
        block += " : "
        block += goal.conclusion.replace("\n", " ").replace("…", "_redacted_code_too_long_") + " :=\n"
        # line: preamble + 9 * n + 3
        block += "begin\n"  
        # line: preamble + 9 * n + 4
        block += tactic + ",\n"
        # line: preamble + 9 * n + 5
        block += 'inspection_tools.trace_goal_state,\n' # goal state
        # line: preamble + 9 * n + 6
        block += 'custom.trace_custom_state,\n'         # custom trace
        # line: preamble + 9 * n + 7
        block += "end\n"
        # line: preamble + 9 * n + 8
        block += "end example_block\n"

        return block
            
    def apply_tactic_multiple(self, lean_goals, tactics, imports=[], verbose=False):
        assert len(lean_goals) == len(tactics)
        
        # build content
        preamble = self.preamble(imports)
        
        # this is the line (indexed from 1) that the first block will be on
        preamble_length = preamble.count("\n") + 1
        
        content = preamble
        for goal, tactic in zip(lean_goals, tactics):
            content += self.example_block(goal, tactic)
        
        # start checking the file
        self.check_content(content, verbose=verbose)
            
        # positions in the block
        theorem_line = 2
        tactic_line = 4
        goals_line = 5
        custom_state_line = 6
        block_length = 9
        
        for _ in range(1000):  # lazy timeout method
            # start checking the file
            msgs = self.get_messages(verbose=verbose)
        
            # extract data from messages
            results = [[None, None, [], []] for _ in lean_goals]
            for msg in msgs:
                block_line_pos = (msg['pos_line'] - preamble_length) % block_length
                block_num = (msg['pos_line'] - preamble_length) // block_length

                if msg['pos_line'] < preamble_length:
                    continue
                elif block_line_pos == goals_line and msg['severity'] == 'information':
                    results[block_num][0] = msg['text']
                elif block_line_pos == custom_state_line and msg['severity'] == 'information':
                    results[block_num][1] = msg['text']
                elif block_line_pos == tactic_line and msg['severity'] == 'error':
                    results[block_num][2].append(msg['text'])
                elif block_line_pos == theorem_line and msg['severity'] == 'error':
                    results[block_num][3].append(msg['text'])

            # parse data and check for completeness
            final_results = []
            complete = True
            for i, ((goal_state, custom_state, tactic_errors, theorem_errors), prev_goal) in enumerate(zip(results, lean_goals)):
                if theorem_errors:
                    result = ('theorem_error', theorem_errors, '')
                elif tactic_errors:
                    result = ('tactic_failure', tactic_errors, '')
                elif goal_state is not None and custom_state is not None:
                    result = ('success', self.parse_state(prev_goal.universes, goal_state), custom_state.strip())
                else:
                    complete = False  # need to get messages again
                    break

                final_results.append(result)

            if complete:
                return final_results
        
        raise TimeoutError("Lean server sessages are not as expected:\n{}".format("\n".join(msgs)))
        
    def apply_tactic(self, lean_goal, tactic, imports=[], verbose=False):
        """
        Apply tactic to a goal and see what you get.
        """
        
        result = self.apply_tactic_multiple([lean_goal], [tactic], imports=imports, verbose=verbose)
        return result[0]

We see if we apply `refl` to `p = p` we solve all the goals (hence the empty list returned).

In [8]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal([], ['p q : Prop'],  'p = p')
    display(lean_interface.apply_tactic(goal, 'refl'))

('success', [], '')

One can also add universe levels, e.g. `u`, `v`, as follows:

In [9]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u', 'v'], ['a b: Type u'],  'a = a')
    display(lean_interface.apply_tactic(goal, 'refl'))

('success', [], '')

Imports can also be added to the `apply_tactic` call:

In [10]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u'], ['α : Type u'],  'dlist.to_list (@dlist.empty α) = []')
    display(lean_interface.apply_tactic(goal, 'refl', imports=['data.dlist']))

('success', [], '')

To see what is going on, we show the logs with `verbose=True`.  This also prints the file being sent to Lean.  

The main idea is that we encode everything we want to check into a Lean file, send that via a "sync" request, and read off the response from the messages.

In [11]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u', 'v'], ['a b: Type u'],  'a = a')
    display(lean_interface.apply_tactic(goal, 'refl', verbose=True))


=>: 10:45:45.544907
'{"content": "", "file_name": "dummy.lean", "seq_num": 1, "command": "sync"}'

<=: 10:45:45.545603
{'message': 'file unchanged', 'response': 'ok', 'seq_num': 1}

-- fake_lean_file1.lean --------------
  1   
  2   namespace inspection_tools
  3   
  4   def join (sep : string) : list string → string
  5   | [x]     := x
  6   | []      := ""
  7   | (x::xs) := x ++ sep ++ join xs
  8   
  9   meta def expr_to_string (e : expr bool.tt) : tactic string :=
 10   do
 11     o ← tactic.get_options,
 12     tactic.set_options (options.mk.set_bool `pp.all tt),
 13     f ← tactic.pp e,
 14     tactic.set_options o,  -- set back to before
 15     return $ to_string f
 16     
 17   meta def local_cxt_to_string (v : expr bool.tt) : tactic string := 
 18   do 
 19     tp ← tactic.infer_type v,
 20     v_str ← expr_to_string v,
 21     tp_str ← expr_to_string tp,
 22     return $ v_str ++ "\n\n" ++ tp_str
 23   
 24   meta def goal_to_string (g : expr) : tactic string :=
 25  

('success', [], '')

In [12]:
# lots of examples
with LeanTacticTestingInterface() as lean_interface:
    goal1 = LeanGoal([], ['p q : Prop'],  'p = p')
    display(lean_interface.apply_tactic(goal1, 'refl'))
    display(lean_interface.apply_tactic(goal1, 'skip'))
    display(lean_interface.apply_tactic(goal1, 'done'))
    display(lean_interface.apply_tactic(goal1, 'abc123'))
    goal2 = LeanGoal([], ['n m : nat'],  'n = n')
    display(lean_interface.apply_tactic(goal2, 'induction n'))
    goal3 = LeanGoal([], [], 'abc = xyz')
    display(lean_interface.apply_tactic(goal3, 'skip'))
    display(lean_interface.apply_tactic(goal3, 'fgh'))

('success', [], '')

('success',
 [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop'], conclusion='@eq.{1} Prop p p')],
 '')

('tactic_failure',
 ['done tactic failed, there are unsolved goals\nstate:\np q : Prop\n⊢ p = p'],
 '')

('tactic_failure',
 ["unknown identifier 'abc123'",
  "don't know how to synthesize placeholder\ncontext:\np q : Prop\n⊢ Type ?"],
 '')

('success',
 [LeanGoal(universes=[], assumptions=['m : nat'], conclusion='@eq.{1} nat nat.zero nat.zero'),
  LeanGoal(universes=[], assumptions=['m : nat', 'n_n : nat', 'n_ih : @eq.{1} nat n_n n_n'], conclusion='@eq.{1} nat (nat.succ n_n) (nat.succ n_n)')],
 '')

('theorem_error',
 ["unknown identifier 'abc'",
  "unknown identifier 'xyz'",
  "don't know how to synthesize placeholder\ncontext:\n⊢ Sort ?"],
 '')

('theorem_error',
 ["unknown identifier 'abc'",
  "unknown identifier 'xyz'",
  "don't know how to synthesize placeholder\ncontext:\n⊢ Sort ?"],
 '')

In [13]:
# examples of executing in batch
with LeanTacticTestingInterface() as lean_interface:
    goal1 = LeanGoal([], ['p q : Prop'],  'p = p')
    goal2 = LeanGoal([], ['n m : nat'],  'n = n')
    goal3 = LeanGoal([], [], 'abc = xyz')
    
    goals = [goal1, goal1, goal1, goal1, goal2, goal3, goal3]
    tactics = ['refl', 'skip', 'done', 'abc123', 'induction n', 'skip', 'fgh']
    
    display(lean_interface.apply_tactic_multiple(goals, tactics))

[('success', [], ''),
 ('success',
  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop'], conclusion='@eq.{1} Prop p p')],
  ''),
 ('tactic_failure',
  ['done tactic failed, there are unsolved goals\nstate:\np q : Prop\n⊢ p = p'],
  ''),
 ('tactic_failure',
  ["unknown identifier 'abc123'",
   "don't know how to synthesize placeholder\ncontext:\np q : Prop\n⊢ Type ?"],
  ''),
 ('success',
  [LeanGoal(universes=[], assumptions=['m : nat'], conclusion='@eq.{1} nat nat.zero nat.zero'),
   LeanGoal(universes=[], assumptions=['m : nat', 'n_n : nat', 'n_ih : @eq.{1} nat n_n n_n'], conclusion='@eq.{1} nat (nat.succ n_n) (nat.succ n_n)')],
  ''),
 ('theorem_error',
  ["unknown identifier 'abc'",
   "unknown identifier 'xyz'",
   "don't know how to synthesize placeholder\ncontext:\n⊢ Sort ?"],
  ''),
 ('theorem_error',
  ["unknown identifier 'abc'",
   "unknown identifier 'xyz'",
   "don't know how to synthesize placeholder\ncontext:\n⊢ Sort ?"],
  '')]

### Speed and robustness
Unfortunately, this is quit slow.

In [14]:
# time it
total_runs = 50
total_time = 0
goal = LeanGoal([], [],  'true')
with LeanTacticTestingInterface() as lean_interface:
    # give the system a chance to "warm up"
    lean_interface.apply_tactic(goal, 'refl')
    time.sleep(2)
    
    for i in range(total_runs):
        a = time.time()
        response = lean_interface.apply_tactic(goal, 'refl')
        b = time.time()
        total_time += b - a
"Each call to the Lean interface takes on average {} seconds".format(total_time / total_runs)

'Each call to the Lean interface takes on average 0.31146204471588135 seconds'

But sending multiple calls to Lean speeds things up.

In [15]:
# time running multiple goal-tactics at a time 
total_runs = 50
batch_size = 32
total_time = 0
goals = [LeanGoal([], [],  'true')] * batch_size
tactics = ['refl'] * batch_size
with LeanTacticTestingInterface() as lean_interface:
    # give the system a chance to "warm up"
    lean_interface.apply_tactic(LeanGoal([], [],  'true'), 'refl')
    time.sleep(2)
    
    for i in range(total_runs):
        a = time.time()
        response = lean_interface.apply_tactic_multiple(goals, tactics)
        b = time.time()
        total_time += b - a
display("Each call to the Lean interface takes on average {} seconds".format(total_time / total_runs))
display("Each goal/tactic pair (w/ batch size {}) takes on average {} seconds".format(batch_size, total_time / total_runs / batch_size))

'Each call to the Lean interface takes on average 0.4297070026397705 seconds'

'Each goal/tactic pair (w/ batch size 32) takes on average 0.013428343832492829 seconds'

It is very difficult to make this process robust. If one is not careful, the results returned by `apply_tactic_multiple` could be for a previous call to the method.  There are ways to get around this, but if one is not careful they would significantly increase Lean's memory usage over time.  (For example, using a new file name for every "sync" request would be bad, since Lean caches the results of each file.)  This is a test that `apply_tactic_multiple` is robust.

In [16]:
# test robustness.  Make sure we are getting correct results back 
# and not results for a previous request.
rounds = 100
with LeanTacticTestingInterface() as lean_interface:
    for r in range(rounds):
        count = rounds - r
        goals = [LeanGoal([], ['p_{}_{} : Prop'.format(r, i)],  'p_{}_{}'.format(r, i)) for i in range(count)]
        
        if r % 3 == 0:
            tactics = ['skip' for _ in range(count)]
            results = lean_interface.apply_tactic_multiple(goals, tactics)
            assert len(results) == count
            for i, (response, new_goals, _) in enumerate(results):
                assert response == 'success'
                assert new_goals[0].conclusion == 'p_{}_{}'.format(r, i)
        else:
            tactics = ['simp' for _ in range(count)]
            results = lean_interface.apply_tactic_multiple(goals, tactics)
            assert len(results) == count
            for i, (response, _, _) in enumerate(results):
                assert response == 'tactic_failure', response
"Passed tests"

'Passed tests'

### Stepping through a full proof

In [17]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "induction n"))

('success',
 [LeanGoal(universes=[], assumptions=['m : nat'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add m nat.zero) (@has_add.add.{0} nat nat.has_add nat.zero m)'),
  LeanGoal(universes=[], assumptions=['m : nat', 'n_n : nat', 'n_ih : @eq.{1} nat (@has_add.add.{0} nat nat.has_add m n_n) (@has_add.add.{0} nat nat.has_add n_n m)'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add m (nat.succ n_n)) (@has_add.add.{0} nat nat.has_add (nat.succ n_n) m)')],
 '')

In [18]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(
        LeanGoal([], ['m n_n : ℕ','n_ih : m + n_n = n_n + m'], 'm + nat.succ n_n = nat.succ n_n + m'), 
        "induction m"))

('success',
 [LeanGoal(universes=[], assumptions=['n_n : nat', 'n_ih : @eq.{1} nat (@has_add.add.{0} nat nat.has_add nat.zero n_n) (@has_add.add.{0} nat nat.has_add n_n nat.zero)'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add nat.zero (nat.succ n_n)) (@has_add.add.{0} nat nat.has_add (nat.succ n_n) nat.zero)'),
  LeanGoal(universes=[], assumptions=['n_n : nat', 'm_n : nat', 'm_ih : @eq.{1} nat (@has_add.add.{0} nat nat.has_add m_n n_n) (@has_add.add.{0} nat nat.has_add n_n m_n) → @eq.{1} nat (@has_add.add.{0} nat nat.has_add m_n (nat.succ n_n)) (@has_add.add.{0} nat nat.has_add (nat.succ n_n) m_n)', 'n_ih : @eq.{1} nat (@has_add.add.{0} nat nat.has_add (nat.succ m_n) n_n) (@has_add.add.{0} nat nat.has_add n_n (nat.succ m_n))'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add (nat.succ m_n) (nat.succ n_n)) (@has_add.add.{0} nat nat.has_add (nat.succ n_n) (nat.succ m_n))')],
 '')

In [19]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(
        LeanGoal([], ['n_n : ℕ','n_ih : 0 + n_n = n_n + 0'],
         '0 + nat.succ n_n = nat.succ n_n + 0'), 
        "simp"))

('success', [], '')

In [20]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(
        LeanGoal([], ['n_n m_n : ℕ',
         'm_ih : m_n + n_n = n_n + m_n → m_n + nat.succ n_n = nat.succ n_n + m_n',
         'n_ih : nat.succ m_n + n_n = n_n + nat.succ m_n'],
         'nat.succ m_n + nat.succ n_n = nat.succ n_n + nat.succ m_n'), 
        "simp"))

('success', [], '')

## Creating a simple Lean solver (with breath first search) in Python

Here we put all of this together to create (a non-practical) BFS prover.  Some highlights:
- Some tactics like `apply` take a parameter.  We search the local context for all possible things to put in this parameter.  Note, we do no type checking before sending it to Lean.  The user inputs the tactic as `apply {}` to signify that something needs to be filled in from the context.
- Our setup assumes the tactic is only applied to the first goal.  This isn't true of all tactics, so we force this by "treeifying" our proof, that is adding brackets `{ ... }` where necessary.
- Sending multiple calls to Lean at the same time makes it much faster.  This is the only difference between the "fast" version.
- We provide a way to verify a proof in the end (sending the whole proof as if it was a single tactic), and also walking through the proof to get information (which might be useful for machine-learning-like training).
- This is just a proof-of-concept and not very useful in itself.

In [21]:
def premises_from_goal(goal):
    prems = []
    for l in goal.assumptions:
        a, _ = l.split(":")
        for v in a.strip().split(" "):
            prems.append(v)
    return prems

def tactic_applications(tactics, prems):
    tacs = []
    for t in tactics:
        if "{}" in t:
            for p in prems:
                tacs.append(t.replace("{}", p))
        else:
            tacs.append(t)
    return tacs

def treeify_proof(proof_list):
    s = ""
    goal_cnts = [1]  # open goal counts at each depth
    for tactic, new_goal_cnt in proof_list:
        while True:
            # take goals
            goal_cnt = goal_cnts.pop()

            # if no goals at this depth, end block and take off stack again
            if goal_cnt == 0:
                s += ", }"
                continue
            
            if goal_cnt == 1:
                if s: # don't put comma in front
                    s += ", "
                
            # if more than one goal, take one of them, start block and put rest back on stack
            if goal_cnt > 1:
                goal_cnts.append(goal_cnt - 1)
                s += ", { "
            
            break
            
        # apply tactic
        s += tactic
        
        # put remaining goals on stack
        goal_cnts.append(new_goal_cnt)
            
    return s 

class BreathFirstProofSearch(LeanTacticTestingInterface):
        
    def proof_search(self, lean_goal, tactics, verbose=False):
        starting_goal = lean_goal
        starting_goal_list = (starting_goal,)
        empty_tactics = tuple()
        goal_list_queue = collections.deque()
        goal_list_queue.append((starting_goal_list, empty_tactics))

        while goal_list_queue:
            goal_list, ts = goal_list_queue.popleft()
            goal = goal_list[0]
            prems = premises_from_goal(goal)
            all_tactics = tactic_applications(tactics, prems)
            for t in all_tactics:
                response, new_goals, _ = self.apply_tactic(goal, t, verbose=verbose)
                if response == 'success':
                    new_goal_list = tuple(new_goals) + goal_list[1:]
                    new_ts = ts + ((t, len(new_goals)),)
                    if not new_goal_list:
                        return treeify_proof(new_ts)

                    goal_list_queue.append((new_goal_list, new_ts))
        return None
    
    def proof_search_fast(self, lean_goal, tactics, verbose=False):
        starting_goal = lean_goal
        starting_goal_list = (starting_goal,)
        empty_tactics = tuple()
        goal_list_queue = collections.deque()
        goal_list_queue.append((starting_goal_list, empty_tactics))

        while goal_list_queue:
            goal_list, ts = goal_list_queue.popleft()
            goal = goal_list[0]
            prems = premises_from_goal(goal)
            all_tactics = tactic_applications(tactics, prems)
            
            # faster to batch process all apply_tactic calls
            results = self.apply_tactic_multiple([goal] * len(all_tactics), all_tactics, verbose=verbose)
            for t, (response, new_goals, _) in zip(all_tactics, results):
                if response == 'success':
                    new_goal_list = tuple(new_goals) + goal_list[1:]
                    new_ts = ts + ((t, len(new_goals)),)
                    if not new_goal_list:
                        return treeify_proof(new_ts)

                    goal_list_queue.append((new_goal_list, new_ts))
        return None
    
    def is_valid_proof(self, goal, proof, verbose=False):
        t = proof.replace("\n", "")  # treat proof as a single tactic
        response, goals, _ = self.apply_tactic(goal, t, verbose=verbose)
        if response == 'success' and goals == []:
            return True
        return goals
    
    def walk_through_proof(self, lean_goal, proof):
        ts = proof.replace(" {", "").replace("}, ", "").split(", ")
        goals = [lean_goal]
        for t in ts:
            goal = goals[0]
            print(" Primary Goal:  ", goal)
            print(" Tactic applied:", t)
            _, new_goals, _ = self.apply_tactic(goal, t)
            goals = new_goals + goals[1:]
            print(" New Goals: ", new_goals)
            print(" All Goals: ", goals)
            print()

In [22]:
def example(goal, tactics, verbose=False):
    with BreathFirstProofSearch() as bfs:
        print("Goal:", goal)
        print("Tactics:", tactics)
        print()
        start = time.time()
        proof = bfs.proof_search_fast(goal, tactics, verbose=verbose)
        end = time.time()
        print("Time to proof:", end-start, "seconds")
        print("Proof:", proof)
        print()
        print("Verified:", bfs.is_valid_proof(goal, proof, verbose=verbose))
        print()
        print("Walk through proof:")
        bfs.walk_through_proof(goal, proof)

In [23]:
example(
    goal = LeanGoal([], ['p q r : Prop'], 'p ∧ q → q ∧ p'), 
    tactics=['apply {}', 'cases {}', 'intro', 'split', 'left', 'right'],
)

Goal: LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='p ∧ q → q ∧ p')
Tactics: ['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']

Time to proof: 2.7585809230804443 seconds
Proof: intro, cases a, split, { apply a_right, }, apply a_left

Verified: True

Walk through proof:
 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='p ∧ q → q ∧ p')
 Tactic applied: intro
 New Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a : and p q'], conclusion='and q p')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a : and p q'], conclusion='and q p')]

 Primary Goal:   LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a : and p q'], conclusion='and q p')
 Tactic applied: cases a
 New Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a_left : p', 'a_right : q'], conclusion='and q p')]
 All Goals:  [LeanGoal(universes=[], a

In [24]:
example(
    goal = LeanGoal([], ['p q r : Prop'], '(p -> (q -> r)) <-> (p /\ q -> r)'), 
    tactics=['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']
)

Goal: LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(p -> (q -> r)) <-> (p /\\ q -> r)')
Tactics: ['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']

Time to proof: 7.318338871002197 seconds
Proof: split, { intro, intro, cases a_1, apply a, { apply a_1_left, }, apply a_1_right, }, intro, intro, intro, apply a, split, { apply a_1, }, apply a_2

Verified: True

Walk through proof:
 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(p -> (q -> r)) <-> (p /\\ q -> r)')
 Tactic applied: split
 New Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop'], conclusion='(p → q → r) → and p q → r'), LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop'], conclusion='(and p q → r) → p → q → r')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop'], conclusion='(p → q → r) → and p q → r'), LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop'], concl

In [25]:
example(
    goal = LeanGoal([], ['p q r : Prop'], '¬(p ↔ ¬p)'), 
    tactics=['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']
)

Goal: LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='¬(p ↔ ¬p)')
Tactics: ['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']

Time to proof: 45.084619998931885 seconds
Proof: intro, cases a, apply a_mp, { apply a_mpr, intro, apply a_mp, { apply a, }, apply a, }, apply a_mpr, intro, apply a_mp, { apply a, }, apply a

Verified: True

Walk through proof:
 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='¬(p ↔ ¬p)')
 Tactic applied: intro
 New Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a : iff p (not p)'], conclusion='false')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a : iff p (not p)'], conclusion='false')]

 Primary Goal:   LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', 'a : iff p (not p)'], conclusion='false')
 Tactic applied: cases a
 New Goals:  [LeanGoal(universes=[], assumptions=['p : Prop', 'q : Prop', 'r : Prop', '

## Custom printing of Lean expressions

Our `apply_tactic` will return a list of goals, using Lean's verbose pp.all format.  Hence, `m n : ℕ ⊢ m + n = n + m` is returned as `LeanGoal(['m n : nat'], "@eq.{1} nat (@has_add.add.{0} nat nat.has_add m n) (@has_add.add.{0} nat nat.has_add n m)")`.  On one hand, this is not very readable by a human.  On the other hand, it is not as machine parsable as it could be.  It is however possible to have `apply_tactic` output the goals in other formats (or print other relevant information about the goal state).  

For example, here we write a simple piece of code which prints the Lean goals just as Lean does.

In [26]:
simple_custom_code = r"""
namespace custom
 
meta def trace_custom_state : tactic unit :=
do 
 tactic.trace_state,  -- default Lean pretty printed goals
 return ()

end custom
"""

In [27]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_code = simple_custom_code
    _, _, custom_msg = (lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "induction n"))
print(custom_msg)

2 goals
case nat.zero
m : ℕ
⊢ m + 0 = 0 + m

case nat.succ
m n_n : ℕ,
n_ih : m + n_n = n_n + m
⊢ m + nat.succ n_n = nat.succ n_n + m


On the other end, we may which to print the goals as expressions.  Here is  code to do that.  (It is basically the same code we are using in the LeanTacticTestingInterface above except with s-expressions instead of pp.all formatting.)

In [28]:
sexp_custom_code = r"""
namespace custom

def join (sep : string) : list string → string
| [x]     := x
| []      := ""
| (x::xs) := x ++ sep ++ join xs

meta def expr_to_string (e : expr bool.tt) : tactic string :=
do
  return $ to_string e.to_raw_fmt
  
meta def local_cxt_to_string (v : expr bool.tt) : tactic string := 
do 
  tp ← tactic.infer_type v,
  v_str ← expr_to_string v,
  tp_str ← expr_to_string tp,
  return $ v_str ++ "\n\n" ++ tp_str

meta def goal_to_string (g : expr) : tactic string :=
do 
  tactic.set_goals [g],
  goal ← tactic.target,
  local_cxt ← tactic.local_context,
  let local_cxt_len := list.length local_cxt,
  goal_str ← expr_to_string goal,
  local_cxt_strs ← (list.mmap local_cxt_to_string local_cxt),
  let s1 := goal_str ++ "\n\n",
  let s2 := "Local Context Vars: " ++ (to_string local_cxt_len) ++ "\n\n",
  let s3 := join "\n\n" local_cxt_strs,
  return $ s1 ++ s2 ++ s3

meta def state_report : tactic string :=
do 
 gs ← tactic.get_goals,
 -- loop over all goals (has effect of resetting the goal each time)
 let gs_len := list.length gs,
 goal_strings ← gs.mmap goal_to_string,
 tactic.set_goals gs,  -- set goals back
 let s := "Goals: " ++ (to_string gs_len) ++ "\n\n" ++ (join "\n\n" goal_strings),
 return s
 
meta def trace_custom_state : tactic unit :=
do 
 s ← state_report,
 tactic.trace s,
 return ()

end custom
"""

In [29]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_code = sexp_custom_code
    _, _, custom_msg = (lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "induction n"))
print(custom_msg)

Goals: 2

(app (app (app (const eq [1]) (const nat [])) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (local_const 0._fresh.12.7660 m (const 1 []))) (const nat.zero []))) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (const nat.zero [])) (local_const 0._fresh.12.7660 m (const 1 []))))

Local Context Vars: 1

(local_const 0._fresh.12.7660 m (const 1 []))

(const nat [])

(app (app (app (const eq [1]) (const nat [])) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (local_const 0._fresh.12.7660 m (const 1 []))) (app (const nat.succ []) (local_const 0._fresh.11.2205 n_n (const 1 []))))) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (app (const nat.succ []) (local_const 0._fresh.11.2205 n_n (const 1 [])))) (local_const 0._fresh.12.7660 m (const 1 []))))

Local Context Vars: 3

(local_const 0._fresh.12.7660 m (const 1 []))

(const nat [])

(loc

Here is code which turns each goal into a single s-expression.  The format is `(goal <intro_cnt> <expr>)` where `<expr>` is the expression and `<intro_cnt>` is the number of `intro`s one has to do on the expression to get back to the goal.  For example `m n : ℕ ⊢ m + n = n + m` is stored as `∀ m n : ℕ, m + n = n + m` so `intro_cnt` is 2 since to `intro`s are needed to reproduce `m n : ℕ ⊢ m + n = n + m`.

In [30]:
sexp_custom_code = r"""
namespace custom

def join (sep : string) : list string → string
| [x]     := x
| []      := ""
| (x::xs) := x ++ sep ++ join xs

meta def expr_to_string (e : expr bool.tt) : tactic string :=
do
  return $ to_string e.to_raw_fmt

meta def goal_to_string (g : expr) : tactic string :=
do 
  n ← tactic.revert_all,
  goal ← tactic.target,
  tactic.iterate_exactly n (do tactic.intro1, return ()),
  goal_str ← expr_to_string goal,
  let s := "(goal " ++ (to_string n) ++ " " ++ goal_str ++ ")",
  tactic.rotate_left 1,
  return s

meta def state_report : tactic string :=
do 
 gs ← tactic.get_goals,
 let gs_len := list.length gs,
 goal_strings ← gs.mmap goal_to_string,
 let s := join "\n\n" goal_strings,
 return s
 
meta def trace_custom_state : tactic unit :=
do 
 s ← state_report,
 tactic.trace s,
 return ()

end custom
"""

In [31]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_code = sexp_custom_code
    _, _, custom_msg = (lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "induction n"))
print(custom_msg)

(goal 1 (pi m default (const nat []) (app (app (app (const eq [1]) (const nat [])) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (var 0)) (const nat.zero []))) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (const nat.zero [])) (var 0)))))

(goal 3 (pi m default (const nat []) (pi n_n default (const nat []) (pi n_ih default (app (app (app (const eq [1]) (const nat [])) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (var 1)) (var 0))) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (var 0)) (var 1))) (app (app (app (const eq [1]) (const nat [])) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (var 2)) (app (const nat.succ []) (var 1)))) (app (app (app (app (const has_add.add [0]) (const nat [])) (const nat.has_add [])) (app (const nat.succ []) (var 1))) (var 2)))))))


See Lewis and Wu [interface between Mathematica and Lean](https://robertylewis.com/leanmm/) for an example of parsing Lean expressions into Mathematica code.)

## Current issues

The current system relies on Leans pretty printer (with `pp.all` set to true) to output the goals which can then be fed back into Lean.  However, the pretty print (even with `pp.all` set to true) may not always output valid Lean code.  Here are some known issues:

### Privately defined terms

This mostly comes up in proof recording, but some true theorems of Lean can't be entered into our system since they refer to privately defined terms.  However, in most cases those theorems are also marked as private (or they are automatically created theorems internal to Lean).

### Term blowup

Because of subexpression sharing, sometimes the terms can blow up in size (usually because of type classes).  If they get too big, the pretty printer will replace parts with `…`,  making them unusable to Lean.  (Also, for some reason an expression with two `…`s will cause the lean server to stop, so we replace `…` with `_redacted_code_too_long_`.) As we can see, even the statement `12345678901234567890 - 12345678901234567890 = 0`, when expanded out, is too large for the pretty printer. 

In [32]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal([], [], '12345678901234567890 - 12345678901234567890 = 0')
    result = (lean_interface.apply_tactic(goal, "skip"))
    goal2 = result[1][0]
    print("New goal:\n", goal2)
    print()
    print("Put that new goal back in...")
    result2 = (lean_interface.apply_tactic(goal2, "skip"))
    display(result2)

New goal:
 LeanGoal(universes=[], assumptions=[], conclusion='@eq.{1} nat (@has_sub.sub.{0} nat nat.has_sub (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit0.{0} nat nat.has_add (@bit0.{0} nat nat.has_add (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit0.{0} nat nat.has_add (@bit0.{0} nat nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit1.{0} nat nat.has_one nat.has_add (@bit0.{0} nat nat.has_add (@bit1.

('theorem_error',
 ["unknown identifier '_redacted_code_too_long_'",
  "unknown identifier '_redacted_code_too_long_'",
  "unknown identifier '_redacted_code_too_long_'",
  "unknown identifier '_redacted_code_too_long_'",
  "unknown identifier '_redacted_code_too_long_'",
  "unknown identifier '_redacted_code_too_long_'",
  "unknown identifier '_redacted_code_too_long_'"],
 '')

### Missing @ symbols

`@` symbols are needed for some functions with implicit parameters.  With `pp.all` set to true, the pretty printer usually adds the `@` symbols as needed but can occasionally miss some cases.  Here is one such case.

In [33]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u'], [], '@id.{u} = @id.{u}')
    result = (lean_interface.apply_tactic(goal, "skip"))
    goal2 = result[1][0]
    print("New goal:\n", goal2)
    print()
    print("Put that new goal back in...")
    result2 = (lean_interface.apply_tactic(goal2, "skip"))
    display(result2)
    goal3 = LeanGoal(['u'], [], '@eq.{(imax (u+1) u)} (Π {α : Sort u}, α → α) @id.{u} @id.{u}')
    print("What the new goal should be:\n", goal3)
    print()
    print("Put that new goal back in...")
    result3 = (lean_interface.apply_tactic(goal3, "skip"))
    display(result3)

New goal:
 LeanGoal(universes=['u'], assumptions=[], conclusion='@eq.{(imax (u+1) u)} (Π {α : Sort u}, α → α) id.{u} id.{u}')

Put that new goal back in...


('theorem_error',
 ['type mismatch at application\n  eq id\nterm\n  id\nhas type\n  ?m_1 → ?m_1 : Sort u\nbut is expected to have type\n  Π {α : Sort u}, α → α : Sort (imax (u+1) u)'],
 '')

What the new goal should be:
 LeanGoal(universes=['u'], assumptions=[], conclusion='@eq.{(imax (u+1) u)} (Π {α : Sort u}, α → α) @id.{u} @id.{u}')

Put that new goal back in...


('success',
 [LeanGoal(universes=['u'], assumptions=[], conclusion='@eq.{(imax (u+1) u)} (Π {α : Sort u}, α → α) id.{u} id.{u}')],
 '')

### Extra `@` sysmbols (or maybe a missing case in the Lean parser)

Currently `@(λ {α : Type} (x : α), x)` is not valid Lean code even though `@id` is.  However, the pretty printer tries to add an @ in this case (and even without the `@` it would still be incorrect, so it is likely a bug in the Lean parser).  Consider this example:
```lean
theorem id_defn : @id = (λ {α : Type} (x: α), x) := by refl
#check congr_fun id_defn bool -- @eq.{1} ((λ (x : Type), x → x) bool) (@id.{1} bool) (@(λ {α : Type} (x : α), x) bool)
```
If we try plugging that theorem into Lean as follows, we will get an error:

In [34]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal([], [], '@eq.{1} ((λ (x : Type), x → x) bool) (@id.{1} bool) (@(λ {α : Type} (x : α), x) bool)')
    result = (lean_interface.apply_tactic(goal, "skip"))
    print(result)

('theorem_error', ["invalid '@', identifier expected"], '')


### Implicit and type class assumptions

One can't directly enter typeclass and implicit assumptions into our framework.  We could easily add this functionality if it matters, but it shouldn't matter from the stand points of proofs.

In [35]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u'], [], '∀ {α : Type u} [inhabited α], (∃ x : α, x = x)')
    result = (lean_interface.apply_tactic(goal, "intro, intro"))
    
    # the new goal should be 
    #  {α : Type u} [_inst_1 : inhabited.{u+1} α] ⊢ @Exists.{u+1} α (λ (x : α), @eq.{u+1} α x x)
    # but we record it as the slightly different
    #  (α : Type u) (_inst_1 : inhabited.{u+1} α) ⊢ @Exists.{u+1} α (λ (x : α), @eq.{u+1} α x x)
    goal2 = result[1][0]
    print(goal2)
    
    # however, the proof is the same
    result = (lean_interface.apply_tactic(goal2, "existsi _inst_1.default, refl"))
    print(result)

LeanGoal(universes=['u'], assumptions=['α : Type u', '_inst_1 : inhabited.{u+1} α'], conclusion='@Exists.{u+1} α (λ (x : α), @eq.{u+1} α x x)')
('success', [], '')


### Let expressions

The `intro` tactic can move `let` binders to the local context.  However, I don't know a way to either
1. figure out that the assumption in the local context is associated with a let binder and know what it is assigned to, nor
2. enter this back into Lean.

Consider the following example.

In [36]:
with LeanTacticTestingInterface() as lean_interface:
    # This is solvable with this proof:
    goal = LeanGoal([], [], 'let x : ℕ := 0 in x = 0')
    proof = "intro, refl"
    result = (lean_interface.apply_tactic(goal, proof))
    print(result)
    
    # but if we do it in two steps, the assumption gets recorded incorrectly:
    goal = LeanGoal([], [], 'let x : ℕ := 0 in x = 0')
    result = (lean_interface.apply_tactic(goal, 'intro'))
    print(result)
    
    goal2 = result[1][0]
    result = (lean_interface.apply_tactic(goal2, 'refl'))
    print(result)
    
    # moveover, even if we enter this as `x : nat := 0` in the assumptions it gets 
    # a different interpretation than we intend.  It interprets this as `x : opt_param nat 0`.
    goal3 = LeanGoal([], ['x : nat := 0'], 'x = 0')
    result = (lean_interface.apply_tactic(goal3, 'skip'))
    print(result)

('success', [], '')
('success', [LeanGoal(universes=[], assumptions=['x : nat'], conclusion='@eq.{1} nat x (@has_zero.zero.{0} nat nat.has_zero)')], '')
('tactic_failure', ['invalid apply tactic, failed to unify\n  x = 0\nwith\n  ?m_2 = ?m_2\nstate:\nx : ℕ\n⊢ x = 0'], '')
('success', [LeanGoal(universes=[], assumptions=['x : opt_param.{1} nat (@has_zero.zero.{0} nat nat.has_zero)'], conclusion='@eq.{1} (opt_param.{1} nat (@has_zero.zero.{0} nat nat.has_zero)) x (@has_zero.zero.{0} (opt_param.{1} nat (@has_zero.zero.{0} nat nat.has_zero)) nat.has_zero)')], '')


### Other

There are other small issues that sometimes come up, and I'm sure more issues will arise as this is tested on all the theorems in Lean.  However, right now about in the core library, about 95% of theorems correctly load into Lean.  (I imagine that in `mathlib`, with the increased use of type classes, that this number will go down.)

## TODOs

- Code:
    - Is there a way to make this faster?
    - Is there a way to make this more robust?  (It works for now, but still feels rickety.)
    - Make this production level code.  It is just a proof of concept and still quite hacky.
- Functionality
    - How should one enter definitions and notation?
    - Can we even enter all Lean theorems this way (assuming there definitions and notation are stored in mathlib)?
      - A next step would be to get a list of all theorems and see if we can apply the `skip` tactic to them.
    - How should one enter premises?  The easiest way is to add them as assumptions to the goal.
    - ~~What about imports, etc?~~
    - We only apply tactics to a single goal and not a goal stack

## What I want from Lean 4

- Either a REPL like interface which is fast,
- or a way to quickly in a few milliseconds call Lean for the text of a small file and get a direct (and unambiguous) response to my request.
- Better documentation of the communication interface
- Better documentation of the tactic commands