Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/scmrepo/git/backend/dulwich/asyncssh_vendor.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def _run_command(
host,
port=port if port is not None else (),
username=username if username is not None else (),
password=password if password is not None else (),
password=password,
client_keys=[key_filename] if key_filename else (),
ignore_encrypted=not key_filename,
known_hosts=None,
Expand Down
82 changes: 75 additions & 7 deletions tests/test_dulwich.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,74 @@
from pytest_mock import MockerFixture
from pytest_test_utils.waiters import wait_until

from scmrepo.exceptions import AuthError
from scmrepo.git.backend.dulwich.asyncssh_vendor import AsyncSSHVendor

from .vendor.test_paramiko_vendor import (
CLIENT_KEY,
PASSWORD,
USER,
Server,
)

# pylint: disable=redefined-outer-name


USER = "testuser"
PASSWORD = "test"
CLIENT_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAxvREKSElPOm/0z/nPO+j5rk2tjdgGcGc7We1QZ6TRXYLu7nN
GeEFIL4p8N1i6dmB+Eydt7xqCU79MWD6Yy4prFe1+/K1wCDUxIbFMxqQcX5zjJzd
i8j8PbcaUlVhP/OkjtkSxrXaGDO1BzfdV4iEBtTV/2l3zmLKJlt3jnOHLczP24CB
DTQKp3rKshbRefzot9Y+wnaK692RsYgsyo9YEP0GyWKG9topCHk13r46J6vGLeuj
ryUKqmbLJkzbJbIcEqwTDo5iHaCVqaMr5Hrb8BdMucSseqZQJsXSd+9tdRcIblUQ
38kZjmFMm4SFbruJcpZCNM2wNSZPIRX+3eiwNwIDAQABAoIBAHSacOBSJsr+jIi5
KUOTh9IPtzswVUiDKwARCjB9Sf8p4lKR4N1L/n9kNJyQhApeikgGT2GCMftmqgoo
tlculQoHFgemBlOmak0MV8NNzF5YKEy/GzF0CDH7gJfEpoyetVFrdA+2QS5yD6U9
XqKQxiBi2VEqdScmyyeT8AwzNYTnPeH/DOEcnbdRjqiy/CD79F49CQ1lX1Fuqm0K
I7BivBH1xo/rVnUP4F+IzocDqoga+Pjdj0LTXIgJlHQDSbhsQqWujWQDDuKb+MAw
sNK4Zf8ErV3j1PyA7f/M5LLq6zgstkW4qikDHo4SpZX8kFOO8tjqb7kujj7XqeaB
CxqrOTECgYEA73uWkrohcmDJ4KqbuL3tbExSCOUiaIV+sT1eGPNi7GCmXD4eW5Z4
75v2IHymW83lORSu/DrQ6sKr1nkuRpqr2iBzRmQpl/H+wahIhBXlnJ25uUjDsuPO
1Pq2LcmyD+jTxVnmbSe/q7O09gZQw3I6H4+BMHmpbf8tC97lqimzpJ0CgYEA1K0W
ZL70Xtn9quyHvbtae/BW07NZnxvUg4UaVIAL9Zu34JyplJzyzbIjrmlDbv6aRogH
/KtuG9tfbf55K/jjqNORiuRtzt1hUN1ye4dyW7tHx2/7lXdlqtyK40rQl8P0kqf8
zaS6BqjnobgSdSpg32rWoL/pcBHPdJCJEgQ8zeMCgYEA0/PK8TOhNIzrP1dgGSKn
hkkJ9etuB5nW5mEM7gJDFDf6JPupfJ/xiwe6z0fjKK9S57EhqgUYMB55XYnE5iIw
ZQ6BV9SAZ4V7VsRs4dJLdNC3tn/rDGHJBgCaym2PlbsX6rvFT+h1IC8dwv0V79Ui
Ehq9WTzkMoE8yhvNokvkPZUCgYEAgBAFxv5xGdh79ftdtXLmhnDvZ6S8l6Fjcxqo
Ay/jg66Tp43OU226iv/0mmZKM8Dd1xC8dnon4GBVc19jSYYiWBulrRPlx0Xo/o+K
CzZBN1lrXH1i6dqufpc0jq8TMf/N+q1q/c1uMupsKCY1/xVYpc+ok71b7J7c49zQ
nOeuUW8CgYA9Infooy65FTgbzca0c9kbCUBmcAPQ2ItH3JcPKWPQTDuV62HcT00o
fZdIV47Nez1W5Clk191RMy8TXuqI54kocciUWpThc6j44hz49oUueb8U4bLcEHzA
WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w==
-----END RSA PRIVATE KEY-----"""


class Server(paramiko.ServerInterface):
"""http://docs.paramiko.org/en/2.4/api/server.html."""

def __init__(self, commands, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.commands = commands

def check_channel_exec_request(self, channel, command):
self.commands.append(command)
return True

def check_auth_password(self, username, password):
if username == USER and password == PASSWORD:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED

def check_auth_publickey(self, username, key):
pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
if username == USER and key == pubkey:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED

def check_channel_request(self, kind, chanid):
if kind == "session":
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED

def get_allowed_auths(self, username):
return "password,publickey"


@pytest.fixture
def ssh_conn(request: pytest.FixtureRequest) -> dict[str, Any]:
server = Server([])
Expand Down Expand Up @@ -77,6 +133,18 @@ def test_run_command_password(server: Server, ssh_port: int):
assert b"test_run_command_password" in server.commands


def test_run_command_no_password(server: Server, ssh_port: int):
vendor = AsyncSSHVendor()
with pytest.raises(AuthError):
vendor.run_command(
"127.0.0.1",
"test_run_command_password",
username=USER,
port=ssh_port,
password=None,
)


def test_run_command_with_privkey(server: Server, ssh_port: int):
key = asyncssh.import_private_key(CLIENT_KEY)

Expand Down