Permalink
Fetching contributors…
Cannot retrieve contributors at this time
executable file 970 lines (833 sloc) 32.6 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright © 2001, 2002, 2003, 2012 Progiciels Bourbeau-Pinard inc.
# François Pinard <pinard@iro.umontreal.ca>, 2001.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
"""\
Interface between Emacs Lisp and Python - Python part.
Emacs may launch this module as a stand-alone program, in which case it
acts as a server of Python facilities for that Emacs session, reading
requests from standard input and writing replies on standard output.
When used in this way, the program is called "the Pymacs helper".
This module may also be usefully imported by other Python modules.
See the Pymacs documentation (check `README') for more information.
"""
# Identification of version.
package = 'Pymacs'
version = '@VERSION@'
import os
import sys
if PYTHON3:
import collections
def callable(value):
return isinstance(value, collections.Callable)
basestring = str
from imp import reload
else:
__metaclass__ = type
def fixup_icanon():
# Otherwise sys.stdin.read hangs for large inputs in emacs 24.
# See comment in emacs source code sysdep.c.
import termios
a = termios.tcgetattr(1)
a[3] &= ~termios.ICANON
termios.tcsetattr(1, termios.TCSANOW, a)
try:
import signal
except ImportError:
# Jython does not have signal.
signal = None
## Python services for Emacs applications.
class Main:
debug_file = None
signal_file = None
def main(self, *arguments):
"""\
Execute Python services for Emacs, and Emacs services for Python.
This program is meant to be called from Emacs, using `pymacs.el'.
Debugging options:
-d FILE Debug the protocol to FILE.
-s FILE Trace received signals to FILE.
Arguments are added to the search path for Python modules.
"""
# Decode options.
arguments = (os.environ.get('PYMACS_OPTIONS', '').split()
+ list(arguments))
import getopt
options, arguments = getopt.getopt(arguments, 'fd:s:')
for option, value in options:
if option == '-d':
self.debug_file = value
elif option == '-s':
self.signal_file = value
elif option == '-f':
try:
fixup_icanon()
except:
pass
arguments.reverse()
for argument in arguments:
if os.path.isdir(argument):
sys.path.insert(0, argument)
# Inhibit signals. The Interrupt signal is temporary enabled, however,
# while executing any Python code received from the Lisp side.
if signal is not None:
if IO_ERRORS_WITH_SIGNALS:
# See the comment for IO_ERRORS_WITH_SIGNALS in ppppconfig.py.
self.original_handler = signal.signal(
signal.SIGINT, self.interrupt_handler)
else:
for counter in range(1, signal.NSIG):
if counter == signal.SIGINT:
self.original_handler = signal.signal(
counter, self.interrupt_handler)
else:
try:
signal.signal(counter, self.generic_handler)
except RuntimeError:
pass
self.inhibit_quit = True
if not PYTHON3:
# Re-open standard input and output in binary mode.
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb')
sys.stdout = os.fdopen(sys.stdout.fileno(), 'wb')
# Start protocol and services.
lisp._protocol.send('version', '"%s"' % version)
lisp._protocol.loop()
def generic_handler(self, number, frame):
if self.signal_file:
handle = open(self.signal_file, 'a')
handle.write('%d\n' % number)
handle.close()
def interrupt_handler(self, number, frame):
if self.signal_file:
star = (' *', '')[self.inhibit_quit]
handle = open(self.signal_file, 'a')
handle.write('%d%s\n' % (number, star))
handle.close()
if not self.inhibit_quit:
self.original_handler(number, frame)
run = Main()
main = run.main
if OLD_EXCEPTIONS:
ProtocolError = 'ProtocolError'
ZombieError = 'ZombieError'
else:
class error(Exception):
pass
class ProtocolError(error):
pass
class ZombieError(error):
pass
class Protocol:
# All exec's and eval's triggered from the Emacs side are all executed
# within the "loop" method below, so all user context is kept as
# local variables within this single routine. Different instances
# of this Protocol class would yield independant evaluation contexts.
# But in the usual case, there is only one such instance kept within a
# Lisp_Interface instance, and the "lisp" global variable within this
# module holds such a Lisp_Interface instance.
def __init__(self):
self.freed = []
if PYTHON3:
def loop(self):
# The server loop repeatedly receives a request from Emacs and
# returns a response, which is either the value of the received
# Python expression, or the Python traceback if an error occurs
# while evaluating the expression.
# The server loop may also be executed, as a recursive invocation,
# in the context of Emacs serving a Python request. In which
# case, we might also receive a notification from Emacs telling
# that the reply has been transmitted, or that an error occurred.
# A reply notification from Emacs interrupts the loop: the result
# of this function is then the value returned from Emacs.
done = False
while not done:
try:
action, text = self.receive()
if action == 'eval':
action = 'return'
try:
run.inhibit_quit = False
value = eval(text)
finally:
run.inhibit_quit = True
elif action == 'exec':
action = 'return'
value = None
try:
run.inhibit_quit = False
exec(text)
finally:
run.inhibit_quit = True
elif action == 'return':
done = True
try:
run.inhibit_quit = False
value = eval(text)
finally:
run.inhibit_quit = True
elif action == 'raise':
action = 'raise'
value = 'Emacs: ' + text
else:
raise ProtocolError("Unknown action %r" % action)
except KeyboardInterrupt:
if done:
raise
action = 'raise'
value = '*Interrupted*'
except ProtocolError as exception:
sys.exit("Protocol error: %s\n" % exception)
except:
import traceback
action = 'raise'
if lisp.debug_on_error.value() is None:
value = traceback.format_exception_only(
sys.exc_info[0], sys.exc_info[1])
value = ''.join(value).rstrip()
else:
value = traceback.format_exc()
if not done:
fragments = []
print_lisp(value, fragments.append, True)
self.send(action, ''.join(fragments))
return value
else:
def loop(self):
# The server loop repeatedly receives a request from Emacs and
# returns a response, which is either the value of the received
# Python expression, or the Python traceback if an error occurs
# while evaluating the expression.
# The server loop may also be executed, as a recursive invocation,
# in the context of Emacs serving a Python request. In which
# case, we might also receive a notification from Emacs telling
# that the reply has been transmitted, or that an error occurred.
# A reply notification from Emacs interrupts the loop: the result
# of this function is then the value returned from Emacs.
done = False
while not done:
try:
action, text = self.receive()
if action == 'eval':
action = 'return'
try:
run.inhibit_quit = False
value = eval(text)
finally:
run.inhibit_quit = True
elif action == 'exec':
action = 'return'
value = None
try:
run.inhibit_quit = False
exec(text)
finally:
run.inhibit_quit = True
elif action == 'return':
done = True
try:
run.inhibit_quit = False
value = eval(text)
finally:
run.inhibit_quit = True
elif action == 'raise':
action = 'raise'
value = 'Emacs: ' + text
else:
if OLD_EXCEPTIONS:
raise ProtocolError, "Unknown action %r" % action
else:
raise ProtocolError("Unknown action %r" % action)
except KeyboardInterrupt:
if done:
raise
action = 'raise'
value = '*Interrupted*'
except ProtocolError, exception:
sys.exit("Protocol error: %s\n" % exception)
except:
import traceback
action = 'raise'
if lisp.debug_on_error.value() is None:
value = traceback.format_exception_only(
sys.exc_type, sys.exc_value)
value = ''.join(value).rstrip()
else:
value = traceback.format_exc()
if not done:
fragments = []
print_lisp(value, fragments.append, True)
self.send(action, ''.join(fragments))
return value
if PYTHON3:
def receive(self):
# Receive a Python expression from Emacs, return (ACTION, TEXT).
prefix = sys.stdin.buffer.read(3)
if not prefix or prefix[0] != ord(b'>'):
raise ProtocolError("`>' expected.")
while prefix[-1] != ord(b'\t'):
character = sys.stdin.buffer.read(1)
if not character:
raise ProtocolError("Empty stdin read.")
prefix += character
data = sys.stdin.buffer.read(int(prefix[1:-1]))
try:
text = data.decode('UTF-8')
except UnicodeDecodeError:
#assert False, ('***', data)
text = data.decode('ISO-8859-1')
if run.debug_file is not None:
handle = open(run.debug_file, 'a')
handle.write(prefix.decode('ASCII') + text)
handle.close()
return text.split(None, 1)
else:
def receive(self):
# Receive a Python expression from Emacs, return (ACTION, TEXT).
prefix = sys.stdin.read(3)
if not prefix or prefix[0] != '>':
if OLD_EXCEPTIONS:
raise ProtocolError, "`>' expected."
else:
raise ProtocolError("`>' expected.")
while prefix[-1] != '\t':
character = sys.stdin.read(1)
if not character:
if OLD_EXCEPTIONS:
raise ProtocolError, "Empty stdin read."
else:
raise ProtocolError("Empty stdin read.")
prefix += character
text = sys.stdin.read(int(prefix[1:-1]))
if run.debug_file is not None:
handle = open(run.debug_file, 'a')
handle.write(prefix + text)
handle.close()
return text.split(None, 1)
if PYTHON3:
def send(self, action, text):
# Send ACTION and its TEXT argument to Emacs.
if self.freed:
# All delayed Lisp cleanup is piggied back on the transmission.
text = ('(free (%s) %s %s)\n'
% (' '.join(map(str, self.freed)), action, text))
self.freed = []
else:
text = '(%s %s)\n' % (action, text)
data = text.encode('UTF-8')
prefix = '<%d\t' % len(data)
if run.debug_file is not None:
handle = open(run.debug_file, 'a')
handle.write(prefix + text)
handle.close()
sys.stdout.buffer.write(prefix.encode('ASCII'))
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
else:
def send(self, action, text):
# Send ACTION and its TEXT argument to Emacs.
if self.freed:
# All delayed Lisp cleanup is piggied back on the transmission.
text = ('(free (%s) %s %s)\n'
% (' '.join(map(str, self.freed)), action, text))
self.freed = []
else:
text = '(%s %s)\n' % (action, text)
prefix = '<%d\t' % len(text)
if run.debug_file is not None:
handle = open(run.debug_file, 'a')
handle.write(prefix + text)
handle.close()
sys.stdout.write(prefix + text)
sys.stdout.flush()
def pymacs_load_helper(file_without_extension, prefix, noerror=None):
# This function imports a Python module, then returns a Lisp expression
# which, when later evaluated, will install trampoline definitions
# in Emacs for accessing the Python module facilities. Module, given
# through FILE_WITHOUT_EXTENSION, may be a full path, yet without the
# `.py' or `.pyc' suffix, in which case the directory is temporarily
# added to the Python search path for the sole duration of that import.
# All defined symbols on the Lisp side have have PREFIX prepended,
# and have Python underlines in Python turned into dashes. If PREFIX
# is None, it then defaults to the base name of MODULE with underlines
# turned to dashes, followed by a dash.
directory, module_name = os.path.split(file_without_extension)
module_components = module_name.split('.')
if prefix is None:
prefix = module_components[-1].replace('_', '-') + '-'
try:
module = sys.modules.get(module_name)
if module:
reload(module)
else:
try:
if directory:
sys.path.insert(0, directory)
module = __import__(module_name)
finally:
if directory:
del sys.path[0]
# Whenever MODULE_NAME is of the form [PACKAGE.]...MODULE,
# __import__ returns the outer PACKAGE, not the module.
for component in module_components[1:]:
module = getattr(module, component)
except ImportError:
if noerror:
return None
else:
raise
load_hook = module.__dict__.get('pymacs_load_hook')
if load_hook:
load_hook()
interactions = module.__dict__.get('interactions', {})
if not isinstance(interactions, dict):
interactions = {}
arguments = []
for name, value in module.__dict__.items():
if callable(value) and value is not lisp:
arguments.append(allocate_python(value))
arguments.append(lisp[prefix + name.replace('_', '-')])
try:
interaction = value.interaction
except AttributeError:
interaction = interactions.get(value)
if callable(interaction):
arguments.append(allocate_python(interaction))
else:
arguments.append(interaction)
if arguments:
return [lisp.progn,
[lisp.pymacs_defuns, [lisp.quote, arguments]],
module]
return [lisp.quote, module]
def doc_string(function):
import inspect
return inspect.getdoc(function)
## Garbage collection matters.
# Many Python types do not have direct Lisp equivalents, and may not be
# directly returned to Lisp for this reason. They are rather allocated in
# a list of handles, below, and a handle index is used for communication
# instead of the Python value. Whenever such a handle is freed from the
# Lisp side, its index is added of a freed list for later reuse.
python = []
freed_list = []
def allocate_python(value):
assert not isinstance(value, str), (type(value), repr(value))
# Allocate some handle to hold VALUE, return its index.
if freed_list:
index = freed_list[-1]
del freed_list[-1]
python[index] = value
else:
index = len(python)
python.append(value)
return index
def free_python(indices):
# Return many handles to the pool.
for index in indices:
python[index] = None
freed_list.append(index)
def zombie_python(indices):
# Ensure that some handles are _not_ in the pool.
for index in indices:
while index >= len(python):
freed_list.append(len(python))
python.append(None)
python[index] = zombie
freed_list.remove(index)
# Merely to make `*Pymacs*' a bit more readable.
freed_list.sort()
def zombie(*arguments):
# This catch-all function is set as the value for any function which
# disappeared with a previous Pymacs helper process, so calling
# such a function from Emacs will trigger a decipherable diagnostic.
diagnostic = "Object vanished when the Pymacs helper was killed"
if lisp.pymacs_dreadful_zombies.value():
if OLD_EXCEPTIONS:
raise ZombieError, diagnostic
else:
raise ZombieError(diagnostic)
lisp.message(diagnostic)
## Emacs services for Python applications.
class Let:
def __init__(self, **keywords):
# The stack holds (METHOD, DATA) pairs, where METHOD is the expected
# unbound pop_* method, and DATA holds information to be restored.
# METHOD may not be bound to the instance, as this would induce
# reference cycles, and then, __del__ would not be called timely.
self.stack = []
if keywords:
self.push(**keywords)
def __del__(self):
self.pops()
if PYTHON3:
def __bool__(self):
# So stylistic `if let:' executes faster.
return True
else:
def __nonzero__(self):
# So stylistic `if let:' executes faster.
return True
def pops(self):
while self.stack:
self.stack[-1][0](self)
def push(self, **keywords):
data = []
for name, value in keywords.items():
data.append((name, getattr(lisp, name).value()))
setattr(lisp, name, value)
self.stack.append((Let.pop, data))
return self
def pop(self):
method, data = self.stack.pop()
assert method == Let.pop, (method, data)
for name, value in data:
setattr(lisp, name, value)
def push_excursion(self):
self.stack.append((Let.pop_excursion, (lisp.current_buffer(),
lisp.point_marker(),
lisp.mark_marker())))
return self
def pop_excursion(self):
method, data = self.stack.pop()
assert method == Let.pop_excursion, (method, data)
buffer, point_marker, mark_marker = data
lisp.set_buffer(buffer)
lisp.goto_char(point_marker)
lisp.set_mark(mark_marker)
lisp.set_marker(point_marker, None)
lisp.set_marker(mark_marker, None)
def push_match_data(self):
self.stack.append((Let.pop_match_data, lisp.match_data()))
return self
def pop_match_data(self):
method, data = self.stack.pop()
assert method == Let.pop_match_data, (method, data)
lisp.set_match_data(data)
def push_restriction(self):
self.stack.append((Let.pop_restriction, (lisp.point_min_marker(),
lisp.point_max_marker())))
return self
def pop_restriction(self):
method, data = self.stack.pop()
assert method == Let.pop_restriction, (method, data)
point_min_marker, point_max_marker = data
lisp.narrow_to_region(point_min_marker, point_max_marker)
lisp.set_marker(point_min_marker, None)
lisp.set_marker(point_max_marker, None)
def push_selected_window(self):
self.stack.append((Let.pop_selected_window, lisp.selected_window()))
return self
def pop_selected_window(self):
method, data = self.stack.pop()
assert method == Let.pop_selected_window, (method, data)
lisp.select_window(data)
def push_window_excursion(self):
self.stack.append((Let.pop_window_excursion,
lisp.current_window_configuration()))
return self
def pop_window_excursion(self):
method, data = self.stack.pop()
assert method == Let.pop_window_excursion, (method, data)
lisp.set_window_configuration(data)
class Symbol:
def __init__(self, text):
self.text = text
def __repr__(self):
return 'lisp[%s]' % repr(self.text)
def __str__(self):
return '\'' + self.text
def value(self):
return lisp._eval(self.text)
def copy(self):
return lisp._expand(self.text)
def set(self, value):
if value is None:
lisp._eval('(setq %s nil)' % self.text)
else:
fragments = []
write = fragments.append
write('(progn (setq %s ' % self.text)
print_lisp(value, write, True)
write(') nil)')
lisp._eval(''.join(fragments))
def __call__(self, *arguments):
fragments = []
write = fragments.append
write('(%s' % self.text)
for argument in arguments:
write(' ')
print_lisp(argument, write, True)
write(')')
return lisp._eval(''.join(fragments))
class Lisp:
def __init__(self, index):
self.index = index
def __del__(self):
lisp._protocol.freed.append(self.index)
def __repr__(self):
return ('lisp(%s)' % repr(lisp('(prin1-to-string %s)' % self)))
def __str__(self):
return '(aref pymacs-lisp %d)' % self.index
def value(self):
return self
def copy(self):
return lisp._expand(str(self))
class Buffer(Lisp):
pass
#def write(text):
# # So you could do things like
# # print >>lisp.current_buffer(), "Hello World"
# lisp.insert(text, self)
#def point(self):
# return lisp.point(self)
class List(Lisp):
def __call__(self, *arguments):
fragments = []
write = fragments.append
write('(%s' % self)
for argument in arguments:
write(' ')
print_lisp(argument, write, True)
write(')')
return lisp._eval(''.join(fragments))
def __len__(self):
return lisp._eval('(length %s)' % self)
def __getitem__(self, key):
value = lisp._eval('(nth %d %s)' % (key, self))
if value is None and key >= len(self):
if OLD_EXCEPTIONS:
raise IndexError, key
else:
raise IndexError(key)
return value
def __setitem__(self, key, value):
fragments = []
write = fragments.append
write('(setcar (nthcdr %d %s) ' % (key, self))
print_lisp(value, write, True)
write(')')
lisp._eval(''.join(fragments))
class Table(Lisp):
def __getitem__(self, key):
fragments = []
write = fragments.append
write('(gethash ')
print_lisp(key, write, True)
write(' %s)' % self)
return lisp._eval(''.join(fragments))
def __setitem__(self, key, value):
fragments = []
write = fragments.append
write('(puthash ')
print_lisp(key, write, True)
write(' ')
print_lisp(value, write, True)
write(' %s)' % self)
lisp._eval(''.join(fragments))
class Vector(Lisp):
def __len__(self):
return lisp._eval('(length %s)' % self)
def __getitem__(self, key):
return lisp._eval('(aref %s %d)' % (self, key))
def __setitem__(self, key, value):
fragments = []
write = fragments.append
write('(aset %s %d ' % (self, key))
print_lisp(value, write, True)
write(')')
lisp._eval(''.join(fragments))
class Lisp_Interface:
def __init__(self):
self.__dict__['_cache'] = {'nil': None}
self.__dict__['_protocol'] = Protocol()
def __call__(self, text):
return self._eval('(progn %s)' % text)
def _eval(self, text):
self._protocol.send('eval', text)
return self._protocol.loop()
def _expand(self, text):
self._protocol.send('expand', text)
return self._protocol.loop()
def __getattr__(self, name):
if name[0] == '_':
if OLD_EXCEPTIONS:
raise AttributeError, name
else:
raise AttributeError(name)
return self[name.replace('_', '-')]
def __setattr__(self, name, value):
if name[0] == '_':
if OLD_EXCEPTIONS:
raise AttributeError, name
else:
raise AttributeError(name)
self[name.replace('_', '-')] = value
def __getitem__(self, name):
try:
return self._cache[name]
except KeyError:
symbol = self._cache[name] = Symbol(name)
return symbol
def __setitem__(self, name, value):
try:
symbol = self._cache[name]
except KeyError:
symbol = self._cache[name] = Symbol(name)
symbol.set(value)
lisp = Lisp_Interface()
if PYTHON3:
print_lisp_quoted_specials = {
ord('"'): '\\"', ord('\\'): '\\\\', ord('\b'): '\\b',
ord('\f'): '\\f',
ord('\n'): '\\n', ord('\r'): '\\r', ord('\t'): '\\t'}
def print_lisp(value, write, quoted):
if value is None:
write('nil')
elif isinstance(bool, type) and isinstance(value, bool):
write(('nil', 't')[value])
elif isinstance(value, int):
write(repr(value))
elif isinstance(value, float):
write(repr(value))
elif isinstance(value, str):
try:
value.encode('ASCII')
except UnicodeError:
write('(decode-coding-string "')
for byte in value.encode('UTF-8'):
special = print_lisp_quoted_specials.get(byte)
if special is not None:
write(special)
elif 32 <= byte < 127:
write(chr(byte))
else:
write('\\%.3o' % byte)
write('" \'utf-8)')
else:
write('"')
for character in value:
special = print_lisp_quoted_specials.get(ord(character))
if special is not None:
write(special)
elif 32 <= ord(character) < 127:
write(character)
else:
write('\\%.3o' % ord(character))
write('"')
elif isinstance(value, list):
if quoted:
write("'")
if len(value) == 0:
write('nil')
elif len(value) == 2 and value[0] == lisp.quote:
write("'")
print_lisp(value[1], write, False)
else:
write('(')
print_lisp(value[0], write, False)
for sub_value in value[1:]:
write(' ')
print_lisp(sub_value, write, False)
write(')')
elif isinstance(value, tuple):
write('[')
if len(value) > 0:
print_lisp(value[0], write, False)
for sub_value in value[1:]:
write(' ')
print_lisp(sub_value, write, False)
write(']')
elif isinstance(value, Lisp):
write(str(value))
elif isinstance(value, Symbol):
if quoted:
write("'")
write(value.text)
elif callable(value):
write('(pymacs-defun %d nil)' % allocate_python(value))
else:
write('(pymacs-python %d)' % allocate_python(value))
else:
print_lisp_quoted_specials = {
'"': '\\"', '\\': '\\\\', '\b': '\\b', '\f': '\\f',
'\n': '\\n', '\r': '\\r', '\t': '\\t'}
def print_lisp(value, write, quoted):
if value is None:
write('nil')
elif isinstance(bool, type) and isinstance(value, bool):
write(('nil', 't')[value])
elif isinstance(value, int):
write(repr(value))
elif isinstance(value, float):
write(repr(value))
elif isinstance(value, basestring):
multibyte = False
if isinstance(value, unicode):
try:
value = value.encode('ASCII')
except UnicodeError:
value = value.encode('UTF-8')
multibyte = True
if multibyte:
write('(decode-coding-string ')
write('"')
for character in value:
special = print_lisp_quoted_specials.get(character)
if special is not None:
write(special)
elif 32 <= ord(character) < 127:
write(character)
else:
write('\\%.3o' % ord(character))
write('"')
if multibyte:
write(' \'utf-8)')
elif isinstance(value, list):
if quoted:
write("'")
if len(value) == 0:
write('nil')
elif len(value) == 2 and value[0] == lisp.quote:
write("'")
print_lisp(value[1], write, False)
else:
write('(')
print_lisp(value[0], write, False)
for sub_value in value[1:]:
write(' ')
print_lisp(sub_value, write, False)
write(')')
elif isinstance(value, tuple):
write('[')
if len(value) > 0:
print_lisp(value[0], write, False)
for sub_value in value[1:]:
write(' ')
print_lisp(sub_value, write, False)
write(']')
elif isinstance(value, Lisp):
write(str(value))
elif isinstance(value, Symbol):
if quoted:
write("'")
write(value.text)
elif callable(value):
write('(pymacs-defun %d nil)' % allocate_python(value))
else:
write('(pymacs-python %d)' % allocate_python(value))
if __name__ == '__main__':
main(*sys.argv[1:])