Skip to content

Commit

Permalink
feat: add mmap client and server
Browse files Browse the repository at this point in the history
  • Loading branch information
nokome committed Dec 14, 2018
1 parent 86927f8 commit 725442e
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 8 deletions.
43 changes: 43 additions & 0 deletions src/comms/MmapClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import List
import os
import re

from .stencilaFiles import create_tempdir
from .Client import Client
from .MmapMixin import MmapMixin

MMAP_URL_REGEX = re.compile(r'^mmap://(.+)')

class MmapClient(MmapMixin, Client):

BYTE_WRITE = b'1'
BYTE_READ = b'2'

def __init__(self, url: str, encoders=None):
MmapMixin.__init__(self)
Client.__init__(self, url=url, encoders=None)

match = MMAP_URL_REGEX.match(url)
if match:
self._path = match.group(1)
else:
raise RuntimeError(f'Invalid URL for memory mapped file: {url}')

@staticmethod
def connectable(url: str) -> bool:
return url[:7] == 'mmap://'

@staticmethod
async def discover() -> List['Client']:
clients = []
tempdir = create_tempdir()
for filename in os.listdir(tempdir):
if filename.startswith('mmap-'):
client = MmapClient('mmap://' + os.path.join(tempdir, filename))
try:
await client.start()
except Exception as exc:
raise exc # TODO: log these as warnings
else:
clients.append(client)
return clients
56 changes: 56 additions & 0 deletions src/comms/MmapMixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import mmap

# Protocol for message size bytes
SIZE_BYTES = 4
SIZE_ENDIANNESS = 'big'
SIZE_SIGNED = False

# First byte in file is a flag byte
BYTE_NONE = b'0' # Value when no message to be read

# Time to sleep between polling the flag byte
LISTEN_SLEEP = 1e-5

class MmapMixin:

def __init__(self):
self._path = None
self._mmap = None
self._task = None

async def open(self) -> None:
file = open(self._path, 'r+b')
self._mmap = mmap.mmap(file.fileno(), 0)
async def listen():
while True:
self._mmap.seek(0)
flag = self._mmap.read(1)
if flag == self.BYTE_READ:
size = int.from_bytes(self._mmap.read(SIZE_BYTES), byteorder=SIZE_ENDIANNESS, signed=SIZE_SIGNED)
if size:
message = self._mmap.read(size)
self._mmap.seek(0)
self._mmap.write(BYTE_NONE)
await self.read(message)
else:
await asyncio.sleep(LISTEN_SLEEP)
self._task = asyncio.ensure_future(listen())

async def write(self, message: bytes) -> None:
assert self._mmap
self._mmap.seek(0)
self._mmap.write(self.BYTE_WRITE)
self._mmap.write(len(message).to_bytes(length=4, byteorder=SIZE_ENDIANNESS, signed=SIZE_SIGNED))
self._mmap.write(message)

async def close(self) -> None:
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
assert self._task.cancelled()
self._task = None
if self._mmap:
self._mmap.close()
34 changes: 34 additions & 0 deletions src/comms/MmapServer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import asyncio
import mmap
import random
import string

from .stencilaFiles import create_tempfile
from .MmapMixin import MmapMixin
from .Server import Server

class MmapServer(MmapMixin, Server):

BYTE_WRITE = b'2'
BYTE_READ = b'1'

def __init__(self, processor=None, encoders=None):
MmapMixin.__init__(self)
Server.__init__(self, processor=processor, encoders=None)
self._id = None

@property
def url(self) -> str:
return f'mmap://{self._path}'

async def open(self) -> None:
self._id = 'mmap-py-' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
self._path = create_tempfile(self._id)
file = open(self._path, 'w+b')
file.truncate(1e6)
file.close()

await MmapMixin.open(self)

async def read(self, message: bytes) -> None:
await self.write(await self.receive(message, 'json'))
4 changes: 2 additions & 2 deletions src/comms/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def __init__(self, processor: Processor, encoders: List[Encoder] = None):
self.encoders = encoders

@property
def url(self):
return None
def url(self) -> str:
return ''

async def start(self) -> None:
"""
Expand Down
9 changes: 4 additions & 5 deletions src/comms/stencilaFiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def create_tempdir() -> str:
def create_tempfile(name: str, content: str = None) -> str:
path = os.path.join(create_tempdir(), name)

if content:
if content is not None:
# Write content to a secure file only readable by current user
# Based on https://stackoverflow.com/a/15015748/4625911

Expand All @@ -63,10 +63,9 @@ def create_tempfile(name: str, content: str = None) -> str:
finally:
os.umask(umask_original)

if content:
# Open file fd and write to file
with os.fdopen(fd, 'w') as file:
file.write(content)
# Open file fd and write to file
with os.fdopen(fd, 'w') as file:
file.write(content)

return path

Expand Down
4 changes: 4 additions & 0 deletions tests/comms/MemoryClientServerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def __init__(self, processor):
# Testing of alternative encoders
encoders=[JsonEncoder(), JsonGzipEncoder()])

@property
def url(self) -> str:
return 'memory://'

async def open(self) -> None:
# Required to be implemented
pass
Expand Down
40 changes: 40 additions & 0 deletions tests/comms/MmapClientServerTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import os
import tempfile

import pytest

from stencilaschema.comms.MmapClient import MmapClient
from stencilaschema.comms.MmapServer import MmapServer

from helpers.processors import PersonProcessor

@pytest.mark.asyncio
async def test_server():
processor = PersonProcessor()
server = MmapServer(processor)
await server.start()

assert server._id
assert server._path

await server.stop()

@pytest.mark.asyncio
async def test_client_server():
processor = PersonProcessor()
server = MmapServer(processor)
await server.start()

client = MmapClient(server.url)
await client.start()

assert server.url[:7] == 'mmap://'
assert client.url == server.url

pre = {'type': 'Person', 'givenNames': ['Peter'], 'familyNames': ['Pan']}
post = {'type': 'Person', 'givenNames': ['Peter'], 'familyNames': ['Pan'], 'name': 'Peter Pan'}
assert await client.execute(pre) == post

await client.stop()
await server.stop()
2 changes: 1 addition & 1 deletion tests/comms/StdioClientServerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def test_client():

def test_server():
server = StdioServer(TestProcessor())
assert server.url == None
assert server.url == ''

0 comments on commit 725442e

Please sign in to comment.