Skip to content

Commit

Permalink
Fix MPI hanging bug. (#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgjarrett committed Jun 10, 2022
1 parent 8c46f8c commit 9b4f0ca
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 17 deletions.
12 changes: 8 additions & 4 deletions armi/bookkeeping/mainInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import re
import itertools

from armi import context
from armi import interfaces
from armi import runLog
from armi import utils
Expand Down Expand Up @@ -174,16 +173,21 @@ def cleanARMIFiles(self):
cycle = int(snapText[0:3])
node = int(snapText[3:])
newFolder = "snapShot{0}_{1}".format(cycle, node)
utils.pathTools.cleanPath(newFolder, context.MPI_RANK)
utils.pathTools.cleanPath(newFolder)
context.waitAll()

# delete database if it's SQLlite
# no need to delete because the database won't have copied it back if using fastpath.

# clean temp directories.
if os.path.exists("shuffleBranches"):
utils.pathTools.cleanPath("shuffleBranches", context.MPI_RANK)
utils.pathTools.cleanPath("shuffleBranches")
context.waitAll()
# Potentially, wait for all the processes to catch up.

if os.path.exists("failedRuns"):
utils.pathTools.cleanPath("failedRuns", context.MPI_RANK)
utils.pathTools.cleanPath("failedRuns")
context.waitAll()

# pylint: disable=no-self-use
def cleanLastCycleFiles(self):
Expand Down
14 changes: 11 additions & 3 deletions armi/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def setMode(cls, mode):
if MPI_COMM is not None:
MPI_COMM.barrier() # make sure app data exists before workers proceed.

MPI_DISTRIBUTABLE = MPI_RANK == 0 and MPI_SIZE > 1
MPI_DISTRIBUTABLE = MPI_SIZE > 1

_FAST_PATH = os.path.join(os.getcwd())
"""
Expand Down Expand Up @@ -238,7 +238,7 @@ def cleanTempDirs(olderThanDays=None):
file=sys.stdout,
)
try:
cleanPath(_FAST_PATH, MPI_RANK)
cleanPath(_FAST_PATH, mpiRank=MPI_RANK)
except Exception as error: # pylint: disable=broad-except
for outputStream in (sys.stderr, sys.stdout):
if printMsg:
Expand Down Expand Up @@ -279,11 +279,19 @@ def cleanAllArmiTempDirs(olderThanDays: int) -> None:
runIsOldAndLikleyComplete = (now - dateOfFolder) > gracePeriod
if runIsOldAndLikleyComplete or fromThisRun:
# Delete old files
cleanPath(dirPath, MPI_RANK)
cleanPath(dirPath, mpiRank=MPI_RANK)
except: # pylint: disable=bare-except
pass


def waitAll() -> None:
"""
If there are parallel processes running, wait for all to catch up to the checkpoint.
"""
if MPI_SIZE > 1 and MPI_DISTRIBUTABLE:
MPI_COMM.barrier()


def disconnectAllHdfDBs() -> None:
"""
Forcibly disconnect all instances of HdfDB objects
Expand Down
7 changes: 5 additions & 2 deletions armi/mpiActions.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,13 @@ def runActionsInSerial(o, r, cs, actions):
runLog.extra("Running {} MPI actions in serial".format(len(actions)))
numActions = len(actions)
for aa, action in enumerate(actions):
canDistribute = context.MPI_DISTRIBUTABLE
action.serial = True
context.MPI_DISTRIBUTABLE = False
runLog.extra("Running action {} of {}: {}".format(aa + 1, numActions, action))
results.append(action.invoke(o, r, cs))
action.serial = False # return to original state
context.MPI_DISTRIBUTABLE = canDistribute
return results


Expand Down Expand Up @@ -400,7 +403,7 @@ def invokeHook(self):
Notes
=====
Two things about this method make it non-recursiv
Two things about this method make it non-recursive
"""
canDistribute = context.MPI_DISTRIBUTABLE
mpiComm = context.MPI_COMM
Expand All @@ -424,11 +427,11 @@ def invokeHook(self):
try:
action = mpiComm.scatter(self._actions, root=0)
# create a new communicator that only has these specific dudes running
context.MPI_DISTRIBUTABLE = False
hasAction = action is not None
context.MPI_COMM = mpiComm.Split(int(hasAction))
context.MPI_RANK = context.MPI_COMM.Get_rank()
context.MPI_SIZE = context.MPI_COMM.Get_size()
context.MPI_DISTRIBUTABLE = context.MPI_SIZE > 1
context.MPI_NODENAMES = context.MPI_COMM.allgather(context.MPI_NODENAME)
if hasAction:
actionResult = action.invoke(self.o, self.r, self.cs)
Expand Down
2 changes: 1 addition & 1 deletion armi/operators/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ def snapshotRequest(self, cycle, node):
newFolder = "snapShot{0}_{1}".format(cycle, node)
if os.path.exists(newFolder):
runLog.important("Deleting existing snapshot data in {0}".format(newFolder))
pathTools.cleanPath(newFolder, context.MPI_RANK) # careful with cleanPath!
pathTools.cleanPath(newFolder) # careful with cleanPath!
# give it a minute.
time.sleep(1)

Expand Down
1 change: 1 addition & 0 deletions armi/tests/armiRun.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ settings:
refType: inner igniter fuel
reloadDBName: reloadingDB.h5
shuffleLogic: refSmallReactorShuffleLogic.py
smallRun: true
summarizeAssemDesign: false
targetK: 1.002
transientForSensitivity: ''
Expand Down
4 changes: 4 additions & 0 deletions armi/tests/test_mpiFeatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ def test_cleanPathMpi(self):
self.assertTrue(os.path.exists(filePath0))
with self.assertRaises(Exception):
pathTools.cleanPath(filePath0, mpiRank=context.MPI_RANK)
context.waitAll()

# TEST 1: Delete a single file
filePath1 = "test1_cleanPathNoMpi_mongoose"
open(filePath1, "w").write("something")

self.assertTrue(os.path.exists(filePath1))
pathTools.cleanPath(filePath1, mpiRank=context.MPI_RANK)
context.waitAll()
self.assertFalse(os.path.exists(filePath1))

# TEST 2: Delete an empty directory
Expand All @@ -274,6 +276,7 @@ def test_cleanPathMpi(self):

self.assertTrue(os.path.exists(dir2))
pathTools.cleanPath(dir2, mpiRank=context.MPI_RANK)
context.waitAll()
self.assertFalse(os.path.exists(dir2))

# TEST 3: Delete a directory with two files inside
Expand All @@ -290,6 +293,7 @@ def test_cleanPathMpi(self):
self.assertTrue(os.path.exists(os.path.join(dir3, "file1.txt")))
self.assertTrue(os.path.exists(os.path.join(dir3, "file2.txt")))
pathTools.cleanPath(dir3, mpiRank=context.MPI_RANK)
context.waitAll()
self.assertFalse(os.path.exists(dir3))


Expand Down
1 change: 1 addition & 0 deletions armi/utils/directoryChangers.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ def __exit__(self, exc_type, exc_value, traceback):
"That is, if you create a directory with a name starting with a period, the "
"TempDirChanger will not be able to clean it (for instance, a '.git' dir)."
)
context.waitAll()


class ForcedCreationDirectoryChanger(DirectoryChanger):
Expand Down
3 changes: 2 additions & 1 deletion armi/utils/outputCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def deleteCache(cachedFolder):
"""
if "Output_Cache" not in cachedFolder:
raise RuntimeError("Cache location must contain safeword: `Output_Cache`.")
cleanPath(cachedFolder, context.MPI_RANK)
cleanPath(cachedFolder)
context.waitAll()


def cacheCall(
Expand Down
6 changes: 0 additions & 6 deletions armi/utils/pathTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ def cleanPath(path, mpiRank=0):
"""
valid = False
if not os.path.exists(path):
if context.MPI_SIZE > 1:
context.MPI_COMM.barrier()
return True

for validPath in [
Expand Down Expand Up @@ -282,10 +280,6 @@ def cleanPath(path, mpiRank=0):
break
sleep(waitTime)

# Potentially, wait for all the processes to catch up.
if context.MPI_SIZE > 1:
context.MPI_COMM.barrier()

if os.path.exists(path):
return False
else:
Expand Down
4 changes: 4 additions & 0 deletions armi/utils/tests/test_pathTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ def test_cleanPathNoMpi(self):
self.assertTrue(os.path.exists(filePath0))
with self.assertRaises(Exception):
pathTools.cleanPath(filePath0, mpiRank=0)
context.waitAll()

# TEST 1: Delete a single file
filePath1 = "test1_cleanPathNoMpi_mongoose"
open(filePath1, "w").write("something")

self.assertTrue(os.path.exists(filePath1))
pathTools.cleanPath(filePath1, mpiRank=0)
context.waitAll()
self.assertFalse(os.path.exists(filePath1))

# TEST 2: Delete an empty directory
Expand All @@ -98,6 +100,7 @@ def test_cleanPathNoMpi(self):

self.assertTrue(os.path.exists(dir2))
pathTools.cleanPath(dir2, mpiRank=0)
context.waitAll()
self.assertFalse(os.path.exists(dir2))

# TEST 3: Delete a directory with two files inside
Expand All @@ -114,6 +117,7 @@ def test_cleanPathNoMpi(self):
self.assertTrue(os.path.exists(os.path.join(dir3, "file1.txt")))
self.assertTrue(os.path.exists(os.path.join(dir3, "file2.txt")))
pathTools.cleanPath(dir3, mpiRank=0)
context.waitAll()
self.assertFalse(os.path.exists(dir3))


Expand Down

0 comments on commit 9b4f0ca

Please sign in to comment.