Skip to content

Commit

Permalink
Use a single APC and completion routine (nvaccess#14924)
Browse files Browse the repository at this point in the history
Related to nvaccess#14899, nvaccess#14312, nvaccess#14627
Fixes nvaccess#14895

Summary of the issue:
Despite several attempts to fix this, NVDA's IoThread can crash without a clear cause.

Description of user facing changes
Less crashes, most likely, as tests indicate that this is the case.

Description of development approach
As proposed by @jcsteh , rather than creating a new function pointer for every APC or completion routine call, use a single internal APC and completion routine and use an internal cache to store the python functions, not the actual APC functions.
  • Loading branch information
LeonarddeR committed Jun 2, 2023
1 parent 3ce0bd2 commit 4b18a9c
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 92 deletions.
8 changes: 4 additions & 4 deletions source/hwIo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from logHandler import log
import config
import time
from .ioThread import IoThread, apcsWillBeStronglyReferenced
from .ioThread import IoThread, _apcsWillBeStronglyReferenced
import NVDAState


Expand Down Expand Up @@ -95,7 +95,7 @@ def _initialRead(self):
ioThread = self._ioThreadRef()
if not ioThread:
raise RuntimeError("I/O thread is no longer available")
if apcsWillBeStronglyReferenced:
if _apcsWillBeStronglyReferenced:
ioThread.queueAsApc(self._asyncReadBackwardsCompat)
else:
ioThread.queueAsApc(self._asyncRead)
Expand Down Expand Up @@ -175,10 +175,10 @@ def _asyncRead(self, param: Optional[int] = None):
self._readBuf,
self._readSize,
byref(self._readOl),
ioThread.getCompletionRoutine(self._ioDone)
ioThread.queueAsCompletionRoutine(self._ioDone, self._readOl)
)

if apcsWillBeStronglyReferenced:
if _apcsWillBeStronglyReferenced:
def _asyncReadBackwardsCompat(self, param: Optional[int] = None):
"""Backwards compatible wrapper around L{_asyncRead} that calls it without param.
"""
Expand Down
252 changes: 166 additions & 86 deletions source/hwIo/ioThread.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# A part of NonVisual Desktop Access (NVDA)
# This file is covered by the GNU General Public License.
# See the file COPYING for more details.
# Copyright (C) 2016-2022 NV Access Limited, Joseph Lee, Babbage B.V., Davy Kager, Bram Duvigneau,
# Copyright (C) 2016-2023 NV Access Limited, Joseph Lee, Babbage B.V., Davy Kager, Bram Duvigneau,
# Leonard de Ruijter

import ctypes
Expand All @@ -10,176 +10,256 @@
import winKernel
import typing
from logHandler import log
import serial.win32
import uuid
from serial.win32 import OVERLAPPED, LPOVERLAPPED
from contextlib import contextmanager
from extensionPoints.util import AnnotatableWeakref, BoundMethodWeakref
from inspect import ismethod
from buildVersion import version_year
import NVDAState
from watchdog import getFormattedStacksForAllThreads


LPOVERLAPPED_COMPLETION_ROUTINE = ctypes.WINFUNCTYPE(
None,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
serial.win32.LPOVERLAPPED
LPOVERLAPPED
)
apcsWillBeStronglyReferenced = version_year < 2024 and NVDAState._allowDeprecatedAPI()
ApcT = typing.Callable[[int], None]
ApcIdT = int
OverlappedStructAddressT = int
CompletionRoutineT = typing.Callable[[int, int, LPOVERLAPPED], None]
ApcStoreT = typing.Dict[
ApcIdT,
typing.Tuple[
typing.Union[ApcT, BoundMethodWeakref[ApcT], AnnotatableWeakref[ApcT]], ApcIdT
]
]
CompletionRoutineStoreTypeT = typing.Dict[
OverlappedStructAddressT,
typing.Tuple[
typing.Union[BoundMethodWeakref[CompletionRoutineT], AnnotatableWeakref[CompletionRoutineT]],
OVERLAPPED
]
]
_apcsWillBeStronglyReferenced = version_year < 2024 and NVDAState._allowDeprecatedAPI()
"""
Starting from NVDA 2024.1, we will weakly reference functions wrapped in an APC.
Starting from NVDA 2024.1, we will weakly reference functions that are executed as an APC.
This will ensure that objects from which APCs have been queuedwon't be scattering around
when the APC is never executed.
Wrapped methods are now strongly referenced due to an oversight in NVDA 2023.1.
"""


