Skip to content

Commit

Permalink
Merge e975419 into 4627315
Browse files Browse the repository at this point in the history
  • Loading branch information
dotsbb committed Jul 16, 2020
2 parents 4627315 + e975419 commit 2077ad4
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 3 deletions.
6 changes: 3 additions & 3 deletions setup.py
Expand Up @@ -14,15 +14,15 @@
IS_PY3 = sys.hexversion >= 0x03000000

tests_require = [
"pytest>=2.7.3",
"pytest-cov",
"pytest>4.1.0,<=4.6.11",
"pytest-cov<2.10.0",
"coveralls",
"futures",
"pytest-benchmark",
"mock",
]
if IS_PY3:
tests_require += ["pytest-asyncio"]
tests_require += ["pytest-asyncio<0.11.0"]


setup(
Expand Down
90 changes: 90 additions & 0 deletions tests/test_thread_safety.py
@@ -1,5 +1,8 @@
import pytest

from promise import Promise
from promise.dataloader import DataLoader
from promise.utils import PY2
import threading


Expand Down Expand Up @@ -113,3 +116,90 @@ def do():

assert assert_object['is_same_thread_1']
assert assert_object['is_same_thread_2']


@pytest.mark.skipif(PY2, reason='python2 does not support setswitchinterval')
@pytest.mark.parametrize('num_threads', [1])
@pytest.mark.parametrize('count', [1000])
@pytest.mark.parametrize('resolution', ['resolve', 'reject'])
def test_with_process_loop(num_threads, count, resolution):
"""
Start a Promise in one thread, but resolve it in another.
"""
import queue
from threading import Thread, Barrier
from traceback import print_exc, format_exc
from sys import setswitchinterval

test_with_process_loop._force_stop = False
items = queue.Queue()
barrier = Barrier(num_threads)

asserts = []
timeouts = []

def event_loop():
stop_count = num_threads
while True:
item = items.get()
if item[0] == 'STOP':
stop_count -= 1
if stop_count == 0:
break
if item[0] == 'ABORT':
break
if item[0] == 'ITEM':
(_, resolve, reject, i) = item
if resolution == 'reject':
reject(ArithmeticError(i))
elif resolution == 'resolve':
resolve(i)
else:
raise ValueError('unsupported resolution: {}'.format(resolution))

def worker():
barrier.wait()
# Force fast switching of threads, this is NOT used in real world case. However without this
# I was unable to reproduce the issue.
setswitchinterval(0.000001)
for i in range(0, count):
if test_with_process_loop._force_stop:
break

def do(resolve, reject):
items.put(('ITEM', resolve, reject, i))

p = Promise(do)
try:
p.get(timeout=1)
except ArithmeticError as e:
if resolution != 'reject':
pytest.fail('unexpected failure on success resolution: {}}'.format(e))
except AssertionError as e:
print("ASSERT", e)
print_exc()
test_with_process_loop._force_stop = True
items.put(('ABORT',))
asserts.append(format_exc())
except Exception as e:
print("Timeout", e)
print_exc()
test_with_process_loop._force_stop = True
items.put(('ABORT',))
timeouts.append(format_exc())

items.put(('STOP',))

loop_thread = Thread(target=event_loop)
loop_thread.start()

worker_threads = [Thread(target=worker) for i in range(0, num_threads)]
for t in worker_threads:
t.start()

loop_thread.join()
for t in worker_threads:
t.join()

assert asserts == []
assert timeouts == []

0 comments on commit 2077ad4

Please sign in to comment.