Skip to content

Commit

Permalink
Add remote_instance context manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
matham committed Aug 8, 2020
1 parent ce48562 commit f2bd613
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 54 deletions.
40 changes: 17 additions & 23 deletions examples/executor_lag.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,18 @@ async def measure_executor(executor, name):
responses.append((te - ts) / 1e6)
response = sum(responses) / len(responses)

await executor.ensure_remote_instance(device, 'analog_random')

ts = time.perf_counter_ns()
for _ in range(100):
await device.read_state()

te = time.perf_counter_ns()
rate = 100 * 1e9 / (te - ts)
async with executor.remote_instance(device, 'analog_random'):
ts = time.perf_counter_ns()
for _ in range(100):
await device.read_state()

async with device.generate_data(100) as aiter:
async for _ in aiter:
pass
rate_cont = 100 * 1e9 / (time.perf_counter_ns() - te)
te = time.perf_counter_ns()
rate = 100 * 1e9 / (te - ts)

await executor.delete_remote_instance(device)
async with device.generate_data(100) as aiter:
async for _ in aiter:
pass
rate_cont = 100 * 1e9 / (time.perf_counter_ns() - te)

print(f'{name} - {cls.__name__}; '
f'Round-trip lag: {response:.2f}ms. '
Expand All @@ -89,18 +86,15 @@ async def measure_executor_async(executor, name):
else:
response = 0

await executor.ensure_remote_instance(device, 'analog_random')

ts = time.perf_counter_ns()
for _ in range(100):
await device.read_state_async()

te = time.perf_counter_ns()
rate = 100 * 1e9 / (te - ts)
async with executor.remote_instance(device, 'analog_random'):
ts = time.perf_counter_ns()
for _ in range(100):
await device.read_state_async()

rate_cont = 0
te = time.perf_counter_ns()
rate = 100 * 1e9 / (te - ts)

await executor.delete_remote_instance(device)
rate_cont = 0

print(f'{name}-async - {cls.__name__}; '
f'Round-trip lag: {response:.2f}ms. '
Expand Down
7 changes: 3 additions & 4 deletions examples/multiprocess_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ async def main():
async with MultiprocessSocketExecutor(
server='127.0.0.1', allow_import_from_main=True) as executor:
with ExecutorContext(executor):
await executor.ensure_remote_instance(demo, 'demo')
res = await demo.remote_func("cheese")
print(f'Executed result is "{res}" in process {getpid()}')
await executor.delete_remote_instance(demo)
async with executor.remote_instance(demo, 'demo'):
res = await demo.remote_func("cheese")
print(f'Executed result is "{res}" in process {getpid()}')


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion pymoa_remote/app/quart.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def register_remote_class(self, data: dict):
return await super().register_remote_class(data)

@convert_io
async def ensure_instance(self, data: dict) -> None:
async def ensure_instance(self, data: dict) -> bool:
return await super().ensure_instance(data)

@convert_io
Expand Down
3 changes: 3 additions & 0 deletions pymoa_remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ class LocalRegistry(InstanceRegistry):
"""

def add_instance(self, obj, hash_name):
if hash_name in self.hashed_instances:
raise ValueError(f'Object <{obj}, {hash_name}> already exists')

self.hashed_instances[hash_name] = obj
self.hashed_instances_ids[id(obj)] = hash_name
return obj
Expand Down
23 changes: 22 additions & 1 deletion pymoa_remote/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,29 @@ async def remote_import(self, module):
async def register_remote_class(self, cls):
raise NotImplementedError

@contextlib.asynccontextmanager
async def remote_instance(
self, obj, hash_name, *args, auto_register_class=True, **kwargs
) -> AsyncContextManager[Any]:
res = None
try:
res = await self.ensure_remote_instance(
obj, hash_name, *args, auto_register_class=auto_register_class,
**kwargs)

if not res:
raise ValueError(
f'Object <{obj}, {hash_name}> already exists. Consider '
f'using ensure_remote_instance instead')
yield obj
finally:
if not res:
return
await self.delete_remote_instance(obj)

async def ensure_remote_instance(
self, obj, hash_name, *args, auto_register_class=True, **kwargs):
self, obj, hash_name, *args, auto_register_class=True, **kwargs
) -> bool:
raise NotImplementedError

async def delete_remote_instance(self, obj):
Expand Down
10 changes: 8 additions & 2 deletions pymoa_remote/rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,26 @@ async def register_remote_class(self, cls):
)

async def ensure_remote_instance(
self, obj, hash_name, *args, auto_register_class=True, **kwargs):
self, obj, hash_name, *args, auto_register_class=True, **kwargs
) -> bool:
if hash_name in self.registry.hashed_instances:
return False

cls = obj.__class__
if auto_register_class and not self.registry.is_class_registered(
class_to_register=cls):
self.registry.register_class(cls)

self.registry.add_instance(obj, hash_name)

await self._vanilla_write_read(
res = await self._vanilla_write_read(
'objects/create_open',
self._get_ensure_remote_instance_data(
obj, args, kwargs, hash_name, auto_register_class),
'post'
)
assert res['data']
return True

async def delete_remote_instance(self, obj):
await self._vanilla_write_read(
Expand Down
11 changes: 8 additions & 3 deletions pymoa_remote/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def remote_import(self, *args, **kwargs):
async def register_remote_class(self, *args, **kwargs):
raise NotImplementedError

async def ensure_instance(self, *args, **kwargs):
async def ensure_instance(self, *args, **kwargs) -> bool:
raise NotImplementedError

async def delete_instance(self, *args, **kwargs):
Expand Down Expand Up @@ -286,12 +286,13 @@ async def remote_import(self, data: dict):
async def register_remote_class(self, data: dict):
await self._register_remote_class(data)

async def ensure_instance(self, data: dict) -> None:
async def ensure_instance(self, data: dict) -> bool:
hash_name = data['hash_name']
if hash_name in self.registry.hashed_instances:
return
return False

await self._create_instance(data)
return True

async def delete_instance(self, data: dict) -> None:
hash_name = data['hash_name']
Expand Down Expand Up @@ -426,6 +427,10 @@ class RemoteRegistry(InstanceRegistry):
def create_instance(
self, cls_triple: Tuple[str, str, str], hash_name: str,
args: tuple, kwargs: dict, config: dict) -> Any:
if hash_name in self.hashed_instances:
raise ValueError(
f'Object <{hash_name}, {cls_triple}> already exists')

obj = self.referenceable_classes[cls_triple](*args, **kwargs)
apply_config(obj, config)

Expand Down
12 changes: 9 additions & 3 deletions pymoa_remote/socket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def raise_return_value(self, data: dict, packet: int = None):
raise ValueError(
f"Packet mismatch when reading: {packet} is not {packet_}")

async def _vanilla_write_read(self, cmd: str, data: dict):
async def _vanilla_write_read(self, cmd: str, data: dict) -> dict:
packet = self._packet
self._packet += 1
data = {
Expand All @@ -149,7 +149,11 @@ async def register_remote_class(self, cls):
await self._vanilla_write_read('register_remote_class', data)

async def ensure_remote_instance(
self, obj, hash_name, *args, auto_register_class=True, **kwargs):
self, obj, hash_name, *args, auto_register_class=True, **kwargs
) -> bool:
if hash_name in self.registry.hashed_instances:
return False

cls = obj.__class__
if auto_register_class and not self.registry.is_class_registered(
class_to_register=cls):
Expand All @@ -159,7 +163,9 @@ async def ensure_remote_instance(

data = self._get_ensure_remote_instance_data(
obj, args, kwargs, hash_name, auto_register_class)
await self._vanilla_write_read('ensure_remote_instance', data)
res = await self._vanilla_write_read('ensure_remote_instance', data)
assert res['data']
return True

async def delete_remote_instance(self, obj):
await self._vanilla_write_read(
Expand Down
40 changes: 25 additions & 15 deletions pymoa_remote/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,49 @@ async def thread_executor():
yield executor


@pytest.fixture
async def process_executor():
from pymoa_remote.socket.multiprocessing_client import \
MultiprocessSocketExecutor
from pymoa_remote.client import ExecutorContext
async with MultiprocessSocketExecutor(
server='127.0.0.1', allow_import_from_main=True) as executor:
with ExecutorContext(executor):
yield executor


@pytest.fixture
async def quart_rest_device(quart_rest_executor):
from pymoa_remote.tests.device import RandomDigitalChannel

device = RandomDigitalChannel()
await quart_rest_executor.ensure_remote_instance(
device, 'rand_device_rest')

yield device

await quart_rest_executor.delete_remote_instance(device)
async with quart_rest_executor.remote_instance(device, 'rand_device_rest'):
yield device


@pytest.fixture
async def quart_socket_device(quart_socket_executor):
from pymoa_remote.tests.device import RandomDigitalChannel

device = RandomDigitalChannel()
await quart_socket_executor.ensure_remote_instance(
device, 'rand_device_socket')

yield device

await quart_socket_executor.delete_remote_instance(device)
async with quart_socket_executor.remote_instance(
device, 'rand_device_socket'):
yield device


@pytest.fixture
async def thread_device(thread_executor):
from pymoa_remote.tests.device import RandomDigitalChannel

device = RandomDigitalChannel()
await thread_executor.ensure_remote_instance(device, 'rand_device_thread')
async with thread_executor.remote_instance(device, 'rand_device_thread'):
yield device

yield device

await thread_executor.delete_remote_instance(device)
@pytest.fixture
async def process_device(process_executor):
from pymoa_remote.tests.device import RandomDigitalChannel

device = RandomDigitalChannel()
async with thread_executor.remote_instance(device, 'rand_device_process'):
yield device
6 changes: 4 additions & 2 deletions pymoa_remote/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ async def register_remote_class(self, cls):
self.registry.register_class(getattr(mod, cls.__name__))

async def ensure_remote_instance(
self, obj, hash_name, *args, auto_register_class=True, **kwargs):
self, obj, hash_name, *args, auto_register_class=True, **kwargs
) -> bool:
if id(obj) in self._obj_executor:
return
return False

cls = obj.__class__
if auto_register_class and not self.registry.is_class_registered(
Expand All @@ -148,6 +149,7 @@ async def ensure_remote_instance(

self._obj_executor[id(obj)] = [None, None]
self.registry.add_instance(obj, hash_name)
return True

async def delete_remote_instance(self, obj):
if id(obj) not in self._obj_executor:
Expand Down

0 comments on commit f2bd613

Please sign in to comment.