diff --git a/.gitignore b/.gitignore index 5c4204a..32d1007 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,4 @@ dist/ # Temporary test files api_test.py sample*.py - -socket_test.py \ No newline at end of file +socket_test.py diff --git a/README.md b/README.md index bb2b2fa..cce516d 100644 --- a/README.md +++ b/README.md @@ -9,23 +9,19 @@ * [Client module](#client-module) * [Client()](#Client) * [authenticate](#authenticate) - * [add_event_handler](#add_event_handler) * [close](#close) * [connect](#connect) * [connect_pool](#connect_pool) * [get_default_scope](#get_default_scope) * [get_event_loop](#get_event_loop) + * [get_rooms](#get_rooms) * [is_connected](#is_connected) * [query](#query) * [reconnect](#reconnect) * [run](#run) * [set_default_scope](#set_default_scope) - * [unwatch](#unwatch) * [wait_closed](#wait_closed) - * [watch](#watch) - * [Model](#model) - * [Collection](#collection) - * [Thing](#thing) + * [Room](#room) --------------------------------------- @@ -97,9 +93,9 @@ Initialize a ThingsDB client When set to `True`, the client will automatically reconnect when a connection is lost. If set to `False` and the connection gets lost, one may call the [reconnect()](#reconnect) method to - make a new connection. The auto-reconnect option can act on - node changes and does so automatically if the connected user - has the required `WATCH` privileges on the `@node` scope. + make a new connection. The auto-reconnect option will listen to + node changes and automatically start a reconnect loop if the + *shutting-down* status is received from the node. Defaults to `True`. - *ssl (SSLContext or bool, optional)*: Accepts an ssl.SSLContext for creating a secure connection @@ -134,21 +130,6 @@ Authenticate a ThingsDB connection. The timeout may be set to `None` in which case the client will wait forever on a response. Defaults to 5. -### add_event_handler - -```python -Client().add_event_handler(event_handler: Events) -> None -``` - -Add an event handler. - -Event handlers will called in the order they are added. - -#### Args - -- *event_handler (Events)*: - An instance of Events (see `thingsdb.client.abc.events`). - ### close ```python @@ -272,6 +253,18 @@ Can be used to get the event loop. The event loop used by the client. +### get_rooms + +```python +Client().get_rooms() -> tuple +``` + +Can be used to get the rooms which are joined by this client. + +#### Returns + +A `tuple` with unique `Room` instances. + ### is_connected ```python @@ -290,7 +283,6 @@ Client().query( code: str, scope: Optional[str] = None, timeout: Optional[int] = None, - convert_vars: bool = True, **kwargs: Any ) -> asyncio.Future ``` @@ -311,13 +303,6 @@ Use this method to run `code` in a scope. Raise a time-out exception if no response is received within X seconds. If no time-out is given, the client will wait forever. Defaults to `None`. -- *convert_vars (bool, optional)*: - Only applicable if `**kwargs` are given. If set to `True`, then - the provided `**kwargs` values will be converted so ThingsDB can - understand them. For example, a thing should be given just by - it's ID and with conversion the `#` will be extracted. When - this argument is `False`, the `**kwargs` stay untouched. - Defaults to `True`. - *\*\*kwargs (any, optional)*: Can be used to inject variable into the ThingsDB code. @@ -364,7 +349,6 @@ Client().run( *args: Optional[Any], scope: Optional[str] = None, timeout: Optional[int] = None, - convert_args: bool = True, **kwargs: Any ) -> asyncio.Future ``` @@ -390,13 +374,6 @@ Use this method to run a stored procedure in a scope. Raise a time-out exception if no response is received within X seconds. If no time-out is given, the client will wait forever. Defaults to `None`. -- *convert_args (bool, optional)*: - Only applicable if `*args` are given. If set to `True`, then - the provided `*args` values will be converted so ThingsDB can - understand them. For example, a thing should be given just by - it's ID and with conversion the `#` will be extracted. When - this argument is `False`, the `*args` stay untouched. - Defaults to `True`. - *\*\*kwargs (any, optional)*: Arguments which are injected as the procedure arguments. Instead of by name, the arguments may also be parsed using @@ -429,37 +406,6 @@ Can be used to change the default scope which is initially set to `@t`. Set the default scope. A scope may start with either the `/` character, or `@`. Examples: `"//stuff"`, `"@:stuff"`, `"/node"` -### unwatch - -```python -Client().unwatch( - *ids: int, - scope: Optional[str] = None -) -> asyncio.Future -``` - -Unsubscribe for changes on given things. - -Stop receiving events for the things given by one or more ids. It is -possible that the client receives an event shortly after calling the -unsubscribe method because the event was queued. - -#### Args -- *\*ids (int)*: - Thing IDs to unsubscribe. No error is returned in case one of - the given things are not found within the collection or if the - thing was not being watched. -- *scope (str, optional)*: - Unsubscribe for things in this scope. If not specified, the - default scope will be used. Only collection scopes may contain - things so only collection scopes can be used. - See https://docs.thingsdb.net/v0/overview/scopes/ for how to - format a scope. - -#### Returns - -Future which result will be set to `None` if successful. - ### wait_closed @@ -473,193 +419,6 @@ Can be used after calling the `close()` method to determine when the connection is actually closed. -### watch - -```python -Client().watch(self, *ids: int, scope: Optional[str] = None) -> asyncio.Future -``` - -Subscribe for changes on given things. - -This method accepts one or more thing ids to subscribe to. This -method will simply return None as soon as the subscribe request is -successful handled by ThingsDB. After the response, the client will -receive `INIT` events for all subscribed ids. After that, ThingsDB -will continue to provide the client with `UPDATE` events which contain -changes to the subscribed thing. A `DELETE` event might be received -if, and only if the thing is removed and garbage collected from the -collection. - -#### Args - -- *\*ids (int)*: - Thing IDs to subscribe to. No error is returned in case one of - the given things are not found within the collection, instead a - `WARN` event will be send to the client. -- *scope (str, optional)*: - Subscribe on things in this scope. If not specified, the - default scope will be used. Only collection scopes may contain - things so only collection scopes can be used. - See https://docs.thingsdb.net/v0/overview/scopes/ for how to - format a scope. - -#### Returns - -Future which result will be set to `None` if successful. - -## Model - -It is possible to create a model which will map to data in ThingsDB. -The model will be kept up-to-date be the client. It is possible to break -anywhere you want in the model. What is not provided, will not be watched. - -### Collection - -A collection is always required, even you do not plan to watch anything in the -root of the collection. In the latter case you can just create an empty -collection which can be used when initializing individual things. - -```python -import asyncio -from thingsdb.client import Client -from thingsdb.model import Collection - -class Foo(Collection): - name = 'str' -``` - -In the example above, the ThingsDB collection name must be equal to the Python Class name, `Foo` in this case. -It may be useful to use a different Python Class name than the ThingsDB collection -name. This can be achieved by initializing the collection with a name attribute, for example `foo = Foo(name='foo')`. -As an alternative, the magic attribute `__COLLECTION_NAME__` can be used , for example: - -```python -class Stuff(Collection): - # the ThingsDB collection name is `stuff`, all lower case characters - __COLLECTION_NAME__ = 'stuff' -``` - -If both a `name` argument and the magic attribute `__COLLECTION_NAME__` are used, the `name` argument wins. - - -### Thing - -```python -import asyncio -from thingsdb.client import Client -from thingsdb.model import Collection, Thing - -class Bar(Thing): - name = 'str' - other = 'Bar?', lambda: Bar - -class Foo(Collection): - bar = 'Bar', Bar - -async def example(): - client = Client() - foo = Foo() - await client.connect('localhost') - try: - await client.authenticate('admin', 'pass') - await foo.load(client) - - # ... now the collection will be watched - - finally: - client.close() - await client.wait_closed() -``` - -Suppose you have an ID and want to watch that single thing, then -you can initialize the thing and call `watch()` manually. For example, -consider we have an `#5` for a `Bar` type in collection `Foo`: - -```python -bar = Bar(foo, 5) -await bar.watch() -``` - -### Enum - -The Python ThingsDB model has it's own Enum implementation which should not -be confused with the default Python Enum class. - -```python -import asyncio -from thingsdb.client import Client -from thingsdb.model import Collection, Thing, Enum - - -class Color(Enum): - RED = "#f00" - BLUE = "#0f0" - GREEN = "#00f" - - -class Brick(Thing): - color = 'Color', Color - - def on_init(self, *args, **kwars): - super().on_init(*args, **kwars) - print(f''' - Init Brick: - id: {self.id()} - color name: {self.color.name} - color value: {self.color.value} - ''') - -class Lego(Collection): - bricks = '[Brick]', Brick - - -async def example(): - client = Client() - lego = Lego() - await client.connect('localhost') - try: - await client.authenticate('admin', 'pass') - try: - await lego.build( - client, - scripts=['.bricks = [];'], - delete_if_exists=False) - except KeyError: - pass - await lego.load(client) - - # ... now the collection will be watched for 100 seconds - await asyncio.sleep(100) +## Room - finally: - client.close() - await client.wait_closed() - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - loop.run_until_complete(example()) -``` - -When adding a new brick, for example using the following code: -``` -.bricks.push(Brick{ - color: Color{RED} -}); -``` - -...then the `on_init` function will be called, printing the following output: *(the id might be different since this is auto-generated)* - -```text - Init Brick: - id: 123 - color name: RED - color value: #f00 -``` - -If you do not care about the whole `Color` class, then you can just create an empty class like this: - -```python -class Color(Enum): - pass -``` diff --git a/sample_lego.py b/sample_lego.py deleted file mode 100644 index eed21f8..0000000 --- a/sample_lego.py +++ /dev/null @@ -1,82 +0,0 @@ -import asyncio -from thingsdb.client import Client -from thingsdb.model import Collection, Thing, ThingStrict, Enum, EnumMember -from thingsdb.util import event - - -class Color(Enum): - RED = "#f00" - GREEN = "#0f0" - BLUE = "#00f" - - -class Brick(Thing): - color = 'Color', Color - - def on_init(self, *args, **kwars): - super().on_init(*args, **kwars) - print(f''' - Init Brick: - id: {self.id()} - color name: {self.color.name} - color value: {self.color.value} - ''') - - def on_update(self, event, jobs): - super().on_update(event, jobs) - print('ON BOOK UPDATE (Color: {}'.format(self.color)) - - @event('new-color') - def on_new_color(self, color): - print(f'brick with id {self.id()} as a new color: {color}') - - -class Lego(Collection): - bricks = '[Brick]', Brick - - -async def example(): - client = Client() - lego = Lego() - await client.connect('localhost') - try: - await client.authenticate('admin', 'pass') - try: - await lego.build( - client, - scripts=['.bricks = [];'], - delete_if_exists=True) - except KeyError: - pass - await lego.load(client) - - # ... now the collection will be watched for 100 seconds - while True: - await asyncio.sleep(1) - - if lego and lego.bricks: - - print('Color:', Color.RED is lego.bricks[0].color) - print('Is Enum', isinstance(Color.GREEN, Enum)) - print('Is EnumMember', isinstance(Color.GREEN, EnumMember)) - print('Is Color', isinstance(Color.GREEN, Color)) - print('Is True', Color.GREEN == Color("#0f0")) - print('Is True', Color.GREEN == Color["GREEN"]) - print('Is True', getattr(Color, 'GREEN') == Color["GREEN"]) - print('Is True', Color.RED.value == lego.bricks[0].color.value) - print('Is True', Color.RED.value == lego.bricks[0].color._value) - - brick = lego.bricks[0] - await brick.emit('new-color', 'RED') - break - await lego.query('.bricks.push(Brick{});') - - await asyncio.sleep(60) - - finally: - client.close() - await client.wait_closed() - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - loop.run_until_complete(example()) diff --git a/setup.py b/setup.py index 7845d32..5106f51 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ long_description_content_type='text/markdown', url='https://github.com/thingsdb/python-thingsdb', author='Jeroen van der Heijden', - author_email='jeroen@transceptor.technology', + author_email='jeroen@cesbit.com', license='MIT', classifiers=[ # How mature is this project? Common values are diff --git a/test/test_fmt.py b/test/test_fmt.py deleted file mode 100644 index 7278418..0000000 --- a/test/test_fmt.py +++ /dev/null @@ -1,72 +0,0 @@ -import sys -import unittest -sys.path.insert(0, '../') -from thingsdb.util import fmt # nopep8 -from thingsdb.model import Thing, Collection # nopep8 - - -def fmt_fmt(formatted): - return f'x={formatted}' - - -def val_fmt(val, blobs=None): - if blobs is None: - blobs = [] - return fmt_fmt(fmt(val, blobs)) - - -class TestWrap(unittest.TestCase): - - def test_fmt_int(self): - self.assertEqual('x=10', val_fmt(10)) - self.assertEqual('x=-10', val_fmt(-10)) - - def test_fmt_float(self): - self.assertEqual('x=0.5', val_fmt(0.5)) - self.assertEqual('x=-0.5', val_fmt(-0.5)) - - def test_fmt_bool(self): - self.assertEqual('x=true', val_fmt(True)) - self.assertEqual('x=false', val_fmt(False)) - - def test_fmt_nil(self): - self.assertEqual('x=nil', val_fmt(None)) - - def test_fmt_string(self): - self.assertEqual("x='Iris'", val_fmt('Iris')) - self.assertEqual("x=''", val_fmt('')) - - def test_fmt_list(self): - self.assertEqual("x=[6,'Iris']", val_fmt([6, 'Iris'])) - self.assertEqual("x=[[6,'Iris']]", val_fmt([[6, 'Iris']])) - - def test_fmt_dict(self): - self.assertEqual(r"x={}", val_fmt({})) - self.assertEqual(r"x={name:'Iris'}", val_fmt({'name': 'Iris'})) - self.assertEqual(r"x={age:6}", val_fmt({'age': 6})) - - def test_fmt_nested(self): - self.assertEqual( - r"x=[{stuff:[{lang:'C'},{more:['Python','C',true]}]}]", - val_fmt([{ - 'stuff': [{ - 'lang': 'C', - }, { - 'more': ['Python', 'C', True] - }] - }]) - ) - - def test_blobs(self): - blobs = {} - self.assertEqual(r"x=[blob0,blob1]", val_fmt([b'a', b'b'], blobs)) - self.assertEqual(blobs, {'blob0': b'a', 'blob1': b'b'}) - - def test_fmt_thing(self): - t = Thing(Collection(), 42) - self.assertEqual("x=#42", val_fmt(t)) - self.assertEqual("x=#42", val_fmt({'#': 42, "test": "xyz"})) - - -if __name__ == '__main__': - unittest.main() diff --git a/thingsdb/client/abc/__init__.py b/thingsdb/client/abc/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/thingsdb/client/abc/events.py b/thingsdb/client/abc/events.py deleted file mode 100644 index 093315d..0000000 --- a/thingsdb/client/abc/events.py +++ /dev/null @@ -1,113 +0,0 @@ -from ..protocol import Proto -import abc -from typing import Any - - -class Events(abc.ABC): - - def __init__(self): - self._evmap = { - Proto.ON_NODE_STATUS: self.on_node_status, - Proto.ON_WARN: self.on_warning, - Proto.ON_WATCH_INI: self.on_watch_init, - Proto.ON_WATCH_UPD: self.on_watch_update, - Proto.ON_WATCH_DEL: self.on_watch_delete, - Proto.ON_WATCH_STOP: self.on_watch_stop, - } - - def __call__(self, tp: Proto, data: Any) -> None: - self._evmap.get(tp)(data) - - @abc.abstractmethod - def on_reconnect(self) -> None: - """On re-connect - Called after a re-concect is finished (including authentication) - """ - pass - - @abc.abstractmethod - def on_node_status(self, status: str) -> None: - """On node status - status: String containing a `new` node status. - Optional values: - - OFFLINE - - CONNECTING - - BUILDING - - SHUTTING_DOWN - - SYNCHRONIZING - - AWAY - - AWAY_SOON - - READY - """ - pass - - @abc.abstractmethod - def on_warning(self, warn: dict) -> None: - """On warning - warn: a dictionary with `warn_msg` and `warn_code`. for example: - - { - "warn_msg": "some warning message" - "warn_code": 1 - } - """ - pass - - @abc.abstractmethod - def on_watch_init(self, data: dict) -> None: - """On watch init. - Initial data from a single thing. for example: - - { - "#": 123, - "name": "ThingsDB!", - ... - } - """ - pass - - @abc.abstractmethod - def on_watch_update(self, data: dict) -> None: - """On watch update. - Updates for a thing with ID (#). One event may contain more than one - job. for example: - - { - "#": 123, - "jobs": [ - { - "set": { - "answer": 42 - } - } - ] - } - """ - pass - - @abc.abstractmethod - def on_watch_delete(self, data: dict) -> None: - """On watch delete. - The thing is removed from the collection (and garbage collected). - for example: - - { - "#": 123 - } - """ - pass - - @abc.abstractmethod - def on_watch_stop(self, data: dict) -> None: - """On watch stop. - The thing is not watched anymore due to either call to `unwatch()`, or - by a unwatch request (REQ_UNWATCH). This event is *not* triggered when - a connection to a node has been lost. - - for example: - - { - "#": 123 - } - """ - pass diff --git a/thingsdb/client/buildin.py b/thingsdb/client/buildin.py index 9f04f01..1fd2cc4 100644 --- a/thingsdb/client/buildin.py +++ b/thingsdb/client/buildin.py @@ -1,128 +1,390 @@ import datetime from typing import Union as U -from typing import Optional as O +from typing import Optional + class Buildin: + # + # Build-in functions from the @thingsdb scope + # + async def collection_info(self, collection: U[int, str]) -> dict: + """Returns information about a specific collection. + + This function requires QUERY privileges on the requested collection, + or CHANGE privileges on the @thingsdb scope. + + This function does not generate a change. + """ return await self.query( - f'collection_info(collection)', + 'collection_info(collection)', collection=collection, scope='@t') - async def collections_info(self): + async def collections_info(self) -> list: + """Returns collection information about all collections in ThingsDB. + """ return await self.query('collections_info()', scope='@t') - async def counters(self, scope='@n'): - return await self.query('counters()', scope=scope) - async def del_collection(self, collection: U[int, str]): + """Delete a collection. + + This function generates a change. + """ return await self.query( - f'del_collection(collection)', + 'del_collection(collection)', collection=collection, scope='@t') async def del_expired(self): + """Delete all expired tokens. + + Requires GRANT privileges on the @thingsdb scope. + + This function generates a change. + """ return await self.query('del_expired()', scope='@t') + async def del_module(self, name: str): + """Delete a module. A SIGTERM signal will be send to the process for + the module which might cancel running futures. + + This function generates a change. + """ + return await self.query('del_module(key)', name=name, scope='@t') + + async def del_node(self, node_id: int): + """Delete a node from ThingsDB. + + Before deleting a node, the node must be offline. As long is the node + is active, you are not allowed to delete the node. See shutdown for + shutting down a node by using a query. + + This function generates a change. + """ + return await self.query('del_node(id)', id=node_id, scope='@t') + async def del_token(self, key: str): - return await self.query(f'del_token(key)', key=key, scope='@t') + """Delete a token. + + This function requires GRANT privileges on the @thingsdb scope unless + the given token belongs to the logged on user. In the latter case, + only CHANGE privileges are required. + + This function generates a change. + """ + return await self.query('del_token(key)', key=key, scope='@t') async def del_user(self, name: str): - return await self.query(f'del_user(name)', name=name, scope='@t') + """Delete a user. + + It is not possible to delete your own user account and a bad_data_err() + will be raised in case you try to. Any tokens associated with the user + will also be deleted. + + This function requires GRANT privileges on the @thingsdb scope. + + This function generates a change. + """ + return await self.query('del_user(name)', name=name, scope='@t') + + async def deploy_module( + self, + name: str, + data: Optional[U[bytes, str]] = None): + """Deploy a module on all nodes. + + The module must be configured first, using the new_module() function. + This function is used to write the module data (or plain python code) + to the module. After deploying the code, the module will be restarted + on every node. + + Before deploying a module, it is strongly recommended to use a + development environment before deploying the module into production. + + When the `data` argument is None, no data will be overwritten but the + module will be restarted on all nodes. This might be useful if you want + to force a module restart on all nodes. + + This function generates a change. + """ + return await self.query( + 'deploy_module(name, data)', + name=name, + data=data, + scope='@t') async def grant(self, target: U[int, str], user: str, mask: int): + """Grant, collection or general, privileges to a user. + + Access to a user is provided by setting a bit mask to either the @node, + @thingsdb or a @collection scope. + + To use this function, at least CHANGE privileges on the @thingsdb scope + and GRANT privileges on the target scope are required. + + It is not possible to set privileges on a specific node scope. + Therefore scope @node will apply to all nodes in ThingsDB. + + The following pre-defined masks are available: + (from thingsdb.util import Access) + + Mask | Description + ----------------- | ------------ + Access.QUERY (1) | Gives read access. + Access.CHANGE (2) | Gives modify access. + Access.GRANT (4) | Gives modify and grant (and revoke) privileges. + Access.JOIN (8) | Gives join (and leave) privileges. + Access.RUN (16) | Gives run procedures access. + Access.FULL (31) | A mask for full privileges. + + It is not possible to have GRANT privileges without also having CHANGE + privileges. However, ThingsDB automatically applies the required + privileges so when setting for example GRANT privileges, ThingsDB makes + sure that the user also gets CHANGE privileges. + + This function generates a change. + """ return await self.query( - f'grant(target, user, mask)', + 'grant(target, user, mask)', target=target, user=user, mask=mask, scope='@t') async def has_collection(self, name: str): - return await self.query(f'has_collection(name)', name=name, scope='@t') + """Determines if a collection exists in ThingsDB. + + This function does not generate a change. + """ + return await self.query('has_collection(name)', name=name, scope='@t') + + async def has_module(self, name: str): + """Determines if a module exists in ThingsDB. + + The scope restriction of the module has no impact on the result of this + function. + + This function does not generate a change. + """ + return await self.query('has_module(name)', name=name, scope='@t') + + async def has_node(self, node_id: int): + """Determines if a node exists in ThingsDB. + + This function does not generate a change. + """ + return await self.query('has_node(id)', id=node_id, scope='@t') async def has_token(self, token: str): - return await self.query(f'has_token(token)', token=token, scope='@t') + """Determines if a token exists in ThingsDB. + + This function requires GRANT privileges on the @thingsdb scope. + + This function does not generate a change. + """ + return await self.query('has_token(token)', token=token, scope='@t') async def has_user(self, name: str): - return await self.query(f'has_user(name)', name=name, scope='@t') + """Determines if a user exists in ThingsDB. + + This function requires GRANT privileges on the @thingsdb scope. + + This function does not generate a change. + """ + return await self.query('has_user(name)', name=name, scope='@t') + + async def module_info(self, name: str) -> dict: + return await self.query('module_info(name)', name=name, scope='@t') + + async def modules_info(self) -> list: + return await self.query('modules_info()', scope='@t') async def new_collection(self, name: str): - return await self.query(f'new_collection(name)', name=name, scope='@t') + """Create a new collection. + + This function generates a change. + """ + return await self.query('new_collection(name)', name=name, scope='@t') + + # TODO: new module + # TODO: new node async def new_token( self, user: str, - expiration_time: O[datetime.datetime] = None, + expiration_time: Optional[datetime.datetime] = None, description: str = ''): if expiration_time is not None: expiration_time = int(datetime.datetime.timestamp(expiration_time)) return await self.query( - f'new_token(user, expiration_time, description)', + 'new_token(user, expiration_time, description)', user=user, expiration_time=expiration_time, description=description, scope='@t') async def new_user(self, name: str): - return await self.query(f'new_user(name)', name=name, scope='@t') + """Creates a new user to ThingsDB. The new user is created without a + password, token and access privileges. You probably want to set a + password or add a new token, and assign some privileges using grant(…). - async def node_info(self, scope='@n'): - return await self.query('node_info()', scope=scope) + This function requires GRANT privileges on the @thingsdb scope. - async def nodes_info(self, scope='@n') -> list: - return await self.query('nodes_info()', scope=scope) + This function generates a change. + """ + return await self.query('new_user(name)', name=name, scope='@t') async def rename_collection( - self, - collection: U[int, str], - new_name: str) -> None: + self, + collection: U[int, str], + new_name: str) -> None: return await self.query( - f'rename_collection(collection, new_name)', + 'rename_collection(collection, new_name)', collection=collection, new_name=new_name, scope='@t') - async def rename_user(self, user: str, new_name: str) -> None: + async def rename_module(self, name: str, new_name: str) -> None: return await self.query( - f'rename_user(user, new_name)', - user=user, + 'rename_module(name, new_name)', + name=name, new_name=new_name, scope='@t') - async def reset_counters(self, scope='@n') -> None: - return await self.query('reset_counters()', scope=scope) + async def rename_user(self, name: str, new_name: str) -> None: + return await self.query( + 'rename_user(name, new_name)', + name=name, + new_name=new_name, + scope='@t') + + # TODO: restore async def revoke(self, target: U[int, str], user: str, mask: int): return await self.query( - f'revoke(target, user, mask)', + 'revoke(target, user, mask)', target=target, user=user, mask=mask, scope='@t') - async def set_log_level(self, log_level: str, scope='@n') -> None: - assert log_level in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') - return await self.query( - f'set_log_level(log_level)', log_level=log_level, scope=scope) + # TODO: set_module_conf + # TODO: set_module_scope async def set_password(self, user: str, new_password: str = None) -> None: return await self.query( - f'set_password(user, new_password)', + 'set_password(user, new_password)', user=user, new_password=new_password, scope='@t') - async def shutdown(self, scope='@n') -> None: - return await self.query('shutdown()', scope=scope) + async def set_time_zone(self, collection: U[int, str], zone: str): + """By default each collection will be created with time zone UTC. - async def user_info(self, user: str = None) -> dict: + This function can be used to change the time zone for a collection. If + changed, the functions datetime(..) and timeval(..) will use the + collections time zone unless specified otherwise. See time_zones_info() + for a list of all available timezones. + + Use collection_info(..) to view the current time zone for a collection. + + This function generates a change. + """ + return await self.query( + 'set_time_zone(collection, zone)', + collection=collection, + zone=zone, + scope='@t') + + async def time_zones_info(self) -> list: + """Returns all available time zones in ThingsDB. + + This function does not generate a change. + """ + return await self.query('time_zones_info()', scope='@t') + + async def user_info(self, user: Optional[str] = None) -> dict: if user is None: return await self.query('user_info()', scope='@t') - return await self.query(f'user_info(user)', user=user, scope='@t') + return await self.query('user_info(user)', user=user, scope='@t') async def users_info(self) -> list: return await self.query('users_info()', scope='@t') + + # + # Build-in functions from the @node scope + # + + async def backup_info(self, backup_id: int, scope='@n'): + return await self.query('backup_info(id)', id=backup_id, scope=scope) + + async def backups_info(self, scope='@n') -> list: + return await self.query('backups_info()', scope=scope) + + async def counters(self, scope='@n'): + return await self.query('counters()', scope=scope) + + async def del_backup( + self, + backup_id: int, + delete_files: bool = False, + scope='@n'): + return await self.query( + 'del_backup(id, delete_files)', + id=backup_id, + delete_files=delete_files, + scope=scope) + + async def has_backup(self, backup_id: int, scope='@n'): + return await self.query('has_backup(id)', id=backup_id, scope=scope) + + # TODO: new_backup + + async def node_info(self, scope='@n') -> dict: + return await self.query('node_info()', scope=scope) + + async def nodes_info(self, scope='@n') -> list: + return await self.query('nodes_info()', scope=scope) + + async def reset_counters(self, scope='@n') -> None: + """Resets the counters for the ThingsDB node you are connected too. + + Other nodes are not affected. This will set the started_at counter + value to the current UNIX time-stamp in seconds and all other counters + to 0 (zero). + + This function does not generate a change. + """ + return await self.query('reset_counters()', scope=scope) + + async def restart_module(self, name: str) -> None: + """Restarts a given module on the select node scope. + + If you want to restart the module on all nodes, you can use the + deploy_module(name, None) function with None as second argument. + + This function does not generate a change. + """ + return await self.query('restart_module(name)', name=name, scope='@t') + + async def set_log_level(self, log_level: str, scope='@n') -> None: + assert log_level in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') + return await self.query( + 'set_log_level(log_level)', log_level=log_level, scope=scope) + + async def shutdown(self, scope='@n') -> None: + """Shutdown the node in the selected scope. + + This is a clean shutdown, allowing all other nodes (and clients) to + disconnect. Be CAREFUL using this function!!! + + At least CHANGE privileges on the @node scope are required to shutdown + a node. + + This function does not generate a change. + """ + return await self.query('shutdown()', scope=scope) diff --git a/thingsdb/client/client.py b/thingsdb/client/client.py index a3cf21a..c31e63e 100644 --- a/thingsdb/client/client.py +++ b/thingsdb/client/client.py @@ -2,27 +2,18 @@ import logging import random import ssl +from collections import defaultdict from ssl import SSLContext, PROTOCOL_TLS from typing import Optional, Union, Any -from deprecation import deprecated from concurrent.futures import CancelledError from .buildin import Buildin -from .protocol import Proto -from .protocol import Protocol -from .abc.events import Events -from ..util.convert import convert -from ..exceptions import ForbiddenError, NodeError, AuthError - - - -_WATCH_MISSING = \ - 'auto reconnect cannot act on node changes since `WATCH` privileges on ' \ - 'the `@node` scope are missing' +from .protocol import Proto, Protocol +from ..exceptions import NodeError, AuthError class Client(Buildin): - MAX_RECONNECT_WAIT_TIME = 120 + MAX_RECONNECT_WAIT_TIME = 60 MAX_RECONNECT_TIMEOUT = 10 def __init__( @@ -38,10 +29,9 @@ def __init__( When set to `True`, the client will automatically reconnect when a connection is lost. If set to `False` and the connection gets lost, one may call the `reconnect()` method to - make a new connection. The auto-reconnect option can act on - node changes and does so automatically if the connected user - has the required `WATCH` privileges on the `@node` scope. - Defaults to `True`. + make a new connection. The auto-reconnect option will listen to + node changes and automatically start a reconnect loop if the + *shutting-down* status is received from the node. ssl (SSLContext or bool, optional): Accepts an ssl.SSLContext for creating a secure connection using SSL/TLS. This argument may simply be set to `True` in @@ -63,34 +53,23 @@ def __init__( self._scope = '@t' # default to thingsdb scope self._pool_idx = 0 self._reconnecting = False + self._rooms = dict() + self._rooms_lock = asyncio.Lock() + if ssl is True: self._ssl = SSLContext(PROTOCOL_TLS) elif ssl is False: self._ssl = None else: self._ssl = ssl - self._event_handlers = [] - - def add_event_handler(self, event_handler: Events) -> None: - """Add an event handler. - - Event handlers will called in the order they are added. - - Args: - event_handler (Events): - An instance of Events (see thingsdb.client.abc.events). - """ - self._event_handlers.append(event_handler) - def remove_event_handler(self, event_handler: Events) -> None: - """Remove an event handler. + def get_rooms(self): + """Can be used to get the rooms which are joined. - Args: - event_handler (Events): - An instance of Events (see thingsdb.client.abc.events). + Returns: + a tuple with unique Room instances. """ - self._event_handlers.remove(event_handler) - + return tuple(self._rooms.values()) def get_event_loop(self) -> asyncio.AbstractEventLoop: """Can be used to get the event loop. @@ -297,18 +276,11 @@ async def authenticate( self._auth = self._auth_check(auth) await self._authenticate(timeout) - if self._reconnect: - try: - await self.watch(scope='@n') - except ForbiddenError: - logging.warning(_WATCH_MISSING) - def query( self, code: str, scope: Optional[str] = None, timeout: Optional[int] = None, - convert_vars: bool = True, **kwargs: Any ) -> asyncio.Future: """Query ThingsDB. @@ -326,13 +298,6 @@ def query( Raise a time-out exception if no response is received within X seconds. If no time-out is given, the client will wait forever. Defaults to `None`. - convert_vars (bool, optional): - Only applicable if `**kwargs` are given. If set to `True`, then - the provided **kwargs values will be converted so ThingsDB can - understand them. For example, a thing should be given just by - it's ID and with conversion the `#` will be extracted. When - this argument is `False`, the **kwargs stay untouched. - Defaults to `True`. **kwargs (any, optional): Can be used to inject variable into the ThingsDB code. @@ -360,13 +325,9 @@ def query( scope = self._scope code = code.strip() # strip white space characters - + data = [scope, code] if kwargs: - if convert_vars: - kwargs = {k: convert(v) for k, v in kwargs.items()} - data = [scope, code, kwargs] - else: - data = [scope, code] + data.append(kwargs) return self._write_pkg(Proto.REQ_QUERY, data, timeout=timeout) @@ -379,7 +340,7 @@ async def _ensure_write( ) -> asyncio.Future: while True: if not self.is_connected(): - logging.info('wait for a connection') + logging.info('Wait for a connection') await asyncio.sleep(1.0) continue @@ -387,7 +348,7 @@ async def _ensure_write( res = await self._protocol.write(tp, data, is_bin, timeout) except (CancelledError, NodeError, AuthError) as e: logging.error( - f'error sending package: ' + f'Failed to transmit package: ' f'{e}({e.__class__.__name__}) (will try again)') await asyncio.sleep(1.0) continue @@ -411,7 +372,6 @@ def run( *args: Optional[Any], scope: Optional[str] = None, timeout: Optional[int] = None, - convert_args: bool = True, **kwargs: Any, ) -> asyncio.Future: """Run a procedure. @@ -434,13 +394,6 @@ def run( Raise a time-out exception if no response is received within X seconds. If no time-out is given, the client will wait forever. Defaults to `None`. - convert_args (bool, optional): - Only applicable if `*args` are given. If set to `True`, then - the provided `*args` values will be converted so ThingsDB can - understand them. For example, a thing should be given just by - it's ID and with conversion the `#` will be extracted. When - this argument is `False`, the `*args` stay untouched. - Defaults to `True`. **kwargs (any): Arguments which are injected as the procedure arguments. Instead of by name, the arguments may also be parsed using @@ -460,60 +413,86 @@ def run( if scope is None: scope = self._scope - if kwargs: - args = { - k: convert(v) - for k, v in kwargs.items() - } if convert_args else kwargs - elif args and convert_args: - args = [convert(arg) for arg in args] - - return self._write_pkg( - Proto.REQ_RUN, - [scope, procedure, args], - timeout=timeout) - - def watch(self, *ids: int, scope: Optional[str] = None) -> asyncio.Future: - """Subscribe for changes on given things. - - This method accepts one or more thing ids to subscribe to. This - method will simply return None as soon as the subscribe request is - successful handled by ThingsDB. After the response, the client will - receive `INIT` events for all subscribed ids. After that, ThingsDB - will continue to provide the client with `UPDATE` events which contain - changes to the subscribed thing. A `DELETE` event might be received - if, and only if the thing is removed and garbage collected from the - collection. + data = [scope, procedure] + + if args: + data.append(args) + if kwargs: + raise ValueError( + 'it is not possible to use both keyword arguments ' + 'and positional arguments at the same time') + elif kwargs: + data.append(kwargs) + + return self._write_pkg(Proto.REQ_RUN, data, timeout=timeout) + + def emit( + self, + room_id: int, + event: str, + *args, + scope: Optional[str] = None, + ) -> asyncio.Future: + """Emit an event. + + This function is most likely called from a Room instance but may be + used directly. Args: - *ids (int): - Thing IDs to subscribe to. No error is returned in case one of - the given things are not found within the collection, instead a - `WARN` event will be send to the client. + room_id (int): + Room Id to emit the event to. + event (str): + Name of the event to emit. + *args: + Additional argument to send with the event. scope (str, optional): - Subscribe on things in this scope. If not specified, the + Find the room in this scope. If not specified, the default scope will be used. Only collection scopes may contain - things so only collection scopes can be used. + rooms so only collection scopes can be used. See https://docs.thingsdb.net/v0/overview/scopes/ for how to format a scope. Returns: asyncio.Future (None): - Future which result will be set to `None` if successful. + Future which should be awaited. The result of the future will + be set to `None` when successful. """ if scope is None: scope = self._scope + return self._write_pkg(Proto.REQ_EMIT, [scope, room_id, event, *args]) - return self._write_pkg(Proto.REQ_WATCH, [scope, *ids]) + def _join(self, *ids: int, scope: Optional[str] = None) -> asyncio.Future: + """Join one or more rooms. - def unwatch( - self, - *ids: int, - scope: Optional[str] = None - ) -> asyncio.Future: - """Unsubscribe for changes on given things. + Args: + *ids (int): + Room Ids to join. No error is returned in case one of + the given room Ids are not found within the collection. + Instead, the return value will contain `None` instead of the + Id in the returned list. + scope (str, optional): + Join room(s) in this scope. If not specified, the + default scope will be used. Only collection scopes may contain + rooms so only collection scopes can be used. + See https://docs.thingsdb.net/v0/overview/scopes/ for how to + format a scope. + + Returns: + asyncio.Future ([*ids]): + Returns a Future which result will be set to a `list` with all + the room Ids from the request. If, and only if a given room Id + was not found in the collection, then the room Id at this + position in the list will be `None`. + """ + if scope is None: + scope = self._scope + + return self._write_pkg(Proto.REQ_JOIN, [scope, *ids]) - Stop receiving events for the things given by one or more ids. It is + def _leave(self, *ids: int, scope: Optional[str] = None) -> asyncio.Future: + """Leave one or more rooms. + + Stop receiving events for the rooms given by one or more ids. It is possible that the client receives an event shortly after calling the unsubscribe method because the event was queued. @@ -530,13 +509,16 @@ def unwatch( format a scope. Returns: - asyncio.Future (None): - Future which result will be set to `None` if successful. + asyncio.Future ([*ids]]): + Returns a Future which result will be set to a `list` with all + the room Ids from the request. If, and only if a given room Id + was not found in the collection, then the room Id at this + position in the list will be `None`. """ if scope is None: scope = self._scope - return self._write_pkg(Proto.REQ_UNWATCH, [scope, *ids]) + return self._write_pkg(Proto.REQ_LEAVE, [scope, *ids]) @staticmethod def _auth_check(auth): @@ -571,20 +553,45 @@ async def _connect(self, timeout=5): self._pool_idx += 1 self._pool_idx %= len(self._pool) + async def _on_room(self, room_id, pkg): + async with self._rooms_lock: + try: + room = self._rooms[room_id] + except KeyError: + logging.warn( + f'Got an event (tp:{pkg.tp}) for room Id {room_id} but ' + f'the room is not known by the ThingsDB client') + else: + room._on_event(pkg) + def _on_event(self, pkg): - if pkg.tp == Proto.ON_NODE_STATUS and \ - self._reconnect and \ - pkg.data == 'SHUTTING_DOWN': - asyncio.ensure_future(self.reconnect(), loop=self._loop) + if pkg.tp == Proto.ON_NODE_STATUS: + status, node_id = pkg.data['status'], pkg.data['id'] + + if self._reconnect and status == 'SHUTTING_DOWN': + asyncio.ensure_future(self.reconnect(), loop=self._loop) + + logging.debug( + f'Node with Id {node_id} has changed its status to: {status}') + return - for event_handler in self._event_handlers: - event_handler(pkg.tp, pkg.data) + try: + room_id = pkg.data['id'] + except KeyError: + if pkg.tp == Proto.ON_WARN: + warn = pkg.data + logging.warn( + f'Warning from ThingsDB: ' + f'{warn["warn_msg"]} ({warn["warn_code"]})') + else: + logging.warn(f'Unexpected event: tp:{pkg.tp} data:{pkg.data}') + else: + asyncio.ensure_future(self._on_room(room_id, pkg), loop=self._loop) def _on_connection_lost(self, protocol, exc): if self._protocol is not protocol: return self._protocol = None - if self._reconnect: asyncio.ensure_future(self.reconnect(), loop=self._loop) @@ -598,11 +605,12 @@ async def _reconnect_loop(self): await self._connect(timeout=timeout) await self._ping(timeout=2) await self._authenticate(timeout=5) + await self._rejoin() except Exception as e: logging.error( - f'connecting to {host}:{port} failed: ' + f'Connecting to {host}:{port} failed: ' f'{e}({e.__class__.__name__}), ' - f'try next connect in {wait_time} seconds' + f'Try next connect in {wait_time} seconds' ) else: if protocol and protocol.transport: @@ -615,27 +623,23 @@ async def _reconnect_loop(self): wait_time = min(wait_time, self.MAX_RECONNECT_WAIT_TIME) timeout = min(timeout+1, self.MAX_RECONNECT_TIMEOUT) - if self._reconnect: - try: - await self.watch(scope='@n') - except ForbiddenError: - logging.warning(_WATCH_MISSING) - - for event_handler in self._event_handlers: - event_handler.on_reconnect() - def _ping(self, timeout): return self._write(Proto.REQ_PING, timeout=timeout) def _authenticate(self, timeout): return self._write(Proto.REQ_AUTH, data=self._auth, timeout=timeout) - @deprecated(details='Use `set_default_scope` instead') - def use(self, scope): - if not scope.startswith('@'): - scope = f'@{scope}' if scope.startswith(':') else f'@:{scope}' - self.set_default_scope(scope) + async def _rejoin(self): + if not self._rooms: + return # do nothig if no rooms are used - @deprecated(details='Use `get_default_scope` instead') - def get_scope(self): - return self._scope + # re-arrange the rooms per scope to combine joins in a less requests + scopes = defaultdict(list) + for room in self._rooms.values(): + if room.id: + scopes[room.scope].append(room.id) + + # join request per scope, each for one or more rooms + await asyncio.gather(*[ + self._join(*ids, scope=scope) + for scope, ids in scopes.items()]) diff --git a/thingsdb/client/protocol.py b/thingsdb/client/protocol.py index a75e58f..b521db3 100644 --- a/thingsdb/client/protocol.py +++ b/thingsdb/client/protocol.py @@ -32,11 +32,11 @@ class Proto(enum.IntEnum): # Events ON_NODE_STATUS = 0x00 - ON_WATCH_INI = 0x01 - ON_WATCH_UPD = 0x02 - ON_WATCH_DEL = 0x03 - ON_WATCH_STOP = 0x04 ON_WARN = 0x05 + ON_ROOM_JOIN = 0x06 + ON_ROOM_LEAVE = 0x07 + ON_ROOM_EMIT = 0x08 + ON_ROOM_DELETE = 0x09 # Responses RES_PING = 0x10 @@ -48,9 +48,10 @@ class Proto(enum.IntEnum): REQ_PING = 0x20 REQ_AUTH = 0x21 REQ_QUERY = 0x22 - REQ_WATCH = 0x23 - REQ_UNWATCH = 0x24 REQ_RUN = 0x25 + REQ_JOIN = 0x26 + REQ_LEAVE = 0x27 + REQ_EMIT = 0x28 class Err(enum.IntEnum): @@ -117,11 +118,11 @@ class Err(enum.IntEnum): _PROTO_EVENTS = ( Proto.ON_NODE_STATUS, - Proto.ON_WATCH_INI, - Proto.ON_WATCH_UPD, - Proto.ON_WATCH_DEL, - Proto.ON_WATCH_STOP, - Proto.ON_WARN + Proto.ON_WARN, + Proto.ON_ROOM_JOIN, + Proto.ON_ROOM_LEAVE, + Proto.ON_ROOM_EMIT, + Proto.ON_ROOM_DELETE, ) @@ -160,7 +161,7 @@ def connection_lost(self, exc: Exception) -> None: ''' if self._requests: logging.error( - f'canceling {len(self._requests)} requests ' + f'Canceling {len(self._requests)} requests ' 'due to a lost connection' ) while self._requests: @@ -190,10 +191,8 @@ def data_received(self, data: bytes) -> None: return None try: self.package.extract_data_from(self._buffered_data) - except KeyError as e: - logging.error(f'unsupported package received: {e}') - except Exception as e: - logging.exception(e) + except Exception: + logging.exception('') # empty the byte-array to recover from this error self._buffered_data.clear() else: @@ -203,10 +202,10 @@ def data_received(self, data: bytes) -> None: elif tp in _PROTO_EVENTS: try: self._on_event(self.package) - except Exception as e: - logging.exception(e) + except Exception: + logging.exception('') else: - logging.error(f'unsupported package type received: {tp}') + logging.error(f'Unsupported package type received: {tp}') self.package = None @@ -250,18 +249,18 @@ async def _timer(self, pid: int, timeout: Optional[int]) -> None: try: future, task = self._requests.pop(pid) except KeyError: - logging.error('timed out package id not found: {}'.format( + logging.error('Timed out package Id not found: {}'.format( self._data_package.pid)) return None future.set_exception(TimeoutError( - 'request timed out on package id {}'.format(pid))) + 'request timed out on package Id {}'.format(pid))) def _on_response(self, pkg: Package) -> None: try: future, task = self._requests.pop(pkg.pid) except KeyError: - logging.error('received package id not found: {}'.format(pkg.pid)) + logging.error('Received package id not found: {}'.format(pkg.pid)) return None # cancel the timeout task diff --git a/thingsdb/model/__init__.py b/thingsdb/model/__init__.py deleted file mode 100644 index 0abf7b1..0000000 --- a/thingsdb/model/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .enum import Enum -from .enummember import EnumMember -from .collection import Collection -from .thing import Thing, ThingStrict, ThingHash -from .emitter import Emitter diff --git a/thingsdb/model/collection.py b/thingsdb/model/collection.py deleted file mode 100644 index 0a517ba..0000000 --- a/thingsdb/model/collection.py +++ /dev/null @@ -1,210 +0,0 @@ -import asyncio -import weakref -import functools -import logging -from typing import Iterable, Optional, Union, TextIO, Type, Any -from ..client import Client -from .eventhandler import EventHandler -from .thing import Thing -from .enum import Enum -from .prop import Prop - - -class Collection(Thing): - - __STRICT__ = True - __AS_TYPE__ = False - - def __init__(self, name=None): - self._things = weakref.WeakValueDictionary() - self._name = \ - getattr(self, '__COLLECTION_NAME__', self.__class__.__name__) \ - if name is None else name - self._scope = f'//{self._name}' - self._pending = set() # Thing ID's - self._client = None # use load, build or rebuild - self._id = None - self._types = {} # mapping where keys are type_id - self._enums = {} # mapping where keys are enum_id - self._conv_any = Prop.get_conv('any', klass=Thing, collection=self) - self._conv_thing = Prop.get_conv('thing', klass=Thing, collection=self) - self._unpack(self) - - async def load(self, client: Client) -> None: - assert self._client is None, 'This collection is already loaded' - self._client = client - id = await self._client.query('.id()', scope=self._scope) - super().__init__(self, id) - client.add_event_handler(EventHandler(self)) - await self._client.watch(id, scope=self._scope) - - async def build( - self, - client: Client, - classes: Optional[Iterable[Type[Thing]]] = None, - scripts: Optional[Iterable[Union[TextIO, str]]] = None, - delete_if_exists: bool = False, - ) -> None: - """Build the collection in ThingsDB. - - This will create the collection in ThingsDB will default values for - all collection properties. - - Args: - client (Client): - ThingsDB Client instance with an active, authenticated - connection. - classes (iterable, optional): - Optional list of classes to create. This is only required when - a class has no relation with the collection, otherwise the - class will be created recursively while building the - collection. Defaults to `None`. - scripts (iterable, optional): - Optional list of script which will be started after building - the collection. They will be started in the same order as they - are given. The iterable may contain File Objects in Text mode, - or plain strings with ThingsDB code. Defaults to `None`. - delete_if_exists (bool): - When `True`, the collection will be removed if it exists. Be - careful since all data in the collection will be removed! If - this arguments is `False`, a `KeyError` will be raised when the - collection exists. Defaults to `False`. - """ - assert self._client is None, 'This collection is already loaded' - if (await client.has_collection(self._name)): - if delete_if_exists: - await client.del_collection(self._name) - else: - raise KeyError(f'Collection `{self._name}` already exists') - - # create the collection, we are sure it does not exists - await client.new_collection(self._name) - - # first create the types so circular dependencies may be handled - await self._new_type(client, self) - if classes: - for model in classes: - await model._new_type(client, self) - - # set the type definitions and - await self._set_type(client, self) - if classes: - for model in classes: - await model._set_type(client, self) - - if scripts is not None: - for script in scripts: - code = script if isinstance(script, str) else script.read() - await client.query( - code=code, - scope=self._scope, - convert_vars=False) - - async def query(self, code: str, **kwargs: Any) -> Any: - """Query using this collection as scope. - - This is the same as calling the `query(..)` method on the client with - the scope='...' argument set to the collection scope. All keyword - arguments will be parsed to the `Client().query(..)` method so look at - that method for more information. - """ - return await self._client.query(code, scope=self._scope, **kwargs) - - def on_reconnect(self): - """Called from the `EventHandler`.""" - self._pending.update(self._things.keys()) - - n = len(self._pending) - - logging.warning( - f'on_reconnect, try re-watching {n} ' - f'thing{"" if n == 1 else "s"}...') - - self._go_pending() - - def _add_pending(self, thing): - self._pending.add(thing.id()) - - def _go_pending(self): - if not self._pending: - return - future = asyncio.ensure_future( - self._client.watch(*self._pending, scope=self._scope), - loop=self._client._loop - ) - self._pending.clear() - return future - - def _register(self, thing: Thing) -> None: - self._things[thing._id] = thing - - def _set_procedure(self, data): - name = data['name'] - setattr(self, name, functools.partial( - self._client.run, - name, - scope=self._scope)) - - def _rename_procedure(self, data): - old = data['old'] - name = data['name'] - func = getattr(self, old, None) - if callable(func): - setattr(self, name, func) - delattr(self, old) - - def _update_type(self, data): - self._types[data['type_id']] = tuple(k[0] for k in data['fields']) - - def _upd_type_add(self, data): - if 'spec' in data: # ignore methods - self._types[data['type_id']] += data['name'], - - def _upd_type_del(self, data): - type_id, name = data['type_id'], data['name'] - t = self._types[type_id] - try: - idx = t.index(name) - except ValueError: - return # probably a method - - t = list(t) - try: - t[idx] = t.pop() # swap remove - except IndexError: - pass - self._types[type_id] = tuple(t) - - def _upd_type_ren(self, data): - type_id, name, to = data['type_id'], data['name'], data['to'] - t = self._types[type_id] - try: - idx = t.index(name) - except ValueError: - return # probably a method - t = list(t) - t[idx] = to - self._types[type_id] = tuple(t) - - def _update_enum(self, data): - Enum._update_enum(self._name, self._enums, data, self._conv_any) - - def _upd_enum_add(self, data): - Enum._upd_enum_add(self._enums, data, self._conv_any) - - def _upd_enum_def(self, data): - Enum._upd_enum_def(self._enums, data) - - def _upd_enum_del(self, data): - Enum._upd_enum_del(self._enums, data) - - def _upd_enum_mod(self, data): - Enum._upd_enum_mod(self._enums, data, self._conv_any) - - def _upd_enum_ren(self, data): - '''Rename a enum member.''' - Enum._upd_enum_ren(self._enums, data) - - def _get_enum_member(self, enum_id, idx): - members, _ = self._enums[enum_id] - return members[idx] diff --git a/thingsdb/model/emitter.py b/thingsdb/model/emitter.py deleted file mode 100644 index 2d23dc6..0000000 --- a/thingsdb/model/emitter.py +++ /dev/null @@ -1,117 +0,0 @@ -import asyncio -import logging -from typing import Union -from ..client import Client -from ..client.abc.events import Events - -class Emitter(Events): - - _ev_handlers = dict() - - def __init__( - self, - client: Client, - emitter: Union[int, str] = '', - scope: str = None): - """Initializes an emitter. - - Args: - client (thingsdb.client.Client): - ThingsDB Client instance. - emitter (str/int): - Code which should point to the `thing` to watch for events. - Defaults to an empty string which is the collection. - Examples are: - - '' - - '.emitter' - - '#123' - Or, just the ID of the thing - scope (str): - Collection scope. Defaults to the scope of the client. - """ - super().__init__() - self._event_id = 0 - self._client = client - - if isinstance(emitter, int): - self._thing_id = emitter - self._code = None - else: - self._thing_id = None - if emitter: - emitter = '' if emitter == '.' else f'{{{emitter}}}' - self._code = \ - f'{emitter}.watch(); {emitter}.id();' - - self._scope = scope or client.get_scope() - - client.add_event_handler(self) - asyncio.ensure_future(self._watch()) - - def __init_subclass__(cls): - cls._ev_handlers = {} - - for key, val in cls.__dict__.items(): - if not key.startswith('__') and \ - callable(val) and hasattr(val, '_ev'): - if asyncio.iscoroutinefunction(val): - val = functools.partial(asyncio.ensure_future, val) - cls._ev_handlers[val._ev] = val - - async def _watch(self): - if self._thing_id is None: - self._thing_id = \ - await self._client.query(self._code, scope=self._scope) - else: - await self._client.watch(self._thing_id, scope=self._scope) - - def on_reconnect(self): - asyncio.ensure_future(self._watch()) - - def on_node_status(self, _status): - pass - - def on_warning(self, warn): - logging.warning(f'{warn["warn_msg"]} ({warn["warn_code"]})') - - def on_watch_init(self, _data): - pass - - def on_event(self, ev, *args): - cls = self.__class__ - fun = cls._ev_handlers.get(ev) - if fun is None: - logging.debug(f'no event handler for `{ev}` on {cls.__name__}') - return - fun(self, *args) - - def on_watch_update(self, data): - thing_id = data['#'] - if thing_id != self._thing_id: - return - - event_id, jobs = data['event'], data['jobs'] - - if self._event_id > event_id: - logging.warning( - f'ignore event because the current event `{self._event_id}` ' - f'is greather than the received event `{event_id}`') - return - self._event_id = event_id - - for job_dict in jobs: - for name, job in job_dict.items(): - if name == 'event': - self.on_event(*job) - - def on_watch_delete(self, data): - thing_id = data['#'] - if thing_id == self._thing_id: - logging.debug(f'emitter with id {thing_id} is removed') - self._client.remove_event_handler(self) - - def on_watch_stop(self, data): - thing_id = data['#'] - if thing_id == self._thing_id: - logging.debug(f'emitter with id {thing_id} is stopped') - self._client.remove_event_handler(self) diff --git a/thingsdb/model/enum.py b/thingsdb/model/enum.py deleted file mode 100644 index efbe4c8..0000000 --- a/thingsdb/model/enum.py +++ /dev/null @@ -1,145 +0,0 @@ -from .enummember import EnumMember - -_enums_lookup = {} # enums lookup by name - - -class _GetAttr(type): - def __getitem__(cls, name): - for k, v in cls.__dict__.items(): - if k == name: - return v - raise KeyError(f'no member with name `{k}`') - - -class Enum(metaclass=_GetAttr): - - _visited = 0 # For build, 0=not visited, 1=new_type, 2=set_type, 3=build - - def __new__(cls, *args): - if len(args) != 1: - return super().__new__(cls) - - value = args[0] - for v in cls.__dict__.values(): - if isinstance(v, EnumMember) and v._value == value: - return v - raise ValueError(f'no member with value `{value}`') - - def __init_subclass__(cls, **kwargs): - if issubclass(cls, EnumMember): - return - - cls._name = getattr(cls, '__NAME__', cls.__name__) - cls._cname = getattr(cls, '__COLLECTION_NAME__', '') - cls._id = None - cls._memberclass = type(f'{cls._name}Member', (EnumMember, cls), {}) - - # upgrade attributes to member instances - for k, v in cls.__dict__.items(): - if k.startswith('_'): - continue - setattr(cls, k, cls._memberclass(cls._name, k, v)) - - # register for lookup by name - _enums_lookup[cls._cname + cls._name] = cls - - @staticmethod - def _update_enum(cname, enums, data, convert): - name = data['name'] - enum = _enums_lookup.get(name, _enums_lookup.get(cname + name)) - if enum and enum._cname and enum._cname != cname: - raise TypeError( - f'Enum type {name} is used in more than one collection; ' - 'Add __COLLECTION_NAME__ to your Enum definition to fix ' - 'this error') - cls = EnumMember if enum is None else enum._memberclass - - members = [cls(name, k, convert(v)) for k, v in data['members']] - - if enum is not None: - enum._cname = cname - enum._id = data['enum_id'] - for member in members: - setattr(enum, member.name, member) - - enums[data['enum_id']] = members, enum - - @staticmethod - def _upd_enum_add(enums, data, convert): - members, enum = enums[data['enum_id']] - name = members[0]._enum_name - cls = EnumMember if enum is None else enum._memberclass - - member = cls(name, data['name'], convert(data['value'])) - members.append(member) - - if enum is not None: - setattr(enum, member.name, member) - - @staticmethod - def _upd_enum_del(enums, data): - members, enum = enums[data['enum_id']] - - if enum is not None: - member = members[data['index']] - delattr(enum, member.name) - - try: - # swap remove the index - members[data['index']] = members.pop() - except IndexError: - pass - - @staticmethod - def _upd_enum_def(enums, data): - members, _ = enums[data['enum_id']] - # swap index - idx = data['index'] - tmp = members[idx] - members[idx] = members[0] - members[0] = tmp - - @staticmethod - def _upd_enum_mod(enums, data, convert): - members, _ = enums[data['enum_id']] - name = members[0]._enum_name - member = members[data['index']] - member._value = convert(data['value']) - - @staticmethod - def _upd_enum_ren(enums, data): - members, enum = enums[data['enum_id']] - member = members[data['index']] - - if enum is not None: - delattr(enum, member.name) - setattr(enum, data['name'], member) - - member._name = data['name'] - - @classmethod - async def _new_type(cls, client, collection): - if cls._visited > 0: - return - cls._visited += 1 - members = ( - m for m in cls.__dict__.values() - if isinstance(m, EnumMember)) - query = f''' - set_enum('{cls._name}', {{ - {', '.join(f'{m.name}: {m.value!r}' for m in members)} - }}); - ''' - await client.query(query, scope=collection._scope) - - @classmethod - async def _set_type(cls, client, collection): - pass - - @classmethod - def id(cls): - return cls._id - - @classmethod - def name(cls): - return cls._name \ No newline at end of file diff --git a/thingsdb/model/enummember.py b/thingsdb/model/enummember.py deleted file mode 100644 index 723e606..0000000 --- a/thingsdb/model/enummember.py +++ /dev/null @@ -1,25 +0,0 @@ -class EnumMember: - - @property - def name(self): - return self._name - - @property - def value(self): - return self._value - - def __new__(cls, enum_name, name, value): - instance = object.__new__(cls) - instance._name = name - instance._value = value - instance._enum_name = enum_name - return instance - - def __repr__(self): - return f'{self._enum_name}{{{self._name}}}' - - def __eq__(self, other): - return self is other or self._value == other - - def __ne__(self, other): - return not self.__eq__(other) diff --git a/thingsdb/model/eventhandler.py b/thingsdb/model/eventhandler.py deleted file mode 100644 index 702d13e..0000000 --- a/thingsdb/model/eventhandler.py +++ /dev/null @@ -1,65 +0,0 @@ -import logging -from ..client.abc.events import Events - - -class EventHandler(Events): - - def __init__(self, collection): - super().__init__() - self._collection = collection - - def on_reconnect(self): - self._collection.on_reconnect() - - def on_node_status(self, _status): - pass - - def on_warning(self, warn): - logging.warning(f'{warn["warn_msg"]} ({warn["warn_code"]})') - - def on_watch_init(self, data): - thing_dict = data['thing'] - thing_id = thing_dict['#'] - thing = self._collection._things.get(thing_id) - if thing is None: - logging.debug( - f'Cannot init #{thing_id} since the thing is not registerd ' - f'for watching by collection `{self._collection._name}`; ' - f'Maybe this thing is watched as an Emitter?') - return - - thing.on_init(data['event'], thing_dict) - if thing is self._collection: - for procedure in data['procedures']: - thing._set_procedure(procedure) - for enum_info in data['enums']: - thing._update_enum(enum_info) - for type_info in data['types']: - thing._update_type(type_info) - - def on_watch_update(self, data): - thing_id = data['#'] - thing = self._collection._things.get(thing_id) - if thing is None: - logging.debug( - f'Cannot update #{thing_id} since the thing is not registerd ' - f'for watching by collection `{self._collection._name}`; ' - f'Maybe this thing is watched as an Emitter?') - return - - thing.on_update(data['event'], data['jobs']) - - def on_watch_delete(self, data): - thing_id = data['#'] - thing = self._collection._things.get(thing_id) - if thing is not None: - # since weakref is used, the thing is probably already removed and - # the code will not reach this point, unless there are references - # left. - thing.on_delete() - - def on_watch_stop(self, data): - thing_id = data['#'] - thing = self._collection._things.get(thing_id) - if thing is not None: - thing.on_stop() diff --git a/thingsdb/model/prop.py b/thingsdb/model/prop.py deleted file mode 100644 index 85ae2f4..0000000 --- a/thingsdb/model/prop.py +++ /dev/null @@ -1,112 +0,0 @@ -import functools -from .proptypes import PropTypes -from .enum import Enum - - -class Prop: - - __slots__ = ( - 'vconv', - 'nconv', - 'spec', - 'nillable', - 'model', - ) - - @staticmethod - def get_conv(fname, is_nillable=False, **kwrags): - func = getattr(PropTypes, f'{fname}_', None) - if kwrags: - func = functools.partial(func, **kwrags) - if func: - return functools.partial(PropTypes.nillable, func=func) \ - if is_nillable else func - - def __init__(self, spec, cb=None): - self.vconv = (cb, ) - self.nconv = None - self.spec = spec - self.nillable = False - self.model = None - - def unpack(self, collection): - from .thing import Thing # nopep8 - if callable(self.vconv): - return - - cb_type, self.vconv = self.vconv[0], lambda: True - spec = self.spec - is_nillable = False - - if spec.endswith('?'): - self.nillable = is_nillable = True - spec = spec[:-1] - - assert spec, 'an empty specification is not allowed' - - if spec.startswith('['): - assert spec.endswith(']'), 'missing `]` in specification' - self.nconv = ('array', is_nillable) - spec = spec[1:-1] - is_nillable = spec.endswith('?') - if is_nillable: - spec = spec[:-1] - - elif spec.startswith('{'): - assert spec.endswith('}'), 'missing `}` in specification' - self.nconv = ('set', is_nillable) - spec = spec[1:-1] - is_nillable = False - - kwargs = { - 'klass': Thing, - 'collection': collection, - } - - if not spec: - nested = self.get_conv('any', **kwargs) - self.vconv = self.get_conv(*self.nconv, nested=nested) - self.nconv = nested - return # finished, this is an array or set - - if spec == 'Thing' and cb_type is None: - spec, kwargs['watch'] = 'thing', True - - self.vconv = self.get_conv( - spec, is_nillable, **kwargs - if spec == 'any' or spec == 'thing' else {}) - - if self.vconv is None: - assert callable(cb_type), \ - f'type `{spec}` is not a build-in, a calleble is required' - - self.model = cb_type \ - if isinstance(cb_type, type) and issubclass(cb_type, Thing) \ - else cb_type() - - if isinstance(self.model, Enum): - self.vconv = self.get_conv('enum', collection=collection) - return - - name = self.model._type_name - - assert spec == name, \ - f'type `{name}` does not match the specification `{spec}`' - - assert hasattr(self.model, '_props'), \ - f'missing `_props`, type `{name}` ' \ - f'must be a subclass of `Thing`' - - kwargs = { - 'klass': self.model, - 'collection': collection, - 'watch': True, - } - self.vconv = self.get_conv('thing', is_nillable, **kwargs) - - self.model._unpack(collection) - - if self.nconv is not None: - vconf = self.vconv - self.vconv = self.get_conv(*self.nconv, nested=vconf) - self.nconv = vconf diff --git a/thingsdb/model/proptypes.py b/thingsdb/model/proptypes.py deleted file mode 100644 index fdce6b3..0000000 --- a/thingsdb/model/proptypes.py +++ /dev/null @@ -1,180 +0,0 @@ -import re -import functools -import logging - - -class PropTypes: - - @staticmethod - def any_(v, klass, collection): - if isinstance(v, dict): - if '#' in v: - return PropTypes.thing_(v, klass, collection) - if '\'' in v: - return PropTypes.datetime_(v) - if '"' in v: - return PropTypes.timeval_(v) - if '%' in v: - return collection._get_enum_member(*v['%']) - if '$' in v: - return PropTypes.set_(v, nested=functools.partial( - PropTypes.thing_, - klass=klass, - collection=collection - )) - if '*' in v: - pattern = v['*'] - flags = 0 - if pattern.endswith('i'): - flags |= re.IGNORECASE - pattern = pattern[:-2] - else: - pattern = pattern[1:-1] - return re.compile(pattern, flags) - if '!': - msg = v['error_msg'] - # TODO : Return correct exception - return Exception(msg) - logging.warning(f'unhandled dict: {v}') - if isinstance(v, list): - return PropTypes.array_(v, nested=functools.partial( - PropTypes.any_, - klass=klass, - collection=collection - )) - return v - - @staticmethod - def thing_(v, klass, collection, watch=False): - if not isinstance(v, dict): - raise TypeError(f'expecting type `dict`, got `{type(v)}`') - - thing_id = v.pop('#') - thing = collection._things.get(thing_id) - - if thing is None: - thing = klass(collection, thing_id) - else: - watch = False - - type_id = v.pop('.', None) - if type_id is None: - thing.__dict__.update(v) - else: - fmap = collection._types.get(type_id) - if fmap: - thing.__dict__.update(zip(fmap, v[''])) - - if watch and not thing: - collection._add_pending(thing) - - return thing - - @staticmethod - def enum_(v, collection): - return collection._get_enum_member(*v['%']) - - @staticmethod - def str_(v): - if not isinstance(v, str): - raise TypeError(f'expecting type `str`, got `{type(v)}`') - return v - - @staticmethod - def utf8_(v): - if not isinstance(v, str): - raise TypeError(f'expecting type `str`, got `{type(v)}`') - return v - - @staticmethod - def datetime_(v): - if not isinstance(v, dict) or not '\'' in dict: - raise TypeError(f'expecting type `datetime`, got `{type(v)}`') - timestamp, _offset, _tz_idx = v['\''] - return datetime.fromtimestamp(timestamp) - - @staticmethod - def timeval_(v): - if not isinstance(v, dict) or not '"' in dict: - raise TypeError(f'expecting type `timeval`, got `{type(v)}`') - timestamp, _offset, _tz_idx = v['"'] - return datetime.fromtimestamp(timestamp) - - @staticmethod - def bytes_(v): - if not isinstance(v, bytes): - raise TypeError(f'expecting type `bytes`, got `{type(v)}`') - return v - - @staticmethod - def raw_(v, _types=(str, bytes)): - if not isinstance(v, _types): - raise TypeError(f'expecting type `bytes`, got `{type(v)}`') - return v - - @staticmethod - def bool_(v): - if not isinstance(v, bool): - raise TypeError(f'expecting type `bool`, got `{type(v)}`') - return v - - @staticmethod - def int_(v): - if not isinstance(v, int): - raise TypeError(f'expecting type `int`, got `{type(v)}`') - return v - - @staticmethod - def uint_(v): - if not isinstance(v, int): - raise TypeError(f'expecting type `int`, got `{type(v)}`') - if v < 0: - raise ValueError(f'expecting an integer value >= 0, got {v}') - return v - - @staticmethod - def pint_(v): - if not isinstance(v, int): - raise TypeError(f'expecting type `int`, got `{type(v)}`') - if v <= 0: - raise ValueError(f'expecting an integer value > 0, got {v}') - return v - - @staticmethod - def nint_(v): - if not isinstance(v, int): - raise TypeError(f'expecting type `int`, got `{type(v)}`') - if v >= 0: - raise ValueError(f'expecting an integer value < 0, got {v}') - return v - - @staticmethod - def float_(v): - if not isinstance(v, float): - raise TypeError(f'expecting type `float`, got `{type(v)}`') - return v - - @staticmethod - def number_(v): - if not isinstance(v, (float, int)): - raise TypeError( - f'expecting type `int` or `float`, got `{type(v)}`') - return v - - @staticmethod - def array_(v, nested): - if not isinstance(v, list): - raise TypeError(f'expecting a `list`, got `{type(v)}`') - return [nested(item) for item in v] - - @staticmethod - def set_(v, nested): - if not isinstance(v, dict): - raise TypeError(f'expecting a `dict`, got `{type(v)}`') - v = v['$'] - return {nested(item) for item in v} - - @staticmethod - - def nillable(v, func=None): - return v if v is None else func(v) diff --git a/thingsdb/model/thing.py b/thingsdb/model/thing.py deleted file mode 100644 index e7cad09..0000000 --- a/thingsdb/model/thing.py +++ /dev/null @@ -1,425 +0,0 @@ -import asyncio -import logging -from .prop import Prop - - -def checkevent(f): - def wrapper(self, event_id, *args): - if self._event_id > event_id: - logging.warning( - f'ignore event because the current event `{self._event_id}` ' - f'is greather than the received event `{event_id}`') - return - self._event_id = event_id - f(self, event_id, *args) - self._collection._go_pending() - return wrapper - - -class ThingHash: - def __init__(self, id): - self._id = id - - def __hash__(self): - return self._id - - def __eq__(self, other): - return self._id == other._id - - -class Thing(ThingHash): - # When __STRICT__ is set to `True`, only properties which are defined in - # the model class are assigned to a `Thing` instance. If `False`, all - # properties are set, not only the ones defined by the model class. - __STRICT__ = False - - # When __SET_ANYWAY__ is set to `True`, values which do mot match the - # specification will be assigned to a `Thing` instance anyway and only - # a warning will be logged. If `False`, the properties will not be set. - __SET_ANYWAY__ = False - - # When __AS_TYPE__ is set to `True`, this class will be created in - # thingsdb as a Type when using the `build(..)` method. If `False`, no type - # will be created. A Collection instance will have `False` as default. - __AS_TYPE__ = True - - _ev_handlers = dict() - _props = dict() - _type_name = None # Only set when __AS_TYPE__ is True - _visited = 0 # For build, 0=not visited, 1=new_type, 2=set_type, 3=build - - def __init__(self, collection, id: int): - super().__init__(id) - self._event_id = 0 - self._collection = collection - collection._register(self) - - def __init_subclass__(cls): - cls._ev_handlers = {} - cls._props = {} - items = { - k: v for k, v in cls.__dict__.items() if not k.startswith('__')} - - for key, val in items.items(): - if isinstance(val, str): - val = val, - if isinstance(val, tuple): - cls._props[key] = Prop(*val) - delattr(cls, key) - elif callable(val) and hasattr(val, '_ev'): - cls._ev_handlers[val._ev] = val - - if cls.__AS_TYPE__: - cls._type_name = getattr(cls, '__TYPE_NAME__', cls.__name__) - - def __bool__(self): - return bool(self._event_id) - - def __repr__(self): - return f'#{self._id}' - - def id(self): - return self._id - - def get_collection(self): - return self._collection - - def get_client(self): - return self._collection._client - - @classmethod - def _unpack(cls, collection): - if cls._props: - for p in cls._props.values(): - p.unpack(collection) - - # unpacking is no longer required - cls._unpack = lambda *_args: None - - def watch(self): - collection = self._collection - - # when calling watch directly, make sure the props are unpacked - self._unpack(collection) - - return collection._client.watch(self._id, scope=collection._scope) - - def unwatch(self): - collection = self._collection - return collection._client.unwatch(self._id, scope=collection._scope) - - def emit(self, event, *args): - data = {f'd{i}': v for i, v in enumerate(args)} - dstr = "".join((f", {k}" for k in data.keys())) - - return self._collection.query( - f'thing(id).emit(event{dstr});', - id=self._id, - event=event, - **data) - - @checkevent - def on_init(self, event, data): - self._job_set(data) - - @checkevent - def on_update(self, event, jobs): - for job_dict in jobs: - for name, job in job_dict.items(): - jobfun = self._UPDMAP.get(name) - if jobfun is None: - logging.warning(f'unhandled job `{name}` for `{self}`') - continue - jobfun(self, job) - - def on_delete(self): - self._collection._things.pop(self.id()) - - def on_event(self, ev, *args): - cls = self.__class__ - fun = cls._ev_handlers.get(ev) - if fun is None: - logging.debug(f'no event handler for {ev} on {cls.__name__}') - return - fun(self, *args) - - def on_stop(self): - logging.warning(f'stopped watching thing {self}') - - def _job_add(self, pair): - cls = self.__class__ - (k, v), = pair.items() - prop = cls._props.get(k) - - if not prop and cls.__STRICT__: - return - - try: - set_ = getattr(self, k) - except AttributeError: - if prop: - logging.warning( - f'missing property `{k}` on `{self}` ' - f'while the property is defined in the ' - f'model class as `{prop.spec}`') - return - - if not isinstance(set_, set): - logging.warning( - f'got a add job for property `{k}` on `{self}` ' - f'while the property is of type `{type(set_)}`') - return - - convert = prop.nconv if prop else self._collection._conv_thing - try: - set_.update((convert(item) for item in v)) - except Exception as e: - logging.warning( - f'got a value for property `{k}` on `{self}` which ' - f'does not match `{prop.spec if prop else "thing"}` ({e})') - - def _job_del(self, k): - prop = self.__class__._props.get(k) - if prop: - logging.warning( - f'property `{k}` on `{self}` will be removed while it ' - f'is defined in the model class as `{prop.spec}`') - try: - delattr(self, k) - except AttributeError: - pass - - def _job_event(self, data): - self.on_event(*data) - - def _job_remove(self, pair): - cls = self.__class__ - (k, v), = pair.items() - prop = cls._props.get(k) - - if not prop and cls.__STRICT__: - return - - try: - set_ = getattr(self, k) - except AttributeError: - if prop: - logging.warning( - f'missing property `{k}` on `{self}` ' - f'while the property is defined in the ' - f'model class as `{prop.spec}`') - return - - if not isinstance(set_, set): - logging.warning( - f'got a remove job for property `{k}` on `{self}` ' - f'while the property is of type `{type(set_)}`') - return - - set_.difference_update((ThingHash(id) for id in v)) - - def _job_set(self, pairs): - cls = self.__class__ - - for k, v in pairs.items(): - if k == '#': - continue - - prop = cls._props.get(k) - if prop: - convert = prop.vconv - elif cls.__STRICT__: - continue - else: - convert = self._collection._conv_any - - try: - v = convert(v) - except Exception as e: - logging.warning( - f'got a value for property `{k}` on `{self}` which does ' - f'not match `{prop.spec if prop else "any"}` ({repr(e)})') - if not cls.__SET_ANYWAY__: - continue - setattr(self, k, v) - - self._collection._go_pending() - - def _job_splice(self, pair): - cls = self.__class__ - (k, v), = pair.items() - prop = cls._props.get(k) - - if not prop and cls.__STRICT__: - return - - try: - arr = getattr(self, k) - except AttributeError: - if prop: - logging.warning( - f'missing property `{k}` on `{self}` ' - f'while the property is defined in the ' - f'model class as `{prop.spec}`') - return - - if not isinstance(arr, list): - logging.warning( - f'got a splice job for property `{k}` on `{self}` ' - f'while the property is of type `{type(arr)}`') - return - - index, count, *items = v - convert = prop.nconv if prop else self._collection._conv_any - try: - arr[index:index+count] = (convert(item) for item in items) - except (TypeError, ValueError) as e: - logging.warning( - f'got a value for property `{k}` on `{self}` ' - f'which does not match `{prop.spec if prop else "any"}` ({e})') - - def _job_del_procedure(self, data): - delattr(self._collection, data) - - def _job_del_enum(self, data): - #keep the enum so simply ignore this event - pass - - def _job_del_type(self, data): - # keep the type so simply ignore this event - pass - - def _job_mod_type_add(self, data): - self._collection._upd_type_add(data) - - def _job_mod_type_del(self, data): - self._collection._upd_type_del(data) - - def _job_mod_type_mod(self, data): - # ignore the specification so simply ignore this event - pass - - def _job_mod_type_rel(self, data): - # ignore the specification so simply ignore this event - pass - - def _job_mod_type_ren(self, data): - self._collection._upd_type_ren(data) - - def _job_mod_type_wpo(self, data): - # ignore wrap-only mode so simply ignore this event - pass - - def _job_mod_enum_add(self, data): - self._collection._upd_enum_add(data) - - def _job_mod_enum_del(self, data): - self._collection._upd_enum_del(data) - - def _job_mod_enum_def(self, data): - self._collection._upd_enum_def(data) - - def _job_mod_enum_mod(self, data): - self._collection._upd_enum_mod(data) - - def _job_mod_enum_ren(self, data): - self._collection._upd_enum_ren(data) - - def _job_new_procedure(self, data): - self._collection._set_procedure(data) - - def _job_new_type(self, data): - data['fields'] = [] - self._collection._update_type(data) - - def _job_rename_enum(self, data): - # rename a enum type - pass - - def _job_rename_procedure(self, data): - self._collection._rename_procedure(data) - - def _job_rename_type(self, data): - # do not rename a type in python - pass - - def _job_set_enum(self, data): - self._collection._update_enum(data) - - def _job_set_type(self, data): - self._collection._update_type(data) - - - _UPDMAP = { - # Thing jobs - 'add': _job_add, - 'del': _job_del, - 'event': _job_event, - 'remove': _job_remove, - 'set': _job_set, - 'splice': _job_splice, - - # Collection jobs - 'del_enum': _job_del_enum, - 'del_procedure': _job_del_procedure, - 'del_type': _job_del_type, - 'mod_type_add': _job_mod_type_add, - 'mod_type_del': _job_mod_type_del, - 'mod_enum_add': _job_mod_enum_add, - 'mod_type_mod': _job_mod_type_mod, - 'mod_type_rel': _job_mod_type_rel, - 'mod_type_ren': _job_mod_type_ren, - 'mod_type_wpo': _job_mod_type_wpo, - 'mod_enum_def': _job_mod_enum_def, - 'mod_enum_del': _job_mod_enum_del, - 'mod_enum_mod': _job_mod_enum_mod, - 'mod_enum_ren': _job_mod_enum_ren, - 'new_procedure': _job_new_procedure, - 'new_type': _job_new_type, - 'rename_enum': _job_rename_enum, - 'rename_procedure': _job_rename_procedure, - 'rename_type': _job_rename_type, - 'set_enum': _job_set_enum, - 'set_type': _job_set_type, - } - - @classmethod - async def _new_type(cls, client, collection): - if cls._visited > 0: - return - cls._visited += 1 - - for prop in cls._props.values(): - if prop.model: - await prop.model._new_type(client, collection) - - if not cls._type_name: - return - - await client.query(f''' - new_type('{cls._type_name}'); - ''', scope=collection._scope) - - @classmethod - async def _set_type(cls, client, collection): - if cls._visited > 1: - return - cls._visited += 1 - - for prop in cls._props.values(): - if prop.model: - await prop.model._set_type(client, collection) - - if not cls._type_name: - return - - await client.query(f''' - set_type('{cls._type_name}', {{ - {', '.join(f'{k}: "{p.spec}"' for k, p in cls._props.items())} - }}); - ''', scope=collection._scope) - - -class ThingStrict(Thing): - - __STRICT__ = True diff --git a/thingsdb/room/__init__.py b/thingsdb/room/__init__.py new file mode 100644 index 0000000..1407cc0 --- /dev/null +++ b/thingsdb/room/__init__.py @@ -0,0 +1,2 @@ +from .room import Room +from .event import event diff --git a/thingsdb/room/event.py b/thingsdb/room/event.py new file mode 100644 index 0000000..3bfb550 --- /dev/null +++ b/thingsdb/room/event.py @@ -0,0 +1,4 @@ +"""Decorator for handleing events.""" +from .roombase import RoomBase + +event = RoomBase.event diff --git a/thingsdb/room/room.py b/thingsdb/room/room.py new file mode 100644 index 0000000..460482c --- /dev/null +++ b/thingsdb/room/room.py @@ -0,0 +1,54 @@ +import abc +import asyncio +import logging +from typing import Union +from ..client import Client +from ..client.protocol import Proto +from .roombase import RoomBase + + +class Room(RoomBase): + """Class Room. + + Listening to emit events can be implemented as follow: + + @Room.event('new-msg') + def on_new_msg(self, *args): + pass # do something + + """ + + def on_init(self) -> None: + """On init + Called when a room is joined. This method will be called only once, + thus *not* after a re-connect like the `on_join(..)` method. This + method is guaranteed to be called *before* the `on_join(..)` method. + """ + pass + + async def on_join(self) -> None: + """On join + Called when a room is joined. Unlike the `on_init(..)` method, + the `on_join(..)` method will be called again after a re-connect. + + This is an async method and usually the best method to perform + some ThingsDB queries (if required). + + Unless the `wait` argument to the Room.join(..) function is explicitly + set to None, the first call to this method will finish before the + call the Room.join() is returned. + """ + pass + + def on_leave(self) -> None: + """On leave + Called after a leave room request. This event is *not* triggered + by ThingsDB when a client disconnects or when a node is shutting down. + """ + pass + + def on_delete(self) -> None: + """On delete + Called when the room is removed from ThingsDB. + """ + pass diff --git a/thingsdb/room/roombase.py b/thingsdb/room/roombase.py new file mode 100644 index 0000000..bb48446 --- /dev/null +++ b/thingsdb/room/roombase.py @@ -0,0 +1,183 @@ +import abc +import asyncio +import logging +import functools +from typing import Union, Optional +from ..client import Client +from ..client.protocol import Proto + + +class RoomBase(abc.ABC): + + def __init_subclass__(cls): + cls._event_handlers = {} + + for key, val in cls.__dict__.items(): + if not key.startswith('__') and \ + callable(val) and hasattr(val, '_event'): + cls._event_handlers[val._event] = val + + def __init__( + self, + room: Union[int, str], + scope: str = None): + """Initializes a room. + + Args: + room (int/str): + The room Id or ThingsDB code which returns the Id of the room. + Examples are: + - 123 + - '.my_room.id();' + scope (str): + Collection scope. If no scope is given, the scope will later + be set to the default client scope once the room is joined. + """ + self._client = None + self._id = room + self._scope = scope + self._wait_join = None + + @property + def id(self): + return self._id if isinstance(self._id, int) else None + + @property + def scope(self): + return self._scope + + @property + def client(self): + return self._client + + async def join(self, client: Client, wait: Optional[float] = 60): + """Join a room. + + Args: + client (thingsdb.client.Client): + ThingsDB client instance. + wait (float): + Max time (in seconds) to wait for the first `on_join` call. + If wait is set to None, the join method will not wait for + the first `on_join` call to happen. + """ + # Although ThingsDB guarantees to return the response on the join + # request before the "on_join" event is being transmitted, the asyncio + # library might still process the "on_join" data before the result is + # set on the future. Therefore we require a lock to ensure the room + # is created inside the dict *before* the on_join is called. + async with client._rooms_lock: + if self._scope is None: + self._scope = client.get_default_scope() + self._client = client + + if isinstance(self._id, str): + code = self._id + id = await client.query(code, scope=self._scope) + if not isinstance(id, int): + raise TypeError( + f'expecting ThingsDB code `{code}` to return with a ' + f'room Id (integer value), ' + f'but got type `{type(id).__name__}`') + res = await client._join(id, scope=self._scope) + if res[0] is None: + raise LookupError( + f'room with Id {id} not found; ' + f'the room Id has been returned using the ThingsDB ' + f'code `{code}` using scope `{self._scope}`') + self._id = id + else: + assert isinstance(self._id, int) + res = await client._join(self._id, scope=self._scope) + if res[0] is None: + raise LookupError(f'room with Id {self._id} not found') + + if self._id in client._rooms: + prev = client._rooms[self._id] + logging.warn( + f'Room Id {self._id} is previously registered by {prev} ' + f'and will be overwritten with {self}') + + client._rooms[self._id] = self + self.on_init() + if wait is not None: + self._wait_join = asyncio.Future() + + if wait is not None: + # wait for the first join to finish + await asyncio.wait_for(self._wait_join, wait) + + async def leave(self) -> asyncio.Future: + """Leave a room. + + Note: If the room is not found, a LookupError will be raised. + """ + if not isinstance(self._id, int): + raise TypeError( + 'room Id is not an integer; most likely `join()` has never ' + 'been called') + res = await self._client._leave(self._id, scope=self._scope) + if res[0] is None: + raise LookupError(f'room Id {self._id} is not found (anymore)') + + def emit(self, event: str, *args) -> asyncio.Future: + return self._client.emit(self._id, event, *args, scope=self._scope) + + def _on_event(self, pkg): + self.__class__._ROOM_EVENT_MAP[pkg.tp](self, pkg.data) + + @abc.abstractmethod + def on_init(self) -> None: + pass + + @abc.abstractmethod + async def on_join(self) -> None: + pass + + @abc.abstractmethod + def on_leave(self) -> None: + pass + + async def _on_first_join(self): + await self.on_join() + self._wait_join.set_result(None) + + def _on_join(self, _data): + loop = self.client.get_event_loop() + if self._wait_join: + asyncio.ensure_future(self._on_first_join(), loop=loop) + else: + asyncio.ensure_future(self.on_join(), loop=loop) + + def _on_stop(self, func): + try: + del self._client._rooms[self._id] + except KeyError: + pass + func() + + def _emit_handler(self, data): + cls = self.__class__ + event = data['event'] + try: + fun = cls._event_handlers[event] + except KeyError: + logging.debug( + f"No emit handler found for `{event}` on {cls.__name__}") + else: + fun(self, *data['args']) + + _ROOM_EVENT_MAP = { + Proto.ON_ROOM_EMIT: _emit_handler, + Proto.ON_ROOM_JOIN: _on_join, + Proto.ON_ROOM_LEAVE: lambda s, _: s._on_stop(s.on_leave), + Proto.ON_ROOM_DELETE: lambda s, _: s._on_stop(s.on_delete), + } + + @staticmethod + def event(event): + def wrapped(fun): + fun._event = event + return fun + + return wrapped diff --git a/thingsdb/util/__init__.py b/thingsdb/util/__init__.py index b3d6283..1cfa3e5 100644 --- a/thingsdb/util/__init__.py +++ b/thingsdb/util/__init__.py @@ -1,4 +1,2 @@ from .cnscope import cnscope -from .convert import convert -from .event import event -from .fmt import fmt +from .access import Access diff --git a/thingsdb/util/access.py b/thingsdb/util/access.py new file mode 100644 index 0000000..da0457d --- /dev/null +++ b/thingsdb/util/access.py @@ -0,0 +1,10 @@ +import enum + + +class Access(enum.IntEnum): + QUERY = 0x01 + CHANGE = 0x02 + GRANT = 0x04 + JOIN = 0x08 + RUN = 0x10 + FULL = 0x1f diff --git a/thingsdb/util/cnscope.py b/thingsdb/util/cnscope.py index 4c753a5..5cbdfeb 100644 --- a/thingsdb/util/cnscope.py +++ b/thingsdb/util/cnscope.py @@ -28,4 +28,4 @@ def cnscope(scope): assert cnscope('//stuff') == 'stuff' assert cnscope('/collection/stuff') == 'stuff' assert cnscope('@:stuff') == 'stuff' - print('OK') \ No newline at end of file + print('OK') diff --git a/thingsdb/util/convert.py b/thingsdb/util/convert.py deleted file mode 100644 index c3a9904..0000000 --- a/thingsdb/util/convert.py +++ /dev/null @@ -1,14 +0,0 @@ -def convert(arg): - if isinstance(arg, dict): - id = arg.get('#') - return { - k: convert(v) for k, v in arg.items() - } if id is None else {'#': id} - - if isinstance(arg, set): - return {convert(v) for v in arg} - - if isinstance(arg, (list, tuple)): - return [convert(v) for v in arg] - - return getattr(arg, '_value', arg) diff --git a/thingsdb/util/event.py b/thingsdb/util/event.py deleted file mode 100644 index d22d89f..0000000 --- a/thingsdb/util/event.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Decorator for handleing events.""" - - -def event(ev): - - def _event(fun): - fun._ev = ev - return fun - - return _event diff --git a/thingsdb/util/fmt.py b/thingsdb/util/fmt.py deleted file mode 100644 index f551319..0000000 --- a/thingsdb/util/fmt.py +++ /dev/null @@ -1,30 +0,0 @@ -def _wrap(value, blobs): - - if value is None: - return 'nil' - if isinstance(value, bool): - return 'true' if value else 'false' - if isinstance(value, bytes): - idx = len(blobs) - name = f'blob{idx}' - blobs[name] = value - return name - if isinstance(value, dict): - thing_id = value.get('#') - if thing_id is None: - thing = ','.join( - f'{k}:{_wrap(v, blobs)}' - for k, v in value.items() - ) - return f"{{{thing}}}" - return f'#{thing_id}' - if isinstance(value, (list, tuple)): - return f"[{','.join(_wrap(v, blobs) for v in value)}]" - - return repr(value) - - -def fmt(val, blobs=None): - if blobs is None: - blobs = {} - return _wrap(val, blobs) diff --git a/thingsdb/util/id.py b/thingsdb/util/id.py new file mode 100644 index 0000000..b0010c8 --- /dev/null +++ b/thingsdb/util/id.py @@ -0,0 +1,15 @@ +def id(val): + if isinstance(val, str) and val.startswith('room:'): + try: + id = int(val[5:]) + return id + except Exception: + return None + return val.get('#') + + +if __name__ == '__main__': + print(id('room:123')) + print(id('room:nil')) + print(id({'#': 123})) + print(id({})) diff --git a/thingsdb/version.py b/thingsdb/version.py index a4021b7..2e44eee 100644 --- a/thingsdb/version.py +++ b/thingsdb/version.py @@ -1 +1 @@ -__version__ = '0.6.24' +__version__ = '1.0.0rc2'