Skip to content

Commit b6a9386

Browse files
authored
v1.6
relay tunnel
1 parent 19afb68 commit b6a9386

File tree

4 files changed

+224
-142
lines changed

4 files changed

+224
-142
lines changed

pproxy/__init__.py

Lines changed: 152 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import argparse, time, re, asyncio, functools, types, urllib.parse
1+
import argparse, time, re, asyncio, functools, urllib.parse
22
from pproxy import proto
33

44
__title__ = 'pproxy'
5-
__version__ = "1.5.2"
5+
__version__ = "1.6"
66
__description__ = "Proxy server that can tunnel among remote servers by regex rules."
77
__author__ = "Qian Wenjie"
88
__license__ = "MIT License"
@@ -25,59 +25,48 @@ def authed(self):
2525
def set_authed(self):
2626
self._auth[self.remote_ip] = time.time()
2727

28+
async def prepare_ciphers(cipher, reader, writer, bind=None, server_side=True):
29+
if cipher:
30+
cipher.pdecrypt = cipher.pdecrypt2 = cipher.pencrypt = cipher.pencrypt2 = DUMMY
31+
for plugin in cipher.plugins:
32+
if server_side:
33+
await plugin.init_server_data(reader, writer, cipher, bind)
34+
else:
35+
await plugin.init_client_data(reader, writer, cipher)
36+
plugin.add_cipher(cipher)
37+
return cipher(reader, writer, cipher.pdecrypt, cipher.pdecrypt2, cipher.pencrypt, cipher.pencrypt2)
38+
else:
39+
return None, None
40+
2841
async def proxy_handler(reader, writer, unix, lbind, protos, rserver, block, cipher, verbose=DUMMY, modstat=lambda r,h:lambda i:DUMMY, **kwargs):
2942
try:
30-
if not unix:
31-
remote_ip = writer.get_extra_info('peername')[0]
32-
server_ip = writer.get_extra_info('sockname')[0]
33-
else:
34-
remote_ip = server_ip = 'local'
35-
local_addr = None if server_ip in ('127.0.0.1', '::1', 'local') else (server_ip, 0)
36-
if cipher:
37-
reader.plugin_decrypt = reader.plugin_decrypt2 = writer.plugin_encrypt = writer.plugin_encrypt2 = DUMMY
38-
for plugin in cipher.plugins:
39-
await plugin.init_client_data(reader, writer, cipher)
40-
plugin.apply_cipher(reader, writer)
41-
reader_cipher = cipher(reader, writer)[0]
43+
if unix:
44+
remote_ip, server_ip, remote_text = 'local', None, 'unix_local'
4245
else:
43-
reader_cipher = None
46+
remote_ip, remote_port = writer.get_extra_info('peername')[0:2]
47+
server_ip = writer.get_extra_info('sockname')[0]
48+
remote_text = f'{remote_ip}:{remote_port}'
49+
local_addr = None if server_ip in ('127.0.0.1', '::1', None) else (server_ip, 0)
50+
reader_cipher, _ = await prepare_ciphers(cipher, reader, writer, server_side=False)
4451
lproto, host_name, port, initbuf = await proto.parse(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs)
45-
if host_name is None:
46-
writer.close()
47-
return
4852
if block and block(host_name):
4953
raise Exception('BLOCK ' + host_name)
5054
roption = next(filter(lambda o: o.alive and (not o.match or o.match(host_name)), rserver), None)
51-
viaproxy = bool(roption)
52-
if viaproxy:
53-
verbose(f'{lproto.name} {host_name}:{port} -> {roption.rproto.name} {roption.bind}')
54-
wait_connect = roption.connect() if roption.unix else roption.connect(local_addr=local_addr if roption.lbind == 'in' else (roption.lbind, 0) if roption.lbind else None)
55-
else:
56-
verbose(f'{lproto.name} {host_name}:{port}')
57-
wait_connect = asyncio.open_connection(host=host_name, port=port, local_addr=local_addr if lbind == 'in' else (lbind, 0) if lbind else None)
55+
verbose(f'{lproto.name} {remote_text}' + roption.logtext(host_name, port))
5856
try:
59-
reader_remote, writer_remote = await asyncio.wait_for(wait_connect, timeout=SOCKET_TIMEOUT)
57+
reader_remote, writer_remote = await asyncio.wait_for(roption.open_connection(host_name, port, local_addr, lbind), timeout=SOCKET_TIMEOUT)
6058
except asyncio.TimeoutError:
61-
raise Exception(f'Connection timeout {rserver}')
59+
raise Exception(f'Connection timeout {roption.bind}')
6260
try:
63-
if viaproxy:
64-
if roption.cipher:
65-
reader_remote.plugin_decrypt = reader_remote.plugin_decrypt2 = writer_remote.plugin_encrypt = writer_remote.plugin_encrypt2 = DUMMY
66-
for plugin in roption.cipher.plugins:
67-
await plugin.init_server_data(reader_remote, writer_remote, roption.cipher, roption.bind)
68-
plugin.apply_cipher(reader_remote, writer_remote)
69-
writer_cipher_r = roption.cipher(reader_remote, writer_remote)[1]
70-
else:
71-
writer_cipher_r = None
72-
await roption.rproto.connect(reader_remote=reader_remote, writer_remote=writer_remote, rauth=roption.auth, host_name=host_name, port=port, initbuf=initbuf, writer_cipher_r=writer_cipher_r, sock=writer_remote.get_extra_info('socket'))
73-
else:
74-
writer_remote.write(initbuf)
61+
await roption.prepare_connection(reader_remote, writer_remote, host_name, port)
62+
writer_remote.write(initbuf)
7563
except Exception:
7664
writer_remote.close()
7765
raise Exception('Unknown remote protocol')
7866
m = modstat(remote_ip, host_name)
79-
asyncio.ensure_future(lproto.rchannel(reader_remote, writer, m(2+viaproxy), m(4+viaproxy)))
80-
asyncio.ensure_future(lproto.channel(reader, writer_remote, m(viaproxy), DUMMY))
67+
lchannel = lproto.http_channel if initbuf else lproto.channel
68+
asyncio.ensure_future(lproto.channel(reader_remote, writer, m(2+roption.direct), m(4+roption.direct)))
69+
asyncio.ensure_future(lchannel(reader, writer_remote, m(roption.direct), DUMMY))
8170
except Exception as ex:
8271
if not isinstance(ex, asyncio.TimeoutError) and not str(ex).startswith('Connection closed'):
8372
verbose(f'{str(ex) or "Unsupported protocol"} from {remote_ip}')
@@ -88,15 +77,17 @@ async def check_server_alive(interval, rserver, verbose):
8877
while True:
8978
await asyncio.sleep(interval)
9079
for remote in rserver:
80+
if remote.direct:
81+
continue
9182
try:
92-
reader, writer = await asyncio.wait_for(remote.connect(), timeout=SOCKET_TIMEOUT)
83+
_, writer = await asyncio.wait_for(remote.open_connection(None, None, None, None), timeout=SOCKET_TIMEOUT)
9384
except Exception as ex:
9485
if remote.alive:
95-
verbose(f'{remote.bind} -> OFFLINE')
86+
verbose(f'{remote.rproto.name} {remote.bind} -> OFFLINE')
9687
remote.alive = False
9788
continue
9889
if not remote.alive:
99-
verbose(f'{remote.bind} -> ONLINE')
90+
verbose(f'{remote.rproto.name} {remote.bind} -> ONLINE')
10091
remote.alive = True
10192
try:
10293
writer.close()
@@ -107,64 +98,135 @@ def pattern_compile(filename):
10798
with open(filename) as f:
10899
return re.compile('(:?'+''.join('|'.join(i.strip() for i in f if i.strip() and not i.startswith('#')))+')$').match
109100

