From 09d361b7789f50b0f8625a604e311dc2d197b336 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 3 Aug 2023 15:34:23 -0600 Subject: [PATCH 1/5] Add parallel flux test --- tests/test_flux.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index 837893e6..aca6a474 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -39,7 +39,7 @@ class TestFlux(unittest.TestCase): def setUp(self): self.executor = FluxExecutor() - def test_flux_executor(self): + def test_flux_executor_serial(self): with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -48,6 +48,15 @@ def test_flux_executor(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_flux_executor_parallel(self): + with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: + fs_1 = exe.submit(mpi_funct, 1) + fs_2 = exe.submit(mpi_funct, 2) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + def test_single_task(self): with SingleTaskExecutor(cores=2, executor=self.executor) as p: output = p.map(mpi_funct, [1, 2, 3]) From 29feef4ff9892e23b34a1f87e58580afeb86eb86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 3 Aug 2023 22:24:12 -0600 Subject: [PATCH 2/5] Add parallel test to meta --- tests/test_meta.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/tests/test_meta.py b/tests/test_meta.py index 869f9de3..e553ff39 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -15,6 +15,13 @@ def calc(i): return i +def mpi_funct(i): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + class TestFutureCreation(unittest.TestCase): def test_get_future_done(self): f = _get_future_done() @@ -57,7 +64,7 @@ def test_executor_broker(self): class TestMetaExecutor(unittest.TestCase): - def test_meta_executor(self): + def test_meta_executor_serial(self): with HPCExecutor(max_workers=2) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -65,3 +72,21 @@ def test_meta_executor(self): self.assertEqual(fs_2.result(), 2) self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + + def test_meta_executor_single(self): + with HPCExecutor(max_workers=1) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_meta_executor_parallel(self): + with HPCExecutor(max_workers=1, cores_per_worker=2) as exe: + fs_1 = exe.submit(mpi_funct, 1) + fs_2 = exe.submit(mpi_funct, 2) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) From 88430fbe9b8d41cc06d1409eb52b6302029ce1b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 3 Aug 2023 22:33:28 -0600 Subject: [PATCH 3/5] Disable parallel tests --- tests/test_flux.py | 16 ++++++++-------- tests/test_meta.py | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index aca6a474..b2718c64 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -48,14 +48,14 @@ def test_flux_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_flux_executor_parallel(self): - with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: - fs_1 = exe.submit(mpi_funct, 1) - fs_2 = exe.submit(mpi_funct, 2) - self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) + # def test_flux_executor_parallel(self): + # with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: + # fs_1 = exe.submit(mpi_funct, 1) + # fs_2 = exe.submit(mpi_funct, 2) + # self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + # self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) + # self.assertTrue(fs_1.done()) + # self.assertTrue(fs_2.done()) def test_single_task(self): with SingleTaskExecutor(cores=2, executor=self.executor) as p: diff --git a/tests/test_meta.py b/tests/test_meta.py index e553ff39..e536ce32 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -82,11 +82,11 @@ def test_meta_executor_single(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_meta_executor_parallel(self): - with HPCExecutor(max_workers=1, cores_per_worker=2) as exe: - fs_1 = exe.submit(mpi_funct, 1) - fs_2 = exe.submit(mpi_funct, 2) - self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) + # def test_meta_executor_parallel(self): + # with HPCExecutor(max_workers=1, cores_per_worker=2) as exe: + # fs_1 = exe.submit(mpi_funct, 1) + # fs_2 = exe.submit(mpi_funct, 2) + # self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + # self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) + # self.assertTrue(fs_1.done()) + # self.assertTrue(fs_2.done()) From 3c820d9a2b5076410354bc2e02bb620ebd5e22c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 3 Aug 2023 22:41:50 -0600 Subject: [PATCH 4/5] comment out mpi function definitions --- tests/test_flux.py | 10 +++++----- tests/test_meta.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index b2718c64..99c97c30 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -19,11 +19,11 @@ def calc(i): return i -def mpi_funct(i): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank +# def mpi_funct(i): +# from mpi4py import MPI +# size = MPI.COMM_WORLD.Get_size() +# rank = MPI.COMM_WORLD.Get_rank() +# return i, size, rank def get_global(memory=None): diff --git a/tests/test_meta.py b/tests/test_meta.py index e536ce32..44960d13 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -15,11 +15,11 @@ def calc(i): return i -def mpi_funct(i): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank +# def mpi_funct(i): +# from mpi4py import MPI +# size = MPI.COMM_WORLD.Get_size() +# rank = MPI.COMM_WORLD.Get_rank() +# return i, size, rank class TestFutureCreation(unittest.TestCase): From 6732812999f1f122c482bf1e2cfaa32f57c9c6fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 3 Aug 2023 22:47:52 -0600 Subject: [PATCH 5/5] add mpi parallel tests --- tests/test_flux.py | 23 ++++++++++------------- tests/test_meta.py | 30 +++++++++--------------------- 2 files changed, 19 insertions(+), 34 deletions(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index 99c97c30..efa3be47 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -19,11 +19,11 @@ def calc(i): return i -# def mpi_funct(i): -# from mpi4py import MPI -# size = MPI.COMM_WORLD.Get_size() -# rank = MPI.COMM_WORLD.Get_rank() -# return i, size, rank +def mpi_funct(i): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank def get_global(memory=None): @@ -48,14 +48,11 @@ def test_flux_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - # def test_flux_executor_parallel(self): - # with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: - # fs_1 = exe.submit(mpi_funct, 1) - # fs_2 = exe.submit(mpi_funct, 2) - # self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - # self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) - # self.assertTrue(fs_1.done()) - # self.assertTrue(fs_2.done()) + def test_flux_executor_parallel(self): + with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) def test_single_task(self): with SingleTaskExecutor(cores=2, executor=self.executor) as p: diff --git a/tests/test_meta.py b/tests/test_meta.py index 44960d13..81263e9d 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -15,11 +15,11 @@ def calc(i): return i -# def mpi_funct(i): -# from mpi4py import MPI -# size = MPI.COMM_WORLD.Get_size() -# rank = MPI.COMM_WORLD.Get_rank() -# return i, size, rank +def mpi_funct(i): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank class TestFutureCreation(unittest.TestCase): @@ -73,20 +73,8 @@ def test_meta_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_meta_executor_single(self): - with HPCExecutor(max_workers=1) as exe: - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) + def test_meta_executor_parallel(self): + with HPCExecutor(max_workers=1, cores_per_worker=2) as exe: + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) - - # def test_meta_executor_parallel(self): - # with HPCExecutor(max_workers=1, cores_per_worker=2) as exe: - # fs_1 = exe.submit(mpi_funct, 1) - # fs_2 = exe.submit(mpi_funct, 2) - # self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - # self.assertEqual(fs_2.result(), [(2, 2, 0), (2, 2, 1)]) - # self.assertTrue(fs_1.done()) - # self.assertTrue(fs_2.done())