def _generateApcParams() -> typing.Generator[ApcIdT, None, None]:
"""Generator of APC params for internal use.
Params generated using this generator are passed to our internal APC to lookup Python functions.
A parameter passed to an APC is of type ULONG_PTR, which has a size of 4 bytes.
Therefore, we use a counter which starts at 0, counts up to 0xffffffff,
wraps back to 0 and continues cycling.
"""
while True:
for param in range(0x100000000):
yield param


class IoThread(threading.Thread):
"""A thread used for background writes and raw I/O, e.g. for braille displays.
"""

exit: bool = False
_apcReferences: typing.Dict[uuid.UUID, winKernel.PAPCFUNC]
_apcParamCounter = _generateApcParams()
#: Store of Python functions to be called as APC.
#: This allows us to have a single APC function in the class rather than on
#: each instance, which prevents reference cycles.
_apcStore: ApcStoreT = {}
#: Store of Python functions to be called as Overlapped Completion Routine.
#: This allows us to have a single completion routine in the class rather than on
#: each instance, which prevents reference cycles.
#: Note that we use the address of the OVERLAPPED structure as key in this store,
#: eventhough the structure is also stored in the value.
#: The OVERLAPPED structure can't be used as key because ctypes does not have OOR (original object return),
#: it constructs a new, equivalent object each time you retrieve the contents of a LPOVERLAPPED.
_completionRoutineStore: CompletionRoutineStoreTypeT = {}

def __init__(self):
super().__init__(
name=f"{self.__class__.__module__}.{self.__class__.__qualname__}",
daemon=True
)
self._apcReferences = dict()

@winKernel.PAPCFUNC
def _internalApc(param: ApcIdT):
threadinst = threading.current_thread()
if not isinstance(threadinst, IoThread):
log.error("Internal APC called from unknown thread")
return

(reference, actualParam) = IoThread._apcStore.pop(param, (None, 0))
if reference is None:
log.error(f"Internal APC called with param {param}, but no such apcId in store")
return
if isinstance(reference, (BoundMethodWeakref, AnnotatableWeakref)):
function = reference()
if not function:
log.debugWarning(
f"Not executing queued APC {param}:{reference.funcName} with param {actualParam} because reference died"
)
return
else:
function = reference

try:
function(actualParam)
except Exception:
log.error(f"Error in APC function {function!r} with apcId {param} queued to IoThread", exc_info=True)

@LPOVERLAPPED_COMPLETION_ROUTINE
def _internalCompletionRoutine(
error: int,
numberOfBytes: int,
overlapped: LPOVERLAPPED
):
threadinst = threading.current_thread()
if not isinstance(threadinst, IoThread):
log.error("Internal APC called from unknown thread")
return

ptr = ctypes.cast(overlapped, ctypes.c_void_p).value
(reference, cachedOverlapped) = IoThread._completionRoutineStore.pop(ptr, (None, None))
if reference is None:
log.error(f"Internal completion routine called with pointer 0x{ptr:x}, but no such address in store")
return

function = reference()
if not function:
log.debugWarning(
f"Not executing queued completion routine 0x{ptr:x}:{reference.funcName} because reference died"
)
return

try:
function(error, numberOfBytes, overlapped)
except Exception:
log.error(f"Error in overlapped completion routine {function!r}", exc_info=True)

