Skip to content

Commit

Permalink
Aio improvement1 (#180)
Browse files Browse the repository at this point in the history
* Improved imports

* Fix for #176

* Fix for #176 && formatting improvment, fstrings are really fast

* Fstrings are really fast, %s are fast in python2 which is deprecated

* Small fixes

Co-authored-by: root <machinexa>
  • Loading branch information
Machinexa2 committed Nov 17, 2020
1 parent fdccb26 commit 4fd0145
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 44 deletions.
53 changes: 27 additions & 26 deletions aiosonic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Main module."""

import asyncio
import random
import re
import codecs
from asyncio import wait_for
from asyncio import get_event_loop
from random import randint
from codecs import lookup
from functools import partial
from json import dumps
from json import loads
from ssl import SSLContext
import gzip
import zlib
from gzip import decompress as gzip_decompress
from zlib import decompress as zlib_decompress

from io import IOBase
from os.path import basename
Expand Down Expand Up @@ -84,7 +85,8 @@ class HttpHeaders(CaseInsensitiveDict):
@staticmethod
def _clear_line(line: bytes):
"""Clear readed line."""
return line.rstrip().split(b': ')
line = line.rstrip()
return line.split(b': ') if b': ' in line else line.split(b':')


#: Headers
Expand Down Expand Up @@ -145,9 +147,9 @@ def status_code(self) -> int:
def _set_body(self, data):
"""Set body."""
if self.compressed == b'gzip':
self.body += gzip.decompress(data)
self.body += gzip_decompress(data)
elif self.compressed == b'deflate':
self.body += zlib.decompress(data)
self.body += zlib_decompress(data)
else:
self.body += data

Expand All @@ -161,7 +163,7 @@ def _get_encoding(self) -> str:

if encoding:
try:
codecs.lookup(encoding)
lookup(encoding)
except LookupError:
encoding = ''

Expand Down Expand Up @@ -226,8 +228,9 @@ def _get_header_data(url: ParseResult,

if params:
query = urlencode(params)
path += '%s' % query if '?' in path else '?%s' % query
get_base = '%s %s HTTP/1.1%s' % (method.upper(), path, _NEW_LINE)
path += f'{query}' if '?' in path else f'?{query}'
uppercase_method = method.upper()
get_base = f'{uppercase_method} {path} HTTP/1.1{_NEW_LINE}'

port = url.port or (443 if url.scheme == 'https' else 80)
hostname = url.hostname
Expand All @@ -242,18 +245,17 @@ def _get_header_data(url: ParseResult,
':authority': hostname.split(':')[0],
':scheme': 'https',
':path': path,
'user-agent': 'aioload/%s' % VERSION
'user-agent': f'aioload/{VERSION}'
})
else:
headers_base.update({
'HOST': hostname,
'Connection': 'keep-alive',
'User-Agent': 'aioload/%s' % VERSION
'User-Agent': f'aioload/{VERSION}'
})

if multipart:
headers_base[
'Content-Type'] = 'multipart/form-data; boundary="%s"' % boundary
headers_base['Content-Type'] = f'multipart/form-data; boundary="{boundary}"'

if headers:
headers_base.update(headers)
Expand All @@ -262,7 +264,7 @@ def _get_header_data(url: ParseResult,
return headers_base

for key, data in headers_base.items():
get_base += '%s: %s%s' % (key, data, _NEW_LINE)
get_base += f'{key}: {data}{_NEW_LINE}'
return (get_base + _NEW_LINE).encode()


Expand Down Expand Up @@ -329,7 +331,7 @@ async def _send_multipart(data: Dict[str, str],
to_send = b''
for key, val in data.items():
# write --boundary + field
to_send += ('--%s%s' % (boundary, _NEW_LINE)).encode()
to_send += (f'--{boundary}{_NEW_LINE}').encode()

if isinstance(val, IOBase):
# TODO: Utility to accept files with multipart metadata
Expand All @@ -342,7 +344,7 @@ async def _send_multipart(data: Dict[str, str],
to_send += to_write.encode()

# read and write chunks
loop = asyncio.get_event_loop()
loop = get_event_loop()
while True:
data = await loop.run_in_executor(None, val.read, chunk_size)
if not data:
Expand All @@ -351,12 +353,11 @@ async def _send_multipart(data: Dict[str, str],
val.close()

else:
to_send += ('Content-Disposition: form-data; name="%s"%s%s' %
(key, _NEW_LINE, _NEW_LINE)).encode()
to_send += (f'Content-Disposition: form-data; name="{key}"{_NEW_LINE}{_NEW_LINE}').encode()
to_send += val.encode() + _NEW_LINE.encode()

# write --boundary-- for finish
to_send += ('--%s--' % boundary).encode()
to_send += (f'--{boundary}--').encode()
_add_header(headers, 'Content-Length', str(len(to_send)))
return to_send

Expand Down Expand Up @@ -392,7 +393,7 @@ async def _do_request(urlparsed: ParseResult,

# get response code and version
try:
response._set_response_initial(await asyncio.wait_for(
response._set_response_initial(await wait_for(
connection.reader.readline(),
(timeouts or connector.timeouts).sock_read))
except TimeoutException:
Expand All @@ -402,7 +403,7 @@ async def _do_request(urlparsed: ParseResult,
# reading headers
while True:
res_data = await connection.reader.readline()
if b': ' not in res_data:
if b': ' not in res_data and b':' not in res_data:
break
response._set_header(*HttpHeaders._clear_line(res_data))

Expand Down Expand Up @@ -642,7 +643,7 @@ async def request(self,
elif multipart:
if not isinstance(data, dict):
raise ValueError('data should be dict')
boundary = 'boundary-%d' % random.randint(10**8, 10**9)
boundary = 'boundary-%d' % randint(10**8, 10**9)
body = await _send_multipart(data, boundary, headers)

max_redirects = 30
Expand All @@ -655,7 +656,7 @@ async def request(self,
multipart=multipart,
boundary=boundary)
try:
response = await asyncio.wait_for(
response = await wait_for(
_do_request(urlparsed, headers_data, self.connector, body,
verify, ssl, timeouts, http2),
timeout=(timeouts
Expand Down Expand Up @@ -693,7 +694,7 @@ async def wait_requests(self, timeout: int = 30):
This is useful when doing safe shutdown of a process.
"""
try:
return await asyncio.wait_for(
return await wait_for(
self.connector.wait_free_pool(), timeout)
except TimeoutException:
return False
14 changes: 7 additions & 7 deletions aiosonic/connection.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
"""Connection stuffs."""

import asyncio
from asyncio import StreamReader
from asyncio import StreamWriter
import ssl
from ssl import SSLContext
from asyncio import wait_for, open_connection
from asyncio import StreamReader
from asyncio import StreamWriter
from typing import Dict
from typing import Optional
from urllib.parse import ParseResult

import h2.connection
import h2.events

from concurrent import futures
#from concurrent import futures (unused)
from aiosonic.exceptions import ConnectTimeout
from aiosonic.exceptions import HttpParsingError
from aiosonic.exceptions import TimeoutException
Expand Down Expand Up @@ -46,7 +46,7 @@ async def connect(self,
http2: bool = False):
"""Connet with timeout."""
try:
await asyncio.wait_for(self._connect(urlparsed, verify,
await wait_for(self._connect(urlparsed, verify,
ssl_context, http2),
timeout=(timeouts
or self.timeouts).sock_connect)
Expand All @@ -59,7 +59,7 @@ async def _connect(self, urlparsed: ParseResult, verify: bool,
if not urlparsed.hostname:
raise HttpParsingError('missing hostname')

key = '%s-%s' % (urlparsed.hostname, urlparsed.port)
key = f'{urlparsed.hostname}-{urlparsed.port}'

if self.writer:
# python 3.6 doesn't have writer.is_closing
Expand All @@ -85,7 +85,7 @@ def is_closing():
ssl_context.verify_mode = ssl.CERT_NONE
port = urlparsed.port or (443
if urlparsed.scheme == 'https' else 80)
self.reader, self.writer = await asyncio.open_connection(
self.reader, self.writer = await open_connection(
urlparsed.hostname, port, ssl=ssl_context)

self.temp_key = key
Expand Down
13 changes: 7 additions & 6 deletions aiosonic/connectors.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""Connector stuffs."""

import asyncio
from asyncio import wait_for
from asyncio import sleep as asyncio_sleep
from asyncio import StreamReader
from asyncio import StreamWriter
import ssl
from ssl import SSLContext
from typing import Coroutine
from typing import Optional
from ssl import SSLContext
from urllib.parse import ParseResult

import h2.connection
#import h2.connection (unused)
from hyperframe.frame import SettingsFrame

from concurrent import futures
#from concurrent import futures (unused)
from aiosonic.exceptions import ConnectTimeout
from aiosonic.exceptions import ConnectionPoolAcquireTimeout
from aiosonic.exceptions import TimeoutException
Expand Down Expand Up @@ -41,7 +42,7 @@ async def acquire(self, urlparsed: ParseResult):
return await self.pool.acquire(urlparsed)

try:
return await asyncio.wait_for(self.pool.acquire(urlparsed),
return await wait_for(self.pool.acquire(urlparsed),
self.timeouts.pool_acquire)
except TimeoutException:
raise ConnectionPoolAcquireTimeout()
Expand All @@ -57,4 +58,4 @@ async def wait_free_pool(self):
while True:
if self.pool.is_all_free():
return True
asyncio.sleep(0.02)
asyncio_sleep(0.02)
10 changes: 5 additions & 5 deletions aiosonic/pools.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""Pools module."""

import asyncio
from urllib.parse import ParseResult

from asyncio import Semaphore
from asyncio import Queue

class CyclicQueuePool:
"""Cyclic queue pool of connections."""
def __init__(self, connector, pool_size, connection_cls):
self.pool_size = pool_size
self.pool = asyncio.Queue(pool_size)
self.pool = Queue(pool_size)

for _ in range(pool_size):
self.pool.put_nowait(connection_cls(connector))
Expand All @@ -31,7 +31,7 @@ class SmartPool:
def __init__(self, connector, pool_size, connection_cls):
self.pool_size = pool_size
self.pool = set()
self.sem = asyncio.Semaphore(pool_size)
self.sem = Semaphore(pool_size)

for _ in range(pool_size):
self.pool.add(connection_cls(connector))
Expand All @@ -40,7 +40,7 @@ async def acquire(self, urlparsed: ParseResult = None):
"""Acquire connection."""
await self.sem.acquire()
if urlparsed:
key = '%s-%s' % (urlparsed.hostname, urlparsed.port)
key = f'{urlparsed.hostname}-{urlparsed.port}'
for item in self.pool:
if item.key == key:
self.pool.remove(item)
Expand Down

0 comments on commit 4fd0145

Please sign in to comment.