Skip to content

Commit

Permalink
ENH More robust order for locking in parallel
Browse files Browse the repository at this point in the history
Previously, a consistently failing sample could block testing of stale
locks. This is more robust.
  • Loading branch information
luispedro committed Sep 13, 2022
1 parent 0212597 commit 7c12bf4
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 38 deletions.
2 changes: 2 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Version 1.5.0
* Make ``print()`` accept ints and doubles as well as strings
* Write log to .failed files when using the parallel module
* Fix bug where failhooks were not triggered for uncaught exceptions
* Change order of testing for locks in the parallel module for a more
robust protocol

Version 1.4.2 2022-07-21 by luispedro
* Fix bug with parsing GFF files
Expand Down
92 changes: 56 additions & 36 deletions NGLess/StandardModules/Parallel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import Control.DeepSeq
import Data.Traversable
import Control.Monad.Trans.Class
import System.AtomicWrite.Writer.Text (atomicWriteFile)
import System.Random.Shuffle (shuffleM)


import Control.Monad.Trans.Resource
Expand Down Expand Up @@ -171,54 +172,73 @@ failedName = (++ ".failed") . T.unpack
getLock :: FilePath
-- ^ directory where to create locks
-> [T.Text]
-- ^ keys to attempt to ock
-- ^ keys to attempt to lock
-> NGLessIO (T.Text, ReleaseKey)
getLock basedir fs = do
existing <- liftIO $ getDirectoryContents basedir
let notfinished = flip filter fs $ \fname -> finishedName fname `notElem` existing
notlocked = flip filter notfinished $ \fname -> lockName fname `notElem` existing
notfailed = flip filter notlocked $ \fname -> failedName fname `notElem` existing
failed = flip filter notfinished $ \fname -> failedName fname `elem` existing
locked = flip filter notfinished $ \fname -> lockName fname `elem` existing
when (null notfinished) $ do
outputListLno' InfoOutput ["All jobs are finished"]
throwError $ NGError NoErrorExit "All jobs are finished"

outputListLno' TraceOutput ["Looking for a lock in '", basedir, "'"]
outputListLno' TraceOutput [
"Looking for a lock in ", basedir, ". ",
"Total number of elements is ", show (length fs),
" (not locked: ", show (length notlocked), "; not finished: ", show (length notfinished), ")."]
-- first try all the elements that are not locked and have not failed
-- if that fails, try the unlocked but failed
-- Finally, try the locked elements in the hope that some may be stale
"Total number of tasks to run is ", show (length fs),
" (total not finished (including locked & failed): ", show (length notfinished),
" ; locked: ", show (length locked),
" ; failed: ", show (length failed),
")."]
-- first try all the tasks that are not locked and have not failed
-- if that fails, try the locked tasks in the hope that some may be stale
-- Finally, try the unlocked but failed (in random order)
getLock' basedir notfailed >>= \case
Just v -> return v
Nothing -> getLock' basedir (filter (`notElem` notfailed) notlocked) >>= \case
Just v -> return v
Nothing -> do
outputListLno' TraceOutput ["All elements locked. checking for stale locks"]
getLock' basedir notfinished >>= \case
Just v -> return v
Nothing -> do
let msg = if null (notfailed ++ notfailed)
then "All jobs are finished"
else "Jobs appear to be running"
outputListLno' WarningOutput ["Could get a lock for any file: ", msg]
throwError $ NGError NoErrorExit msg
Nothing -> do
outputListLno' InfoOutput ["All tasks locked or failed. Checking for stale locks..."]
getLock' basedir locked >>= \case
Just v -> return v
Nothing -> do
when (null failed) $ do
outputListLno' InfoOutput ["All jobs appear to be finished or running"]
throwError $ NGError NoErrorExit "All jobs are finished or running"
-- randomizing the order maximizes the possibilities to get a lock
failed' <- liftIO $ shuffleM failed
outputListLno' InfoOutput ["All tasks locked or failed and there are no stale locks."]
outputListLno' InfoOutput ["Will retry some failed tasks, but it is possible that this will fail again."]
outputListLno' InfoOutput ["Failed logs are in directory '", basedir, "'"]
getLock' basedir failed' >>= \case
Just v -> return v
Nothing -> do
let msg
| null (notfailed ++ notfailed) = "All jobs are finished"
| null failed = "Jobs appear to be running"
| otherwise = "Jobs are either locked or failed. Check directory '" ++ basedir ++ "' for more information"
outputListLno' WarningOutput ["Could get a lock for any file: ", msg]
throwError $ NGError NoErrorExit msg

