Skip to content

Commit

Permalink
Handle node disconnecions
Browse files Browse the repository at this point in the history
1. Ensure follow catches asyncronous exceptions from connectClient,
such that it can restart with a new connection and cursor.

2. Keep Local State Queries and to-be-submitted Txs queued until their
requests finish, not just when they start. If a query is interrupted by
the node being disconnected, it will block until a connection is
re-established, and then retry.
  • Loading branch information
Anviking authored and HeinrichApfelmus committed Oct 18, 2021
1 parent 54326ed commit a24a981
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
31 changes: 19 additions & 12 deletions lib/core/src/Cardano/Wallet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ import UnliftIO.Async
import UnliftIO.Concurrent
( threadDelay )
import UnliftIO.Exception
( SomeException, bracket, handle )
( SomeException, bracket, handleSyncOrAsync )

import qualified Cardano.Api.Shelley as Node
import qualified Data.List.NonEmpty as NE
Expand Down Expand Up @@ -364,26 +364,33 @@ follow'
-- ^ Getter on the abstract 'block' type
-> IO FollowExit
follow' nl tr cps yield header =
bracket (initCursor nl cps) (destroyCursor nl) (sleep 0 False)
bracket
(initCursor nl cps)
(destroyCursor nl)
(handleExceptions . sleep 0 False)
where
innerTr = contramap MsgFollowLog tr
delay0 :: Int
delay0 = 500*1000 -- 500ms

handleExceptions :: IO FollowExit -> IO FollowExit
handleExceptions =
-- Node disconnections are seen as async exceptions from here. By
-- catching them, `follow` will try to establish a new connection
-- depending on the `FollowExceptionRecovery`.
handleSyncOrAsync (traceException *> const (pure FollowFailure))
where
traceException :: SomeException -> IO ()
traceException e = do
traceWith tr $ MsgUnhandledException $ T.pack $ show e

-- | Wait a short delay before querying for blocks again. We also take this
-- opportunity to refresh the chain tip as it has probably increased in
-- order to refine our syncing status.
sleep :: Int -> Bool -> Cursor -> IO FollowExit
sleep delay hasRolledForward cursor = handle exitOnAnyException $ do
when (delay > 0) (threadDelay delay)
step hasRolledForward cursor
where
-- Any unhandled synchronous exception should be logged and cause the
-- chain follower to exit.
exitOnAnyException :: SomeException -> IO FollowExit
exitOnAnyException e = do
traceWith tr $ MsgUnhandledException $ T.pack $ show e
pure FollowFailure
sleep delay hasRolledForward cursor = do
when (delay > 0) (threadDelay delay)
step hasRolledForward cursor

step :: Bool -> Cursor -> IO FollowExit
step hasRolledForward cursor = nextBlocks nl cursor >>= \case
Expand Down
29 changes: 24 additions & 5 deletions lib/core/src/Ouroboros/Network/Client/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import Control.Monad.Class.MonadSTM
, atomically
, isEmptyTQueue
, newEmptyTMVarIO
, peekTQueue
, putTMVar
, readTQueue
, takeTMVar
Expand Down Expand Up @@ -505,7 +506,7 @@ localStateQuery queue =
:: LocalStateQueryCmd block m
-> m (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
clientStAcquired (SomeLSQ cmd respond) = pure $ go cmd $ \res -> do
LSQ.SendMsgRelease (respond res >> clientStIdle)
LSQ.SendMsgRelease (respond res >> finalizeCmd >> clientStIdle)
-- We /could/ read all LocalStateQueryCmds from the TQueue, and run
-- them against the same tip, if re-acquiring takes a long time. As
-- of Jan 2021, it seems like queries themselves take significantly
Expand All @@ -527,8 +528,21 @@ localStateQuery queue =
go (LSQBind ma f) cont = go ma $ \a -> do
go (f a) $ \b -> cont b

-- | Note that we for LSQ and TxSubmission use peekTQueue when starting the
-- request, and only remove the command from the queue after we have
-- responded.
--
-- If the connection to the node drops, this makes cancelled commands
-- automatically retry on reconnection.
--
-- IMPORTANT: callers must also `finalizeCmd`, because of the above.
awaitNextCmd :: m (LocalStateQueryCmd block m)
awaitNextCmd = atomically $ readTQueue queue
awaitNextCmd = atomically $ peekTQueue queue

finalizeCmd :: m ()
finalizeCmd = atomically $ tryReadTQueue queue >>= \case
Just _ -> return ()
Nothing -> error "finalizeCmd: queue is not empty"

-- | Monad for composing local state queries for the node /tip/.
--
Expand Down Expand Up @@ -591,10 +605,15 @@ localTxSubmission
localTxSubmission queue = LocalTxSubmissionClient clientStIdle
where
clientStIdle
:: m (LocalTxClientStIdle tx err m ())
clientStIdle = atomically (readTQueue queue) <&> \case
:: m (LocalTxClientStIdle tx err m Void)
clientStIdle = atomically (peekTQueue queue) <&> \case
CmdSubmitTx tx respond ->
SendMsgSubmitTx tx (\e -> respond e >> clientStIdle)
SendMsgSubmitTx tx $ \res -> do
respond res
-- Same note about peekTQueue from `localStateQuery` applies
-- here.
_processedCmd <- atomically (readTQueue queue)
clientStIdle

--------------------------------------------------------------------------------
--
Expand Down

0 comments on commit a24a981

Please sign in to comment.