Skip to content
This repository

Add SSH tunneling to engines #685

Merged
merged 6 commits into from over 2 years ago

2 participants

Min RK Fernando Perez
Min RK
Owner

This copies the basic ssh tunneling from the Client to the Engine. The same semantics apply. In the controller, the ssh server is configured separately from the client ssh server, to prevent unnecessary tunneling from engines to the controller.

Fernando Perez
Owner

This is super useful, but I think it would be really good to have even a short paragraph illustrating this in the docs. Not all users are familiar with the details of ssh tunneling, so I'm sure a short illustrative example will do lots of good here.

Otherwise, the code looks solid (and the current test suite still passes) but I'm a little concerned that there's no test coverage at all, despite a fair amount of new functionality. I know that testing multiprocess things like this is super tricky, but even some light tests that at least do api validation will help us catch silly mistakes in the future. Obviously if you can think of some robust tests for it, that would be even better.

Beyond some docs and testing, it looks otherwise great for merging. Thanks for the excellent work, this will be very useful!

Min RK
Owner

We simply can't test ssh tunneling except by hand. We can't depend on ssh being installed, or passwordless keys, or permissions, etc. Testing ssh in any meaningful way simply requires the use of multiple machines.

I'll toss up an example in the docs, though.

added some commits August 07, 2011
Min RK split open_tunnel part of tunnel_connection into separate method
This allows connection forwarding without establishing the final connection

(needed if the final connection is delayed, e.g. heartbeats)
c6e1b5b
Min RK add ssh tunneling to Engine
'enginessh' alias added to ipcontroller to new IPControllerApp.engine_ssh_server

ssh/keyfile added to ipengine/EngineFactory
d58d98a
Min RK remove now-obsolete note that engine's don't support ssh 6ba9d0a
Min RK add delay configurable to EngineSetLaunchers
c/o @gzahl
6d0679c
Min RK add ssh tunnel notes to parallel process doc fb00667
Min RK
Owner

simple engine ssh example added to parallel docs

Fernando Perez fperez merged commit 52dffc0 into from August 16, 2011
Fernando Perez fperez closed this August 16, 2011
Fernando Perez fperez referenced this pull request from a commit January 10, 2012
Commit has since been removed from the repository and is no longer available.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 6 unique commits by 1 author.

Aug 16, 2011
Min RK split open_tunnel part of tunnel_connection into separate method
This allows connection forwarding without establishing the final connection

(needed if the final connection is delayed, e.g. heartbeats)
c6e1b5b
Min RK add ssh tunneling to Engine
'enginessh' alias added to ipcontroller to new IPControllerApp.engine_ssh_server

ssh/keyfile added to ipengine/EngineFactory
d58d98a
Min RK remove now-obsolete note that engine's don't support ssh 6ba9d0a
Min RK add delay configurable to EngineSetLaunchers
c/o @gzahl
6d0679c
Min RK add ssh tunnel notes to parallel process doc fb00667
Min RK specify sshkey is *private* 852ec36
This page is out of date. Refresh to see the latest.
19  IPython/external/ssh/tunnel.py
@@ -110,6 +110,22 @@ def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramik
110 110
     selected local port of the tunnel.
111 111
     
112 112
     """
  113
+    new_url, tunnel = open_tunnel(addr, server, keyfile=keyfile, password=password, paramiko=paramiko)
  114
+    socket.connect(new_url)
  115
+    return tunnel
  116
+
  117
+
  118
+def open_tunnel(addr, server, keyfile=None, password=None, paramiko=None):
  119
+    """Open a tunneled connection from a 0MQ url.
  120
+    
  121
+    For use inside tunnel_connection.
  122
+    
  123
+    Returns
  124
+    -------
  125
+    
  126
+    (url, tunnel): The 0MQ url that has been forwarded, and the tunnel object
  127
+    """
  128
+    
113 129
     lport = select_random_ports(1)[0]
114 130
     transport, addr = addr.split('://')
115 131
     ip,rport = addr.split(':')
@@ -121,8 +137,7 @@ def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramik
121 137
     else:
122 138
         tunnelf = openssh_tunnel
123 139
     tunnel = tunnelf(lport, rport, server, remoteip=ip, keyfile=keyfile, password=password)
124  
-    socket.connect('tcp://127.0.0.1:%i'%lport)
125  
-    return tunnel
  140
+    return 'tcp://127.0.0.1:%i'%lport, tunnel
126 141
 
127 142
 def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, password=None, timeout=15):
128 143
     """Create an ssh tunnel using command-line ssh that connects port lport
