Skip to content

Commit

Permalink
Merge 6ea1a36 into 82561af
Browse files Browse the repository at this point in the history
  • Loading branch information
bmerry committed Nov 2, 2022
2 parents 82561af + 6ea1a36 commit e772231
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 9 deletions.
76 changes: 68 additions & 8 deletions src/aiokatcp/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import abc
import asyncio
import enum
import functools
import inspect
import time
import warnings
import weakref
from abc import ABCMeta, abstractmethod
from typing import (
Any,
Expand Down Expand Up @@ -746,6 +748,50 @@ def copy(self) -> Dict[str, Sensor]:
copy.__doc__ = dict.copy.__doc__


class _weak_callback:
"""Method decorator that makes it hold only a weak reference to self.
Calling the method will be a no-op if the object has been deleted.
The return value is cached, so accessing it multiple times will yield the
same wrapper object each time. However, this is *not* thread-safe.
"""

def __init__(self, func: Callable) -> None:
self._func = func
self._name: Optional[str] = None
self.__doc__ = func.__doc__

def __set_name__(self, owner: type, name: str) -> None:
self._name = name

def __get__(self, instance: object, owner: type = None):
# __get__ is magic in Python: it makes this class a "descriptor".
# Refer to the language guide for an explanation of what that means.
# In short, when one calls `obj.method` where `method` was
# decorated with this descriptor, it's resolved with
# `__get__(obj, type(obj))`.
if instance is None:
return self
if self._name is None:
raise TypeError("name was not set for weak callback")
cache = instance.__dict__
weak_instance = weakref.ref(instance)
func = self._func

@functools.wraps(self._func)
def wrapper(*args, **kwargs):
strong_instance = weak_instance()
if strong_instance is not None:
return func(strong_instance, *args, **kwargs)

# Note: this overrides the descriptor, so that future accesses
# will use the value directly.
assert self._name not in cache
cache[self._name] = wrapper
return wrapper


class AggregateSensor(Sensor[_T], metaclass=ABCMeta):
"""A Sensor with its reading determined by several other Sensors.
Expand Down Expand Up @@ -788,18 +834,25 @@ def __init__(
self.target = target
for sensor in self.target.values():
if self.filter_aggregate(sensor):
sensor.attach(self._update_aggregate)
sensor.attach(self._update_aggregate_callback)
# We don't use weakref.finalize to detach, because that
# finalizer would live until `self` is destroyed and thus
# keep that `sensor` alive, even if `sensor` is removed from
# the sensor set and could otherwise be destroyed. Instead,
# __del__ cleans up the attachments.

self.target.add_add_callback(self._sensor_added)
weakref.finalize(self, self.target.remove_add_callback, self._sensor_added)
self.target.add_remove_callback(self._sensor_removed)
weakref.finalize(self, self.target.remove_remove_callback, self._sensor_removed)
self._update_aggregate(None, None, None)

def __del__(self):
self.target.remove_add_callback(self._sensor_added)
self.target.remove_remove_callback(self._sensor_removed)
for sensor in self.target.values():
if sensor is not self:
sensor.detach(self._update_aggregate)
# Protect against an exception early in __init__
for sensor in getattr(self, "target", []).values():
# Could use filter_aggregate, but it might not work during
# destruction, and its a no-op if there is no attachment.
sensor.detach(self._update_aggregate_callback)

@abstractmethod
def update_aggregate(
Expand Down Expand Up @@ -858,14 +911,21 @@ def _update_aggregate(
if updated_reading is not None:
self.set_value(updated_reading.value, updated_reading.status, updated_reading.timestamp)

# We create a separate name for the _weak_callback version rather than
# decorating _update_aggregate. This lets us call the original directly
# without incurring the decorator overheads.
_update_aggregate_callback = _weak_callback(_update_aggregate)

@_weak_callback
def _sensor_added(self, sensor: Sensor) -> None:
"""Add the update callback to a new sensor in the set."""
if self.filter_aggregate(sensor):
sensor.attach(self._update_aggregate)
sensor.attach(self._update_aggregate_callback)
self._update_aggregate(sensor, sensor.reading, None)

@_weak_callback
def _sensor_removed(self, sensor: Sensor) -> None:
"""Remove the update callback from a sensor no longer in the set."""
if self.filter_aggregate(sensor):
sensor.detach(self._update_aggregate)
sensor.detach(self._update_aggregate_callback)
self._update_aggregate(sensor, None, sensor.reading)
53 changes: 52 additions & 1 deletion tests/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,20 @@

import gc
import unittest
import weakref
from typing import Optional
from unittest.mock import create_autospec

import pytest

from aiokatcp.sensor import AggregateSensor, Reading, Sensor, SensorSampler, SensorSet
from aiokatcp.sensor import (
AggregateSensor,
Reading,
Sensor,
SensorSampler,
SensorSet,
_weak_callback,
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -384,3 +392,46 @@ def test_aggregate_sensor_excluded(self, agg_sensor, mocker, ss, sensors):
agg_sensor.attach.assert_not_called()
ss.remove(agg_sensor)
agg_sensor.detach.assert_not_called()

def test_aggregate_garbage_collection(self, ss, sensors):
"""Check that the aggregate can be garbage collected."""
# Don't use the agg_sensor fixture, because pytest will hold its own
# references to it.
my_agg = MyAgg(target=ss, sensor_type=int, name="garbage")
ss.add(sensors[1])
weak = weakref.ref(my_agg)
del my_agg
# Some Python implementations need multiple rounds to garbage-collect
# everything.
for _ in range(5):
gc.collect()
assert weak() is None # i.e. my_agg was garbage-collected
sensors[0].value = 12 # Check that it doesn't fail

def test_sensor_garbage_collection(self):
"""Check that sensors can be garbage-collected once removed from the aggregate."""
# Don't use the fixtures, because they have mocks that might
# record things and keep them alive.
# The noqa is to suppress
# "local variable '_my_agg' is assigned to but never used"
# (we need to give it a name just to keep it alive)
ss = SensorSet()
my_agg = MyAgg(target=ss, sensor_type=int, name="agg") # noqa: F841
sensor = Sensor(int, "rubbish")
ss.add(sensor)
ss.remove(sensor)
weak = weakref.ref(sensor)
del sensor
# Some Python implementations need multiple rounds to garbage-collect
# everything.
for _ in range(5):
gc.collect()
assert weak() is None

def test_weak_callback_failures(self, agg_sensor, monkeypatch):
"""Ensure code coverage of :class:`._weak_callback`."""
assert isinstance(MyAgg._sensor_added, _weak_callback)
wc = _weak_callback(lambda x: x)
monkeypatch.setattr(MyAgg, "bad_weak_callback", wc, raising=False)
with pytest.raises(TypeError):
agg_sensor.bad_weak_callback

0 comments on commit e772231

Please sign in to comment.