Skip to content

Commit

Permalink
Merge dcce24e into 5347967
Browse files Browse the repository at this point in the history
  • Loading branch information
james-smith-za committed Nov 2, 2022
2 parents 5347967 + dcce24e commit 8820c74
Show file tree
Hide file tree
Showing 5 changed files with 511 additions and 13 deletions.
54 changes: 54 additions & 0 deletions doc/server/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,60 @@ assignments that do not change the value will still be reported). This default
can be changed by passing `auto_strategy` and `auto_strategy_parameters` when
constructing the sensor.

Aggregate sensors
^^^^^^^^^^^^^^^^^
To provide a sensor which has its reading derived from that of a set of other
sensors, such as a total, average or general "device status" sensor, subclass
:class:`.AggregateSensor`, and implement
:meth:`~.AggregateSensor.update_aggregate` to compute the sensor reading from
changes in sensor values.

In order to avoid circular references, a
:meth:`~.AggregateSensor.filter_aggregate` method is provided which excludes the
aggregate sensor itself from operations that happen on the target sensor set.
However, the user may wish to include more complex logic than this, such as to
include only integer datatypes or to exclude other aggregate sensors. In this
case, the method can be overridden.

For example:

.. code:: python
class Total(AggregateSensor):
def __init__(self, target):
super().__init__(target=target, sensor_type=int, name="total")
def update_aggregate(self, updated_sensor, reading, old_reading):
if update_sensor is None:
# Instantiation, calculate total for sensors already in target.
...
return Reading(...)
if reading is None:
# The sensor is being removed from the set.
...
return Reading(...)
if old_reading is None:
# The sensor is being added to the set.
...
return Reading(...)
# Otherwise, it's just a change.
...
return Reading(...)
def filter_aggregate(self, sensor):
...
return True
In the :meth:`!__init__` method of the :class:`.DeviceServer` subclass being
created, you'd include a few lines like this:

.. code:: python
self.total_sensor = Total(self.sensors)
self.sensors.add(self.total_sensor)
Cancellation
------------
It is important that request handlers operate gracefully if cancelled (refer
Expand Down
49 changes: 48 additions & 1 deletion examples/example_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import enum
import logging
import signal
import time
from typing import Tuple

import aiokatcp
Expand All @@ -41,6 +42,33 @@ class Foo(enum.Enum):
GHI_K = 2


class Total(aiokatcp.AggregateSensor):
def __init__(self, target):
super().__init__(target=target, sensor_type=int, name="total")

def update_aggregate(self, updated_sensor, reading, old_reading):
if updated_sensor is None:
# Instantiation, calculate total for sensors already in target.
total = sum(
sensor.value for sensor in self.target.values() if self.filter_aggregate(sensor)
)
return aiokatcp.Reading(time.time(), aiokatcp.Sensor.Status.NOMINAL, total)
new_value = self.value
if old_reading is not None: # Will be None if this is a new sensor being added
new_value -= old_reading.value # Remove the previous value from the sum
if reading is not None: # Will be None if this is a sensor being removed
new_value += reading.value # Add the new value to the sum
return aiokatcp.Reading(
updated_sensor.timestamp,
aiokatcp.Sensor.Status.NOMINAL,
new_value,
)

def filter_aggregate(self, sensor):
"""Return true for int sensors which aren't self."""
return sensor.stype is int and sensor is not self


class Server(aiokatcp.DeviceServer):
VERSION = "testapi-1.0"
BUILD_STATE = "testapi-1.0.1"
Expand All @@ -59,6 +87,10 @@ def __init__(self, *args, **kwargs):
self.sensors.add(sensor)
self.add_service_task(asyncio.create_task(self._service_task()))

total_sensor = Total(self.sensors)
self.sensors.add(total_sensor)
self.add_service_task(asyncio.create_task(self._alter_sensors()))

async def request_echo(self, ctx, *args: str) -> Tuple:
"""Return the arguments to the caller"""
return tuple(args)
Expand All @@ -80,11 +112,26 @@ async def request_counter(self, ctx) -> None:
self.sensors["counter-queries"].value += 1

async def _service_task(self) -> None:
"""Example service task that just broadcasts to clients."""
"""Example service task that broadcasts to clients."""
while True:
await asyncio.sleep(10)
self.mass_inform("hello", "Hi I am a service task")

async def _alter_sensors(self) -> None:
"""Example service task that adds and removes a fixed sensor.
This demonstrate's the aggregate sensor's ability to add and remove
values from its total.
"""
await asyncio.sleep(10)
self.mass_inform("add", "I'm going to add a fixed sensor")
sensor = aiokatcp.Sensor(int, "fixed-value", default=7)
self.sensors.add(sensor)

await asyncio.sleep(10)
self.mass_inform("remove", "I'm going to remove the fixed sensor")
self.sensors.remove(sensor)


async def main():
server = Server("localhost", 4444)
Expand Down
2 changes: 1 addition & 1 deletion src/aiokatcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
get_type,
register_type,
)
from .sensor import Reading, Sensor, SensorSampler, SensorSet # noqa: F401
from .sensor import AggregateSensor, Reading, Sensor, SensorSampler, SensorSet # noqa: F401
from .server import DeviceServer, RequestContext # noqa: F401


Expand Down

0 comments on commit 8820c74

Please sign in to comment.