Skip to content

Commit 134d1d4

Browse files
authored
pproxy 1.3
1 parent eb3edea commit 134d1d4

File tree

3 files changed

+102
-114
lines changed

3 files changed

+102
-114
lines changed

pproxy/__init__.py

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from pproxy import proto
33

44
__title__ = 'pproxy'
5-
__version__ = "1.2.1"
5+
__version__ = "1.3"
66
__description__ = "Proxy server that can tunnel among remote servers by regex rules."
77
__author__ = "Qian Wenjie"
88
__license__ = "MIT License"
@@ -15,29 +15,6 @@
1515
asyncio.StreamReader.read_n = lambda self, n: asyncio.wait_for(self.readexactly(n), timeout=SOCKET_TIMEOUT)
1616
asyncio.StreamReader.read_until = lambda self, s: asyncio.wait_for(self.readuntil(s), timeout=SOCKET_TIMEOUT)
1717

18-
if not hasattr(asyncio.StreamReader, 'readuntil'): # Python 3.4 and below
19-
@asyncio.coroutine
20-
def readuntil(self, separator):
21-
seplen = len(separator)
22-
offset = 0
23-
while True:
24-
buflen = len(self._buffer)
25-
if buflen - offset >= seplen:
26-
isep = self._buffer.find(separator, offset)
27-
if isep != -1:
28-
break
29-
offset = buflen + 1 - seplen
30-
if self._eof:
31-
chunk = bytes(self._buffer)
32-
self._buffer.clear()
33-
raise asyncio.IncompleteReadError(chunk, None)
34-
yield from self._wait_for_data('readuntil')
35-
chunk = self._buffer[:isep + seplen]
36-
del self._buffer[:isep + seplen]
37-
self._maybe_resume_transport()
38-
return bytes(chunk)
39-
asyncio.StreamReader.readuntil = readuntil
40-
4118
AUTH_TIME = 86400 * 30
4219
class AuthTable(object):
4320
_auth = {}
@@ -48,46 +25,65 @@ def authed(self):
4825
def set_authed(self):
4926
self._auth[self.remote_ip] = time.time()
5027

51-
def proxy_handler(reader, writer, protos, rserver, block, cipher, verbose=DUMMY, modstat=lambda r,h:lambda i:DUMMY, **kwargs):
28+
async def proxy_handler(reader, writer, protos, rserver, block, cipher, verbose=DUMMY, modstat=lambda r,h:lambda i:DUMMY, **kwargs):
5229
try:
5330
remote_ip = (writer.get_extra_info('peername') or ['local'])[0]
5431
reader_cipher = cipher(reader, writer)[0] if cipher else None
55-
lproto, host_name, port, initbuf = yield from proto.parse(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs)
32+
lproto, host_name, port, initbuf = await proto.parse(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs)
5633
if host_name is None:
5734
writer.close()
5835
return
5936
if block and block(host_name):
6037
raise Exception('BLOCK ' + host_name)
61-
roption = next(filter(lambda o: not o.match or o.match(host_name), rserver), None)
38+
roption = next(filter(lambda o: o.alive and (not o.match or o.match(host_name)), rserver), None)
6239
viaproxy = bool(roption)
6340
if viaproxy:
64-
verbose('{l.name} {}:{} -> {r.rproto.name} {r.bind}'.format(host_name, port, l=lproto, r=roption))
41+
verbose(f'{lproto.name} {host_name}:{port} -> {roption.rproto.name} {roption.bind}')
6542
wait_connect = roption.connect()
6643
else:
67-
verbose('{l.name} {}:{}'.format(host_name, port, l=lproto))
44+
verbose(f'{lproto.name} {host_name}:{port}')
6845
wait_connect = asyncio.open_connection(host=host_name, port=port)
6946
try:
70-
reader_remote, writer_remote = yield from asyncio.wait_for(wait_connect, timeout=SOCKET_TIMEOUT)
47+
reader_remote, writer_remote = await asyncio.wait_for(wait_connect, timeout=SOCKET_TIMEOUT)
7148
except asyncio.TimeoutError:
72-
raise Exception('Connection timeout {}'.format(rserver))
49+
raise Exception(f'Connection timeout {rserver}')
7350
try:
7451
if viaproxy:
7552
writer_cipher_r = roption.cipher(reader_remote, writer_remote)[1] if roption.cipher else None
76-
yield from roption.rproto.connect(reader_remote=reader_remote, writer_remote=writer_remote, rauth=roption.auth, host_name=host_name, port=port, initbuf=initbuf, writer_cipher_r=writer_cipher_r, sock=writer_remote.get_extra_info('socket'))
53+
await roption.rproto.connect(reader_remote=reader_remote, writer_remote=writer_remote, rauth=roption.auth, host_name=host_name, port=port, initbuf=initbuf, writer_cipher_r=writer_cipher_r, sock=writer_remote.get_extra_info('socket'))
7754
else:
7855
writer_remote.write(initbuf)
7956
except Exception:
8057
writer_remote.close()
8158
raise Exception('Unknown remote protocol')
8259
m = modstat(remote_ip, host_name)
83-
asyncio.async(lproto.rchannel(reader_remote, writer, m(2+viaproxy), m(4+viaproxy)))
84-
asyncio.async(lproto.channel(reader, writer_remote, m(viaproxy), DUMMY))
60+
asyncio.ensure_future(lproto.rchannel(reader_remote, writer, m(2+viaproxy), m(4+viaproxy)))
61+
asyncio.ensure_future(lproto.channel(reader, writer_remote, m(viaproxy), DUMMY))
8562
except Exception as ex:
86-
if not isinstance(ex, asyncio.TimeoutError):
87-
verbose('{} from {}'.format(str(ex) or "Unsupported protocol", remote_ip))
63+
if not isinstance(ex, asyncio.TimeoutError) and not str(ex).startswith('Connection closed'):
64+
verbose(f'{str(ex) or "Unsupported protocol"} from {remote_ip}')
8865
try: writer.close()
8966
except Exception: pass
9067

