-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
84 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from typing import Optional | ||
import asyncio | ||
import re | ||
|
||
from .StreamConnection import StreamConnection | ||
from .StreamClient import StreamClient | ||
|
||
TCP_URL_REGEX = re.compile(r'^tcp://([^:/]+)(?:\:(\d+))?') | ||
|
||
class TcpClient(StreamClient): | ||
|
||
def __init__(self, url: str = 'tcp://127.0.0.1', encoders=None): | ||
StreamClient.__init__(self, encoders=encoders) | ||
match = TCP_URL_REGEX.match(url) | ||
if match: | ||
self._url = url | ||
self._host = match.group(1) | ||
self._port = int(match.group(2)) if match.group(2) else 2000 | ||
else: | ||
raise RuntimeError(f'Invalid URL for TCP: {url}') | ||
|
||
@property | ||
def url(self): | ||
return self._url | ||
|
||
async def open(self) -> None: | ||
reader, writer = await asyncio.open_connection(self._host, self._port) | ||
self.connection = StreamConnection(reader, writer) | ||
await StreamClient.open(self) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import asyncio | ||
|
||
from ..Processor import Processor | ||
from .StreamMultiServer import StreamMultiServer | ||
|
||
class TcpServer(StreamMultiServer): | ||
|
||
def __init__(self, processor: Processor, host: str = '127.0.0.1', port: int = 2000, encoders=None): | ||
StreamMultiServer.__init__(self, processor, encoders) | ||
self.host = host | ||
self.port = port | ||
|
||
@property | ||
def url(self): | ||
return f'tcp://{self.host}:{self.port}' | ||
|
||
async def open(self) -> None: | ||
await asyncio.start_server(self.on_client_connected, self.host, self.port) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import pytest | ||
|
||
from stencilaschema.comms.JsonEncoder import JsonEncoder | ||
from stencilaschema.comms.JsonGzipBase64Encoder import JsonGzipBase64Encoder | ||
from stencilaschema.comms.TcpClient import TcpClient | ||
from stencilaschema.comms.TcpServer import TcpServer | ||
|
||
from helpers.TestProcessor import TestProcessor | ||
|
||
@pytest.mark.asyncio | ||
async def test_client_server(): | ||
# Create test processor | ||
processor = TestProcessor() | ||
|
||
# Start the server and several clients | ||
server = TcpServer(processor, encoders=[JsonEncoder(), JsonGzipBase64Encoder()]) | ||
await server.start() | ||
client1 = TcpClient(server.url) | ||
await client1.start() | ||
client2 = TcpClient(server.url, encoders=[JsonGzipBase64Encoder(), JsonEncoder()]) | ||
await client2.start() | ||
client3 = TcpClient(server.url, encoders=[JsonGzipBase64Encoder()]) | ||
await client3.start() | ||
|
||
thing1 = {'type': 'Thing', 'name': 'thing1'} | ||
thing2 = {'type': 'Thing', 'name': 'thing2'} | ||
thing3 = {'type': 'Thing', 'name': 'thing3'} | ||
|
||
assert await client1.execute(thing1) == thing1 | ||
assert await client2.execute(thing2) == thing2 | ||
assert await client3.execute(thing3) == thing3 | ||
|
||
# Stop everything | ||
await client1.stop() | ||
await client2.stop() | ||
await client3.stop() | ||
await server.stop() |