Skip to content

Commit

Permalink
Advance http2
Browse files Browse the repository at this point in the history
  • Loading branch information
sonic182 committed Oct 12, 2019
1 parent 7e58edc commit 643654e
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 348 deletions.
83 changes: 13 additions & 70 deletions aiosonic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@
from typing import Dict
from typing import Iterator
from typing import Union
from typing import Tuple
from typing import Optional
from typing import Sequence

import chardet
import h2.events

from aiosonic_utils.structures import CaseInsensitiveDict

Expand All @@ -45,6 +42,12 @@
from aiosonic.utils import cache_decorator
from aiosonic.version import VERSION

# TYPES
from aiosonic.types import ParamsType
from aiosonic.types import DataType
from aiosonic.types import BodyType
from aiosonic.types import ParsedBodyType


try:
import cchardet as chardet
Expand All @@ -63,33 +66,6 @@
_COMPRESSED_OPTIONS = set([b'gzip', b'deflate'])


# TYPES
ParamsType = Union[
Dict[str, str],
Sequence[Tuple[str, str]],
]
#: Data to be sent in requests, allowed types
DataType = Union[
str,
bytes,
dict,
tuple,
AsyncIterator[bytes],
Iterator[bytes],
]
BodyType = Union[
str,
bytes,
AsyncIterator[bytes],
Iterator[bytes],
]
ParsedBodyType = Union[
bytes,
AsyncIterator[bytes],
Iterator[bytes],
]


