Skip to content

Commit

Permalink
Release 0.17.0
Browse files Browse the repository at this point in the history
Add some new APIs, update others

Add Service, NetAddress, ServicePart, validate_port, validate_protocol

SessionBase: new API proxy() and remote_address().  Remove peer_address()
   and peer_address_str()

SOCKSProxy: auto_detect_address(), auto_detect_host() renamed auto_detect_at_address()
   and auto_detect_at_host().  auto_detect_at_address() takes a NetAddress.
  • Loading branch information
Neil Booth committed Apr 22, 2019
1 parent a302515 commit d869137
Show file tree
Hide file tree
Showing 9 changed files with 647 additions and 304 deletions.
2 changes: 1 addition & 1 deletion aiorpcx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .util import *


_version_str = '0.16.2'
_version_str = '0.17.0'
_version = tuple(int(part) for part in _version_str.split('.'))


Expand Down
45 changes: 15 additions & 30 deletions aiorpcx/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
Request, Batch, Notification, ProtocolError, RPCError,
JSONRPC, JSONRPCv2, JSONRPCConnection
)
from aiorpcx.util import NetAddress


class ReplyAndDisconnect(Exception):
Expand Down Expand Up @@ -159,8 +160,8 @@ def __init__(self, *, framer=None, loop=None):
self.transport = None
self.closed_event = self.event()
# Set when a connection is made
self._address = None
self._proxy_address = None
self._proxy = None
self._remote_address = None
# For logger.debug messsages
self.verbosity = 0
# Cleared when the send socket is full
Expand Down Expand Up @@ -245,15 +246,12 @@ def connection_made(self, transport):
Derived classes overriding this method must call this first.'''
self.transport = transport
# This would throw if called on a closed SSL transport. Fixed
# in asyncio in Python 3.6.1 and 3.5.4
peer_address = transport.get_extra_info('peername')
# If the Socks proxy was used then _address is already set to
# the remote address
if self._address:
self._proxy_address = peer_address
else:
self._address = peer_address
# If the Socks proxy was used then _proxy and _remote_address are already set
if self._proxy is None:
# This would throw if called on a closed SSL transport. Fixed in asyncio in
# Python 3.6.1 and 3.5.4
peername = transport.get_extra_info('peername')
self._remote_address = NetAddress(peername[0], peername[1])
self._task = spawn_sync(self._process_messages(), loop=self.loop)

def connection_lost(self, exc):
Expand All @@ -262,7 +260,6 @@ def connection_lost(self, exc):
Tear down things done in connection_made.'''
# Work around uvloop bug; see https://github.com/MagicStack/uvloop/issues/246
if self.transport:
self._address = None
self.transport = None
self.closed_event.set()
# Release waiting tasks
Expand Down Expand Up @@ -322,25 +319,13 @@ def default_framer(self):
'''Return a default framer.'''
raise NotImplementedError

def peer_address(self):
'''Returns the peer's address (Python networking address), or None if
no connection or an error.
def proxy(self):
'''Returns the proxy used, or None.'''
return self._proxy

This is the result of socket.getpeername() when the connection
was made.
'''
return self._address

def peer_address_str(self):
'''Returns the peer's IP address and port as a human-readable
string.'''
if not self._address:
return 'unknown'
ip_addr_str, port = self._address[:2]
if ':' in ip_addr_str:
return f'[{ip_addr_str}]:{port}'
else:
return f'{ip_addr_str}:{port}'
def remote_address(self):
'''Returns a NetAddress or None if not connected.'''
return self._remote_address

def is_closing(self):
'''Return True if the connection is closing.'''
Expand Down
134 changes: 62 additions & 72 deletions aiorpcx/socks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import struct
from functools import partial

from .util import classify_host
from .util import NetAddress


__all__ = ('SOCKSUserAuth', 'SOCKS4', 'SOCKS4a', 'SOCKS5', 'SOCKSProxy',
Expand All @@ -59,7 +59,7 @@ class NeedData(Exception):
pass


class SOCKSBase(object):
class SOCKSBase:
'''Stateful as written so good for a single connection only.'''

@classmethod
Expand Down Expand Up @@ -97,38 +97,36 @@ class SOCKS4(SOCKSBase):
'report different user-ids')
}

def __init__(self, dst_host, dst_port, auth):
def __init__(self, remote_address, auth):
super().__init__()
self._dst_host = self._check_host(dst_host)
self._dst_port = dst_port
self._remote_host = remote_address.host
self._remote_port = remote_address.port
self._auth = auth
self._check_remote_host()