110-
def uri_compile(uri):
111-
url = urllib.parse.urlparse(uri)
112-
rawprotos = url.scheme.split('+')
113-
err_str, protos = proto.get_protos(rawprotos)
114-
if err_str:
115-
raise argparse.ArgumentTypeError(err_str)
116-
if 'ssl' in rawprotos or 'secure' in rawprotos:
117-
import ssl
118-
sslserver = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
119-
sslclient = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
120-
if 'ssl' in rawprotos:
121-
sslclient.check_hostname = False
122-
sslclient.verify_mode = ssl.CERT_NONE
123-
else:
124-
sslserver = None
125-
sslclient = None
126-
urlpath, _, plugins = url.path.partition(',')
127-
urlpath, _, lbind = urlpath.partition('@')
128-
plugins = plugins.split(',') if plugins else None
129-
cipher, _, loc = url.netloc.rpartition('@')
130-
if cipher:
131-
from pproxy.cipher import get_cipher
132-
err_str, cipher = get_cipher(cipher)
101+
class ProxyURI(object):
102+
def __init__(self, **kw):
103+
self.__dict__.update(kw)
104+
def logtext(self, host, port):
105+
if self.direct:
106+
return f' -> {host}:{port}'
107+
else:
108+
return f' -> {self.rproto.name} {self.bind}' + self.relay.logtext(host, port)
109+
def open_connection(self, host, port, local_addr, lbind):
110+
if self.direct:
111+
local_addr = local_addr if lbind == 'in' else (lbind, 0) if lbind else None
112+
return asyncio.open_connection(host=host, port=port, local_addr=local_addr)
113+
elif self.unix:
114+
return asyncio.open_unix_connection(path=self.bind, ssl=self.sslclient, server_hostname='' if self.sslclient else None)
115+
else:
116+
local_addr = local_addr if self.lbind == 'in' else (self.lbind, 0) if self.lbind else None
117+
return asyncio.open_connection(host=self.host_name, port=self.port, ssl=self.sslclient, local_addr=local_addr)
118+
async def prepare_connection(self, reader_remote, writer_remote, host, port):
119+
if not self.direct:
120+
_, writer_cipher_r = await prepare_ciphers(self.cipher, reader_remote, writer_remote, self.bind)
121+
whost, wport = (host, port) if self.relay.direct else (self.relay.host_name, self.relay.port)
122+
await self.rproto.connect(reader_remote=reader_remote, writer_remote=writer_remote, rauth=self.auth, host_name=whost, port=wport, writer_cipher_r=writer_cipher_r, sock=writer_remote.get_extra_info('socket'))
123+
await self.relay.prepare_connection(reader_remote, writer_remote, host, port)
124+
def start_server(self, handler):
125+
if self.unix:
126+
return asyncio.start_unix_server(handler, path=self.bind, ssl=self.sslserver)
127+
else:
128+
return asyncio.start_server(handler, host=self.host_name, port=self.port, ssl=self.sslserver)
129+
@classmethod
130+
def compile_relay(cls, uri):
131+
tail = cls.DIRECT
132+
for urip in reversed(uri.split('__')):
133+
tail = cls.compile(urip, tail)
134+
return tail
135+
@classmethod
136+
def compile(cls, uri, relay=None):
137+
url = urllib.parse.urlparse(uri)
138+
rawprotos = url.scheme.split('+')
139+
err_str, protos = proto.get_protos(rawprotos)
133140
if err_str:
134141
raise argparse.ArgumentTypeError(err_str)
135-
if plugins:
136-
from pproxy.plugin import get_plugin
137-
for name in plugins:
138-
if not name: continue
139-
err_str, plugin = get_plugin(name)
140-
if err_str:
141-
raise argparse.ArgumentTypeError(err_str)
142-
cipher.plugins.append(plugin)
143-
match = pattern_compile(url.query) if url.query else None
144-
if loc:
145-
host, _, port = loc.partition(':')
146-
port = int(port) if port else 8080
147-
connect = functools.partial(asyncio.open_connection, host=host, port=port, ssl=sslclient)
148-
server = functools.partial(asyncio.start_server, host=host, port=port, ssl=sslserver)
149-
else:
150-
connect = functools.partial(asyncio.open_unix_connection, path=urlpath, ssl=sslclient, server_hostname='' if sslclient else None)
151-
server = functools.partial(asyncio.start_unix_server, path=urlpath, ssl=sslserver)
152-
return types.SimpleNamespace(protos=protos, rproto=protos[0], cipher=cipher, auth=url.fragment.encode(), match=match, server=server, connect=connect, bind=loc or urlpath, unix=not loc, lbind=lbind, sslclient=sslclient, sslserver=sslserver, alive=True)
142+
if 'ssl' in rawprotos or 'secure' in rawprotos:
143+
import ssl
144+
sslserver = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
145+
sslclient = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
146+
if 'ssl' in rawprotos:
147+
sslclient.check_hostname = False
148+
sslclient.verify_mode = ssl.CERT_NONE
149+
else:
150+
sslserver = None
151+
sslclient = None
152+
urlpath, _, plugins = url.path.partition(',')
153+
urlpath, _, lbind = urlpath.partition('@')
154+
plugins = plugins.split(',') if plugins else None
155+
cipher, _, loc = url.netloc.rpartition('@')
156+
if cipher:
157+
from pproxy.cipher import get_cipher
158+
err_str, cipher = get_cipher(cipher)
159+
if err_str:
160+
raise argparse.ArgumentTypeError(err_str)
161+
if plugins:
162+
from pproxy.plugin import get_plugin
163+
for name in plugins:
164+
if not name: continue
165+
err_str, plugin = get_plugin(name)
166+
if err_str:
167+
raise argparse.ArgumentTypeError(err_str)
168+
cipher.plugins.append(plugin)
169+
match = pattern_compile(url.query) if url.query else None
170+
if loc:
171+
host_name, _, port = loc.partition(':')
172+
port = int(port) if port else 8080
173+
else:
174+
host_name = port = None
175+
return ProxyURI(protos=protos, rproto=protos[0], cipher=cipher, auth=url.fragment.encode(), match=match, bind=loc or urlpath, host_name=host_name, port=port, unix=not loc, lbind=lbind, sslclient=sslclient, sslserver=sslserver, alive=True, direct='direct' in rawprotos, relay=relay)
176+
ProxyURI.DIRECT = ProxyURI(direct=True, relay=None, alive=True, match=None)
177+
178+
async def test_url(url, rserver):
179+
url = urllib.parse.urlparse(url)
180+
assert url.scheme in ('http', ), f'Unknown scheme {url.scheme}'
181+
host_name, _, port = url.netloc.partition(':')
182+
port = int(port) if port else 80 if url.scheme == 'http' else 443
183+
initbuf = f'GET {url.path or "/"} HTTP/1.1\r\nHost: {host_name}\r\nUser-Agent: pproxy-{__version__}\r\nConnection: close\r\n\r\n'.encode()
184+
for roption in rserver:
185+
if roption.direct:
186+
continue
187+
print(f'============ {roption.bind} ============')
188+
try:
189+
reader, writer = await asyncio.wait_for(roption.open_connection(host_name, port, None, None), timeout=SOCKET_TIMEOUT)
190+
except asyncio.TimeoutError:
191+
raise Exception(f'Connection timeout {rserver}')
192+
try:
193+
await roption.prepare_connection(reader, writer, host_name, port)
194+
except Exception:
195+
writer.close()
196+
raise Exception('Unknown remote protocol')
197+
writer.write(initbuf)
198+
headers = await reader.read_until(b'\r\n\r\n')
199+
print(headers.decode()[:-4])
200+
print(f'--------------------------------')
201+
body = bytearray()
202+
while 1:
203+
s = await reader.read_()
204+
if not s:
205+
break
206+
body.extend(s)
207+
print(body.decode())
208+
print(f'============ success ============')
153209

