From 35b9845a9cea6e2af1907062e4f3d1e8394a9bf7 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 16 Jul 2020 00:54:52 +0200 Subject: [PATCH] Add unit test; credits to @Novakov at https://github.com/syrusakbary/promise/issues/87 --- tests/test_thread_safety.py | 85 ++++++++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 2 deletions(-) diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py index ed55a84..259b345 100644 --- a/tests/test_thread_safety.py +++ b/tests/test_thread_safety.py @@ -1,7 +1,13 @@ -from promise import Promise -from promise.dataloader import DataLoader import threading +from queue import Queue +from sys import setswitchinterval +from threading import Thread, Barrier +from traceback import print_exc, format_exc + +import pytest +from promise import Promise +from promise.dataloader import DataLoader def test_promise_thread_safety(): @@ -113,3 +119,78 @@ def do(): assert assert_object['is_same_thread_1'] assert assert_object['is_same_thread_2'] + + +_force_stop = False + + +@pytest.mark.parametrize('num_threads', [3]) +@pytest.mark.parametrize('count', [10000]) +def test_with_process_loop(num_threads, count): + """ + Start a Promise in one thread, but resolve it in another. + """ + items = Queue() + barrier = Barrier(num_threads) + + asserts = [] + timeouts = [] + + def event_loop(): + stop_count = num_threads + while True: + item = items.get() + if item[0] == 'STOP': + stop_count -= 1 + if stop_count == 0: + break + if item[0] == 'ABORT': + break + if item[0] == 'ITEM': + (_, resolve, i) = item + resolve(i) + + def worker(): + global _force_stop + barrier.wait() + # Force fast switching of threads, this is NOT used in real world case. However without this + # I was unable to reproduce the issue. + setswitchinterval(0.000001) + for i in range(0, count): + if _force_stop: + break + + def do(resolve, reject): + items.put(('ITEM', resolve, i)) + + p = Promise(do) + try: + p.get(timeout=1) + except AssertionError as e: + print("ASSERT", e) + print_exc() + _force_stop = True + items.put(('ABORT',)) + asserts.append(format_exc()) + except Exception as e: + print("Timeout", e) + print_exc() + _force_stop = True + items.put(('ABORT',)) + timeouts.append(format_exc()) + + items.put(('STOP',)) + + loop_thread = Thread(target=event_loop) + loop_thread.start() + + worker_threads = [Thread(target=worker) for i in range(0, num_threads)] + for t in worker_threads: + t.start() + + loop_thread.join() + for t in worker_threads: + t.join() + + assert asserts == [] + assert timeouts == []