Skip to content

Commit

Permalink
Add unit test;
Browse files Browse the repository at this point in the history
credits to @Novakov at #87
  • Loading branch information
Alex committed Jul 15, 2020
1 parent e726e0b commit 35b9845
Showing 1 changed file with 83 additions and 2 deletions.
85 changes: 83 additions & 2 deletions tests/test_thread_safety.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from promise import Promise
from promise.dataloader import DataLoader
import threading
from queue import Queue
from sys import setswitchinterval
from threading import Thread, Barrier
from traceback import print_exc, format_exc

import pytest

from promise import Promise
from promise.dataloader import DataLoader


def test_promise_thread_safety():
Expand Down Expand Up @@ -113,3 +119,78 @@ def do():

assert assert_object['is_same_thread_1']
assert assert_object['is_same_thread_2']


_force_stop = False


@pytest.mark.parametrize('num_threads', [3])
@pytest.mark.parametrize('count', [10000])
def test_with_process_loop(num_threads, count):
"""
Start a Promise in one thread, but resolve it in another.
"""
items = Queue()
barrier = Barrier(num_threads)

asserts = []
timeouts = []

def event_loop():
stop_count = num_threads
while True:
item = items.get()
if item[0] == 'STOP':
stop_count -= 1
if stop_count == 0:
break
if item[0] == 'ABORT':
break
if item[0] == 'ITEM':
(_, resolve, i) = item
resolve(i)

def worker():
global _force_stop
barrier.wait()
# Force fast switching of threads, this is NOT used in real world case. However without this
# I was unable to reproduce the issue.
setswitchinterval(0.000001)
for i in range(0, count):
if _force_stop:
break

def do(resolve, reject):
items.put(('ITEM', resolve, i))

p = Promise(do)
try:
p.get(timeout=1)
except AssertionError as e:
print("ASSERT", e)
print_exc()
_force_stop = True
items.put(('ABORT',))
asserts.append(format_exc())
except Exception as e:
print("Timeout", e)
print_exc()
_force_stop = True
items.put(('ABORT',))
timeouts.append(format_exc())

items.put(('STOP',))

loop_thread = Thread(target=event_loop)
loop_thread.start()

worker_threads = [Thread(target=worker) for i in range(0, num_threads)]
for t in worker_threads:
t.start()

loop_thread.join()
for t in worker_threads:
t.join()

assert asserts == []
assert timeouts == []

0 comments on commit 35b9845

Please sign in to comment.