From 4fd0145019e61cf99c1d0b1377bfed6696a0c759 Mon Sep 17 00:00:00 2001 From: Machinexa2 <60662297+machinexa2@users.noreply.github.com> Date: Tue, 17 Nov 2020 15:45:25 +0545 Subject: [PATCH] Aio improvement1 (#180) * Improved imports * Fix for https://github.com/sonic182/aiosonic/issues/176 * Fix for https://github.com/sonic182/aiosonic/issues/176 && formatting improvment, fstrings are really fast * Fstrings are really fast, %s are fast in python2 which is deprecated * Small fixes Co-authored-by: root --- aiosonic/__init__.py | 53 +++++++++++++++++++++--------------------- aiosonic/connection.py | 14 +++++------ aiosonic/connectors.py | 13 ++++++----- aiosonic/pools.py | 10 ++++---- 4 files changed, 46 insertions(+), 44 deletions(-) diff --git a/aiosonic/__init__.py b/aiosonic/__init__.py index 3b7cd50..48b6701 100644 --- a/aiosonic/__init__.py +++ b/aiosonic/__init__.py @@ -1,15 +1,16 @@ """Main module.""" -import asyncio -import random import re -import codecs +from asyncio import wait_for +from asyncio import get_event_loop +from random import randint +from codecs import lookup from functools import partial from json import dumps from json import loads from ssl import SSLContext -import gzip -import zlib +from gzip import decompress as gzip_decompress +from zlib import decompress as zlib_decompress from io import IOBase from os.path import basename @@ -84,7 +85,8 @@ class HttpHeaders(CaseInsensitiveDict): @staticmethod def _clear_line(line: bytes): """Clear readed line.""" - return line.rstrip().split(b': ') + line = line.rstrip() + return line.split(b': ') if b': ' in line else line.split(b':') #: Headers @@ -145,9 +147,9 @@ def status_code(self) -> int: def _set_body(self, data): """Set body.""" if self.compressed == b'gzip': - self.body += gzip.decompress(data) + self.body += gzip_decompress(data) elif self.compressed == b'deflate': - self.body += zlib.decompress(data) + self.body += zlib_decompress(data) else: self.body += data @@ -161,7 +163,7 @@ def _get_encoding(self) -> str: if encoding: try: - codecs.lookup(encoding) + lookup(encoding) except LookupError: encoding = '' @@ -226,8 +228,9 @@ def _get_header_data(url: ParseResult, if params: query = urlencode(params) - path += '%s' % query if '?' in path else '?%s' % query - get_base = '%s %s HTTP/1.1%s' % (method.upper(), path, _NEW_LINE) + path += f'{query}' if '?' in path else f'?{query}' + uppercase_method = method.upper() + get_base = f'{uppercase_method} {path} HTTP/1.1{_NEW_LINE}' port = url.port or (443 if url.scheme == 'https' else 80) hostname = url.hostname @@ -242,18 +245,17 @@ def _get_header_data(url: ParseResult, ':authority': hostname.split(':')[0], ':scheme': 'https', ':path': path, - 'user-agent': 'aioload/%s' % VERSION + 'user-agent': f'aioload/{VERSION}' }) else: headers_base.update({ 'HOST': hostname, 'Connection': 'keep-alive', - 'User-Agent': 'aioload/%s' % VERSION + 'User-Agent': f'aioload/{VERSION}' }) if multipart: - headers_base[ - 'Content-Type'] = 'multipart/form-data; boundary="%s"' % boundary + headers_base['Content-Type'] = f'multipart/form-data; boundary="{boundary}"' if headers: headers_base.update(headers) @@ -262,7 +264,7 @@ def _get_header_data(url: ParseResult, return headers_base for key, data in headers_base.items(): - get_base += '%s: %s%s' % (key, data, _NEW_LINE) + get_base += f'{key}: {data}{_NEW_LINE}' return (get_base + _NEW_LINE).encode() @@ -329,7 +331,7 @@ async def _send_multipart(data: Dict[str, str], to_send = b'' for key, val in data.items(): # write --boundary + field - to_send += ('--%s%s' % (boundary, _NEW_LINE)).encode() + to_send += (f'--{boundary}{_NEW_LINE}').encode() if isinstance(val, IOBase): # TODO: Utility to accept files with multipart metadata @@ -342,7 +344,7 @@ async def _send_multipart(data: Dict[str, str], to_send += to_write.encode() # read and write chunks - loop = asyncio.get_event_loop() + loop = get_event_loop() while True: data = await loop.run_in_executor(None, val.read, chunk_size) if not data: @@ -351,12 +353,11 @@ async def _send_multipart(data: Dict[str, str], val.close() else: - to_send += ('Content-Disposition: form-data; name="%s"%s%s' % - (key, _NEW_LINE, _NEW_LINE)).encode() + to_send += (f'Content-Disposition: form-data; name="{key}"{_NEW_LINE}{_NEW_LINE}').encode() to_send += val.encode() + _NEW_LINE.encode() # write --boundary-- for finish - to_send += ('--%s--' % boundary).encode() + to_send += (f'--{boundary}--').encode() _add_header(headers, 'Content-Length', str(len(to_send))) return to_send @@ -392,7 +393,7 @@ async def _do_request(urlparsed: ParseResult, # get response code and version try: - response._set_response_initial(await asyncio.wait_for( + response._set_response_initial(await wait_for( connection.reader.readline(), (timeouts or connector.timeouts).sock_read)) except TimeoutException: @@ -402,7 +403,7 @@ async def _do_request(urlparsed: ParseResult, # reading headers while True: res_data = await connection.reader.readline() - if b': ' not in res_data: + if b': ' not in res_data and b':' not in res_data: break response._set_header(*HttpHeaders._clear_line(res_data)) @@ -642,7 +643,7 @@ async def request(self, elif multipart: if not isinstance(data, dict): raise ValueError('data should be dict') - boundary = 'boundary-%d' % random.randint(10**8, 10**9) + boundary = 'boundary-%d' % randint(10**8, 10**9) body = await _send_multipart(data, boundary, headers) max_redirects = 30 @@ -655,7 +656,7 @@ async def request(self, multipart=multipart, boundary=boundary) try: - response = await asyncio.wait_for( + response = await wait_for( _do_request(urlparsed, headers_data, self.connector, body, verify, ssl, timeouts, http2), timeout=(timeouts @@ -693,7 +694,7 @@ async def wait_requests(self, timeout: int = 30): This is useful when doing safe shutdown of a process. """ try: - return await asyncio.wait_for( + return await wait_for( self.connector.wait_free_pool(), timeout) except TimeoutException: return False diff --git a/aiosonic/connection.py b/aiosonic/connection.py index 31b3883..e546835 100644 --- a/aiosonic/connection.py +++ b/aiosonic/connection.py @@ -1,10 +1,10 @@ """Connection stuffs.""" -import asyncio -from asyncio import StreamReader -from asyncio import StreamWriter import ssl from ssl import SSLContext +from asyncio import wait_for, open_connection +from asyncio import StreamReader +from asyncio import StreamWriter from typing import Dict from typing import Optional from urllib.parse import ParseResult @@ -12,7 +12,7 @@ import h2.connection import h2.events -from concurrent import futures +#from concurrent import futures (unused) from aiosonic.exceptions import ConnectTimeout from aiosonic.exceptions import HttpParsingError from aiosonic.exceptions import TimeoutException @@ -46,7 +46,7 @@ async def connect(self, http2: bool = False): """Connet with timeout.""" try: - await asyncio.wait_for(self._connect(urlparsed, verify, + await wait_for(self._connect(urlparsed, verify, ssl_context, http2), timeout=(timeouts or self.timeouts).sock_connect) @@ -59,7 +59,7 @@ async def _connect(self, urlparsed: ParseResult, verify: bool, if not urlparsed.hostname: raise HttpParsingError('missing hostname') - key = '%s-%s' % (urlparsed.hostname, urlparsed.port) + key = f'{urlparsed.hostname}-{urlparsed.port}' if self.writer: # python 3.6 doesn't have writer.is_closing @@ -85,7 +85,7 @@ def is_closing(): ssl_context.verify_mode = ssl.CERT_NONE port = urlparsed.port or (443 if urlparsed.scheme == 'https' else 80) - self.reader, self.writer = await asyncio.open_connection( + self.reader, self.writer = await open_connection( urlparsed.hostname, port, ssl=ssl_context) self.temp_key = key diff --git a/aiosonic/connectors.py b/aiosonic/connectors.py index 3b47667..14f8131 100644 --- a/aiosonic/connectors.py +++ b/aiosonic/connectors.py @@ -1,18 +1,19 @@ """Connector stuffs.""" -import asyncio +from asyncio import wait_for +from asyncio import sleep as asyncio_sleep from asyncio import StreamReader from asyncio import StreamWriter import ssl +from ssl import SSLContext from typing import Coroutine from typing import Optional -from ssl import SSLContext from urllib.parse import ParseResult -import h2.connection +#import h2.connection (unused) from hyperframe.frame import SettingsFrame -from concurrent import futures +#from concurrent import futures (unused) from aiosonic.exceptions import ConnectTimeout from aiosonic.exceptions import ConnectionPoolAcquireTimeout from aiosonic.exceptions import TimeoutException @@ -41,7 +42,7 @@ async def acquire(self, urlparsed: ParseResult): return await self.pool.acquire(urlparsed) try: - return await asyncio.wait_for(self.pool.acquire(urlparsed), + return await wait_for(self.pool.acquire(urlparsed), self.timeouts.pool_acquire) except TimeoutException: raise ConnectionPoolAcquireTimeout() @@ -57,4 +58,4 @@ async def wait_free_pool(self): while True: if self.pool.is_all_free(): return True - asyncio.sleep(0.02) + asyncio_sleep(0.02) diff --git a/aiosonic/pools.py b/aiosonic/pools.py index 0c19982..104ad19 100644 --- a/aiosonic/pools.py +++ b/aiosonic/pools.py @@ -1,14 +1,14 @@ """Pools module.""" -import asyncio from urllib.parse import ParseResult - +from asyncio import Semaphore +from asyncio import Queue class CyclicQueuePool: """Cyclic queue pool of connections.""" def __init__(self, connector, pool_size, connection_cls): self.pool_size = pool_size - self.pool = asyncio.Queue(pool_size) + self.pool = Queue(pool_size) for _ in range(pool_size): self.pool.put_nowait(connection_cls(connector)) @@ -31,7 +31,7 @@ class SmartPool: def __init__(self, connector, pool_size, connection_cls): self.pool_size = pool_size self.pool = set() - self.sem = asyncio.Semaphore(pool_size) + self.sem = Semaphore(pool_size) for _ in range(pool_size): self.pool.add(connection_cls(connector)) @@ -40,7 +40,7 @@ async def acquire(self, urlparsed: ParseResult = None): """Acquire connection.""" await self.sem.acquire() if urlparsed: - key = '%s-%s' % (urlparsed.hostname, urlparsed.port) + key = f'{urlparsed.hostname}-{urlparsed.port}' for item in self.pool: if item.key == key: self.pool.remove(item)