12  IPython/parallel/apps/ipcontrollerapp.py
@@ -116,6 +116,7 @@
116 116
 aliases = dict(
117 117
     secure = 'IPControllerApp.secure',
118 118
     ssh = 'IPControllerApp.ssh_server',
  119
+    enginessh = 'IPControllerApp.engine_ssh_server',
119 120
     location = 'IPControllerApp.location',
120 121
 
121 122
     ident = 'Session.session',
@@ -158,6 +159,11 @@ class IPControllerApp(BaseParallelApplication):
158 159
         processes. It should be of the form: [user@]server[:port]. The
159 160
         Controller's listening addresses must be accessible from the ssh server""",
160 161
     )
  162
+    engine_ssh_server = Unicode(u'', config=True,
  163
+        help="""ssh url for engines to use when connecting to the Controller
  164
+        processes. It should be of the form: [user@]server[:port]. The
  165
+        Controller's listening addresses must be accessible from the ssh server""",
  166
+    )
161 167
     location = Unicode(u'', config=True,
162 168
         help="""The external IP or domain name of the Controller, used for disambiguating
163 169
         engine and client connections.""",
@@ -218,6 +224,8 @@ def load_config_from_json(self):
218 224
         c.HubFactory.engine_ip = ip
219 225
         c.HubFactory.regport = int(ports)
220 226
         self.location = cfg['location']
  227
+        if not self.engine_ssh_server:
  228
+            self.engine_ssh_server = cfg['ssh']
221 229
         # load client config
222 230
         with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
223 231
             cfg = json.loads(f.read())
@@ -226,7 +234,8 @@ def load_config_from_json(self):
226 234
         c.HubFactory.client_transport = xport
227 235
         ip,ports = addr.split(':')
228 236
         c.HubFactory.client_ip = ip
229  
-        self.ssh_server = cfg['ssh']
  237
+        if not self.ssh_server:
  238
+            self.ssh_server = cfg['ssh']
230 239
         assert int(ports) == c.HubFactory.regport, "regport mismatch"
231 240
     
232 241
     def init_hub(self):
@@ -271,6 +280,7 @@ def init_hub(self):
271 280
             self.save_connection_dict('ipcontroller-client.json', cdict)
272 281
             edict = cdict
273 282
             edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
  283
+            edict['ssh'] = self.engine_ssh_server
274 284
             self.save_connection_dict('ipcontroller-engine.json', edict)
275 285
 
276 286
     #
49  IPython/parallel/apps/ipengineapp.py
@@ -118,6 +118,8 @@ def _on_use_changed(self, old, new):
118 118
     keyfile = 'Session.keyfile',
119 119
 
120 120
     url = 'EngineFactory.url',
  121
+    ssh = 'EngineFactory.sshserver',
  122
+    sshkey = 'EngineFactory.sshkey',
121 123
     ip = 'EngineFactory.ip',
122 124
     transport = 'EngineFactory.transport',
123 125
     port = 'EngineFactory.regport',
@@ -192,6 +194,40 @@ def find_url_file(self):
192 194
                 self.profile_dir.security_dir,
193 195
                 self.url_file_name
194 196
             )
  197
+    
  198
+    def load_connector_file(self):
  199
+        """load config from a JSON connector file,
  200
+        at a *lower* priority than command-line/config files.
  201
+        """
  202
+        
  203
+        self.log.info("Loading url_file %r"%self.url_file)
  204
+        config = self.config
  205
+        
  206
+        with open(self.url_file) as f:
  207
+            d = json.loads(f.read())
  208
+        
  209
+        try:
  210
+            config.Session.key
  211
+        except AttributeError:
  212
+            if d['exec_key']:
  213
+                config.Session.key = asbytes(d['exec_key'])
  214
+                
  215
+        try:
  216
+            config.EngineFactory.location
  217
+        except AttributeError:
  218
+            config.EngineFactory.location = d['location']
  219
+        
  220
+        d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
  221
+        try:
  222
+            config.EngineFactory.url
  223
+        except AttributeError:
  224
+            config.EngineFactory.url = d['url']
  225
+        
  226
+        try:
  227
+            config.EngineFactory.sshserver
  228
+        except AttributeError:
  229
+            config.EngineFactory.sshserver = d['ssh']
  230
+        
195 231
     def init_engine(self):
