diff --git a/codecov.yml b/codecov.yml index af3e5c97dd..c2a03fbfbe 100644 --- a/codecov.yml +++ b/codecov.yml @@ -11,5 +11,6 @@ coverage: # The unittests of the torchrpc module are tested by different runners and cannot be included # in the test_unittest's coverage report. To keep CI happy, we don't count torchrpc related coverage. ignore: - - /mnt/cache/wangguoteng/DI-engine/ding/framework/message_queue/torch_rpc.py - - /mnt/cache/wangguoteng/DI-engine/ding/framework/message_queue/perfs/* + - ./ding/framework/message_queue/torch_rpc.py + - ./ding/framework/message_queue/tests/test_torch_rpc.py + - ./ding/framework/message_queue/perfs/* diff --git a/ding/framework/message_queue/perfs/perf_nng.py b/ding/framework/message_queue/perfs/perf_nng.py index 5cfdac2508..a13b6edada 100644 --- a/ding/framework/message_queue/perfs/perf_nng.py +++ b/ding/framework/message_queue/perfs/perf_nng.py @@ -127,6 +127,8 @@ def recv_loop(): continue elif topic == "f": finish_tag.append(1) + send_t("f") + mq.stop() return else: raise RuntimeError("Unkown topic") diff --git a/ding/utils/data/dataloader.py b/ding/utils/data/dataloader.py index cb81925fd3..91de62ef9a 100644 --- a/ding/utils/data/dataloader.py +++ b/ding/utils/data/dataloader.py @@ -116,12 +116,12 @@ def __init__( p, c = self.mp_context.Pipe() # Async process (Main worker): Process data if num_workers <= 1; Assign job to other workers if num_workers > 1. - self.async_process = self.mp_context.Process(target=self._async_loop, args=(p, c)) + self.async_process = self.mp_context.Process(target=self._async_loop, args=(p, c), name="async_process") self.async_process.daemon = True self.async_process.start() # Get data thread: Get data from ``data_source`` and send it to ``async_process``.` - self.get_data_thread = threading.Thread(target=self._get_data, args=(p, c)) + self.get_data_thread = threading.Thread(target=self._get_data, args=(p, c), name="get_data_thread") self.get_data_thread.daemon = True self.get_data_thread.start() @@ -350,6 +350,9 @@ def close(self) -> None: self.end_flag = True self.async_process.terminate() self.async_process.join() + if self.use_cuda: + self.cuda_thread.join() + self.get_data_thread.join() if self.num_workers > 1: for w in self.worker: w.terminate() diff --git a/ding/utils/data/tests/test_dataloader.py b/ding/utils/data/tests/test_dataloader.py index 9fc78113df..6c76a006a1 100644 --- a/ding/utils/data/tests/test_dataloader.py +++ b/ding/utils/data/tests/test_dataloader.py @@ -101,4 +101,4 @@ def entry(self, batch_size, num_workers, chunk_size, use_cuda): assert total_data_time <= 7 * 0.008 dataloader.__del__() time.sleep(0.5) - assert len(threading.enumerate()) <= 2, threading.enumerate() + assert len(threading.enumerate()) <= 3, threading.enumerate()