-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
Copy pathExtended.hs
281 lines (252 loc) · 12.1 KB
/
Extended.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
{-# HLINT ignore "Use withAsync" #-}
module Control.Concurrent.Extended
( module Control.Concurrent,
sleep,
ForkableMonadIO,
-- * Robust forking
forkImmortal,
forkManagedT,
forkManagedTWithGracefulShutdown,
-- * Concurrency in MonadError
forConcurrentlyEIO,
concurrentlyEIO,
-- * Deprecated
ImmortalThreadLog (..),
ThreadState (..),
ThreadShutdown (..),
Forever (..),
)
where
import Control.Concurrent hiding (threadDelay)
import Control.Concurrent qualified as Base
import Control.Concurrent.Async as A
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.STM qualified as STM
import Control.Exception
import Control.Immortal qualified as Immortal
import Control.Monad.Except
import Control.Monad.Loops (iterateM_)
import Control.Monad.Trans.Control qualified as MC
import Control.Monad.Trans.Managed (ManagedT (..), allocate)
import Data.Aeson
import Data.List.Split
import Data.Traversable
import Data.Void
-- For forkImmortal. We could also have it take a cumbersome continuation if we
-- want to break this dependency. Probably best to move Hasura.Logging into a
-- separate lib with this if we do the override thing.
import Hasura.Logging
import Hasura.Prelude
{-# HLINT ignore sleep #-}
-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int' microseconds.
--
-- NOTE: you cannot simply replace e.g. @threadDelay 1000@ with @sleep 1000@ since those literals
-- have different meanings!
sleep :: DiffTime -> IO ()
sleep = Base.threadDelay . round . Microseconds
-- | Note: Please consider using 'forkManagedT' instead to ensure reliable
-- resource cleanup.
forkImmortal ::
(ForkableMonadIO m) =>
-- | A label describing this thread's function (see 'labelThread').
String ->
Logger Hasura ->
-- | An IO action we expect never to return normally. This will have the type
-- signature ':: m a' (see e.g. the type of 'forever').
m Void ->
-- | A handle for the forked thread. See "Control.Immortal".
m Immortal.Thread
forkImmortal label logger m =
Immortal.createWithLabel label $ \this -> do
-- Log that the thread has started
liftIO $ unLogger logger (ImmortalThreadRestarted label)
-- In this case, we are handling unexpected exceptions.
-- i.e This does not catch the asynchronous exception which stops the thread.
Immortal.onUnexpectedFinish this logAndPause (void m)
where
logAndPause = \case
Right _void -> pure () -- absurd _void (i.e. unreachable)
Left e -> liftIO $ do
liftIO $ unLogger logger (ImmortalThreadUnexpectedException label e)
-- pause before restarting some arbitrary amount of time. The idea is not to flood
-- logs or cause other cascading failures.
sleep (seconds 1)
data ThreadState = ThreadForked | ThreadBlocking | ThreadShutdownInitiated
deriving (Show, Eq)
-- | @ThreadShutdown@ is a newtype wrapper over an action which is intended
-- to execute when a thread's shutdown is initiated before killing the thread
newtype ThreadShutdown m = ThreadShutdown {tsThreadShutdown :: m ()}
-- | This function pairs a call to 'forkImmortal' with a finalizer which stops
-- the immortal thread.
-- Note, the thread object can leave its scope if this function is incorrectly
-- used. Generally, the result should only be used later in the same ManagedT
-- scope.
forkManagedT ::
(ForkableMonadIO m) =>
String ->
Logger Hasura ->
m Void ->
ManagedT m Immortal.Thread
forkManagedT label logger m =
allocate
(forkImmortal label logger m)
( \thread -> do
unLogger logger (ImmortalThreadStopping label)
liftIO $ Immortal.stop thread
)
-- | The @Forever@ type defines an infinite looping monadic action (like @m void@), but allows the
-- caller to control the recursion or insert code before each iteration. The @a@ is the initial argument,
-- and subsequent iterations will be fed the argument returned by the previous one. See
-- @forkManagedTWithGracefulShutdown@ to see how it's used
data Forever m = forall a. Forever a (a -> m a)
-- | @forkManagedTWithGracefulShutdown@ is an extension of the @forkManagedT@
-- function this function also attempts to gracefully shutdown the thread. This function
-- accepts a `m (Forever m)` argument. The @Forever@ type contains a function and an argument
-- to the function. The function supplied will be run repeatedly until shutdown is initiated. The
-- response of the function will be the argument to the next iteration.
--
-- For reference, this function is used to run the async actions processor. Check
-- `asyncActionsProcessor`
forkManagedTWithGracefulShutdown ::
(ForkableMonadIO m) =>
String ->
Logger Hasura ->
ThreadShutdown m ->
m (Forever m) ->
ManagedT m Immortal.Thread
forkManagedTWithGracefulShutdown label logger (ThreadShutdown threadShutdownHandler) loopIteration = do
threadStateTVar <- liftIO $ STM.newTVarIO ThreadForked
allocate
( Immortal.createWithLabel label $ \this -> do
-- Log that the thread has started
liftIO $ unLogger logger (ImmortalThreadRestarted label)
-- In this case, we are handling unexpected exceptions.
-- i.e This does not catch the asynchronous exception which stops the thread.
Immortal.onUnexpectedFinish this logAndPause
$ ( do
let mLoop (Forever loopFunctionInitArg loopFunction) =
flip iterateM_ loopFunctionInitArg $ \args -> do
liftIO
$ STM.atomically
$ do
STM.readTVar threadStateTVar >>= \case
ThreadShutdownInitiated -> do
-- signal to the finalizer that we are now blocking
-- and blocking forever since this
-- var moves monotonically from forked -> shutdown -> blocking
STM.writeTVar threadStateTVar ThreadBlocking
ThreadBlocking -> STM.retry
ThreadForked -> pure ()
loopFunction args
t <- LA.async $ mLoop =<< loopIteration
LA.link t
void $ LA.wait t
)
)
( \thread -> do
liftIO
$ STM.atomically
$ STM.modifyTVar' threadStateTVar (const ThreadShutdownInitiated)
-- the threadShutdownHandler here will wait for any in-flight events
-- to finish processing
{-
There is a conundrum here about whether the @threadShutdownHandler@
should be before or after the @ThreadBlocking@ check call, this is because
there are problems with both the cases:
1. @threadShutdownHandler@ before the @ThreadBlocking@ check
------------------------------------------------------------
Let's say we're just about to start processing a new iteration of the
loop function and before the processing actually starts the shutdown is
initiated, there will be no in-flight events (because the batch hasn't started processing yet) so
@threadShutdownHandler@ will return immediately and the new batch will start processing
which were fetched earlier. This is a race condition and may kill the thread with some
of the events still processing.
2. @threadShutdownHandler@ after the @ThreadBlocking@ check
-----------------------------------------------------------
This will solve the above race condition but will cause a new problem. The
graphql-engine accepts a config called `--graceful-shutdown-timeout` which is a timeout
for any in-flight processing events that are running in the graphql-engine to complete
processing within this time.
Let's say we are going to start iterating over the next iteration of `processEventQueue`
and without loss of generality let's say this batch takes 100 seconds to finish processing
and the graceful shutdown timeout is 10 seconds and shutdown is initiated in the midst of processing
this batch, this will have no effect and the thread will be shutdown after the batch completes (after
100 seconds) which is wrong because it doesn't respect the graceful shutdown timeout
TODO: figure out a way which solves both the problems
At the time of writing this PR, we decided to go with 1 because the worst thing
that will happen is that some events might get processed more than once but this
is a better solution than what we had earlier where we were shutting down all the in-flight
processing events without the graceful shutdown timeout.
-}
threadShutdownHandler
liftIO
$ STM.atomically
$ do
STM.readTVar threadStateTVar >>= STM.check . (== ThreadBlocking)
unLogger logger (ImmortalThreadStopping label)
liftIO $ Immortal.stop thread
)
where
logAndPause = \case
Right () -> pure ()
Left e -> liftIO $ do
liftIO $ unLogger logger (ImmortalThreadUnexpectedException label e)
-- pause before restarting some arbitrary amount of time. The idea is not to flood
-- logs or cause other cascading failures.
sleep (seconds 1)
data ImmortalThreadLog
= -- | Synchronous Exception
ImmortalThreadUnexpectedException String SomeException
| -- | Asynchronous Exception about to be sent
ImmortalThreadStopping String
| ImmortalThreadRestarted String
instance ToEngineLog ImmortalThreadLog Hasura where
toEngineLog (ImmortalThreadStopping label) =
(LevelInfo, ELTInternal ILTUnstructured, toJSON msg)
where
msg = "Stopping immortal " <> label <> " thread"
toEngineLog (ImmortalThreadUnexpectedException label e) =
(LevelError, ELTInternal ILTUnstructured, toJSON msg)
where
msg =
"Unexpected exception in immortal thread "
<> label
<> " (it will be restarted):\n"
<> show e
toEngineLog (ImmortalThreadRestarted label) =
(LevelInfo, ELTInternal ILTUnstructured, toJSON msg)
where
msg = "Thread " <> label <> " (re)started"
-- TODO
-- - maybe use this everywhere, but also:
-- - consider unifying with: src-lib/Control/Monad/Stateless.hs ?
-- - nice TypeError: https://kodimensional.dev/type-errors
--
-- | Like 'MonadIO' but constrained to stacks in which forking a new thread is reasonable/safe.
-- In particular 'StateT' causes problems.
--
-- This is the constraint you can use for functions that call 'LA.async', or 'immortal'.
type ForkableMonadIO m = (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m))
-- TODO consider deprecating async.
-- export something with polymorphic return type, which makes "fork and forget" difficult
-- this could automatically link in one variant
-- another variant might return ThreadId that self destructs w/ finalizer (mkWeakThreadId)
-- and note: "Holding a normal ThreadId reference will prevent the delivery of BlockedIndefinitely exceptions because the reference could be used as the target of throwTo at any time, "
-- | A somewhat wonky function for parallelizing @for xs f@ where @f@ is
-- @(MonadIO m, MonadError e m)@. This is equivalent to @for xs f@ modulo the
-- IO effects (i.e. when the IO has no real side effects we care about).
--
-- This also takes a @chunkSize@ argument so you can manipulate the amount of
-- work given to each thread.
forConcurrentlyEIO :: (MonadIO m, MonadError e m) => Int -> [a] -> (a -> ExceptT e IO b) -> m [b]
forConcurrentlyEIO chunkSize xs f = do
let fIO a = runExceptT (f a) >>= evaluate
xs' <- liftIO $ fmap concat $ A.forConcurrently (chunksOf chunkSize xs) $ traverse fIO
for xs' (either throwError pure)
concurrentlyEIO :: (MonadIO m, MonadError e m) => ExceptT e IO a -> ExceptT e IO b -> m (a, b)
concurrentlyEIO left right = do
(leftE, rightE) <- liftIO $ A.concurrently (runExceptT left >>= evaluate) (runExceptT right >>= evaluate)
x <- leftE `onLeft` throwError
y <- rightE `onLeft` throwError
pure (x, y)