196 232
         # This is the working dir by now.
197 233
         sys.path.insert(0, '')
@@ -219,14 +255,7 @@ def init_engine(self):
219 255
                 time.sleep(0.1)
220 256
             
221 257
         if os.path.exists(self.url_file):
222  
-            self.log.info("Loading url_file %r"%self.url_file)
223  
-            with open(self.url_file) as f:
224  
-                d = json.loads(f.read())
225  
-            if d['exec_key']:
226  
-                config.Session.key = asbytes(d['exec_key'])
227  
-            d['url'] = disambiguate_url(d['url'], d['location'])
228  
-            config.EngineFactory.url = d['url']
229  
-            config.EngineFactory.location = d['location']
  258
+            self.load_connector_file()
230 259
         elif not url_specified:
231 260
             self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
232 261
             self.exit(1)
@@ -253,7 +282,7 @@ def init_engine(self):
253 282
         except:
254 283
             self.log.error("Couldn't start the Engine", exc_info=True)
255 284
             self.exit(1)
256  
-        
  285
+    
257 286
     def forward_logging(self):
258 287
         if self.log_url:
259 288
             self.log.info("Forwarding logging to %s"%self.log_url)
@@ -265,7 +294,7 @@ def forward_logging(self):
265 294
             handler.setLevel(self.log_level)
266 295
             self.log.addHandler(handler)
267 296
             self._log_handler = handler
268  
-    #
  297
+    
269 298
     def init_mpi(self):
270 299
         global mpi
271 300
         self.mpi = MPI(config=self.config)
12  IPython/parallel/apps/launcher.py
@@ -56,7 +56,7 @@ def check_output(*args, **kwargs):
56 56
 from IPython.config.application import Application
57 57
 from IPython.config.configurable import LoggingConfigurable
58 58
 from IPython.utils.text import EvalFormatter
59  
-from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
  59
+from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 60
 from IPython.utils.path import get_ipython_module_path
61 61
 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62 62
 
@@ -364,6 +364,12 @@ class LocalEngineSetLauncher(BaseLauncher):
364 364
         ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 365
         help="command-line arguments to pass to ipengine"
366 366
     )
  367
+    delay = CFloat(0.1, config=True,
  368
+        help="""delay (in seconds) between starting each engine after the first.
  369
+        This can help force the engines to get their ids in order, or limit
  370
+        process flood when starting many engines."""
  371
+    )
  372
+    
367 373
     # launcher class
368 374
     launcher_class = LocalEngineLauncher
369 375
     
@@ -381,6 +387,8 @@ def start(self, n, profile_dir):
381 387
         self.profile_dir = unicode(profile_dir)
382 388
         dlist = []
383 389
         for i in range(n):
  390
+            if i > 0:
  391
+                time.sleep(self.delay)
384 392
             el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
385 393
             # Copy the engine args over to each engine launcher.
386 394
             el.engine_args = copy.deepcopy(self.engine_args)
@@ -603,6 +611,8 @@ def start(self, n, profile_dir):
603 611
             else:
604 612
                 user=None
605 613
             for i in range(n):
  614
+                if i > 0:
  615
+                    time.sleep(self.delay)
606 616
                 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
607 617
                 
608 618
                 # Copy the engine args over to each engine launcher.
2  IPython/parallel/client/client.py
@@ -171,7 +171,7 @@ class Client(HasTraits):
171 171
         A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
172 172
         If keyfile or password is specified, and this is not, it will default to
173 173
         the ip given in addr.
174  
-    sshkey : str; path to public ssh key file
  174
+    sshkey : str; path to ssh private key file
175 175
         This specifies a key to be used in ssh login, default None.
176 176
         Regular default ssh keys will be used without specifying this argument.
177 177
     password : str 
89  IPython/parallel/engine/engine.py
@@ -17,12 +17,16 @@
17 17
 
18 18
 import sys
19 19
 import time
  20
+from getpass import getpass
20 21
 
21 22
 import zmq
22 23
 from zmq.eventloop import ioloop, zmqstream
23 24
 
  25
+from IPython.external.ssh import tunnel
24 26
 # internal
25  
-from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
  27
+from IPython.utils.traitlets import (
  28
+    Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool
  29
+)
26 30
 # from IPython.utils.localinterfaces import LOCALHOST 
27 31
 
28 32
 from IPython.parallel.controller.heartmonitor import Heart
