Skip to content
Permalink
Browse files

ssh: better OpenSSH 7.5+ permission denied handling

The user@host prefix in new-style OpenSSH messages unfortunately takes
the host part from ~/.ssh/config and friends. There is no way to know
which hostname will appear in this string without parsing the OpenSSH
config, nor which username will appear.

Instead just regex it.

Add SSH stub modes to print the new/old errors and add some simple
tests.

This extends the work done in b9112a9
  • Loading branch information
dw committed Oct 30, 2018
1 parent b527ff0 commit 16ca111ebdb08b901572d0bd446a7ab1a183944a
Showing with 43 additions and 11 deletions.
  1. +9 −6 mitogen/ssh.py
  2. +16 −2 tests/data/stubs/ssh.py
  3. +18 −3 tests/ssh_test.py
@@ -31,6 +31,7 @@
"""

import logging
import re
import time

try:
@@ -46,10 +47,16 @@

# sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = b('password')
PERMDENIED_PROMPT = b('permission denied')
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?')
HOSTKEY_FAIL = b('host key verification failed.')

# [user@host: ] permission denied
PERMDENIED_RE = re.compile(
('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5
'Permission denied').encode(),
re.I
)


DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))

@@ -289,11 +296,7 @@ def _connect_bootstrap(self, extra_fd):
self._host_key_prompt()
elif HOSTKEY_FAIL in buf.lower():
raise HostKeyError(self.hostkey_failed_msg)
elif buf.lower().startswith((
PERMDENIED_PROMPT,
b("%s@%s: " % (self.username, self.hostname))
+ PERMDENIED_PROMPT,
)):
elif PERMDENIED_RE.match(buf):
# issue #271: work around conflict with user shell reporting
# 'permission denied' e.g. during chdir($HOME) by only matching
# it at the start of the line.
@@ -15,6 +15,10 @@

HOST_KEY_STRICT_MSG = """Host key verification failed.\n"""

PERMDENIED_CLASSIC_MSG = 'Permission denied (publickey,password)\n'
PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n'



def tty(msg):
fp = open('/dev/tty', 'wb', 0)
@@ -37,13 +41,23 @@ def confirm(msg):
fp.close()


if os.getenv('FAKESSH_MODE') == 'ask':
mode = os.getenv('FAKESSH_MODE')

if mode == 'ask':
assert 'y\n' == confirm(HOST_KEY_ASK_MSG)

if os.getenv('FAKESSH_MODE') == 'strict':
elif mode == 'strict':
stderr(HOST_KEY_STRICT_MSG)
sys.exit(255)

elif mode == 'permdenied_classic':
stderr(PERMDENIED_CLASSIC_MSG)
sys.exit(255)

elif mode == 'permdenied_75':
stderr(PERMDENIED_75_MSG)
sys.exit(255)


#
# Set an env var if stderr was a TTY to make ssh_test tests easier to write.
@@ -124,9 +124,10 @@ def test_verbose_enabled(self):
self.assertEquals(name, context.name)


class RequirePtyTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream

class FakeSshMixin(testlib.RouterMixin):
"""
Mix-in that provides :meth:`fake_ssh` executing the stub 'ssh.py'.
"""
def fake_ssh(self, FAKESSH_MODE=None, **kwargs):
os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE)
try:
@@ -139,6 +140,20 @@ def fake_ssh(self, FAKESSH_MODE=None, **kwargs):
finally:
del os.environ['FAKESSH_MODE']


class PermissionDeniedTest(FakeSshMixin, testlib.TestCase):
def test_classic_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.fake_ssh(FAKESSH_MODE='permdenied_classic'))

def test_openssh_75_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.fake_ssh(FAKESSH_MODE='permdenied_75'))


class RequirePtyTest(FakeSshMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream

def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept
context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept')

0 comments on commit 16ca111

Please sign in to comment.