Permalink
Browse files

added preliminary ssh tunneling support for clients

  • Loading branch information...
1 parent 9b5cebb commit 99528f8b277f5e3f27e8caf6768f09bd8fc1ff65 @minrk minrk committed Nov 10, 2010
Showing with 261 additions and 34 deletions.
  1. +3 −3 IPython/zmq/forward.py
  2. +62 −12 IPython/zmq/parallel/client.py
  3. +196 −19 IPython/zmq/tunnel.py
View
@@ -1,7 +1,7 @@
#!/usr/bin/env python
#
-# This file is adapted from a paramiko demo, and thus LGPL 2.1.
+# This file is adapted from a paramiko demo, and thus licensed under LGPL 2.1.
# Original Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
# Edits Copyright (C) 2010 The IPython Team
#
@@ -83,7 +83,7 @@ def handle(self):
self.request.send(data)
chan.close()
self.request.close()
- verbose('Tunnel closed from %r' % (self.request.getpeername(),))
+ verbose('Tunnel closed ')
def forward_tunnel(local_port, remote_host, remote_port, transport):
@@ -94,7 +94,7 @@ class SubHander (Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
- ForwardServer(('', local_port), SubHander).serve_forever()
+ ForwardServer(('127.0.0.1', local_port), SubHander).serve_forever()
def verbose(s):
@@ -19,6 +19,7 @@
from zmq.eventloop import ioloop, zmqstream
from IPython.external.decorator import decorator
+from IPython.zmq import tunnel
import streamsession as ss
# from remotenamespace import RemoteNamespace
@@ -117,7 +118,33 @@ class Client(object):
addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
The address of the controller's registration socket.
-
+ [Default: 'tcp://127.0.0.1:10101']
+ context : zmq.Context
+ Pass an existing zmq.Context instance, otherwise the client will create its own
+ username : bytes
+ set username to be passed to the Session object
+ debug : bool
+ flag for lots of message printing for debug purposes
+
+ #-------------- ssh related args ----------------
+ # These are args for configuring the ssh tunnel to be used
+ # credentials are used to forward connections over ssh to the Controller
+ # Note that the ip given in `addr` needs to be relative to sshserver
+ # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
+ # and set sshserver as the same machine the Controller is on. However,
+ # the only requirement is that sshserver is able to see the Controller
+ # (i.e. is within the same trusted network).
+
+ sshserver : str
+ A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
+ If keyfile or password is specified, and this is not, it will default to
+ the ip given in addr.
+ keyfile : str; path to public key file
+ This specifies a key to be used in ssh login, default None.
+ Regular default ssh keys will be used without specifying this argument.
+ password : str;
+ Your ssh password to sshserver. Note that if this is left None,
+ you will be prompted for it if passwordless key based login is unavailable.
Attributes
----------
@@ -159,6 +186,7 @@ class Client(object):
_connected=False
+ _ssh=False
_engines=None
_addr='tcp://127.0.0.1:10101'
_registration_socket=None
@@ -173,18 +201,33 @@ class Client(object):
history = None
debug = False
- def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
+ def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
+ sshserver=None, keyfile=None, password=None, paramiko=None):
if context is None:
context = zmq.Context()
self.context = context
self._addr = addr
+ self._ssh = bool(sshserver or keyfile or password)
+ if self._ssh and sshserver is None:
+ # default to the same
+ sshserver = addr.split('://')[1].split(':')[0]
+ if self._ssh and password is None:
+ if tunnel.try_passwordless_ssh(sshserver, keyfile, paramiko):
+ password=False
+ else:
+ password = getpass("SSH Password for %s: "%sshserver)
+ ssh_kwargs = dict(keyfile=keyfile, password=password, paramiko=paramiko)
+
if username is None:
self.session = ss.StreamSession()
else:
self.session = ss.StreamSession(username)
- self._registration_socket = self.context.socket(zmq.PAIR)
+ self._registration_socket = self.context.socket(zmq.XREQ)
self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
- self._registration_socket.connect(addr)
+ if self._ssh:
+ tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
+ else:
+ self._registration_socket.connect(addr)
self._engines = {}
self._ids = set()
self.outstanding=set()
@@ -198,7 +241,7 @@ def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, de
}
self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
'apply_reply' : self._handle_apply_reply}
- self._connect()
+ self._connect(sshserver, ssh_kwargs)
@property
@@ -229,12 +272,19 @@ def _build_targets(self, targets):
targets = [targets]
return [self._engines[t] for t in targets], list(targets)
- def _connect(self):
+ def _connect(self, sshserver, ssh_kwargs):
"""setup all our socket connections to the controller. This is called from
__init__."""
if self._connected:
return
self._connected=True
+
+ def connect_socket(s, addr):
+ if self._ssh:
+ return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
+ else:
+ return s.connect(addr)
+
self.session.send(self._registration_socket, 'connection_request')
idents,msg = self.session.recv(self._registration_socket,mode=0)
if self.debug:
@@ -245,23 +295,23 @@ def _connect(self):
if content.queue:
self._mux_socket = self.context.socket(zmq.PAIR)
self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
- self._mux_socket.connect(content.queue)
+ connect_socket(self._mux_socket, content.queue)
if content.task:
self._task_socket = self.context.socket(zmq.PAIR)
self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
- self._task_socket.connect(content.task)
+ connect_socket(self._task_socket, content.task)
if content.notification:
self._notification_socket = self.context.socket(zmq.SUB)
- self._notification_socket.connect(content.notification)
+ connect_socket(self._notification_socket, content.notification)
self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
if content.query:
self._query_socket = self.context.socket(zmq.PAIR)
self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
- self._query_socket.connect(content.query)
+ connect_socket(self._query_socket, content.query)
if content.control:
self._control_socket = self.context.socket(zmq.PAIR)
self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
- self._control_socket.connect(content.control)
+ connect_socket(self._control_socket, content.control)
self._update_engines(dict(content.engines))
else:
@@ -852,4 +902,4 @@ def spin(self):
for stream in (self.queue_stream, self.notifier_stream,
self.task_stream, self.control_stream):
stream.flush()
-
+
Oops, something went wrong.

0 comments on commit 99528f8

Please sign in to comment.