@@ -50,6 +54,12 @@ class EngineFactory(RegistrationFactory):
50 54
     timeout=CFloat(2,config=True,
51 55
         help="""The time (in seconds) to wait for the Controller to respond
52 56
         to registration requests before giving up.""")
  57
+    sshserver=Unicode(config=True,
  58
+        help="""The SSH server to use for tunneling connections to the Controller.""")
  59
+    sshkey=Unicode(config=True,
  60
+        help="""The SSH private key file to use when tunneling connections to the Controller.""")
  61
+    paramiko=Bool(sys.platform == 'win32', config=True,
  62
+        help="""Whether to use paramiko instead of openssh for tunnels.""")
53 63
     
54 64
     # not configurable:
55 65
     user_ns=Dict()
@@ -61,28 +71,70 @@ class EngineFactory(RegistrationFactory):
61 71
     ident = Unicode()
62 72
     def _ident_changed(self, name, old, new):
63 73
         self.bident = asbytes(new)
  74
+    using_ssh=Bool(False)
64 75
     
65 76
     
66 77
     def __init__(self, **kwargs):
67 78
         super(EngineFactory, self).__init__(**kwargs)
68 79
         self.ident = self.session.session
69  
-        ctx = self.context
  80
+    
  81
+    def init_connector(self):
  82
+        """construct connection function, which handles tunnels."""
  83
+        self.using_ssh = bool(self.sshkey or self.sshserver)
70 84
         
71  
-        reg = ctx.socket(zmq.XREQ)
72  
-        reg.setsockopt(zmq.IDENTITY, self.bident)
73  
-        reg.connect(self.url)
74  
-        self.registrar = zmqstream.ZMQStream(reg, self.loop)
  85
+        if self.sshkey and not self.sshserver:
  86
+            # We are using ssh directly to the controller, tunneling localhost to localhost
  87
+            self.sshserver = self.url.split('://')[1].split(':')[0]
  88
+        
  89
+        if self.using_ssh:
  90
+            if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
  91
+                password=False
  92
+            else:
  93
+                password = getpass("SSH Password for %s: "%self.sshserver)
  94
+        else:
  95
+            password = False
  96
+        
  97
+        def connect(s, url):
  98
+            url = disambiguate_url(url, self.location)
  99
+            if self.using_ssh:
  100
+                self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
  101
+                return tunnel.tunnel_connection(s, url, self.sshserver,
  102
+                            keyfile=self.sshkey, paramiko=self.paramiko,
  103
+                            password=password,
  104
+                )
  105
+            else:
  106
+                return s.connect(url)
  107
+        
  108
+        def maybe_tunnel(url):
  109
+            """like connect, but don't complete the connection (for use by heartbeat)"""
  110
+            url = disambiguate_url(url, self.location)
  111
+            if self.using_ssh:
  112
+                self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
  113
+                url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
  114
+                            keyfile=self.sshkey, paramiko=self.paramiko,
  115
+                            password=password,
  116
+                )
  117
+            return url
  118
+        return connect, maybe_tunnel
75 119
         
76 120
     def register(self):
77 121
         """send the registration_request"""
78 122
         
79 123
         self.log.info("Registering with controller at %s"%self.url)
  124
+        ctx = self.context
  125
+        connect,maybe_tunnel = self.init_connector()
  126
+        reg = ctx.socket(zmq.XREQ)
  127
+        reg.setsockopt(zmq.IDENTITY, self.bident)
  128
+        connect(reg, self.url)
  129
+        self.registrar = zmqstream.ZMQStream(reg, self.loop)
  130
+        
  131
+        
80 132
         content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
81  
-        self.registrar.on_recv(self.complete_registration)
  133
+        self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
82 134
         # print (self.session.key)
83 135
         self.session.send(self.registrar, "registration_request",content=content)
84 136
     
85  
-    def complete_registration(self, msg):
  137
+    def complete_registration(self, msg, connect, maybe_tunnel):
86 138
         # print msg
87 139
         self._abort_dc.stop()
88 140
         ctx = self.context
@@ -94,6 +146,14 @@ def complete_registration(self, msg):
94 146
         if msg.content.status == 'ok':
95 147
             self.id = int(msg.content.id)
96 148
             
  149
+            # launch heartbeat
  150
+            hb_addrs = msg.content.heartbeat
  151
+            
  152
+            # possibly forward hb ports with tunnels
  153
+            hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
  154
+            heart = Heart(*map(str, hb_addrs), heart_id=identity)
  155
+            heart.start()
  156