154210
def main():
155211
parser = argparse.ArgumentParser(description=__description__+'\nSupported protocols: http,socks,shadowsocks,shadowsocksr,redirect', epilog='Online help: <https://github.com/qwj/python-proxy>')
156-
parser.add_argument('-i', dest='listen', default=[], action='append', type=uri_compile, help='proxy server setting uri (default: http+socks://:8080/)')
157-
parser.add_argument('-r', dest='rserver', default=[], action='append', type=uri_compile, help='remote server setting uri (default: direct)')
212+
parser.add_argument('-i', dest='listen', default=[], action='append', type=ProxyURI.compile, help='proxy server setting uri (default: http+socks://:8080/)')
213+
parser.add_argument('-r', dest='rserver', default=[], action='append', type=ProxyURI.compile_relay, help='remote server setting uri (default: direct)')
158214
parser.add_argument('-b', dest='block', type=pattern_compile, help='block regex rules')
159215
parser.add_argument('-a', dest='alived', default=0, type=int, help='interval to check remote alive (default: no check)')
160216
parser.add_argument('-v', dest='v', action='store_true', help='print verbose output')
161217
parser.add_argument('--ssl', dest='sslfile', help='certfile[,keyfile] if server listen in ssl mode')
162218
parser.add_argument('--pac', dest='pac', help='http PAC path')
163-
parser.add_argument('--get', dest='gets', default=[], action='append', help='http custom path/file')
219+
parser.add_argument('--get', dest='gets', default=[], action='append', help='http custom {path,file}')
220+
parser.add_argument('--test', dest='testurl', help='test this url for all remote proxies and exit')
164221
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}')
165222
args = parser.parse_args()
223+
if args.testurl:
224+
asyncio.run(test_url(args.testurl, args.rserver))
225+
return
166226
if not args.listen:
167-
args.listen.append(uri_compile('http+socks://:8080/'))
227+
args.listen.append(ProxyURI.compile_relay('http+socks://:8080/'))
228+
if not args.rserver or args.rserver[-1].match:
229+
args.rserver.append(ProxyURI.DIRECT)
168230
args.httpget = {}
169231
if args.pac:
170232
pactext = 'function FindProxyForURL(u,h){' + (f'var b=/^(:?{args.block.__self__.pattern})$/i;if(b.test(h))return "";' if args.block else '')
@@ -195,7 +257,7 @@ def main():
195257
print('Serving on', option.bind, 'by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '')
196258
handler = functools.partial(proxy_handler, **vars(args), **vars(option))
197259
try:
198-
server = loop.run_until_complete(option.server(handler))
260+
server = loop.run_until_complete(option.start_server(handler))
199261
servers.append(server)
200262
except Exception as ex:
201263
print('Start server failed.\n\t==>', ex)

0 commit comments

Comments
 (0)