Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python better async support #40

Merged
merged 7 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 105 additions & 96 deletions python/lib/src/xplpc/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging as log
from concurrent.futures import ThreadPoolExecutor
from threading import Thread

from xplpc.core.xplpc import XPLPC
from xplpc.data.callback_list import CallbackList
Expand All @@ -10,136 +11,91 @@


class Client:
executor = ThreadPoolExecutor(max_workers=4)

class SyncCall:
def __init__(self, request: Request, class_type=None):
self.request = request
self.class_type = class_type
self.key = UniqueID().generate()
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor(max_workers=1)
self.future = self.loop.create_future()
self.__call__()
self.response_data = None
self.make_call()

def __call__(self):
def make_call(self):
def callback(response):
try:
self.response = (
self.response_data = (
XPLPC().config.serializer.decode_function_return_value(
response, self.class_type
)
)
self.loop.call_soon_threadsafe(self.future.set_result, None)
except Exception as e:
log.error(f"[Client : callback] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)

try:
CallbackList().add(self.key, callback)

# run the blocking call in a separate thread
self.loop.run_in_executor(
self.executor,
PlatformProxy().native_call_proxy,
self.key,
self.request.data(),
)

except Exception as e:
log.error(f"[Client : call] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)

def __enter__(self):
self.loop.run_until_complete(self.future)
return self.response
log.error(f"[Client : call] Error: {e}")

def __exit__(self, exc_type, exc_val, exc_tb):
pass
CallbackList().add(self.key, callback)
PlatformProxy().native_call_proxy(self.key, self.request.data())

def run(self):
with self as response:
return response
return self.response_data

class SyncCallFromString:
def __init__(self, request_data: str):
self.request_data = request_data
self.key = UniqueID().generate()
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor(max_workers=1)
self.future = self.loop.create_future()
self.__call__()
self.response_data = None
self.make_call()

def __call__(self):
def make_call(self):
def callback(response):
try:
self.response = response
self.loop.call_soon_threadsafe(self.future.set_result, None)
self.response_data = response
except Exception as e:
log.error(f"[Client : callback] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)
log.error(f"[Client : call_from_string] Error: {e}")

try:
CallbackList().add(self.key, callback)

# run the blocking call in a separate thread
self.loop.run_in_executor(
self.executor,
PlatformProxy().native_call_proxy,
self.key,
self.request_data,
)

except Exception as e:
log.error(f"[Client : call] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)

def __enter__(self):
self.loop.run_until_complete(self.future)
return self.response

def __exit__(self, exc_type, exc_val, exc_tb):
pass
CallbackList().add(self.key, callback)
PlatformProxy().native_call_proxy(self.key, self.request_data)

def run(self):
with self as response:
return response
return self.response_data

class AsyncCall:
def __init__(self, request: Request, class_type=None):
def __init__(self, request: Request, class_type=None, loop=None):
self.request = request
self.class_type = class_type
self.loop = loop if loop else asyncio.get_event_loop()
self.key = UniqueID().generate()
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor(max_workers=1)
self.future = self.loop.create_future()
self.__call__()
self.make_call()

def __call__(self):
def make_call(self):
def callback(response):
try:
self.response = (
XPLPC().config.serializer.decode_function_return_value(
response, self.class_type
)
)
self.loop.call_soon_threadsafe(self.future.set_result, None)
if not self.future.done():
self.loop.call_soon_threadsafe(
self.future.set_result, self.response
)
except Exception as e:
log.error(f"[Client : callback] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)
log.error(f"[Client : async_call] Error: {e}")
if not self.future.done():
self.loop.call_soon_threadsafe(self.future.set_exception, e)

try:
CallbackList().add(self.key, callback)

# run the blocking call in a separate thread
self.loop.run_in_executor(
self.executor,
Client.executor,
PlatformProxy().native_call_proxy,
self.key,
self.request.data(),
)

except Exception as e:
log.error(f"[Client : call] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)
log.error(f"[Client : async_call] Error: {e}")
if not self.future.done():
self.loop.call_soon_threadsafe(self.future.set_exception, e)

async def __aenter__(self):
await self.future
Expand All @@ -149,37 +105,38 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
pass

class AsyncCallFromString:
def __init__(self, request_data: str):
def __init__(self, request_data: str, loop=None):
self.request_data = request_data
self.loop = loop if loop else asyncio.get_event_loop()
self.key = UniqueID().generate()
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor(max_workers=1)
self.future = self.loop.create_future()
self.__call__()
self.make_call()

def __call__(self):
def make_call(self):
def callback(response):
try:
self.response = response
self.loop.call_soon_threadsafe(self.future.set_result, None)
if not self.future.done():
self.loop.call_soon_threadsafe(
self.future.set_result, self.response
)
except Exception as e:
log.error(f"[Client : callback] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)
log.error(f"[Client : async_call_from_string] Error: {e}")
if not self.future.done():
self.loop.call_soon_threadsafe(self.future.set_exception, e)

try:
CallbackList().add(self.key, callback)

# run the blocking call in a separate thread
self.loop.run_in_executor(
self.executor,
Client.executor,
PlatformProxy().native_call_proxy,
self.key,
self.request_data,
)

except Exception as e:
log.error(f"[Client : call] Error: {e}")
self.loop.call_soon_threadsafe(self.future.set_exception, e)
log.error(f"[Client : async_call_from_string] Error: {e}")
if not self.future.done():
self.loop.call_soon_threadsafe(self.future.set_exception, e)

async def __aenter__(self):
await self.future
Expand All @@ -188,6 +145,50 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass

class ThreadCall:
def __init__(self, request: Request, class_type=None, callback=None):
self.request = request
self.class_type = class_type
self.key = UniqueID().generate()
self.callback = callback
Thread(target=self.make_call).start()

def make_call(self):
def internal_callback(response):
try:
result = XPLPC().config.serializer.decode_function_return_value(
response, self.class_type
)
if self.callback:
self.callback(result)
except Exception as e:
log.error(f"[Client : thread_call] Error: {e}")
if self.callback:
self.callback(None, e)

CallbackList().add(self.key, internal_callback)
PlatformProxy().native_call_proxy(self.key, self.request.data())

class ThreadCallFromString:
def __init__(self, request_data: str, callback=None):
self.request_data = request_data
self.key = UniqueID().generate()
self.callback = callback
Thread(target=self.make_call).start()

def make_call(self):
def internal_callback(response):
try:
if self.callback:
self.callback(response)
except Exception as e:
log.error(f"[Client : thread_call_from_string] Error: {e}")
if self.callback:
self.callback(None, e)

CallbackList().add(self.key, internal_callback)
PlatformProxy().native_call_proxy(self.key, self.request_data)

@staticmethod
def call(request: Request, class_type=None):
return Client.SyncCall(request, class_type).run()
Expand All @@ -197,13 +198,21 @@ def call_from_string(request_data: str):
return Client.SyncCallFromString(request_data).run()

@staticmethod
async def async_call(request: Request, class_type=None):
async_call_instance = Client.AsyncCall(request, class_type)
async def async_call(request: Request, class_type=None, loop=None):
async_call_instance = Client.AsyncCall(request, class_type, loop)
async with async_call_instance as response:
return response

@staticmethod
async def async_call_from_string(request_data: str):
async_call_instance = Client.AsyncCallFromString(request_data)
async def async_call_from_string(request_data: str, loop=None):
async_call_instance = Client.AsyncCallFromString(request_data, loop)
async with async_call_instance as response:
return response

@staticmethod
def thread_call(request: Request, class_type=None, callback=None):
Client.ThreadCall(request, class_type, callback)

@staticmethod
def thread_call_from_string(request_data: str, callback=None):
Client.ThreadCallFromString(request_data, callback)
20 changes: 14 additions & 6 deletions python/lib/src/xplpc/data/mapping_list.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from typing import Optional

from xplpc.map.mapping_item import MappingItem
Expand All @@ -6,11 +7,14 @@
class MappingList:
# singleton
_instance = None
_lock = threading.Lock()

def __new__(cls):
if cls._instance is None:
cls._instance = super(MappingList, cls).__new__(cls)
cls._instance._init()
with cls._lock:
if cls._instance is None:
cls._instance = super(MappingList, cls).__new__(cls)
cls._instance._init()
return cls._instance

def _init(self):
Expand All @@ -19,13 +23,17 @@ def _init(self):

# methods
def add(self, name: str, item: MappingItem):
self.list[name] = item
with self._lock:
self.list[name] = item

def find(self, name: str) -> Optional[MappingItem]:
return self.list.get(name)
with self._lock:
return self.list.get(name)

def has(self, name: str) -> bool:
return name in self.list
with self._lock:
return name in self.list

def clear(self):
self.list.clear()
with self._lock:
self.list.clear()
Loading
Loading