-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #30 from jeamland/even-more-testing
Get to 100% coverage on everything except compat and utf8validator
- Loading branch information
Showing
9 changed files
with
1,405 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import itertools | ||
|
||
import pytest | ||
|
||
from wsproto.connection import WSConnection, CLIENT, SERVER, ConnectionState | ||
from wsproto.events import ( | ||
ConnectionClosed, | ||
ConnectionEstablished, | ||
ConnectionRequested, | ||
TextReceived, | ||
BytesReceived, | ||
) | ||
from wsproto.frame_protocol import CloseReason, FrameProtocol | ||
|
||
|
||
class TestConnection(object): | ||
def create_connection(self): | ||
server = WSConnection(SERVER) | ||
client = WSConnection(CLIENT, host='localhost', resource='foo') | ||
|
||
server.receive_bytes(client.bytes_to_send()) | ||
event = next(server.events()) | ||
assert isinstance(event, ConnectionRequested) | ||
|
||
server.accept(event) | ||
client.receive_bytes(server.bytes_to_send()) | ||
assert isinstance(next(client.events()), ConnectionEstablished) | ||
|
||
return client, server | ||
|
||
def test_negotiation(self): | ||
self.create_connection() | ||
|
||
@pytest.mark.parametrize('as_client,final', [ | ||
(True, True), | ||
(True, False), | ||
(False, True), | ||
(False, False) | ||
]) | ||
def test_send_and_receive(self, as_client, final): | ||
client, server = self.create_connection() | ||
if as_client: | ||
me = client | ||
them = server | ||
else: | ||
me = server | ||
them = client | ||
|
||
data = b'x' * 23 | ||
|
||
me.send_data(data, final) | ||
them.receive_bytes(me.bytes_to_send()) | ||
|
||
event = next(them.events()) | ||
assert isinstance(event, BytesReceived) | ||
assert event.data == data | ||
assert event.message_finished is final | ||
|
||
@pytest.mark.parametrize('as_client,code,reason', [ | ||
(True, CloseReason.NORMAL_CLOSURE, u'bye'), | ||
(True, CloseReason.GOING_AWAY, u'👋👋'), | ||
(False, CloseReason.NORMAL_CLOSURE, u'bye'), | ||
(False, CloseReason.GOING_AWAY, u'👋👋'), | ||
]) | ||
def test_close(self, as_client, code, reason): | ||
client, server = self.create_connection() | ||
if as_client: | ||
me = client | ||
them = server | ||
else: | ||
me = server | ||
them = client | ||
|
||
me.close(code, reason) | ||
them.receive_bytes(me.bytes_to_send()) | ||
|
||
event = next(them.events()) | ||
assert isinstance(event, ConnectionClosed) | ||
assert event.code is code | ||
assert event.reason == reason | ||
|
||
def test_normal_closure(self): | ||
client, server = self.create_connection() | ||
|
||
for conn in (client, server): | ||
conn.close() | ||
conn.receive_bytes(None) | ||
with pytest.raises(StopIteration): | ||
print(repr(next(conn.events()))) | ||
assert conn.closed | ||
|
||
def test_abnormal_closure(self): | ||
client, server = self.create_connection() | ||
|
||
for conn in (client, server): | ||
conn.receive_bytes(None) | ||
event = next(conn.events()) | ||
assert isinstance(event, ConnectionClosed) | ||
assert event.code is CloseReason.ABNORMAL_CLOSURE | ||
assert conn.closed | ||
|
||
def test_bytes_send_all(self): | ||
connection = WSConnection(SERVER) | ||
connection._outgoing = b'fnord fnord' | ||
assert connection.bytes_to_send() == b'fnord fnord' | ||
assert connection.bytes_to_send() == b'' | ||
|
||
def test_bytes_send_some(self): | ||
connection = WSConnection(SERVER) | ||
connection._outgoing = b'fnord fnord' | ||
assert connection.bytes_to_send(5) == b'fnord' | ||
assert connection.bytes_to_send() == b' fnord' | ||
|
||
@pytest.mark.parametrize('as_client', [True, False]) | ||
def test_ping_pong(self, as_client): | ||
client, server = self.create_connection() | ||
if as_client: | ||
me = client | ||
them = server | ||
else: | ||
me = server | ||
them = client | ||
|
||
payload = b'x' * 23 | ||
|
||
me.ping(payload) | ||
wire_data = me.bytes_to_send() | ||
assert wire_data[0] == 0x89 | ||
masked = bool(wire_data[1] & 0x80) | ||
assert wire_data[1] & ~0x80 == len(payload) | ||
if masked: | ||
maskbytes = itertools.cycle(bytearray(wire_data[2:6])) | ||
data = bytearray(b ^ next(maskbytes) | ||
for b in bytearray(wire_data[6:])) | ||
else: | ||
data = wire_data[2:] | ||
assert data == payload | ||
|
||
them.receive_bytes(wire_data) | ||
with pytest.raises(StopIteration): | ||
print(repr(next(them.events()))) | ||
wire_data = them.bytes_to_send() | ||
assert wire_data[0] == 0x8a | ||
masked = bool(wire_data[1] & 0x80) | ||
assert wire_data[1] & ~0x80 == len(payload) | ||
if masked: | ||
maskbytes = itertools.cycle(bytearray(wire_data[2:6])) | ||
data = bytearray(b ^ next(maskbytes) | ||
for b in bytearray(wire_data[6:])) | ||
else: | ||
data = wire_data[2:] | ||
assert data == payload | ||
|
||
@pytest.mark.parametrize('text,payload,full_message,full_frame', [ | ||
(True, u'ƒñö®∂😎', True, True), | ||
(True, u'ƒñö®∂😎', False, True), | ||
(True, u'ƒñö®∂😎', False, False), | ||
(False, b'x' * 23, True, True), | ||
(False, b'x' * 23, False, True), | ||
(False, b'x' * 23, False, False), | ||
]) | ||
def test_data_events(self, text, payload, full_message, full_frame): | ||
if text: | ||
opcode = 0x01 | ||
encoded_payload = payload.encode('utf8') | ||
else: | ||
opcode = 0x02 | ||
encoded_payload = payload | ||
|
||
if full_message: | ||
opcode = bytearray([opcode | 0x80]) | ||
else: | ||
opcode = bytearray([opcode]) | ||
|
||
if full_frame: | ||
length = bytearray([len(encoded_payload)]) | ||
else: | ||
length = bytearray([len(encoded_payload) + 100]) | ||
|
||
frame = opcode + length + encoded_payload | ||
|
||
connection = WSConnection(CLIENT, host='localhost', resource='foo') | ||
connection._proto = FrameProtocol(True, []) | ||
connection._state = ConnectionState.OPEN | ||
connection.bytes_to_send() | ||
|
||
connection.receive_bytes(frame) | ||
event = next(connection.events()) | ||
if text: | ||
assert isinstance(event, TextReceived) | ||
else: | ||
assert isinstance(event, BytesReceived) | ||
assert event.data == payload | ||
assert event.frame_finished is full_frame | ||
assert event.message_finished is full_message | ||
|
||
assert not connection.bytes_to_send() | ||
|
||
def test_frame_protocol_somehow_loses_its_mind(self): | ||
class FailFrame(object): | ||
opcode = object() | ||
|
||
class DoomProtocol(object): | ||
def receive_bytes(self, data): | ||
return None | ||
|
||
def received_frames(self): | ||
return [FailFrame()] | ||
|
||
connection = WSConnection(CLIENT, host='localhost', resource='foo') | ||
connection._proto = DoomProtocol() | ||
connection._state = ConnectionState.OPEN | ||
connection.bytes_to_send() | ||
|
||
connection.receive_bytes(b'') | ||
with pytest.raises(StopIteration): | ||
next(connection.events()) | ||
assert not connection.bytes_to_send() | ||
|
||
def test_frame_protocol_gets_fed_garbage(self): | ||
client, server = self.create_connection() | ||
|
||
payload = b'x' * 23 | ||
frame = b'\x09' + bytearray([len(payload)]) + payload | ||
|
||
client.receive_bytes(frame) | ||
event = next(client.events()) | ||
assert isinstance(event, ConnectionClosed) | ||
assert event.code == CloseReason.PROTOCOL_ERROR | ||
|
||
output = client.bytes_to_send() | ||
assert output[:1] == b'\x88' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import pytest | ||
|
||
from h11 import Request | ||
|
||
from wsproto.events import ( | ||
ConnectionClosed, | ||
ConnectionEstablished, | ||
ConnectionRequested, | ||
) | ||
from wsproto.frame_protocol import CloseReason | ||
|
||
|
||
def test_connection_requested_repr_no_subprotocol(): | ||
method = b'GET' | ||
target = b'/foo' | ||
headers = { | ||
b'host': b'localhost', | ||
b'sec-websocket-version': b'13', | ||
} | ||
http_version = b'1.1' | ||
|
||
req = Request(method=method, target=target, headers=list(headers.items()), | ||
http_version=http_version) | ||
|
||
event = ConnectionRequested([], req) | ||
r = repr(event) | ||
|
||
assert 'ConnectionRequested' in r | ||
assert target.decode('ascii') in r | ||
|
||
|
||
def test_connection_requested_repr_with_subprotocol(): | ||
method = b'GET' | ||
target = b'/foo' | ||
headers = { | ||
b'host': b'localhost', | ||
b'sec-websocket-version': b'13', | ||
b'sec-websocket-protocol': b'fnord', | ||
} | ||
http_version = b'1.1' | ||
|
||
req = Request(method=method, target=target, headers=list(headers.items()), | ||
http_version=http_version) | ||
|
||
event = ConnectionRequested([], req) | ||
r = repr(event) | ||
|
||
assert 'ConnectionRequested' in r | ||
assert target.decode('ascii') in r | ||
assert headers[b'sec-websocket-protocol'].decode('ascii') in r | ||
|
||
|
||
@pytest.mark.parametrize('subprotocol,extensions', [ | ||
('sproto', None), | ||
(None, ['fake']), | ||
('sprout', ['pretend']), | ||
]) | ||
def test_connection_established_repr(subprotocol, extensions): | ||
event = ConnectionEstablished(subprotocol, extensions) | ||
r = repr(event) | ||
|
||
if subprotocol: | ||
assert subprotocol in r | ||
if extensions: | ||
for extension in extensions: | ||
assert extension in r | ||
|
||
|
||
@pytest.mark.parametrize('code,reason', [ | ||
(CloseReason.NORMAL_CLOSURE, None), | ||
(CloseReason.NORMAL_CLOSURE, 'because i felt like it'), | ||
(CloseReason.INVALID_FRAME_PAYLOAD_DATA, 'GOOD GOD WHAT DID YOU DO'), | ||
]) | ||
def test_connection_closed_repr(code, reason): | ||
event = ConnectionClosed(code, reason) | ||
r = repr(event) | ||
|
||
assert repr(code) in r | ||
if reason: | ||
assert reason in r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import wsproto.extensions as wpext | ||
import wsproto.frame_protocol as fp | ||
|
||
|
||
class TestExtension(object): | ||
def test_enabled(self): | ||
ext = wpext.Extension() | ||
assert not ext.enabled() | ||
|
||
def test_offer(self): | ||
ext = wpext.Extension() | ||
assert ext.offer(None) is None | ||
|
||
def test_accept(self): | ||
ext = wpext.Extension() | ||
assert ext.accept(None, None) is None | ||
|
||
def test_finalize(self): | ||
ext = wpext.Extension() | ||
assert ext.finalize(None, None) is None | ||
|
||
def test_frame_inbound_header(self): | ||
ext = wpext.Extension() | ||
result = ext.frame_inbound_header(None, None, None, None) | ||
assert result == fp.RsvBits(False, False, False) | ||
|
||
def test_frame_inbound_payload_data(self): | ||
ext = wpext.Extension() | ||
data = object() | ||
assert ext.frame_inbound_payload_data(None, data) == data | ||
|
||
def test_frame_inbound_complete(self): | ||
ext = wpext.Extension() | ||
assert ext.frame_inbound_complete(None, None) is None | ||
|
||
def test_frame_outbound(self): | ||
ext = wpext.Extension() | ||
rsv = fp.RsvBits(True, True, True) | ||
data = object() | ||
assert ext.frame_outbound(None, None, rsv, data, None) == (rsv, data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.