getLock' _ [] = return Nothing
getLock' basedir (f:fs) = do
let lockname = basedir </> lockName f
finished <- liftIO $ doesFileExist (basedir </> finishedName f)
if finished
then getLock' basedir fs
else LockFile.acquireLock LockFile.LockParameters
{ lockFname = lockname
, maxAge = fromInteger (60*60)
-- one hour. Given that lock files are touched
-- every ten minutes if things are good (see
-- thread below), this is an indication that
-- the process has crashed
, whenExistsStrategy = LockFile.IfLockedNothing
, mtimeUpdate = True } >>= \case
Nothing -> getLock' basedir fs
Just rk -> do
return $ Just (f,rk)
getLock' basedir (f:fs) =
LockFile.acquireLock LockFile.LockParameters
{ lockFname = basedir </> lockName f
, maxAge = fromInteger (60*60)
-- one hour. Given that lock files are touched
-- every ten minutes if things are good (see
-- thread below), this is an indication that
-- the process has crashed
, whenExistsStrategy = LockFile.IfLockedNothing
, mtimeUpdate = True } >>= \case
Nothing -> getLock' basedir fs
Just rk -> do
isFinished <- liftIO $ doesFileExist (basedir </> finishedName f)
if isFinished
then do
release rk
getLock' basedir fs
else return $ Just (f, rk)

executeCollect :: NGLessObject -> [(T.Text, NGLessObject)] -> NGLessIO NGLessObject
executeCollect (NGOCounts istream) kwargs = do
Expand Down
2 changes: 1 addition & 1 deletion build-scripts/ngless-static-embed-dependencies.nix
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ let
in pkgs.haskell-nix.stackProject {
name = "NGLess";
src = (import ./sources-with-static-dependencies.nix) ;
stack-sha256 = "13ms75px2iyfydc91nyr521cw8fadhwic1xn1vn9kqnksf2zaglz";
stack-sha256 = "0m0kkykh6zicmzcgs1fyxnjaxpsf8ylvvdq4cawjmpd3csnxqvgg";
materialized = ./sources-with-static-dependencies.materialized;
inherit checkMaterialization;
}
Expand Down
4 changes: 4 additions & 0 deletions build-scripts/release.materialized/NGLess.nix
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down Expand Up @@ -158,6 +159,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down Expand Up @@ -240,6 +242,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down Expand Up @@ -326,6 +329,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down Expand Up @@ -158,6 +159,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down Expand Up @@ -240,6 +242,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down Expand Up @@ -326,6 +329,7 @@
(hsPkgs."parsec" or (errorHandler.buildDepError "parsec"))
(hsPkgs."primitive" or (errorHandler.buildDepError "primitive"))
(hsPkgs."process" or (errorHandler.buildDepError "process"))
(hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle"))
(hsPkgs."regex" or (errorHandler.buildDepError "regex"))
(hsPkgs."resourcet" or (errorHandler.buildDepError "resourcet"))
(hsPkgs."safe" or (errorHandler.buildDepError "safe"))
Expand Down
11 changes: 11 additions & 0 deletions build-scripts/update-materialized.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

set -e
for nixfile in release.nix build-scripts/ngless-static-embed-dependencies.nix; do
nix build -f ${nixfile} NGLess.project.stack-nix.passthru.updateMaterialized
./result
nix build -f ${nixfile} NGLess.project.stack-nix.passthru.calculateMaterializedSha
echo "stack-sha256 = \"$(./result)\";" > ${nixfile}.sha256
echo "Updated ${nixfile}.sha256"
done

1 change: 1 addition & 0 deletions docs/sources/whatsnew.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ which will print ``my-sample``.
- When using the ``parallel`` module and a job fails, writes the log to the corresponding ``.failed`` file.
- External modules can now use the ``sequenceset`` type to represent a FASTA file.
- The ``load_fastq_directory`` function now supports ``.xz`` compressed files.
- The ``parallel`` module now checks for stale locks **before** re-trying failed tasks. The former model could lead to a situation where a particular sample failed deterministically and then blocked progress even when some locks were stale.

Bugfixes
~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies:
- stm
- stm-chans
- stm-conduit >=2.7
- random-shuffle
- resourcet >=1.1
- tar >=0.5
- template-haskell
Expand Down
2 changes: 1 addition & 1 deletion release.nix
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ in pkgs.haskell-nix.stackProject {
filter = path: type: let baseName = baseNameOf (toString path); in !(pkgs.lib.elem baseName ignoredPaths);
};
materialized = ./build-scripts/release.materialized;
stack-sha256 = "0lpv5h8zzvmdamw0sxxnqaj919b3wl4b3zmc3d1z38ckpgpxi16b";
stack-sha256 = "0b6mwmf9jcqj294fdyrqjiq1wcdnzjl7z00bzwzskklh5bknp8s3";
inherit checkMaterialization;
}

0 comments on commit 7c12bf4

Please sign in to comment.