Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
190 lines (168 sloc) 7 KB
from untrusted_kernel_manager import UntrustedMultiKernelManager
import zmq
from zmq import ssh
import sys
from misc import Timer
class Receiver(object):
def __init__(self, filename, ip):
self.context = zmq.Context() = self.context.socket(zmq.DEALER)
self.port ="tcp://%s" % ip)
print self.port
self.sage_mode = self.setup_sage()
print self.sage_mode
sys.stdout.flush() = UntrustedMultiKernelManager(filename, ip,
self.filename = filename
self.timer = Timer("", reset=True)
def start(self):
self.listen = True
while self.listen:
source =
msg =
msg_type = "invalid_message"
if msg.get("type") is not None:
msgtype = msg["type"]
if hasattr(self, msgtype):
msg_type = msgtype
if msg.get("content") is None:
msg["content"] = {}
logging.debug("Start %s"%(msg_type,))
handler = getattr(self, msg_type)
response = handler(msg["content"])
logging.debug("Finished handler %s: %s"%(msg_type, self.timer)), zmq.SNDMORE)
def _form_message(self, content, error=False):
return {"content": content,
"type": "error" if error else "success"}
def setup_sage(self):
import sage
import sage.all
sage.misc.misc.EMBEDDED_MODE = {'frontend': 'sagecell'}
import StringIO
# The first plot takes about 2 seconds to generate (presumably
# because lots of things, like matplotlib, are imported). We plot
# something here so that worker processes don't have this overhead
sage.all.plot(lambda x: x, (0,1)).save(StringIO.StringIO())
except Exception as e:
logging.debug('plotting exception: %s'%e)
self.sage_dict = {'sage': sage}
sage_code = """
from sage.all import *
from sage.calculus.predefined import x
from sage.misc.html import html
from import help
from import automatic_names
exec sage_code in self.sage_dict
return True
except ImportError as e:
self.sage_dict = {}
return False
def update_dict_with_sage(self, ka):
import misc
class TempClass(object):
_sage_ = TempClass()
_sage_.display_message = misc.display_message
_sage_.kernel_timeout = 0.0
_sage_.sent_files = {}
def new_files(root='./'):
import os
import sys
new_files = []
for top,dirs,files in os.walk(root):
for nm in files:
path = os.path.join(top,nm)
if path.startswith('./'):
path = path[2:]
mtime = os.stat(path).st_mtime
if path not in sys._sage_.sent_files or sys._sage_.sent_files[path] < mtime:
sys._sage_.sent_files[path] = mtime
ip = user_ns['get_ipython']()
ip.payload_manager.write_payload({"new_files": new_files})
return ''
_sage_.new_files = new_files
sys._sage_ = _sage_
user_ns =
# TODO: maybe we don't want to cut down the flush interval?
sys.stdout.flush_interval = sys.stderr.flush_interval = 0.0
if self.sage_mode:'sage.misc.sage_extension')
sage_code = """
# Ensure unique random state after forking
exec sage_code in user_ns
import interact_sagecell
import interact_compatibility
# overwrite Sage's interact command with our own
user_ns["interact"] = interact_sagecell.interact_func(ka.session, ka.iopub_socket)
sys._sage_.update_interact = interact_sagecell.update_interact
Message Handlers
def invalid_message(self, msg_content):
"""Handler for unsupported messages."""
return self._form_message({"status": "Invalid message!"}, error = True)
def start_kernel(self, msg_content):
"""Handler for start_kernel messages."""
resource_limits = msg_content.get("resource_limits")
reply_content =
return self._form_message(reply_content)
except Exception as e:
return self._form_message({}, error=True)
def kill_kernel(self, msg_content):
"""Handler for kill_kernel messages."""
kernel_id = msg_content["kernel_id"]
success =
reply_content = {"status": "Kernel %s killed!"%(kernel_id)}
if not success:
reply_content["status"] = "Could not kill kernel %s!"%(kernel_id)
return self._form_message(reply_content, error=(not success))
def purge_kernels(self, msg_content):
"""Handler for purge_kernels messages."""
failures =
reply_content = {"status": "All kernels killed!"}
success = (len(failures) == 0)
if not success:
reply_content["status"] = "Could not kill kernels %s!"%(failures)
return self._form_message(reply_content, error=(not success))
def restart_kernel(self, content):
"""Handler for restart_kernel messages."""
kernel_id = content["kernel_id"]
return self._form_message(
def interrupt_kernel(self, msg_content):
"""Handler for interrupt_kernel messages."""
kernel_id = msg_content["kernel_id"]
reply_content = {"status": "Kernel %s interrupted!"%(kernel_id)}
success =
if not success:
reply_content["status"] = "Could not interrupt kernel %s!"%(kernel_id)
return self._form_message(reply_content, error=(not success))
def remove_computer(self, msg_content):
"""Handler for remove_computer messages."""
self.listen = False
return self.purge_kernels(msg_content)
if __name__ == '__main__':
filename = sys.argv[2]
comp_id = sys.argv[3]
import logging
import uuid
logging.basicConfig(filename=filename,format=comp_id[:4]+': %(asctime)s %(message)s',level=logging.DEBUG)
ip = sys.argv[1]
receiver = Receiver(filename, ip)
Something went wrong with that request. Please try again.