Skip to content
Browse files

added basic tunneling with ssh or paramiko

  • Loading branch information...
1 parent 1028b8d commit 9b5cebbae3d7f541b0583476e8e8c1f75906871f @minrk minrk committed Nov 10, 2010
Showing with 524 additions and 9 deletions.
  1. +183 −0 IPython/zmq/forward.py
  2. +1 −9 IPython/zmq/parallel/dependency.py
  3. +217 −0 IPython/zmq/parallel/kernelstarter.py
  4. +123 −0 IPython/zmq/tunnel.py
View
183 IPython/zmq/forward.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python
+
+#
+# This file is adapted from a paramiko demo, and thus LGPL 2.1.
+# Original Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+# Edits Copyright (C) 2010 The IPython Team
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Sample script showing how to do local port forwarding over paramiko.
+
+This script connects to the requested SSH server and sets up local port
+forwarding (the openssh -L option) from a local port through a tunneled
+connection to a destination reachable from the SSH server machine.
+"""
+
+from __future__ import print_function
+
+import getpass
+import os
+import socket
+import select
+import SocketServer
+import sys
+from optparse import OptionParser
+
+import paramiko
+
+SSH_PORT = 22
+DEFAULT_PORT = 4000
+
+g_verbose = False
+
+
+class ForwardServer (SocketServer.ThreadingTCPServer):
+ daemon_threads = True
+ allow_reuse_address = True
+
+
+class Handler (SocketServer.BaseRequestHandler):
+
+ def handle(self):
+ try:
+ chan = self.ssh_transport.open_channel('direct-tcpip',
+ (self.chain_host, self.chain_port),
+ self.request.getpeername())
+ except Exception, e:
+ verbose('Incoming request to %s:%d failed: %s' % (self.chain_host,
+ self.chain_port,
+ repr(e)))
+ return
+ if chan is None:
+ verbose('Incoming request to %s:%d was rejected by the SSH server.' %
+ (self.chain_host, self.chain_port))
+ return
+
+ verbose('Connected! Tunnel open %r -> %r -> %r' % (self.request.getpeername(),
+ chan.getpeername(), (self.chain_host, self.chain_port)))
+ while True:
+ r, w, x = select.select([self.request, chan], [], [])
+ if self.request in r:
+ data = self.request.recv(1024)
+ if len(data) == 0:
+ break
+ chan.send(data)
+ if chan in r:
+ data = chan.recv(1024)
+ if len(data) == 0:
+ break
+ self.request.send(data)
+ chan.close()
+ self.request.close()
+ verbose('Tunnel closed from %r' % (self.request.getpeername(),))
+
+
+def forward_tunnel(local_port, remote_host, remote_port, transport):
+ # this is a little convoluted, but lets me configure things for the Handler
+ # object. (SocketServer doesn't give Handlers any way to access the outer
+ # server normally.)
+ class SubHander (Handler):
+ chain_host = remote_host
+ chain_port = remote_port
+ ssh_transport = transport
+ ForwardServer(('', local_port), SubHander).serve_forever()
+
+
+def verbose(s):
+ if g_verbose:
+ print (s)
+
+
+HELP = """\
+Set up a forward tunnel across an SSH server, using paramiko. A local port
+(given with -p) is forwarded across an SSH session to an address:port from
+the SSH server. This is similar to the openssh -L option.
+"""
+
+
+def get_host_port(spec, default_port):
+ "parse 'hostname:22' into a host and port, with the port optional"
+ args = (spec.split(':', 1) + [default_port])[:2]
+ args[1] = int(args[1])
+ return args[0], args[1]
+
+
+def parse_options():
+ global g_verbose
+
+ parser = OptionParser(usage='usage: %prog [options] <ssh-server>[:<server-port>]',
+ version='%prog 1.0', description=HELP)
+ parser.add_option('-q', '--quiet', action='store_false', dest='verbose', default=True,
+ help='squelch all informational output')
+ parser.add_option('-p', '--local-port', action='store', type='int', dest='port',
+ default=DEFAULT_PORT,
+ help='local port to forward (default: %d)' % DEFAULT_PORT)
+ parser.add_option('-u', '--user', action='store', type='string', dest='user',
+ default=getpass.getuser(),
+ help='username for SSH authentication (default: %s)' % getpass.getuser())
+ parser.add_option('-K', '--key', action='store', type='string', dest='keyfile',
+ default=None,
+ help='private key file to use for SSH authentication')
+ parser.add_option('', '--no-key', action='store_false', dest='look_for_keys', default=True,
+ help='don\'t look for or use a private key file')
+ parser.add_option('-P', '--password', action='store_true', dest='readpass', default=False,
+ help='read password (for key or password auth) from stdin')
+ parser.add_option('-r', '--remote', action='store', type='string', dest='remote', default=None, metavar='host:port',
+ help='remote host and port to forward to')
+ options, args = parser.parse_args()
+
+ if len(args) != 1:
+ parser.error('Incorrect number of arguments.')
+ if options.remote is None:
+ parser.error('Remote address required (-r).')
+
+ g_verbose = options.verbose
+ server_host, server_port = get_host_port(args[0], SSH_PORT)
+ remote_host, remote_port = get_host_port(options.remote, SSH_PORT)
+ return options, (server_host, server_port), (remote_host, remote_port)
+
+
+def main():
+ options, server, remote = parse_options()
+
+ password = None
+ if options.readpass:
+ password = getpass.getpass('Enter SSH password: ')
+
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy())
+
+ verbose('Connecting to ssh host %s:%d ...' % (server[0], server[1]))
+ try:
+ client.connect(server[0], server[1], username=options.user, key_filename=options.keyfile,
+ look_for_keys=options.look_for_keys, password=password)
+ except Exception as e:
+ print ('*** Failed to connect to %s:%d: %r' % (server[0], server[1], e))
+ sys.exit(1)
+
+ verbose('Now forwarding port %d to %s:%d ...' % (options.port, remote[0], remote[1]))
+
+ try:
+ forward_tunnel(options.port, remote[0], remote[1], client.get_transport())
+ except KeyboardInterrupt:
+ print ('C-c: Port forwarding stopped.')
+ sys.exit(0)
+
+
+if __name__ == '__main__':
+ main()
View
10 IPython/zmq/parallel/dependency.py
@@ -11,15 +11,7 @@
class UnmetDependency(Exception):
pass
-class depend2(object):
- """dependency decorator"""
- def __init__(self, f, *args, **kwargs):
- self.dependency = (f,args,kwargs)
-
- def __call__(self, f, *args, **kwargs):
- f._dependency = self.dependency
- return decorator(_depend_wrapper, f)
-
+
class depend(object):
"""Dependency decorator, for use with tasks."""
def __init__(self, f, *args, **kwargs):
View
217 IPython/zmq/parallel/kernelstarter.py
@@ -0,0 +1,217 @@
+"""KernelStarter class that intercepts Control Queue messages, and handles process management."""
+
+from zmq.eventloop import ioloop
+from streamsession import StreamSession
+
+class KernelStarter(object):
+ """Object for resetting/killing the Kernel."""
+
+
+ def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs):
+ self.session = session
+ self.upstream = upstream
+ self.downstream = downstream
+ self.kernel_args = kernel_args
+ self.kernel_kwargs = kernel_kwargs
+ self.handlers = {}
+ for method in 'shutdown_request shutdown_reply'.split():
+ self.handlers[method] = getattr(self, method)
+
+ def start(self):
+ self.upstream.on_recv(self.dispatch_request)
+ self.downstream.on_recv(self.dispatch_reply)
+
+ #--------------------------------------------------------------------------
+ # Dispatch methods
+ #--------------------------------------------------------------------------
+
+ def dispatch_request(self, raw_msg):
+ idents, msg = self.session.feed_identities()
+ try:
+ msg = self.session.unpack_message(msg, content=False)
+ except:
+ print ("bad msg: %s"%msg)
+
+ msgtype = msg['msg_type']
+ handler = self.handlers.get(msgtype, None)
+ if handler is None:
+ self.downstream.send_multipart(raw_msg)
+ else:
+ handler(msg)
+
+ def dispatch_reply(self, raw_msg):
+ idents, msg = self.session.feed_identities()
+ try:
+ msg = self.session.unpack_message(msg, content=False)
+ except:
+ print ("bad msg: %s"%msg)
+
+ msgtype = msg['msg_type']
+ handler = self.handlers.get(msgtype, None)
+ if handler is None:
+ self.upstream.send_multipart(raw_msg)
+ else:
+ handler(msg)
+
+ #--------------------------------------------------------------------------
+ # Handlers
+ #--------------------------------------------------------------------------
+
+ def shutdown_request(self, msg):
+
+
+ #--------------------------------------------------------------------------
+ # Kernel process management methods, from KernelManager:
+ #--------------------------------------------------------------------------
+
+ def _check_local(addr):
+ if isinstance(addr, tuple):
+ addr = addr[0]
+ return addr in LOCAL_IPS
+
+ def start_kernel(self, **kw):
+ """Starts a kernel process and configures the manager to use it.
+
+ If random ports (port=0) are being used, this method must be called
+ before the channels are created.
+
+ Parameters:
+ -----------
+ ipython : bool, optional (default True)
+ Whether to use an IPython kernel instead of a plain Python kernel.
+ """
+ self.kernel = Process(target=make_kernel, args=self.kernel_args,
+ kwargs=self.kernel_kwargs)
+
+ def shutdown_kernel(self, restart=False):
+ """ Attempts to the stop the kernel process cleanly. If the kernel
+ cannot be stopped, it is killed, if possible.
+ """
+ # FIXME: Shutdown does not work on Windows due to ZMQ errors!
+ if sys.platform == 'win32':
+ self.kill_kernel()
+ return
+
+ # Don't send any additional kernel kill messages immediately, to give
+ # the kernel a chance to properly execute shutdown actions. Wait for at
+ # most 1s, checking every 0.1s.
+ self.xreq_channel.shutdown(restart=restart)
+ for i in range(10):
+ if self.is_alive:
+ time.sleep(0.1)
+ else:
+ break
+ else:
+ # OK, we've waited long enough.
+ if self.has_kernel:
+ self.kill_kernel()
+
+ def restart_kernel(self, now=False):
+ """Restarts a kernel with the same arguments that were used to launch
+ it. If the old kernel was launched with random ports, the same ports
+ will be used for the new kernel.
+
+ Parameters
+ ----------
+ now : bool, optional
+ If True, the kernel is forcefully restarted *immediately*, without
+ having a chance to do any cleanup action. Otherwise the kernel is
+ given 1s to clean up before a forceful restart is issued.
+
+ In all cases the kernel is restarted, the only difference is whether
+ it is given a chance to perform a clean shutdown or not.
+ """
+ if self._launch_args is None:
+ raise RuntimeError("Cannot restart the kernel. "
+ "No previous call to 'start_kernel'.")
+ else:
+ if self.has_kernel:
+ if now:
+ self.kill_kernel()
+ else:
+ self.shutdown_kernel(restart=True)
+ self.start_kernel(**self._launch_args)
+
+ # FIXME: Messages get dropped in Windows due to probable ZMQ bug
+ # unless there is some delay here.
+ if sys.platform == 'win32':
+ time.sleep(0.2)
+
+ @property
+ def has_kernel(self):
+ """Returns whether a kernel process has been specified for the kernel
+ manager.
+ """
+ return self.kernel is not None
+
+ def kill_kernel(self):
+ """ Kill the running kernel. """
+ if self.has_kernel:
+ # Pause the heart beat channel if it exists.
+ if self._hb_channel is not None:
+ self._hb_channel.pause()
+
+ # Attempt to kill the kernel.
+ try:
+ self.kernel.kill()
+ except OSError, e:
+ # In Windows, we will get an Access Denied error if the process
+ # has already terminated. Ignore it.
+ if not (sys.platform == 'win32' and e.winerror == 5):
+ raise
+ self.kernel = None
+ else:
+ raise RuntimeError("Cannot kill kernel. No kernel is running!")
+
+ def interrupt_kernel(self):
+ """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
+ well supported on all platforms.
+ """
+ if self.has_kernel:
+ if sys.platform == 'win32':
+ from parentpoller import ParentPollerWindows as Poller
+ Poller.send_interrupt(self.kernel.win32_interrupt_event)
+ else:
+ self.kernel.send_signal(signal.SIGINT)
+ else:
+ raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
+
+ def signal_kernel(self, signum):
+ """ Sends a signal to the kernel. Note that since only SIGTERM is
+ supported on Windows, this function is only useful on Unix systems.
+ """
+ if self.has_kernel:
+ self.kernel.send_signal(signum)
+ else:
+ raise RuntimeError("Cannot signal kernel. No kernel is running!")
+
+ @property
+ def is_alive(self):
+ """Is the kernel process still running?"""
+ # FIXME: not using a heartbeat means this method is broken for any
+ # remote kernel, it's only capable of handling local kernels.
+ if self.has_kernel:
+ if self.kernel.poll() is None:
+ return True
+ else:
+ return False
+ else:
+ # We didn't start the kernel with this KernelManager so we don't
+ # know if it is running. We should use a heartbeat for this case.
+ return True
+
+
+def make_starter(up_addr, down_addr, *args, **kwargs):
+ """entry point script for launching a kernelstarter in a subprocess"""
+ loop = ioloop.IOLoop.instance()
+ ctx = zmq.Context()
+ session = StreamSession()
+ upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
+ upstream.connect(up_addr)
+ downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
+ downstream.connect(down_addr)
+
+ starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
+ starter.start()
+ loop.start()
+
View
123 IPython/zmq/tunnel.py
@@ -0,0 +1,123 @@
+
+
+#-----------------------------------------
+# Imports
+#-----------------------------------------
+
+from __future__ import print_function
+
+import os,sys
+from multiprocessing import Process
+from getpass import getpass, getuser
+
+try:
+ import paramiko
+except ImportError:
+ paramiko = None
+else:
+ from forward import forward_tunnel
+
+from IPython.external import pexpect
+
+
+def launch_ssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, timeout=15):
+ """Create an ssh tunnel using command-line ssh that connects port lport
+ on this machine to localhost:rport on server. The tunnel
+ will automatically close when not in use, remaining open
+ for a minimum of timeout seconds for an initial connection.
+ """
+ ssh="ssh "
+ if keyfile:
+ ssh += "-i " + keyfile
+ cmd = ssh + " -f -L %i:127.0.0.1:%i %s sleep %i"%(lport, rport, server, timeout)
+ tunnel = pexpect.spawn(cmd)
+ failed = False
+ while True:
+ try:
+ tunnel.expect('[Pp]assword:', timeout=.1)
+ except pexpect.TIMEOUT:
+ continue
+ except pexpect.EOF:
+ if tunnel.exitstatus:
+ print (tunnel.exitstatus)
+ print (tunnel.before)
+ print (tunnel.after)
+ raise RuntimeError("tunnel '%s' failed to start"%(cmd))
+ else:
+ return tunnel.pid
+ else:
+ if failed:
+ print("Password rejected, try again")
+ tunnel.sendline(getpass())
+ failed = True
+
+def _split_server(server):
+ if '@' in server:
+ username,server = server.split('@', 1)
+ else:
+ username = getuser()
+ if ':' in server:
+ server, port = server.split(':')
+ port = int(port)
+ else:
+ port = 22
+ return username, server, port
+
+def launch_paramiko_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None):
+ """launch a tunner with paramiko in a subprocess"""
+ if paramiko is None:
+ raise ImportError("Paramiko not available")
+ server = _split_server(server)
+ if keyfile is None:
+ passwd = getpass("%s@%s's password: "%(server[0], server[1]))
+ else:
+ passwd = None
+ p = Process(target=_paramiko_tunnel,
+ args=(lport, rport, server, remoteip),
+ kwargs=dict(keyfile=keyfile, password=passwd))
+ p.daemon=False
+ p.start()
+ return p
+
+
+def _paramiko_tunnel(lport, rport, server, remoteip, keyfile=None, password=None):
+ """function for actually starting a paramiko tunnel, to be passed
+ to multiprocessing.Process(target=this).
+ """
+ username, server, port = server
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy())
+
+ try:
+ client.connect(server, port, username=username, key_filename=keyfile,
+ look_for_keys=True, password=password)
+ except Exception as e:
+ print ('*** Failed to connect to %s:%d: %r' % (server, port, e))
+ sys.exit(1)
+
+ print ('Now forwarding port %d to %s:%d ...' % (lport, server, rport))
+
+ try:
+ forward_tunnel(lport, remoteip, rport, client.get_transport())
+ except KeyboardInterrupt:
+ print ('C-c: Port forwarding stopped.')
+ sys.exit(0)
+
+
+__all__ = ['launch_ssh_tunnel', 'launch_paramiko_tunnel']
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

0 comments on commit 9b5cebb

Please sign in to comment.
Something went wrong with that request. Please try again.