diff --git a/.gitignore b/.gitignore index b50ed65c46..e03ca405d8 100644 --- a/.gitignore +++ b/.gitignore @@ -93,3 +93,6 @@ ENV/ # Rope project settings .ropeproject + +# PyCharm +.idea diff --git a/pynetdicom/dul.py b/pynetdicom/dul.py index cf789ec94e..45848739cf 100644 --- a/pynetdicom/dul.py +++ b/pynetdicom/dul.py @@ -386,14 +386,22 @@ def run_reactor(self) -> None: # Main DUL loop self._idle_timer.start() self.socket = cast("AssociationSocket", self.socket) + sleep = False while True: # Let the assoc reactor off the leash if not self.assoc._dul_ready.is_set(): self.assoc._dul_ready.set() + # When single-stepping the reactor, sleep between events so that + # test code has time to run. + sleep = True - # This effectively controls how quickly the DUL does anything - time.sleep(self._run_loop_delay) + if sleep: + # If there were no events to process on the previous loop, + # sleep before checking again, otherwise check immediately + # Setting `_run_loop_delay` higher will use less CPU when idle, but + # will also increase the latency to respond to new requests + time.sleep(self._run_loop_delay) if self._kill_thread: break @@ -434,9 +442,11 @@ def run_reactor(self) -> None: event = self.event_queue.get(block=False) # If the queue is empty, return to the start of the loop except queue.Empty: + sleep = True continue self.state_machine.do_action(event) + sleep = False def send_pdu(self, primitive: _PDUPrimitiveType) -> None: """Place a primitive in the provider queue to be sent to the peer. @@ -478,7 +488,7 @@ def stop_dul(self) -> bool: # Fix for Issue 39 # Give the DUL thread time to exit while self.is_alive(): - time.sleep(0.001) + time.sleep(self._run_loop_delay) return True diff --git a/pynetdicom/events.py b/pynetdicom/events.py index c1cd5f47f8..0bf7eb3f34 100644 --- a/pynetdicom/events.py +++ b/pynetdicom/events.py @@ -108,6 +108,10 @@ class NotificationEvent(NamedTuple): is_intervention: bool = False is_notification: bool = True + def __str__(self) -> str: + """String representation of the class.""" + return self.name + # pylint: disable=line-too-long EVT_ABORTED = NotificationEvent("EVT_ABORTED", "Association aborted") @@ -172,6 +176,10 @@ class InterventionEvent(NamedTuple): is_intervention: bool = True is_notification: bool = False + def __str__(self) -> str: + """String representation of the class.""" + return self.name + EVT_ASYNC_OPS = InterventionEvent( "EVT_ASYNC_OPS", "Asynchronous operations negotiation requested" diff --git a/pynetdicom/tests/benchmark_script.py b/pynetdicom/tests/benchmark_script.py index d8a2a6e88f..b366436c0a 100755 --- a/pynetdicom/tests/benchmark_script.py +++ b/pynetdicom/tests/benchmark_script.py @@ -15,6 +15,7 @@ from datetime import datetime import os +import re import subprocess import tempfile import time @@ -30,8 +31,6 @@ TEST_DS_DIR = os.path.join(os.path.dirname(__file__), "dicom_files") -# DATASET = dcmread(os.path.join(TEST_DS_DIR, 'RTImageStorage.dcm')) # 2.1 MB -DATASET = dcmread(os.path.join(TEST_DS_DIR, "CTImageStorage.dcm")) # 39 kB def init_yappi(): @@ -104,24 +103,28 @@ def start_storescp(): return subprocess.Popen(args) -def start_storescu(ds_per_assoc): +def start_storescu(test_ds, ds_per_assoc): """Run DCMTK's storescu in a background process. Parameters ---------- + test_ds : pydicom.dataset.Dataset + The test dataset to use ds_per_assoc : int The number of datasets to send using `storescu`. """ - fpath = os.path.join(TEST_DS_DIR, "CTImageStorage.dcm") + fpath = test_ds.filename args = [which("storescu"), "localhost", "11112"] + [fpath] * ds_per_assoc return subprocess.Popen(args) -def receive_store(nr_assoc, ds_per_assoc, write_ds=0, use_yappi=False): +def receive_store(test_ds, nr_assoc, ds_per_assoc, write_ds=0, use_yappi=False): """Run a Storage SCP and transfer datasets with sequential storescu's. Parameters ---------- + test_ds : pydicom.dataset.Dataset + The test dataset to use nr_assoc : int The total number of (sequential) associations that will be made. ds_per_assoc : int @@ -158,7 +161,7 @@ def handle(event): ae.network_timeout = 5 if write_ds == 3: ae.maximum_pdu_size = 0 - ae.add_supported_context(DATASET.SOPClassUID, ImplicitVRLittleEndian) + ae.add_supported_context(test_ds.SOPClassUID, ImplicitVRLittleEndian) server = ae.start_server( ("", 11112), block=False, evt_handlers=[(evt.EVT_C_STORE, handle)] @@ -169,7 +172,7 @@ def handle(event): is_successful = True for ii in range(nr_assoc): - p = start_storescu(ds_per_assoc) + p = start_storescu(test_ds, ds_per_assoc) # Block until transfer is complete p.wait() if p.returncode != 0: @@ -177,11 +180,12 @@ def handle(event): break if is_successful: + write_msg = ["", " (write)", " (write fast)", " (write fastest)"][write_ds] print( - "C-STORE SCP transferred {} total datasets over {} " - "association(s) in {:.2f} s".format( - nr_assoc * ds_per_assoc, nr_assoc, time.time() - start_time - ) + f"C-STORE SCP transferred {nr_assoc * ds_per_assoc} total " + f"{os.path.basename(test_ds.filename)} datasets over " + f"{nr_assoc} association{'' if nr_assoc == 1 else 's'}{write_msg} " + f"in {time.time() - start_time:.2f} s" ) else: print("C-STORE SCP benchmark failed") @@ -189,11 +193,15 @@ def handle(event): server.shutdown() -def receive_store_internal(nr_assoc, ds_per_assoc, write_ds=0, use_yappi=False): +def receive_store_internal( + test_ds, nr_assoc, ds_per_assoc, write_ds=0, use_yappi=False +): """Run a Storage SCP and transfer datasets with pynetdicom alone. Parameters ---------- + test_ds : pydicom.dataset.Dataset + The test dataset to use nr_assoc : int The total number of (sequential) associations that will be made. ds_per_assoc : int @@ -230,8 +238,8 @@ def handle(event): ae.network_timeout = 5 if write_ds == 3: ae.maximum_pdu_size = 0 - ae.add_supported_context(DATASET.SOPClassUID, ImplicitVRLittleEndian) - ae.add_requested_context(DATASET.SOPClassUID, ImplicitVRLittleEndian) + ae.add_supported_context(test_ds.SOPClassUID, ImplicitVRLittleEndian) + ae.add_requested_context(test_ds.SOPClassUID, ImplicitVRLittleEndian) server = ae.start_server( ("", 11112), block=False, evt_handlers=[(evt.EVT_C_STORE, handle)] @@ -245,16 +253,17 @@ def handle(event): assoc = ae.associate("127.0.0.1", 11112) if assoc.is_established: for jj in range(ds_per_assoc): - assoc.send_c_store(DATASET) + assoc.send_c_store(test_ds) assoc.release() if is_successful: + write_msg = ["", " (write)", " (write fast)", " (write fastest)"][write_ds] print( - "C-STORE SCU/SCP transferred {} total datasets over {} " - "association(s) in {:.2f} s".format( - nr_assoc * ds_per_assoc, nr_assoc, time.time() - start_time - ) + f"C-STORE SCU/SCP transferred {nr_assoc * ds_per_assoc} total " + f"{os.path.basename(test_ds.filename)} datasets over " + f"{nr_assoc} association{'' if nr_assoc == 1 else 's'}{write_msg} " + f"in {time.time() - start_time:.2f} s" ) else: print("C-STORE SCU/SCP benchmark failed") @@ -262,11 +271,13 @@ def handle(event): server.shutdown() -def receive_store_dcmtk(nr_assoc, ds_per_assoc, use_yappi=False): +def receive_store_dcmtk(test_ds, nr_assoc, ds_per_assoc, use_yappi=False): """Run a Storage SCP and transfer datasets with sequential storescu's. Parameters ---------- + test_ds : pydicom.dataset.Dataset + The test dataset to use nr_assoc : int The total number of (sequential) associations that will be made. ds_per_assoc : int @@ -285,7 +296,7 @@ def receive_store_dcmtk(nr_assoc, ds_per_assoc, use_yappi=False): is_successful = True for ii in range(nr_assoc): - p = start_storescu(ds_per_assoc) + p = start_storescu(test_ds, ds_per_assoc) # Block until transfer is complete p.wait() if p.returncode != 0: @@ -294,22 +305,25 @@ def receive_store_dcmtk(nr_assoc, ds_per_assoc, use_yappi=False): if is_successful: print( - "C-STORE SCP transferred {} total datasets over {} " - "association(s) in {:.2f} s".format( - nr_assoc * ds_per_assoc, nr_assoc, time.time() - start_time - ) + f"C-STORE DCMTK SCU/SCP transferred {nr_assoc * ds_per_assoc} total " + f"{os.path.basename(test_ds.filename)} datasets over " + f"{nr_assoc} association{'' if nr_assoc == 1 else 's'} " + f"in {time.time() - start_time:.2f} s" ) else: - print("C-STORE SCP benchmark failed") + print("C-STORE DCMTK SCU/SCP benchmark failed") server.terminate() + time.sleep(0.5) -def receive_store_simultaneous(nr_assoc, ds_per_assoc, use_yappi=False): +def receive_store_simultaneous(test_ds, nr_assoc, ds_per_assoc, use_yappi=False): """Run a Storage SCP and transfer datasets with simultaneous storescu's. Parameters ---------- + test_ds : pydicom.dataset.Dataset + The test dataset to use nr_assoc : int The number of simultaneous associations that will be made. ds_per_assoc : int @@ -328,7 +342,7 @@ def handle(event): ae.dimse_timeout = 5 ae.network_timeout = 5 ae.maximum_associations = 15 - ae.add_supported_context(DATASET.SOPClassUID, ImplicitVRLittleEndian) + ae.add_supported_context(test_ds.SOPClassUID, ImplicitVRLittleEndian) server = ae.start_server( ("", 11112), block=False, evt_handlers=[(evt.EVT_C_STORE, handle)] @@ -340,7 +354,7 @@ def handle(event): processes = [] for ii in range(nr_assoc): - processes.append(start_storescu(ds_per_assoc)) + processes.append(start_storescu(test_ds, ds_per_assoc)) while None in [pp.poll() for pp in processes]: pass @@ -351,10 +365,10 @@ def handle(event): if is_successful: print( - "C-STORE SCP transferred {} total datasets over {} " - "association(s) in {:.2f} s".format( - nr_assoc * ds_per_assoc, nr_assoc, time.time() - start_time - ) + f"C-STORE SCP transferred {nr_assoc * ds_per_assoc} total " + f"{os.path.basename(test_ds.filename)} datasets over " + f"{nr_assoc} association{'' if nr_assoc == 1 else 's'} " + f"in {time.time() - start_time:.2f} s" ) else: print("C-STORE SCP benchmark failed") @@ -362,11 +376,13 @@ def handle(event): server.shutdown() -def send_store(nr_assoc, ds_per_assoc, use_yappi=False): +def send_store(test_ds, nr_assoc, ds_per_assoc, use_yappi=False): """Send a number of sequential C-STORE requests. Parameters ---------- + test_ds : pydicom.dataset.Dataset + The test dataset to use nr_assoc : int The total number of (sequential) associations that will be made. ds_per_assoc : int @@ -385,7 +401,7 @@ def send_store(nr_assoc, ds_per_assoc, use_yappi=False): ae.acse_timeout = 5 ae.dimse_timeout = 5 ae.network_timeout = 5 - ae.add_requested_context(DATASET.SOPClassUID, ImplicitVRLittleEndian) + ae.add_requested_context(test_ds.SOPClassUID, ImplicitVRLittleEndian) # Start timer start_time = time.time() @@ -402,7 +418,7 @@ def send_store(nr_assoc, ds_per_assoc, use_yappi=False): if assoc.is_established: for jj in range(ds_per_assoc): try: - status = assoc.send_c_store(DATASET) + status = assoc.send_c_store(test_ds) if status and status.Status != 0x0000: is_successful = False break @@ -419,57 +435,101 @@ def send_store(nr_assoc, ds_per_assoc, use_yappi=False): if is_successful: print( - "C-STORE SCU transferred {} total datasets over {} " - "association(s) in {:.2f} s".format( - nr_assoc * ds_per_assoc, nr_assoc, time.time() - start_time - ) + f"C-STORE SCU transferred {nr_assoc * ds_per_assoc} total " + f"{os.path.basename(test_ds.filename)} datasets over " + f"{nr_assoc} association{'' if nr_assoc == 1 else 's'} " + f"in {time.time() - start_time:.2f} s" ) else: print("C-STORE SCU benchmark failed") server.terminate() + time.sleep(0.5) if __name__ == "__main__": - print("Use yappi (y/n:)") + print("Use yappi? (y/n:)") use_yappi = input() if use_yappi in ["y", "Y"]: use_yappi = True else: use_yappi = False - print("Which benchmark do you wish to run?") - print(" 1. Storage SCU, 1000 datasets over 1 association") - print(" 2. Storage SCU, 1 dataset per association over 1000 associations") - print(" 3. Storage SCP, 1000 datasets over 1 association") - print(" 4. Storage SCP, 1000 datasets over 1 association (write)") - print(" 5. Storage SCP, 1000 datasets over 1 association (write fast)") - print(" 6. Storage SCP, 1000 datasets over 1 association (write fastest)") - print(" 7. Storage SCP, 1 dataset per association over 1000 associations") + print("Use large dataset? (y/n:)") + use_large_dcm = input() + if use_large_dcm in ["y", "Y"]: + use_large_dcm = True + else: + use_large_dcm = False + + if use_large_dcm: + ds_name = "RTImageStorage.dcm" # 2.1 MB + default_nr_ds = 100 + else: + ds_name = "CTImageStorage.dcm" # 39 kB + default_nr_ds = 1000 + + print(f"number of datasets? (default = {default_nr_ds})") + try: + nr_ds = int(input()) + except ValueError: + nr_ds = default_nr_ds + + test_ds = dcmread(os.path.join(TEST_DS_DIR, ds_name)) + + print(f"Which benchmarks do you wish to run? (list, range, or all)") + print(f" 1. Storage SCU, {nr_ds} {ds_name} datasets over 1 association") print( - " 8. Storage SCP, 1000 datasets per association over 10 simultaneous associations" + f" 2. Storage SCU, 1 {ds_name} dataset per association over {nr_ds} associations" ) - print(" 9. Storage SCU/SCP, 1000 datasets over 1 association") - print(" 10. Storage DCMTK SCU/SCP, 1000 datasets over 1 association") - bench_index = input() - - if bench_index == "1": - send_store(1, 1000, use_yappi) - elif bench_index == "2": - send_store(1000, 1, use_yappi) - elif bench_index == "3": - receive_store(1, 1000, 0, use_yappi) - elif bench_index == "4": - receive_store(1, 1000, 1, use_yappi) - elif bench_index == "5": - receive_store(1, 1000, 2, use_yappi) - elif bench_index == "6": - receive_store(1, 1000, 3, use_yappi) - elif bench_index == "7": - receive_store(1000, 1, 0, use_yappi) - elif bench_index == "8": - receive_store_simultaneous(10, 1000, use_yappi) - elif bench_index == "9": - receive_store_internal(1, 1000, 0, use_yappi) - elif bench_index == "10": - receive_store_dcmtk(1, 1000, use_yappi) + print(f" 3. Storage SCP, {nr_ds} {ds_name} datasets over 1 association") + print(f" 4. Storage SCP, {nr_ds} {ds_name} datasets over 1 association (write)") + print( + f" 5. Storage SCP, {nr_ds} {ds_name} datasets over 1 association (write fast)" + ) + print( + f" 6. Storage SCP, {nr_ds} {ds_name} datasets over 1 association (write fastest)" + ) + print( + f" 7. Storage SCP, 1 {ds_name} dataset per association over {nr_ds} associations" + ) + print( + f" 8. Storage SCP, {nr_ds} {ds_name} datasets per association over 10 simultaneous associations" + ) + print(f" 9. Storage SCU/SCP, {nr_ds} {ds_name} datasets over 1 association") + print(f" 10. Storage DCMTK SCU/SCP, {nr_ds} {ds_name} datasets over 1 association") + + bench_input = input() + if re.fullmatch(r"\s*(a|all)\s*", bench_input): + # All: "a" or "all" + bench_list = [str(i) for i in range(1, 11)] + elif re.fullmatch(r"\s*(\d+)\s*-\s*(\d+)\s*", bench_input): + # Range: "x - y" + match = re.fullmatch(r"\s*(\d+)\s*-\s*(\d+)\s*", bench_input) + bench_list = [ + str(i) for i in range(int(match.group(1)), int(match.group(2)) + 1) + ] + else: + # List: "a, b, c" + bench_list = re.findall(r"\d+", bench_input) + + if "1" in bench_list: + send_store(test_ds, 1, nr_ds, use_yappi) + if "2" in bench_list: + send_store(test_ds, nr_ds, 1, use_yappi) + if "3" in bench_list: + receive_store(test_ds, 1, nr_ds, 0, use_yappi) + if "4" in bench_list: + receive_store(test_ds, 1, nr_ds, 1, use_yappi) + if "5" in bench_list: + receive_store(test_ds, 1, nr_ds, 2, use_yappi) + if "6" in bench_list: + receive_store(test_ds, 1, nr_ds, 3, use_yappi) + if "7" in bench_list: + receive_store(test_ds, nr_ds, 1, 0, use_yappi) + if "8" in bench_list: + receive_store_simultaneous(test_ds, 10, nr_ds, use_yappi) + if "9" in bench_list: + receive_store_internal(test_ds, 1, nr_ds, 0, use_yappi) + if "10" in bench_list: + receive_store_dcmtk(test_ds, 1, nr_ds, use_yappi) diff --git a/pynetdicom/tests/test_dul.py b/pynetdicom/tests/test_dul.py index a655ccff45..dabb908207 100644 --- a/pynetdicom/tests/test_dul.py +++ b/pynetdicom/tests/test_dul.py @@ -19,6 +19,7 @@ A_ABORT_RQ, ) from pynetdicom.pdu_primitives import A_ASSOCIATE, A_RELEASE, A_ABORT, P_DATA +from pynetdicom.sop_class import Verification from .encoded_pdu_items import a_associate_ac, a_release_rq from .parrot import start_server, ThreadedParrot, ParrotRequest from .utils import sleep @@ -70,6 +71,7 @@ class TestDUL: def setup(self): self.scp = None + self.ae = None def teardown(self): if self.scp: @@ -78,6 +80,9 @@ def teardown(self): self.scp.commands = [] self.scp.shutdown() + if self.ae: + self.ae.shutdown() + for thread in threading.enumerate(): if isinstance(thread, ThreadedParrot): thread.shutdown() @@ -290,3 +295,30 @@ def patch_read_pdu(): scp.step() scp.shutdown() + + def test_stop_dul_sta1(self): + """Test that stop_dul() returns True when in Sta1""" + dul = DULServiceProvider(DummyAssociation()) + assert dul.state_machine.current_state == "Sta1" + assert dul.stop_dul() + + def test_stop_dul(self): + self.ae = ae = AE() + ae.network_timeout = 5 + ae.dimse_timeout = 5 + ae.acse_timeout = 5 + ae.add_supported_context(Verification) + + scp = ae.start_server(("", 11112), block=False) + + ae.add_requested_context(Verification) + assoc = ae.associate("localhost", 11112) + + dul = assoc.dul + + dul.state_machine.current_state = "Sta1" + dul.stop_dul() + + assoc.release() + + scp.shutdown() diff --git a/pynetdicom/tests/test_events.py b/pynetdicom/tests/test_events.py index 1502303b8c..3b7503b6b4 100644 --- a/pynetdicom/tests/test_events.py +++ b/pynetdicom/tests/test_events.py @@ -66,6 +66,7 @@ def test_intervention_namedtuple(): assert event.description == "some description" assert event.is_intervention is True assert event.is_notification is False + assert str(event) == event.name def test_notification_namedtuple(): @@ -75,6 +76,7 @@ def test_notification_namedtuple(): assert event.description == "some description" assert event.is_intervention is False assert event.is_notification is True + assert str(event) == event.name def test_intervention_global(): diff --git a/pynetdicom/tests/test_service_qr.py b/pynetdicom/tests/test_service_qr.py index 6004028aa0..5beccad7ed 100644 --- a/pynetdicom/tests/test_service_qr.py +++ b/pynetdicom/tests/test_service_qr.py @@ -868,7 +868,7 @@ def handle(event): ds.PatientID = "123456" cancel_results.append(event.is_cancelled) yield 0xFF00, ds - time.sleep(0.2) + time.sleep(0.5) cancel_results.append(event.is_cancelled) yield 0xFE00, None yield 0xFF00, self.query @@ -888,6 +888,7 @@ def handle(event): results = assoc.send_c_find( identifier, PatientRootQueryRetrieveInformationModelFind, msg_id=11142 ) + time.sleep(0.2) assoc.send_c_cancel(1, 3) assoc.send_c_cancel(11142, 1) @@ -904,7 +905,7 @@ def handle(event): assoc.release() assert assoc.is_released - assert True in cancel_results + assert cancel_results == [False, True] scp.shutdown() @@ -3017,7 +3018,7 @@ def handle(event): yield 2 cancel_results.append(event.is_cancelled) yield 0xFF00, ds - time.sleep(0.2) + time.sleep(0.5) cancel_results.append(event.is_cancelled) yield 0xFE00, None @@ -3046,6 +3047,7 @@ def handle_store(event): results = assoc.send_c_get( identifier, PatientRootQueryRetrieveInformationModelGet, msg_id=11142 ) + time.sleep(0.2) assoc.send_c_cancel(1, 3) assoc.send_c_cancel(11142, 1) @@ -3066,7 +3068,7 @@ def handle_store(event): assoc.release() assert assoc.is_released - assert True in cancel_results + assert cancel_results == [False, True] scp.shutdown() @@ -5171,7 +5173,7 @@ def handle(event): yield 2 cancel_results.append(event.is_cancelled) yield 0xFF00, ds - time.sleep(0.2) + time.sleep(0.5) cancel_results.append(event.is_cancelled) yield 0xFE00, None @@ -5193,6 +5195,7 @@ def handle_store(event): results = assoc.send_c_move( identifier, "A", PatientRootQueryRetrieveInformationModelMove, msg_id=11142 ) + time.sleep(0.2) assoc.send_c_cancel(1, 3) assoc.send_c_cancel(11142, 1) @@ -5213,7 +5216,7 @@ def handle_store(event): assoc.release() assert assoc.is_released - assert True in cancel_results + assert cancel_results == [False, True] scp.shutdown()