/
Worker.hs
75 lines (63 loc) · 2.68 KB
/
Worker.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-| This module contains all logic related to error handling when spawning threads
to execute Worker sub-routines
-}
module Control.Concurrent.Capataz.Internal.Worker where
import RIO
import RIO.Time (getCurrentTime)
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent.Capataz.Internal.Process as Process
import qualified Control.Concurrent.Capataz.Internal.Util as Util
import Control.Concurrent.Capataz.Internal.Types
-- | Decorates the given @IO ()@ sub-routine with failure handling
workerMain
:: (MonadUnliftIO m)
=> ParentSupervisorEnv m
-> WorkerOptions m
-> WorkerId
-> RestartCount
-> m (Worker m)
workerMain env@ParentSupervisorEnv { supervisorNotify } workerOptions@WorkerOptions { workerName, workerAction } workerId restartCount
= do
workerCreationTime <- getCurrentTime
workerAsync <- asyncWithUnmask $ \unmask -> do
Util.setProcessThreadName workerId workerName
eResult <- unsafeTry $ unmask (workerAction workerId)
resultEvent <- case eResult of
Left err -> Process.handleProcessException unmask
env
(WorkerSpec workerOptions)
workerId
restartCount
err
Right _ -> Process.handleProcessCompletion unmask
env
(WorkerSpec workerOptions)
workerId
restartCount
supervisorNotify (MonitorEvent resultEvent)
return Worker
{ workerId
, workerName
, workerAsync
, workerCreationTime
, workerOptions
}
-- | Internal function that forks a worker thread on the Capataz thread; note
-- this is different from the public @forkWorker@ function which sends a message
-- to the capataz loop
forkWorker
:: (MonadUnliftIO m)
=> ParentSupervisorEnv m
-> WorkerOptions m
-> Maybe (WorkerId, RestartCount)
-> m (Worker m)
forkWorker env workerOptions mRestartInfo = do
(workerId, restartCount) <- case mRestartInfo of
Just (workerId, restartCount) -> pure (workerId, restartCount)
Nothing -> (,) <$> liftIO UUID.nextRandom <*> pure 0
worker <- workerMain env workerOptions workerId restartCount
Process.notifyProcessStarted mRestartInfo env (WorkerProcess worker)
return worker