Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
koehlma committed Dec 12, 2015
1 parent 3be4604 commit e0dcf91
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 70 deletions.
16 changes: 16 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ def reraise(exc_type, exc_value, exc_traceback):
exec(PY2_RERAISE)


if uv.is_win32:
TEST_PIPE1 = r'\\?\pipe\python-uv-test1'
TEST_PIPE2 = r'\\?\pipe\python-uv-test2'
else:
TEST_PIPE1 = '/tmp/python-uv-test1'
TEST_PIPE2 = '/tmp/python-uv-test2'

BAD_PIPE = '/path/to/unix/socket/that/really/should/not/be/there'

TEST_IPV4 = '127.0.0.1'
TEST_IPV6 = '::1'

TEST_PORT1 = 12345
TEST_PORT2 = 12346


class TestLoop(uv.Loop):
def __init__(self):
super(TestLoop, self).__init__()
Expand Down
106 changes: 106 additions & 0 deletions tests/test_ping_pong.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015, Maximilian Köhl <mail@koehlma.de>
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function, unicode_literals, division

import os

import common

import uv


class TestPingPong(common.TestCase):
def on_shutdown(self, request, status):
request.stream.close()

def on_read(self, stream, status, length, data):
if status is not uv.StatusCode.SUCCESS:
stream.shutdown(on_shutdown=self.on_shutdown)
return
self.buffer.append(data)
if b''.join(self.buffer) == b'PING':
del self.buffer[:]
self.pong_counter += 1
self.assert_is(stream, self.client)
stream.write(b'PONG')
elif b''.join(self.buffer) == b'PONG':
del self.buffer[:]
if self.ping_counter < 5:
stream.write(b'PING')
self.ping_counter += 1
else:
stream.shutdown(on_shutdown=self.on_shutdown)
self.server.close()

def on_connect(self, request, status):
self.assert_equal(status, uv.StatusCode.SUCCESS)
request.stream.read_start(on_read=self.on_read)

def on_connection(self, stream, status):
self.assert_equal(status, uv.StatusCode.SUCCESS)
connection = stream.accept()
connection.read_start(on_read=self.on_read)
connection.write(b'PING')

def set_up(self):
self.buffer = []

self.ping_counter = 1
self.pong_counter = 0

def run_ping_pong(self):
self.loop.run()

self.assert_equal(self.ping_counter, 5)
self.assert_equal(self.pong_counter, 5)

def test_pipe(self):
self.server = uv.Pipe()
self.server.bind(common.TEST_PIPE1)
self.server.listen(5, on_connection=self.on_connection)

self.client = uv.Pipe()
self.client.connect(common.TEST_PIPE1, on_connect=self.on_connect)

self.run_ping_pong()

def test_tcp_ipv4(self):
self.server = uv.TCP()
self.server.bind((common.TEST_IPV4, common.TEST_PORT1))
self.server.listen(5, on_connection=self.on_connection)

self.client = uv.TCP()
self.client.connect((common.TEST_IPV4, common.TEST_PORT1),
on_connect=self.on_connect)

self.run_ping_pong()

def test_tcp_ipv6(self):
address = (common.TEST_IPV6, common.TEST_PORT1)

self.server = uv.TCP()
self.server.bind(address)
self.server.listen(5, on_connection=self.on_connection)

self.client = uv.TCP()
self.client.connect(address, on_connect=self.on_connect)

self.run_ping_pong()



70 changes: 1 addition & 69 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,12 @@

from __future__ import print_function, unicode_literals, division

import os

from common import TestCase
from common import TestCase, BAD_PIPE

import uv

if uv.is_win32:
TEST_PIPE1 = '\\\\?\\pipe\\python-uv-test1'
TEST_PIPE2 = '\\\\?\\pipe\\python-uv-test2'
else:
TEST_PIPE1 = '/tmp/python-uv-test1'
TEST_PIPE2 = '/tmp/python-uv-test2'

BAD_PIPE = '/path/to/unix/socket/that/really/should/not/be/there'


class TestPipe(TestCase):
def tear_down(self):
try: os.remove(TEST_PIPE1)
except: pass
try: os.remove(TEST_PIPE2)
except: pass

def test_connect_bad(self):
def on_connect(request, status):
self.assert_not_equal(status, uv.StatusCode.SUCCESS)
Expand All @@ -50,55 +33,4 @@ def on_connect(request, status):

self.loop.run()

def test_ping_pong(self):
def on_shutdown(request, status):
request.stream.close()

def on_read(pipe, status, length, data):
if status is not uv.StatusCode.SUCCESS:
pipe.shutdown(on_shutdown=on_shutdown)
return
self.buffer.append(data)
if b''.join(self.buffer) == b'PING':
del self.buffer[:]
self.pong_counter += 1
self.assert_is(pipe, self.client)
pipe.write(b'PONG')
elif b''.join(self.buffer) == b'PONG':
del self.buffer[:]
if self.ping_counter < 5:
pipe.write(b'PING')
self.ping_counter += 1
else:
pipe.shutdown(on_shutdown=on_shutdown)
self.server.close()

def on_connect(request, status):
self.assert_equal(status, uv.StatusCode.SUCCESS)
request.stream.read_start(on_read=on_read)

def on_connection(pipe, status):
"""
:type pipe: uv.Pipe
"""
self.assert_equal(status, uv.StatusCode.SUCCESS)
connection = pipe.accept()
connection.read_start(on_read=on_read)
connection.write(b'PING')

self.buffer = []

self.ping_counter = 1
self.pong_counter = 0

self.server = uv.Pipe()
self.server.bind(TEST_PIPE1)
self.server.listen(5, on_connection=on_connection)

self.client = uv.Pipe()
self.client.connect(TEST_PIPE1, on_connect=on_connect)

self.loop.run()

self.assert_equal(self.ping_counter, 5)
self.assert_equal(self.pong_counter, 5)
2 changes: 1 addition & 1 deletion uv/handles/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def connect(self, address, on_connect=None):
:rtype: uv.ConnectRequest
"""
if self.closing: raise HandleClosedError()
request = ConnectRequest(stream, on_connect)
request = ConnectRequest(self, on_connect)
c_storage = c_create_sockaddr(*address)
c_sockaddr = ffi.cast('struct sockaddr*', c_storage)
self._family = c_sockaddr.sa_family
Expand Down

0 comments on commit e0dcf91

Please sign in to comment.