@classmethod
def _check_host(cls, host):
result = classify_host(host)
if not isinstance(result, IPv4Address):
raise SOCKSProtocolError(f'SOCKS4 requires an IPv4 address: {host}')
return result
def _check_remote_host(self):
if not isinstance(self._remote_host, IPv4Address):
raise SOCKSProtocolError(f'SOCKS4 requires an IPv4 address: {self._remote_host}')

def _start(self):
self._state = self._first_response

if isinstance(self._dst_host, IPv4Address):
if isinstance(self._remote_host, IPv4Address):
# SOCKS4
dst_ip_packed = self._dst_host.packed
dst_ip_packed = self._remote_host.packed
host_bytes = b''
else:
# SOCKS4a
dst_ip_packed = b'\0\0\0\1'
host_bytes = self._dst_host.encode() + b'\0'
host_bytes = self._remote_host.encode() + b'\0'

if isinstance(self._auth, SOCKSUserAuth):
user_id = self._auth.username.encode()
else:
user_id = b''

# Send TCP/IP stream CONNECT request
return b''.join([b'\4\1', struct.pack('>H', self._dst_port),
return b''.join([b'\4\1', struct.pack('>H', self._remote_port),
dst_ip_packed, user_id, b'\0', host_bytes])

def _first_response(self):
Expand All @@ -149,12 +147,10 @@ def _first_response(self):

class SOCKS4a(SOCKS4):

@classmethod
def _check_host(cls, host):
result = classify_host(host)
if not isinstance(result, (str, IPv4Address)):
raise SOCKSProtocolError(f'SOCKS4a requires an IPv4 address or host name: {host}')
return result
def _check_remote_host(self):
if not isinstance(self._remote_host, (str, IPv4Address)):
raise SOCKSProtocolError(
f'SOCKS4a requires an IPv4 address or host name: {self._remote_host}')


class SOCKS5(SOCKSBase):
Expand All @@ -172,28 +168,26 @@ class SOCKS5(SOCKSBase):
8: 'address type not supported',
}

def __init__(self, dst_host, dst_port, auth):
def __init__(self, remote_address, auth):
super().__init__()
self._dst_bytes = self._destination_bytes(dst_host, dst_port)
self._auth_bytes, self._auth_methods = self._authentication(auth)
self._dst_bytes = SOCKS5._destination_bytes(remote_address.host, remote_address.port)
self._auth_bytes, self._auth_methods = SOCKS5._authentication(auth)

def _destination_bytes(self, host, port):
@staticmethod
def _destination_bytes(host, port):
if isinstance(host, IPv4Address):
addr_bytes = b'\1' + host.packed
elif isinstance(host, IPv6Address):
addr_bytes = b'\4' + host.packed
elif isinstance(host, str):
else:
assert isinstance(host, str)
host = host.encode()
if len(host) > 255:
raise SOCKSProtocolError(f'hostname too long: '
f'{len(host)} bytes')
assert len(host) <= 255
addr_bytes = b'\3' + bytes([len(host)]) + host
else:
raise SOCKSProtocolError(f'SOCKS5 requires an IPv4 address, IPv6 '
f'address, or host name: {host}')
return addr_bytes + struct.pack('>H', port)

def _authentication(self, auth):
@staticmethod
def _authentication(auth):
if isinstance(auth, SOCKSUserAuth):
user_bytes = auth.username.encode()
if not 0 < len(user_bytes) < 256:
Expand Down Expand Up @@ -265,15 +259,15 @@ def _connect_response_rest(self, addr_len):
return None


class SOCKSProxy(object):
class SOCKSProxy:

def __init__(self, address, protocol, auth):
'''A SOCKS proxy at an address following a SOCKS protocol. auth is an
authentication method to use when connecting, or None.
'''A SOCKS proxy at a NetAddress following a SOCKS protocol.
address is a (host, port) pair; for IPv6 it can instead be a
(host, port, flowinfo, scopeid) 4-tuple.
auth is an authentication method to use when connecting, or None.
'''
if not isinstance(address, NetAddress):
address = NetAddress.from_string(address)
self.address = address
self.protocol = protocol
self.auth = auth
Expand Down Expand Up @@ -303,18 +297,17 @@ async def _handshake(self, client, sock, loop):
raise SOCKSProtocolError("EOF received")
client.receive_data(data)

async def _connect_one(self, host, port):
'''Connect to the proxy and perform a handshake requesting a
connection to (host, port).
async def _connect_one(self, remote_address):
'''Connect to the proxy and perform a handshake requesting a connection.
Return the open socket on success, or the exception on failure.
'''
loop = asyncio.get_event_loop()

proxy_host, proxy_port = self.address[:2]
for info in await loop.getaddrinfo(proxy_host, proxy_port, type=socket.SOCK_STREAM):
for info in await loop.getaddrinfo(str(self.address.host), self.address.port,
type=socket.SOCK_STREAM):
# This object has state so is only good for one connection
client = self.protocol(host, port, self.auth)
client = self.protocol(remote_address, self.auth)
sock = socket.socket(family=info[0])
try:
# A non-blocking socket is required by loop socket methods
Expand All @@ -329,20 +322,19 @@ async def _connect_one(self, host, port):
# see https://github.com/kyuupichan/aiorpcX/issues/8
return exception

async def _connect(self, addresses):
'''Connect to the proxy and perform a handshake requesting a
connection to each address in addresses.
async def _connect(self, remote_addresses):
'''Connect to the proxy and perform a handshake requesting a connection to each address in
addresses.
Return an (open_socket, address) pair on success.
Return an (open_socket, remote_address) pair on success.
'''
assert addresses
assert remote_addresses

exceptions = []
for address in addresses:
host, port = address[:2]
sock = await self._connect_one(host, port)
for remote_address in remote_addresses:
sock = await self._connect_one(remote_address)
if isinstance(sock, socket.socket):
return sock, address
return sock, remote_address
exceptions.append(sock)

strings = set(f'{exc!r}' for exc in exceptions)
Expand All @@ -354,21 +346,21 @@ async def _detect_proxy(self):
otherwise False.
'''
if self.protocol is SOCKS4a:
host, port = 'www.apple.com', 80
remote_address = NetAddress('www.apple.com', 80)
else:
host, port = IPv4Address('8.8.8.8'), 53
remote_address = NetAddress('8.8.8.8', 53)

sock = await self._connect_one(host, port)
sock = await self._connect_one(remote_address)
if isinstance(sock, socket.socket):
sock.close()
return True

# SOCKSFailure indicates something failed, but that we are
# likely talking to a proxy
# SOCKSFailure indicates something failed, but that we are likely talking to a
# proxy
return isinstance(sock, SOCKSFailure)

@classmethod
async def auto_detect_address(cls, address, auth):
async def auto_detect_at_address(cls, address, auth):
'''Try to detect a SOCKS proxy at address using the authentication method (or None).
SOCKS5, SOCKS4a and SOCKS are tried in order. If a SOCKS proxy is detected a
SOCKSProxy object is returned.
Expand All @@ -385,7 +377,7 @@ async def auto_detect_address(cls, address, auth):
return None

@classmethod
async def auto_detect_host(cls, host, ports, auth):
async def auto_detect_at_host(cls, host, ports, auth):
'''Try to detect a SOCKS proxy on a host on one of the ports.
Calls auto_detect_address for the ports in order. Returning a SOCKSProxy does not
Expand All @@ -394,8 +386,7 @@ async def auto_detect_host(cls, host, ports, auth):
If no proxy is detected return None.
'''
for port in ports:
address = (host, port)
proxy = await cls.auto_detect_address(address, auth)
proxy = await cls.auto_detect_at_address(NetAddress(host, port), auth)
if proxy:
return proxy

Expand All @@ -418,20 +409,19 @@ async def create_connection(self, protocol_factory, host, port, *,
'''
loop = asyncio.get_event_loop()
if resolve:
infos = await loop.getaddrinfo(host, port, family=family,
type=socket.SOCK_STREAM,
proto=proto, flags=flags)
addresses = [info[4] for info in infos]
remote_addresses = [NetAddress(info[4][0], info[4][1]) for info in
await loop.getaddrinfo(host, port, family=family, proto=proto,
type=socket.SOCK_STREAM, flags=flags)]
else:
addresses = [(host, port)]
remote_addresses = [NetAddress(host, port)]

sock, address = await self._connect(addresses)
sock, remote_address = await self._connect(remote_addresses)

def set_address():
protocol = protocol_factory()
protocol._address = address
protocol._proxy = self
protocol._remote_address = remote_address
return protocol

return await loop.create_connection(
set_address, sock=sock, ssl=ssl,
server_hostname=host if ssl else None)
return await loop.create_connection(set_address, sock=sock, ssl=ssl,
server_hostname=host if ssl else None)

0 comments on commit d869137

Please sign in to comment.