Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Red-M committed Sep 29, 2023
2 parents ae982f6 + 300668b commit 42cc1c1
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 19 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/ci.yml
Expand Up @@ -11,11 +11,19 @@ env:

jobs:
test:
runs-on: ubuntu-22.04
name: Test Python ${{ matrix.python-version }} on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-22.04"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "pypy3.9"]
include:
# Python < 3.7 is not available on ubuntu-22.04
- os: "ubuntu-20.04"
python-version: "3.5"
- os: "ubuntu-20.04"
python-version: "3.6"

steps:
- uses: actions/checkout@v4
Expand All @@ -28,6 +36,7 @@ jobs:

- name: Install packages
run: |
sudo apt-get install --yes zsh
export PYTHONIOENCODING=UTF8
pip install -r requirements-testing.txt
Expand Down
8 changes: 6 additions & 2 deletions pexpect/pxssh.py
Expand Up @@ -143,6 +143,7 @@ def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
# used to set shell command-line prompt to UNIQUE_PROMPT.
self.PROMPT_SET_SH = r"PS1='[PEXPECT]\$ '"
self.PROMPT_SET_CSH = r"set prompt='[PEXPECT]\$ '"
self.PROMPT_SET_ZSH = "prompt restore;\nPS1='[PEXPECT]%(!.#.$) '"
self.SSH_OPTS = (" -o 'PubkeyAuthentication=no'")
# Disabling host key checking, makes you vulnerable to MITM attacks.
# + " -o 'StrictHostKeyChecking=no'"
Expand Down Expand Up @@ -529,8 +530,11 @@ def set_unique_prompt(self):
if i == 0: # csh-style
self.sendline(self.PROMPT_SET_CSH)
i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
if i == 0:
return False
if i == 0: # zsh-style
self.sendline(self.PROMPT_SET_ZSH)
i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
if i == 0:
return False
return True

# vi:ts=4:sw=4:expandtab:ft=python:
20 changes: 13 additions & 7 deletions pexpect/replwrap.py
Expand Up @@ -112,19 +112,25 @@ def python(command=sys.executable):
"""Start a Python shell and return a :class:`REPLWrapper` object."""
return REPLWrapper(command, u">>> ", u"import sys; sys.ps1={0!r}; sys.ps2={1!r}")

def bash(command="bash"):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh')
child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False,
encoding='utf-8')
def _repl_sh(command, args, non_printable_insert):
child = pexpect.spawn(command, args, echo=False, encoding='utf-8')

# If the user runs 'env', the value of PS1 will be in the output. To avoid
# replwrap seeing that as the next prompt, we'll embed the marker characters
# for invisible characters in the prompt; these show up when inspecting the
# environment variable, but not when bash displays the prompt.
ps1 = PEXPECT_PROMPT[:5] + u'\\[\\]' + PEXPECT_PROMPT[5:]
ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\\[\\]' + PEXPECT_CONTINUATION_PROMPT[5:]
ps1 = PEXPECT_PROMPT[:5] + non_printable_insert + PEXPECT_PROMPT[5:]
ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + non_printable_insert + PEXPECT_CONTINUATION_PROMPT[5:]
prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)

return REPLWrapper(child, u'\\$', prompt_change,
extra_init_cmd="export PAGER=cat")

def bash(command="bash"):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh')
return _repl_sh(command, ['--rcfile', bashrc], non_printable_insert='\\[\\]')

def zsh(command="zsh", args=("--no-rcs", "-V", "+Z")):
"""Start a zsh shell and return a :class:`REPLWrapper` object."""
return _repl_sh(command, list(args), non_printable_insert='%(!..)')
27 changes: 18 additions & 9 deletions tests/fakessh/ssh
Expand Up @@ -9,27 +9,22 @@ if not PY3:
input = raw_input

ssh_usage = "usage: ssh [-2qV] [-c cipher_spec] [-l login_name]\r\n" \
+ " hostname"
+ " hostname [shell]"

cipher_valid_list = ['aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'arcfour256', 'arcfour128', \
'aes128-cbc','3des-cbc','blowfish-cbc','cast128-cbc','aes192-cbc', \
'aes256-cbc','arcfour']

try:
server = sys.argv[-1]
if server == 'noserver':
print('No route to host')
sys.exit(1)

elif len(sys.argv) < 2:
if len(sys.argv) < 2:
print(ssh_usage)
sys.exit(1)

cipher = ''
cipher_list = []
fullCmdArguments = sys.argv
argumentList = fullCmdArguments[1:]
unixOptions = "2qVc:l"
unixOptions = "2qVc:l:"
arguments, values = getopt.getopt(argumentList, unixOptions)
for currentArgument, currentValue in arguments:
if currentArgument in ("-2"):
Expand All @@ -45,6 +40,14 @@ try:
print("Unknown cipher type '" + str(cipher_item) + "'")
sys.exit(1)

server = values[0]
if server == 'noserver':
print('No route to host')
sys.exit(1)

shell = 'bash'
if len(values) > 1:
shell = values[1]

except Exception as e:
print(ssh_usage)
Expand All @@ -62,7 +65,13 @@ prompt = "$"
while True:
cmd = input(prompt)
if cmd.startswith('PS1='):
prompt = eval(cmd[4:]).replace(r'\$', '$')
if shell == 'bash':
prompt = eval(cmd[4:]).replace(r'\$', '$')
elif shell == 'zsh':
prompt = eval(cmd[4:]).replace('%(!.#.$)', '$')
elif cmd.startswith('set prompt='):
if shell.endswith('csh'):
prompt = eval(cmd[11:]).replace(r'\$', '$')
elif cmd == 'ping':
print('pong')
elif cmd.startswith('ls'):
Expand Down
24 changes: 24 additions & 0 deletions tests/test_pxssh.py
Expand Up @@ -276,5 +276,29 @@ def test_failed_custom_ssh_cmd(self):
else:
assert False, 'should have raised exception, pxssh.ExceptionPxssh'

def test_login_bash(self):
ssh = pxssh.pxssh()
result = ssh.login('server bash', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()

def test_login_zsh(self):
ssh = pxssh.pxssh()
result = ssh.login('server zsh', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()

def test_login_tcsh(self):
ssh = pxssh.pxssh()
result = ssh.login('server tcsh', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()

if __name__ == '__main__':
unittest.main()
12 changes: 12 additions & 0 deletions tests/test_replwrap.py
Expand Up @@ -97,6 +97,18 @@ def test_existing_spawn(self):
print(res)
assert res.startswith('/'), res

def test_zsh(self):
zsh = replwrap.zsh()
res = zsh.run_command("env")
assert 'PAGER' in res, res

try:
zsh.run_command('')
except ValueError:
pass
else:
assert False, "Didn't raise ValueError for empty input"

def test_python(self):
if platform.python_implementation() == 'PyPy':
raise unittest.SkipTest(skip_pypy)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_unicode.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import os
import platform
import tempfile
import sys
Expand Down Expand Up @@ -124,11 +125,13 @@ def open(fname, mode, **kwargs):
# ensure the 'send' log is correct,
with open(filename_send, 'r', encoding='utf-8') as f:
self.assertEqual(f.read(), msg + '\n\x04')
os.unlink(filename_send)

# ensure the 'read' log is correct,
with open(filename_read, 'r', encoding='utf-8', newline='') as f:
output = f.read().replace(_CAT_EOF, '')
self.assertEqual(output, (msg + '\r\n')*2 )
os.unlink(filename_read)


def test_spawn_expect_ascii_unicode(self):
Expand Down

0 comments on commit 42cc1c1

Please sign in to comment.