68+
async def check_server_alive(interval, rserver, verbose):
69+
while True:
70+
await asyncio.sleep(interval)
71+
for remote in rserver:
72+
try:
73+
reader, writer = await asyncio.wait_for(remote.connect(), timeout=SOCKET_TIMEOUT)
74+
except Exception as ex:
75+
if remote.alive:
76+
verbose(f'{remote.bind}: OFFLINE')
77+
remote.alive = False
78+
continue
79+
if not remote.alive:
80+
verbose(f'{remote.bind}: ONLINE')
81+
remote.alive = True
82+
try:
83+
writer.close()
84+
except Exception:
85+
pass
86+
9187
def pattern_compile(filename):
9288
with open(filename) as f:
9389
return re.compile('(:?'+''.join('|'.join(i.strip() for i in f if i.strip() and not i.startswith('#')))+')$').match
@@ -125,32 +121,33 @@ def uri_compile(uri):
125121
else:
126122
connect = functools.partial(asyncio.open_unix_connection, path=url.path, ssl=sslclient, server_hostname='' if sslclient else None)
127123
server = functools.partial(asyncio.start_unix_server, path=url.path, ssl=sslserver)
128-
return types.SimpleNamespace(protos=protos, rproto=protos[0], cipher=cipher, auth=url.fragment.encode(), match=match, server=server, connect=connect, bind=loc or url.path, sslclient=sslclient, sslserver=sslserver)
124+
return types.SimpleNamespace(protos=protos, rproto=protos[0], cipher=cipher, auth=url.fragment.encode(), match=match, server=server, connect=connect, bind=loc or url.path, sslclient=sslclient, sslserver=sslserver, alive=True)
129125

130126
def main():
131127
parser = argparse.ArgumentParser(description=__description__+'\nSupported protocols: http,socks,shadowsocks,redirect', epilog='Online help: <https://github.com/qwj/python-proxy>')
132128
parser.add_argument('-i', dest='listen', default=[], action='append', type=uri_compile, help='proxy server setting uri (default: http+socks://:8080/)')
133129
parser.add_argument('-r', dest='rserver', default=[], action='append', type=uri_compile, help='remote server setting uri (default: direct)')
134130
parser.add_argument('-b', dest='block', type=pattern_compile, help='block regex rules')
131+
parser.add_argument('-a', dest='alive', default=0, type=int, help='interval to check remote alive (default: no check)')
135132
parser.add_argument('-v', dest='v', action='store_true', help='print verbose output')
136133
parser.add_argument('--ssl', dest='sslfile', help='certfile[,keyfile] if server listen in ssl mode')
137134
parser.add_argument('--pac', dest='pac', help='http PAC path')
138135
parser.add_argument('--get', dest='gets', default=[], action='append', help='http custom path/file')
139-
parser.add_argument('--version', action='version', version='%(prog)s {}'.format(__version__))
136+
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}')
140137
args = parser.parse_args()
141138
if not args.listen:
142139
args.listen.append(uri_compile('http+socks://:8080/'))
143140
args.httpget = {}
144141
if args.pac:
145-
pactext = 'function FindProxyForURL(u,h){' + ('var b=/^(:?{})$/i;if(b.test(h))return "";'.format(args.block.__self__.pattern) if args.block else '')
142+
pactext = 'function FindProxyForURL(u,h){' + (f'var b=/^(:?{args.block.__self__.pattern})$/i;if(b.test(h))return "";' if args.block else '')
146143
for i, option in enumerate(args.rserver):
147-
pactext += ('var m{1}=/^(:?{0})$/i;if(m{1}.test(h))'.format(option.match.__self__.pattern, i) if option.match else '') + 'return "PROXY %(host)s";'
144+
pactext += (f'var m{i}=/^(:?{option.match.__self__.pattern})$/i;if(m{i}.test(h))' if option.match else '') + 'return "PROXY %(host)s";'
148145
args.httpget[args.pac] = pactext+'return "DIRECT";}'
149146
args.httpget[args.pac+'/all'] = 'function FindProxyForURL(u,h){return "PROXY %(host)s";}'
150147
args.httpget[args.pac+'/none'] = 'function FindProxyForURL(u,h){return "DIRECT";}'
151148
for gets in args.gets:
152149
path, filename = gets.split(',', 1)
153-
with open(filename, 'r') as f:
150+
with open(filename, 'rb') as f:
154151
args.httpget[path] = f.read()
155152
if args.sslfile:
156153
sslfile = args.sslfile.split(',')
@@ -175,6 +172,8 @@ def main():
175172
except Exception as ex:
176173
print('Start server failed.\n\t==>', ex)
177174
if servers:
175+
if args.alive > 0 and args.rserver:
176+
asyncio.ensure_future(check_server_alive(args.alive, args.rserver, args.verbose if args.v else DUMMY))
178177
try:
179178
loop.run_forever()
180179
except KeyboardInterrupt:

0 commit comments

Comments
 (0)