def start(self):
super().start()
self.handle = ctypes.windll.kernel32.OpenThread(winKernel.THREAD_SET_CONTEXT, False, self.ident)

@contextmanager
def autoDeleteApcReference(self, apcUuid: uuid.UUID):
try:
if _apcsWillBeStronglyReferenced:
@contextmanager
def autoDeleteApcReference(self, apcUuid):
log.warning(
"IoThread.autoDeleteApcReference is deprecated. "
"It was never meant to be part of the public API. "
"This method will be removed in NVDA 2024.1. "
"Up until that version, it behaves as a no-op, i.e. a context manager yielding nothing."
)
yield
finally:
del self._apcReferences[apcUuid]

def _getApc(
def _registerToCallAsApc(
self,
func: typing.Callable[[int], None],
func: ApcT,
param: int = 0,
_alwaysReferenceWeakly: bool = True
) -> winKernel.PAPCFUNC:
"""Internal method to safely wrap a python function in an Asynchronous Procedure Call (APC).
The generated APC is saved in a cache on the IoThread instance
and automatically cleaned when the call is complete.
Note that starting from NVDA 2024.1, the wrapped python function will be weakly referenced,
therefore the caller should keep a reference to the python function (not the APC itself).
@param func: The function to be wrapped in an APC.
) -> ApcIdT:
"""Internal method to store a python function to be called in an Asynchronous Procedure Call (APC).
The function and param are saved in a store on the IoThread instance.
When our internal APC executes the function, the entry be popped from the store.
This method does not queue the APC itself.
Note that starting from NVDA 2024.1, the saved python function will be weakly referenced,
therefore the caller should keep a reference to the python function.
@param func: The function to be called in an APC.
@param param: The parameter passed to the APC when called.
@returns: The wrapped APC.
@returns: The internal param to pass to the internal APC.
"""
if not self.is_alive():
raise RuntimeError("Thread is not running")

# generate an UUID that will be used to cleanup the APC when it is finished
apcUuid = uuid.uuid4()
useWeak = _alwaysReferenceWeakly or not apcsWillBeStronglyReferenced
# generate a number to identify the function in the store.
internalParam = next(self._apcParamCounter)
useWeak = _alwaysReferenceWeakly or not _apcsWillBeStronglyReferenced
reference = None
if useWeak:
# Generate a weak reference to the function
reference = BoundMethodWeakref(func) if ismethod(func) else AnnotatableWeakref(func)
reference.funcName = repr(func)

@winKernel.PAPCFUNC
def apc(param: int):
with self.autoDeleteApcReference(apcUuid):
if self.exit:
return
if useWeak:
function = reference()
if not function:
log.debugWarning(f"Not executing queued APC {reference.funcName} because reference died")
return
else:
function = func

try:
function(param)
except Exception:
log.error(f"Error in APC function {function!r} queued to IoThread", exc_info=True)

self._apcReferences[apcUuid] = apc
return apc
self._apcStore[internalParam] = (func or reference, param)
return internalParam

def queueAsApc(
self,
func: typing.Callable[[int], None],
func: ApcT,
param: int = 0
):
"""safely queues an Asynchronous Procedure Call (APC) created from a python function.
The generated APC is saved in a cache on the IoThread instance
and automatically cleaned when the call is complete.
Note that starting from NVDA 2024.1, the wrapped python function will be weakly referenced,
"""safely queues a Python function call as an Asynchronous Procedure Call (APC).
The function and param are saved in a store on the IoThread instance.
When our internal APC executes the function, the entry will be popped from the store.
Note that starting from NVDA 2024.1, the queued python function will be weakly referenced,
therefore the caller should keep a reference to the python function.
@param func: The function to be wrapped in an APC.
@param func: The function to be called in an APC.
@param param: The parameter passed to the APC when called.
"""
apc = self._getApc(func, param, _alwaysReferenceWeakly=False)
ctypes.windll.kernel32.QueueUserAPC(apc, self.handle, param)
internalParam = self._registerToCallAsApc(func, param, _alwaysReferenceWeakly=False)
ctypes.windll.kernel32.QueueUserAPC(self._internalApc, self.handle, internalParam)

def setWaitableTimer(
self,
handle: typing.Union[int, ctypes.wintypes.HANDLE],
dueTime: int,
func: typing.Callable[[int], None],
func: ApcT,
param: int = 0
):
""""Safe wrapper around winKernel.setWaitableTimer to ensure that the queued APC
is available when called.
The generated APC is saved in a cache on the IoThread instance
and automatically cleaned when the call is complete.
The wrapped python function is weakly referenced, therefore the caller should
""""Safe wrapper around winKernel.setWaitableTimer that uses an internal APC.
A weak reference to the function and its param are saved in a store on the IoThread instance.
When our internal APC executes the function, the entry will be popped from the store.
Note that as the python function is weakly referenced, the caller should
keep a reference to the python function.
@param handle: A handle to the timer object.
@param dueTime: Relative time (in miliseconds).
@param func: The function to be executed when the timer elapses.
@param param: The parameter passed to the APC when called.
"""
apc = self._getApc(func, param)
winKernel.setWaitableTimer(handle, dueTime, completionRoutine=apc, arg=param)
internalParam = self._registerToCallAsApc(func, param)
winKernel.setWaitableTimer(
handle,
dueTime,
completionRoutine=self._internalApc,
arg=internalParam
)

def getCompletionRoutine(
def queueAsCompletionRoutine(
self,
func: typing.Callable[[int, int, serial.win32.LPOVERLAPPED], None],
func: CompletionRoutineT,
overlapped: OVERLAPPED,
):
"""Safely wraps a python function in an overlapped completion routine.
The generated routine is saved in a cache on the IoThread instance
and automatically cleaned when the call is complete.
"""safely queues a Python function call as an overlapped completion routine.
A weak reference to the Python function is saved in a store on the IoThread instance
When our internal completion routine executes the function, it will be popped from the store.
The wrapped python function is weakly referenced, therefore the caller should
keep a reference to the python function (not the completion routine itself).
@param func: The function to be wrapped in a completion routine.
@returns: The wrapped completion routine.
@param overlapped: The overlapped structure
@returns: The completion routine.
"""
if not self.is_alive():
raise RuntimeError("Thread is not running")

# generate an UUID that will be used to cleanup the func when it is finished
ocrUuid = uuid.uuid4()
addr = ctypes.addressof(overlapped)
if addr in self._completionRoutineStore:
raise RuntimeError(
f"Overlapped structure with address 0x{addr:x} has a completion routine queued already. "
"Only one completion routine for one overlapped structure can be queued at a time."
)

# Generate a weak reference to the function
reference = BoundMethodWeakref(func) if ismethod(func) else AnnotatableWeakref(func)
reference.funcName = repr(func)

@LPOVERLAPPED_COMPLETION_ROUTINE
def overlappedCompletionRoutine(error: int, numberOfBytes: int, overlapped: serial.win32.LPOVERLAPPED):
with self.autoDeleteApcReference(ocrUuid):
if self.exit:
return
function = reference()
if not function:
log.debugWarning(f"Not executing completion routine {reference.funcName} because reference died")
return
try:
function(error, numberOfBytes, overlapped)
except Exception:
log.error(f"Error in overlapped completion routine {func!r}", exc_info=True)

self._apcReferences[ocrUuid] = overlappedCompletionRoutine
return overlappedCompletionRoutine
self._completionRoutineStore[addr] = (reference, overlapped)
return self._internalCompletionRoutine

def stop(self, timeout: typing.Optional[float] = None):
if not self.is_alive():
Expand Down

0 comments on commit 4b18a9c

Please sign in to comment.