@@ -60,6 +60,12 @@ def _server_irq(event, data):
6060def _server_shutdown ():
6161 global _registered_characteristics
6262 _registered_characteristics = {}
63+ if hasattr (BaseCharacteristic , "_capture_task" ):
64+ BaseCharacteristic ._capture_task .cancel ()
65+ del BaseCharacteristic ._capture_queue
66+ del BaseCharacteristic ._capture_write_event
67+ del BaseCharacteristic ._capture_consumed_event
68+ del BaseCharacteristic ._capture_task
6369
6470
6571register_irq_handler (_server_irq , _server_shutdown )
@@ -97,6 +103,42 @@ def write(self, data, send_update=False):
97103 else :
98104 ble .gatts_write (self ._value_handle , data , send_update )
99105
106+ # When the a capture-enabled characteristic is created, create the
107+ # necessary events (if not already created).
108+ @staticmethod
109+ def _init_capture ():
110+ if hasattr (BaseCharacteristic , "_capture_queue" ):
111+ return
112+
113+ BaseCharacteristic ._capture_queue = deque ((), _WRITE_CAPTURE_QUEUE_LIMIT )
114+ BaseCharacteristic ._capture_write_event = asyncio .ThreadSafeFlag ()
115+ BaseCharacteristic ._capture_consumed_event = asyncio .ThreadSafeFlag ()
116+ BaseCharacteristic ._capture_task = asyncio .create_task (
117+ BaseCharacteristic ._run_capture_task ()
118+ )
119+
120+ # Monitor the shared queue for incoming characteristic writes and forward
121+ # them sequentially to the individual characteristic events.
122+ @staticmethod
123+ async def _run_capture_task ():
124+ write = BaseCharacteristic ._capture_write_event
125+ consumed = BaseCharacteristic ._capture_consumed_event
126+ q = BaseCharacteristic ._capture_queue
127+
128+ while True :
129+ if len (q ):
130+ conn , data , characteristic = q .popleft ()
131+ # Let the characteristic waiting in `written()` know that it
132+ # can proceed.
133+ characteristic ._write_data = (conn , data )
134+ characteristic ._write_event .set ()
135+ # Wait for the characteristic to complete `written()` before
136+ # continuing.
137+ await consumed .wait ()
138+
139+ if not len (q ):
140+ await write .wait ()
141+
100142 # Wait for a write on this characteristic. Returns the connection that did
101143 # the write, or a tuple of (connection, value) if capture is enabled for
102144 # this characteristics.
@@ -105,17 +147,27 @@ async def written(self, timeout_ms=None):
105147 # Not a writable characteristic.
106148 return
107149
108- # If the queue is empty, then we need to wait. However, if the queue
109- # has a single item, we also need to do a no-op wait in order to
110- # clear the event flag (because the queue will become empty and
111- # therefore the event should be cleared).
112- if len (self ._write_queue ) <= 1 :
113- with DeviceTimeout (None , timeout_ms ):
114- await self ._write_event .wait ()
150+ # If no write has been seen then we need to wait. If the event has
151+ # already been set this will clear the event and continue
152+ # immediately. In regular mode, this is set by the write IRQ
153+ # directly (in _remote_write). In capture mode, this is set when it's
154+ # our turn by _capture_task.
155+ with DeviceTimeout (None , timeout_ms ):
156+ await self ._write_event .wait ()
157+
158+ # Return the write data and clear the stored copy.
159+ # In default usage this will be just the connection handle.
160+ # In capture mode this will be a tuple of (connection_handle, received_data)
161+ data = self ._write_data
162+ self ._write_data = None
115163
116- # Either we started > 1 item, or the wait completed successfully, return
117- # the front of the queue.
118- return self ._write_queue .popleft ()
164+ if self .flags & _FLAG_WRITE_CAPTURE :
165+ # Notify the shared queue monitor that the event has been consumed
166+ # by the caller to `written()` and another characteristic can now
167+ # proceed.
168+ BaseCharacteristic ._capture_consumed_event .set ()
169+
170+ return data
119171
120172 def on_read (self , connection ):
121173 return 0
@@ -124,27 +176,20 @@ def _remote_write(conn_handle, value_handle):
124176 if characteristic := _registered_characteristics .get (value_handle , None ):
125177 # If we've gone from empty to one item, then wake something
126178 # blocking on `await char.written()`.
127- wake = len (characteristic ._write_queue ) == 0
128179
129180 conn = DeviceConnection ._connected .get (conn_handle , None )
130- q = characteristic ._write_queue
131181
132182 if characteristic .flags & _FLAG_WRITE_CAPTURE :
133- # For capture, we append both the connection and the written
134- # value to the queue. The deque will enforce the max queue len.
183+ # For capture, we append the connection and the written value
184+ # value to the shared queue along with the matching characteristic object.
185+ # The deque will enforce the max queue len.
135186 data = characteristic .read ()
136- q .append ((conn , data ))
187+ BaseCharacteristic ._capture_queue .append ((conn , data , characteristic ))
188+ BaseCharacteristic ._capture_write_event .set ()
137189 else :
138- # Use the queue as a single slot -- it has max length of 1,
139- # so if there's an existing item it will be replaced.
140- q .append (conn )
141-
142- if wake :
143- # Queue is now non-empty. If something is waiting, it will be
144- # worken. If something isn't waiting right now, then a future
145- # caller to `await char.written()` will see the queue is
146- # non-empty, and wait on the event if it's going to empty the
147- # queue.
190+ # Store the write connection handle to be later used to retrieve the data
191+ # then set event to handle in written() task.
192+ characteristic ._write_data = conn
148193 characteristic ._write_event .set ()
149194
150195 def _remote_read (conn_handle , value_handle ):
@@ -178,10 +223,15 @@ def __init__(
178223 if capture :
179224 # Capture means that we keep track of all writes, and capture
180225 # their values (and connection) in a queue. Otherwise we just
181- # track the most recent connection .
226+ # track the connection of the most recent write .
182227 flags |= _FLAG_WRITE_CAPTURE
228+ BaseCharacteristic ._init_capture ()
229+
230+ # Set when this characteristic has a value waiting in self._write_data.
183231 self ._write_event = asyncio .ThreadSafeFlag ()
184- self ._write_queue = deque ((), _WRITE_CAPTURE_QUEUE_LIMIT if capture else 1 )
232+ # The connection of the most recent write, or a tuple of
233+ # (connection, data) if capture is enabled.
234+ self ._write_data = None
185235 if notify :
186236 flags |= _FLAG_NOTIFY
187237 if indicate :
@@ -263,7 +313,7 @@ def __init__(self, characteristic, uuid, read=False, write=False, initial=None):
263313 flags |= _FLAG_DESC_READ
264314 if write :
265315 self ._write_event = asyncio .ThreadSafeFlag ()
266- self ._write_queue = deque ((), 1 )
316+ self ._write_data = None
267317 flags |= _FLAG_DESC_WRITE
268318
269319 self .uuid = uuid
0 commit comments