Skip to content

Commit

Permalink
IocpProactor: prevent modification if closed (pythonGH-11494)
Browse files Browse the repository at this point in the history
* _wait_for_handle(), _register() and _unregister() methods of
  IocpProactor now raise an exception if closed
* Add "closed" to IocpProactor.__repr__()
* Simplify IocpProactor.close()
  • Loading branch information
vstinner committed Jan 10, 2019
1 parent 6aedfa6 commit 9b07681
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions Lib/asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,16 @@ def __init__(self, concurrency=0xffffffff):
self._unregistered = []
self._stopped_serving = weakref.WeakSet()

def _check_closed(self):
if self._iocp is None:
raise RuntimeError('IocpProactor is closed')

def __repr__(self):
return ('<%s overlapped#=%s result#=%s>'
% (self.__class__.__name__, len(self._cache),
len(self._results)))
info = ['overlapped#=%s' % len(self._cache),
'result#=%s' % len(self._results)]
if self._iocp is None:
info.append('closed')
return '<%s %s>' % (self.__class__.__name__, " ".join(info))

def set_loop(self, loop):
self._loop = loop
Expand Down Expand Up @@ -618,6 +624,8 @@ def _wait_cancel(self, event, done_callback):
return fut

def _wait_for_handle(self, handle, timeout, _is_cancel):
self._check_closed()

if timeout is None:
ms = _winapi.INFINITE
else:
Expand Down Expand Up @@ -660,6 +668,8 @@ def _register_with_iocp(self, obj):
# that succeed immediately.

def _register(self, ov, obj, callback):
self._check_closed()

# Return a future which will be set with the result of the
# operation when it completes. The future's value is actually
# the value returned by callback().
Expand Down Expand Up @@ -696,6 +706,7 @@ def _unregister(self, ov):
already be signalled (pending in the proactor event queue). It is also
safe if the event is never signalled (because it was cancelled).
"""
self._check_closed()
self._unregistered.append(ov)

def _get_accept_socket(self, family):
Expand Down Expand Up @@ -765,6 +776,10 @@ def _stop_serving(self, obj):
self._stopped_serving.add(obj)

def close(self):
if self._iocp is None:
# already closed
return

# Cancel remaining registered operations.
for address, (fut, ov, obj, callback) in list(self._cache.items()):
if fut.cancelled():
Expand All @@ -787,14 +802,15 @@ def close(self):
context['source_traceback'] = fut._source_traceback
self._loop.call_exception_handler(context)

# wait until all cancelled overlapped future complete
while self._cache:
if not self._poll(1):
logger.debug('taking long time to close proactor')

self._results = []
if self._iocp is not None:
_winapi.CloseHandle(self._iocp)
self._iocp = None

_winapi.CloseHandle(self._iocp)
self._iocp = None

def __del__(self):
self.close()
Expand Down

0 comments on commit 9b07681

Please sign in to comment.