-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathagent.pyx
198 lines (165 loc) · 6.62 KB
/
agent.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# This file is part of ssh2-python.
# Copyright (C) 2017-2020 Panos Kittenis
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation, version 2.1.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from .pkey cimport PublicKey, PyPublicKey
from .utils cimport to_bytes
from .exceptions import AgentConnectionError, AgentListIdentitiesError, \
AgentGetIdentityError, AgentAuthenticationError, AgentError
from . cimport c_ssh2
cdef int agent_auth(char * _username,
c_ssh2.LIBSSH2_AGENT * agent) except 1 nogil:
cdef c_ssh2.libssh2_agent_publickey *identity = NULL
cdef c_ssh2.libssh2_agent_publickey *prev = NULL
if c_ssh2.libssh2_agent_list_identities(agent) != 0:
clear_agent(agent)
with gil:
raise AgentListIdentitiesError(
"Failure requesting identities from agent")
while 1:
if auth_identity(_username, agent, &identity, prev) == 1:
with gil:
raise AgentAuthenticationError(
"No identities match for user %s",
_username)
if c_ssh2.libssh2_agent_userauth(
agent, _username, identity) == 0:
clear_agent(agent)
return 0
prev = identity
return 1
cdef int auth_identity(const char *username,
c_ssh2.LIBSSH2_AGENT *agent,
c_ssh2.libssh2_agent_publickey **identity,
c_ssh2.libssh2_agent_publickey *prev) except -1 nogil:
cdef int rc
rc = c_ssh2.libssh2_agent_get_identity(
agent, identity, prev)
if rc == 1:
clear_agent(agent)
elif rc < 0:
clear_agent(agent)
with gil:
raise AgentGetIdentityError(
"Failure getting identity for user %s from agent",
username)
return rc
cdef c_ssh2.LIBSSH2_AGENT * agent_init(
c_ssh2.LIBSSH2_SESSION *_session) except NULL nogil:
cdef c_ssh2.LIBSSH2_AGENT *agent = c_ssh2.libssh2_agent_init(
_session)
if agent is NULL:
with gil:
raise AgentError("Error initialising agent")
return agent
cdef c_ssh2.LIBSSH2_AGENT * init_connect_agent(
c_ssh2.LIBSSH2_SESSION *_session) except NULL nogil:
cdef c_ssh2.LIBSSH2_AGENT *agent
agent = c_ssh2.libssh2_agent_init(_session)
if c_ssh2.libssh2_agent_connect(agent) != 0:
c_ssh2.libssh2_agent_free(agent)
with gil:
raise AgentConnectionError("Unable to connect to agent")
return agent
cdef void clear_agent(c_ssh2.LIBSSH2_AGENT *agent) noexcept nogil:
c_ssh2.libssh2_agent_disconnect(agent)
c_ssh2.libssh2_agent_free(agent)
cdef object PyAgent(c_ssh2.LIBSSH2_AGENT *agent, Session session):
cdef Agent _agent = Agent.__new__(Agent, session)
_agent._agent = agent
return _agent
cdef class Agent:
def __cinit__(self, Session session):
cdef c_ssh2.LIBSSH2_AGENT *_agent = agent_init(session._session)
self._agent = _agent
self._session = session
def __dealloc__(self):
if self._session is not None and self._session._session is not NULL and self._agent is not NULL:
clear_agent(self._agent)
self._agent = NULL
def list_identities(self):
"""This method is a no-op - use
:py:func:`ssh2.agent.Agent.get_identities()` to list and retrieve
identities."""
pass
def get_identities(self):
"""List and get identities from agent
:rtype: list(:py:class:`ssh2.pkey.PublicKey`)"""
cdef int rc
cdef list identities = []
cdef c_ssh2.libssh2_agent_publickey *identity = NULL
cdef c_ssh2.libssh2_agent_publickey *prev = NULL
if c_ssh2.libssh2_agent_list_identities(self._agent) != 0:
raise AgentListIdentitiesError(
"Failure requesting identities from agent."
"Agent must be connected first")
while c_ssh2.libssh2_agent_get_identity(
self._agent, &identity, prev) == 0:
identities.append(PyPublicKey(identity))
prev = identity
return identities
def userauth(self, username not None,
PublicKey pkey):
"""Perform user authentication with specific public key
:param username: User name to authenticate as
:type username: str
:param pkey: Public key to authenticate with
:type pkey: :py:class:`ssh2.pkey.PublicKey`
:raises: :py:class:`ssh2.exceptions.AgentAuthenticationError` on errors
authenticating.
:rtype: int"""
cdef int rc
cdef bytes b_username = to_bytes(username)
cdef char *_username = b_username
with nogil:
rc = c_ssh2.libssh2_agent_userauth(
self._agent, _username, pkey._pkey)
if rc != 0 and rc != c_ssh2.LIBSSH2_ERROR_EAGAIN:
with gil:
raise AgentAuthenticationError(
"Error authenticating user %s with provided public key",
username)
return rc
def disconnect(self):
"""Disconnect from agent.
:rtype: int"""
cdef int rc
with nogil:
rc = c_ssh2.libssh2_agent_disconnect(self._agent)
return rc
def connect(self):
"""Connect to agent.
:raises: :py:class:`ssh2.exceptions.AgentConnectionError` on errors
connecting to agent.
:rtype: int"""
cdef int rc
rc = c_ssh2.libssh2_agent_connect(self._agent)
if rc != 0:
raise AgentConnectionError("Unable to connect to agent")
return rc
def get_identity_path(self):
cdef bytes _path
cdef const char* c_path = NULL
with nogil:
c_path = c_ssh2.libssh2_agent_get_identity_path(self._agent)
if c_path is NULL:
return
_path = c_path
return _path
def set_identity_path(self, path not None):
cdef bytes b_path = to_bytes(path)
cdef const char *c_path = b_path
with nogil:
c_ssh2.libssh2_agent_set_identity_path(
self._agent, c_path)