|
2 | 2 | from . import proto |
3 | 3 | from .__doc__ import * |
4 | 4 |
|
5 | | -SOCKET_TIMEOUT = 300 |
| 5 | +SOCKET_TIMEOUT = 60 |
6 | 6 | UDP_LIMIT = 30 |
7 | 7 | DUMMY = lambda s: s |
8 | 8 |
|
9 | | -asyncio.StreamReader.read_w = lambda self, n: asyncio.wait_for(self.read(n), timeout=SOCKET_TIMEOUT) |
10 | | -asyncio.StreamReader.read_n = lambda self, n: asyncio.wait_for(self.readexactly(n), timeout=SOCKET_TIMEOUT) |
11 | | -asyncio.StreamReader.read_until = lambda self, s: asyncio.wait_for(self.readuntil(s), timeout=SOCKET_TIMEOUT) |
12 | | -asyncio.StreamReader.rollback = lambda self, s: self._buffer.__setitem__(slice(0, 0), s) |
| 9 | +class ProxyReader(asyncio.StreamReader): |
| 10 | + def __init__(self, o=None): |
| 11 | + if o: |
| 12 | + self.__dict__ = o.__dict__ |
| 13 | + def read_w(self, n): |
| 14 | + return asyncio.wait_for(self.read(n), timeout=SOCKET_TIMEOUT) |
| 15 | + def read_n(self, n): |
| 16 | + return asyncio.wait_for(self.readexactly(n), timeout=SOCKET_TIMEOUT) |
| 17 | + def read_until(self, s): |
| 18 | + return asyncio.wait_for(self.readuntil(s), timeout=SOCKET_TIMEOUT) |
| 19 | + def rollback(self, s): |
| 20 | + self._buffer.__setitem__(slice(0, 0), s) |
13 | 21 |
|
14 | 22 | class AuthTable(object): |
15 | 23 | _auth = {} |
@@ -56,7 +64,7 @@ def schedule(rserver, salgorithm, host_name, port): |
56 | 64 |
|
57 | 65 | async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, sslserver, debug=0, authtime=86400*30, block=None, salgorithm='fa', verbose=DUMMY, modstat=lambda u,r,h:lambda i:DUMMY, **kwargs): |
58 | 66 | try: |
59 | | - reader, writer = proto.sslwrap(reader, writer, sslserver, True, None, verbose) |
| 67 | + reader, writer = proto.sslwrap(ProxyReader(reader), writer, sslserver, True, None, verbose) |
60 | 68 | if unix: |
61 | 69 | remote_ip, server_ip, remote_text = 'local', None, 'unix_local' |
62 | 70 | else: |
@@ -219,10 +227,8 @@ async def open_connection(self, host, port, local_addr, lbind, timeout=SOCKET_TI |
219 | 227 | reader, writer = await asyncio.wait_for(wait, timeout=timeout) |
220 | 228 | except Exception as ex: |
221 | 229 | raise |
222 | | - return reader, writer |
223 | | - def prepare_connection(self, reader_remote, writer_remote, host, port): |
224 | | - return self.prepare_ciphers_and_headers(reader_remote, writer_remote, host, port) |
225 | | - async def prepare_ciphers_and_headers(self, reader_remote, writer_remote, host, port): |
| 230 | + return ProxyReader(reader), writer |
| 231 | + async def prepare_connection(self, reader_remote, writer_remote, host, port): |
226 | 232 | return reader_remote, writer_remote |
227 | 233 | async def tcp_connect(self, host, port, local_addr=None, lbind=None): |
228 | 234 | reader, writer = await self.open_connection(host, port, local_addr, lbind) |
@@ -288,12 +294,12 @@ def wait_open_connection(self, host, port, local_addr, family): |
288 | 294 | return asyncio.open_unix_connection(path=self.bind) |
289 | 295 | else: |
290 | 296 | return asyncio.open_connection(host=self.host_name, port=self.port, local_addr=local_addr, family=family) |
291 | | - async def prepare_ciphers_and_headers(self, reader_remote, writer_remote, host, port): |
| 297 | + async def prepare_connection(self, reader_remote, writer_remote, host, port): |
292 | 298 | reader_remote, writer_remote = proto.sslwrap(reader_remote, writer_remote, self.sslclient, False, self.host_name) |
293 | 299 | _, writer_cipher_r = await prepare_ciphers(self.cipher, reader_remote, writer_remote, self.bind) |
294 | 300 | whost, wport = self.jump.destination(host, port) |
295 | 301 | await self.rproto.connect(reader_remote=reader_remote, writer_remote=writer_remote, rauth=self.auth, host_name=whost, port=wport, writer_cipher_r=writer_cipher_r, myhost=self.host_name, sock=writer_remote.get_extra_info('socket')) |
296 | | - return await self.jump.prepare_ciphers_and_headers(reader_remote, writer_remote, host, port) |
| 302 | + return await self.jump.prepare_connection(reader_remote, writer_remote, host, port) |
297 | 303 | def start_server(self, args, stream_handler=stream_handler): |
298 | 304 | handler = functools.partial(stream_handler, **vars(self), **args) |
299 | 305 | if self.unix: |
@@ -470,7 +476,7 @@ def __init__(self, backward, backward_num, **kw): |
470 | 476 | async def wait_open_connection(self, *args): |
471 | 477 | while True: |
472 | 478 | reader, writer = await self.conn.get() |
473 | | - if not writer.is_closing(): |
| 479 | + if not writer.is_closing() and not reader.at_eof(): |
474 | 480 | return reader, writer |
475 | 481 | def close(self): |
476 | 482 | self.closed = True |
@@ -524,7 +530,7 @@ async def handler(reader, writer, **kw): |
524 | 530 | auth = b'\x01'+auth |
525 | 531 | if auth: |
526 | 532 | try: |
527 | | - assert auth == (await reader.read_n(len(auth))) |
| 533 | + assert auth == (await asyncio.wait_for(reader.readexactly(len(auth)), timeout=SOCKET_TIMEOUT)) |
528 | 534 | except Exception: |
529 | 535 | return |
530 | 536 | await self.conn.put((reader, writer)) |
|
0 commit comments