# Functions with cache
@cache_decorator(_LRU_CACHE_SIZE)
def _get_url_parsed(url: str) -> ParseResult:
Expand Down Expand Up @@ -270,8 +246,10 @@ def _get_header_data(url: ParseResult, connection: Connection, method: str,
headers_base.update(headers)

if http2conn:
http2conn.send_headers(1, headers_base.items(), end_stream=True)
return http2conn.data_to_send()
return headers_base
# stream_id = http2conn.get_next_available_stream_id()
# http2conn.send_headers(stream_id, headers_base.items(), end_stream=True)
# return http2conn.data_to_send()

for key, data in headers_base.items():
get_base += '%s: %s%s' % (key, data, _NEW_LINE)
Expand Down Expand Up @@ -379,10 +357,9 @@ async def _do_request(urlparsed: ParseResult, headers_data: Callable,
async with (await connector.acquire(urlparsed)) as connection:
await connection.connect(urlparsed, verify, ssl, timeouts)
to_send = headers_data(connection=connection)
h2conn = connection.h2conn

if h2conn:
return await _http2_handle(connection, to_send, body)
if connection.h2conn:
return await connection.http2_request(to_send, body)

if not connection.writer or not connection.reader:
raise ConnectionError('Not connection writer or reader')
Expand Down Expand Up @@ -589,38 +566,4 @@ async def request(url: str, method: str = 'GET', headers: HeadersType = None,
except ConnectTimeout:
raise
except futures._base.TimeoutError:
raise RequestTimeout()


async def _http2_handle(connection: Connection, headers: bytes,
body: Optional[ParsedBodyType]):
"""Handle."""
if not connection.writer or not connection.reader or not connection.h2conn:
raise ConnectionError('Not connection writer or reader or h2conn')

h2conn = connection.h2conn
# first send headers
connection.writer.write(headers)

# now read events...
while True:
data = await connection.reader.readline()
events = h2conn.receive_data(data)

print('events')
print(events)

for event in events:
if isinstance(event, h2.events.StreamEnded):
raise Exception('stream end')
elif isinstance(event, h2.events.SettingsAcknowledged):
# send data
raise Exception('ack')
data = h2conn.data_to_send()
print('data')
print(data)
connection.writer.write(data)

if not data:
raise Exception('asdf')

raise RequestTimeout()
19 changes: 12 additions & 7 deletions aiosonic/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
from urllib.parse import ParseResult

import h2.connection
from hyperframe.frame import SettingsFrame
import h2.events

from concurrent import futures
from aiosonic.exceptions import ConnectTimeout
from aiosonic.timeout import Timeouts
from aiosonic.connectors import TCPConnector
from aiosonic.http2 import Http2Handler

from aiosonic.types import ParamsType
from aiosonic.types import ParsedBodyType


class Connection:
Expand All @@ -30,6 +34,7 @@ def __init__(self, connector: TCPConnector):
self.temp_key: Optional[str] = None

self.h2conn: Optional[h2.connection.H2Connection] = None
self.h2handler: Optional[Http2Handler] = None

async def connect(self, urlparsed: ParseResult, verify: bool,
ssl_context: SSLContext, timeouts: Timeouts):
Expand Down Expand Up @@ -89,12 +94,7 @@ async def _connection_made(self):

if negotiated_protocol == 'h2':
self.h2conn = h2.connection.H2Connection()

self.h2conn.initiate_connection()

# This reproduces the error in #396, by changing the header table size.
# self.h2conn.update_settings({SettingsFrame.HEADER_TABLE_SIZE: 4096})
self.writer.write(self.h2conn.data_to_send())
self.h2handler = Http2Handler(self)

def keep_alive(self):
"""Check if keep alive."""
Expand All @@ -120,6 +120,8 @@ async def __aexit__(self, exc_type, exc, tb):

if not self.blocked:
await self.release()
if self.h2handler:
self.h2handler.cleanup()

async def release(self):
"""Release connection."""
Expand All @@ -136,3 +138,6 @@ def __del__(self):
self.writer, 'is_closing', self.writer._transport.is_closing)
if is_closing():
self.writer.close()

async def http2_request(self, headers: ParamsType, body: Optional[ParsedBodyType]):
return await self.h2handler.request(headers, body)
132 changes: 132 additions & 0 deletions aiosonic/http2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@


import asyncio
from typing import Optional

import h2.events

from aiosonic.types import ParamsType
from aiosonic.types import ParsedBodyType


class Http2Handler(object):

def __init__(self, connection):
"""Initialize."""
self.connection = connection
h2conn = connection.h2conn

loop = asyncio.get_event_loop()
h2conn.initiate_connection()
self.writer_q = asyncio.Queue()

self.requests = {}

# This reproduces the error in #396, by changing the header table size.
# h2conn.update_settings({SettingsFrame.HEADER_TABLE_SIZE: 4096})
self.writer.write(h2conn.data_to_send())
self.reader_task = loop.create_task(self.reader_t())
self.writer_task = loop.create_task(self.writer_t())

@property
def writer(self):
return self.connection.writer

@property
def reader(self):
return self.connection.reader

@property
def h2conn(self):
return self.connection.h2conn

def cleanup(self):
"""Cleanup."""
self.reader_task.cancel()
self.writer_task.cancel()

async def request(self, headers: ParamsType, body: Optional[ParsedBodyType]):
stream_id = self.h2conn.get_next_available_stream_id()
self.h2conn.send_headers(stream_id, headers.items(),
end_stream=True)
from aiosonic import HttpResponse
future = asyncio.Future()
self.requests[stream_id] = {
'body': b'',
'headers': None,
'future': future
}
await self.writer_q.put(True)
await future
res = self.requests[stream_id].copy()
del self.requests[stream_id]

response = HttpResponse()
for key, val in res['headers']:
if key == b':status':
response.response_initial = {
'version': b'2',
'code': val
}
else:
response._set_header(key, val)

if res['body']:
response._set_body(res['body'])

return response

async def reader_t(self):
"""Reader task."""
read_size = 16000
h2conn = self.h2conn

while True:
data = await asyncio.wait_for(self.reader.read(read_size), 3)
events = h2conn.receive_data(data)

if not events:
continue
else:
for event in events:
if isinstance(event, h2.events.StreamEnded):
self.requests[event.stream_id]['future'].set_result(
self.requests[event.stream_id]['body'])
elif isinstance(event, h2.events.DataReceived):
self.requests[event.stream_id]['body'] += event.data

if (event.stream_id in h2conn.streams
and not h2conn.streams[event.stream_id].closed):
h2conn.increment_flow_control_window(
event.flow_controlled_length, event.stream_id)
h2conn.increment_flow_control_window(
event.flow_controlled_length)
elif isinstance(event, h2.events.ResponseReceived):
self.requests[
event.stream_id]['headers'] = event.headers
elif isinstance(event, (
h2.events.WindowUpdated,
h2.events.PingReceived,
h2.events.RemoteSettingsChanged,
h2.events.SettingsAcknowledged
)):
pass
else:
raise Exception(
f'another event {event.__class__.__name__}')

await self.writer_q.put(True)

async def writer_t(self):
"""Writer task."""
h2conn = self.h2conn

while True:
await self.writer_q.get()
while True:
data_to_send = h2conn.data_to_send()

if data_to_send:
self.writer.write(data_to_send)
else:
break
34 changes: 34 additions & 0 deletions aiosonic/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

from typing import Union
from typing import Dict
from typing import Sequence
from typing import Tuple
from typing import AsyncIterator
from typing import Iterator


# TYPES
ParamsType = Union[
Dict[str, str],
Sequence[Tuple[str, str]],
]
#: Data to be sent in requests, allowed types
DataType = Union[
str,
bytes,
dict,
tuple,
AsyncIterator[bytes],
Iterator[bytes],
]
BodyType = Union[
str,
bytes,
AsyncIterator[bytes],
Iterator[bytes],
]
ParsedBodyType = Union[
bytes,
AsyncIterator[bytes],
Iterator[bytes],
]
Loading

0 comments on commit 643654e

Please sign in to comment.