+            
97 157
             # create Shell Streams (MUX, Task, etc.):
98 158
             queue_addr = msg.content.mux
99 159
             shell_addrs = [ str(queue_addr) ]
@@ -114,24 +174,20 @@ def complete_registration(self, msg):
114 174
             stream.setsockopt(zmq.IDENTITY, identity)
115 175
             shell_streams = [stream]
116 176
             for addr in shell_addrs:
117  
-                stream.connect(disambiguate_url(addr, self.location))
  177
+                connect(stream, addr)
118 178
             # end single stream-socket
119 179
             
120 180
             # control stream:
121 181
             control_addr = str(msg.content.control)
122 182
             control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
123 183
             control_stream.setsockopt(zmq.IDENTITY, identity)
124  
-            control_stream.connect(disambiguate_url(control_addr, self.location))
  184
+            connect(control_stream, control_addr)
125 185
             
126 186
             # create iopub stream:
127 187
             iopub_addr = msg.content.iopub
128 188
             iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
129 189
             iopub_stream.setsockopt(zmq.IDENTITY, identity)
130  
-            iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
131  
-            
132  
-            # launch heartbeat
133  
-            hb_addrs = msg.content.heartbeat
134  
-            # print (hb_addrs)
  190
+            connect(iopub_stream, iopub_addr)
135 191
             
136 192
             # # Redirect input streams and set a display hook.
137 193
             if self.out_stream_factory:
@@ -147,9 +203,6 @@ def complete_registration(self, msg):
147 203
                     control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, 
148 204
                     loop=loop, user_ns = self.user_ns, log=self.log)
149 205
             self.kernel.start()
150  
-            hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
151  
-            heart = Heart(*map(str, hb_addrs), heart_id=identity)
152  
-            heart.start()
153 206
             
154 207
             
155 208
         else:
43  docs/source/parallel/parallel_process.txt
@@ -484,6 +484,49 @@ The ``file`` flag works like this::
484 484
     (:file:`~/.ipython/profile_<name>/security` is the same on all of them), then things
485 485
     will just work!
486 486
 
  487
+SSH Tunnels
  488
+***********
  489
+
  490
+If your engines are not on the same LAN as the controller, or you are on a highly
  491
+restricted network where your nodes cannot see each others ports, then you can
  492
+use SSH tunnels to connect engines to the controller.
  493
+
  494
+.. note::
  495
+
  496
+    This does not work in all cases.  Manual tunnels may be an option, but are
  497
+    highly inconvenient. Support for manual tunnels will be improved.
  498
+
  499
+You can instruct all engines to use ssh, by specifying the ssh server in 
  500
+:file:`ipcontroller-engine.json`:
  501
+
  502
+.. I know this is really JSON, but the example is a subset of Python:
  503
+.. sourcecode:: python
  504
+
  505
+    {
  506
+      "url":"tcp://192.168.1.123:56951",
  507
+      "exec_key":"26f4c040-587d-4a4e-b58b-030b96399584",
  508
+      "ssh":"user@example.com",
  509
+      "location":"192.168.1.123"
  510
+    }
  511
+
  512
+This will be specified if you give the ``--enginessh=use@example.com`` argument when
  513
+starting :command:`ipcontroller`.
  514
+
  515
+Or you can specify an ssh server on the command-line when starting an engine::
  516
+
  517
+    $> ipengine --profile=foo --ssh=my.login.node
  518
+
  519
+For example, if your system is totally restricted, then all connections will actually be
  520
+loopback, and ssh tunnels will be used to connect engines to the controller::
  521
+
  522
+    [node1] $> ipcontroller --enginessh=node1
  523
+    [node2] $> ipengine
  524
+    [node3] $> ipcluster engines --n=4
  525
+
  526
+Or if you want to start many engines on each node, the command `ipcluster engines --n=4`
  527
+without any configuration is equivalent to running ipengine 4 times.
  528
+
  529
+
487 530
 Make JSON files persistent
488 531
 --------------------------
489 532
 
3  docs/source/parallel/parallel_security.txt
@@ -105,9 +105,6 @@ use OpenSSH or Paramiko, or the tunneling utilities are insufficient, then they
105 105
 construct the tunnels themselves, and simply connect clients and engines as if the
106 106
 controller were on loopback on the connecting machine.
107 107
 
108  
-.. note::
109  
-
110  
-    There is not currently tunneling available for engines.
111 108
 
112 109
 Authentication
113 110
 --------------
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.