Skip to content
Permalink
Browse files

implement 'forceResync' handler properly

Taking care of concurrent workers possibly acting on the database
at the same time as well as restarting the restoration worker at
the right point
  • Loading branch information
KtorZ committed Jan 14, 2020
1 parent 1a71072 commit 196b4dffe4a87dd48c1bbdae4ee3da1420d8a85c
@@ -62,6 +62,7 @@ library
, http-media
, http-types
, iohk-monitoring
, lifted-base
, memory
, monad-logger
, network
@@ -70,6 +70,7 @@ import Cardano.Wallet
, ErrWrongPassphrase (..)
, HasLogger
, genesisData
, logger
, networkLayer
)
import Cardano.Wallet.Api
@@ -130,7 +131,7 @@ import Cardano.Wallet.Api.Types
, getApiMnemonicT
)
import Cardano.Wallet.DB
( DBFactory )
( DBFactory (..) )
import Cardano.Wallet.Logging
( fromLogObject, transformTextTrace )
import Cardano.Wallet.Network
@@ -209,12 +210,14 @@ import Control.DeepSeq
( NFData )
import Control.Exception
( IOException, bracket, tryJust )
import Control.Exception.Lifted
( finally )
import Control.Monad
( forM, forM_, void )
import Control.Monad.IO.Class
( MonadIO, liftIO )
import Control.Monad.Trans.Except
( ExceptT, catchE, throwE, withExceptT )
( ExceptT (..), catchE, runExceptT, throwE, withExceptT )
import Control.Tracer
( Tracer, contramap )
import Data.Aeson
@@ -787,7 +790,7 @@ deleteWallet
-> Handler NoContent
deleteWallet ctx (ApiT wid) = do
liftHandler $ withWorkerCtx @_ @s @k ctx wid throwE $ \_ -> pure ()
liftIO $ (df ^. #removeDatabase) wid
liftIO $ removeDatabase df wid
liftIO $ Registry.remove re wid
return NoContent
where
@@ -890,9 +893,21 @@ forceResyncWallet
-> ApiT WalletId
-> Handler NoContent
forceResyncWallet ctx (ApiT wid) = do
liftHandler $ withWorkerCtx ctx wid throwE $ \wrk ->
W.rollbackBlocks wrk wid W.slotMinBound
liftHandler $ withWorkerCtx @_ @s @k ctx wid throwE $ \_ -> pure ()
flip finally (liftIO $ registerWorker ctx wid) $ do
liftIO $ Registry.remove re wid
liftHandler $ ExceptT safeRollback
pure NoContent
where
re = ctx ^. workerRegistry @s @k
tr = ctx ^. logger
df = ctx ^. dbFactory @s @k
-- NOTE Safe because it happens without any worker running.
safeRollback = do
let tr' = Registry.transformTrace wid tr
withDatabase df wid $ \db -> do
let wrk = hoistResource db (ctx & logger .~ tr')
runExceptT $ W.rollbackBlocks wrk wid W.slotMinBound

{-------------------------------------------------------------------------------
Coin Selections
@@ -1340,7 +1355,7 @@ initWorker ctx wid createWallet restoreWallet =
defaultWorkerAfter . transformTextTrace

, workerAcquire =
(df ^. #withDatabase) wid
withDatabase df wid
}
re = ctx ^. workerRegistry @s @k
df = ctx ^. dbFactory @s @k
@@ -1435,31 +1450,42 @@ newApiLayer tr g0 nw tl df wids = do
re <- Registry.empty
let tr' = contramap MsgFromWorker tr
let ctx = ApiLayer (fromLogObject tr') g0 nw tl df re
forM_ wids (registerWorker re ctx)
forM_ wids (registerWorker ctx)
return ctx

-- | Register a restoration worker to the registry.
registerWorker
:: forall ctx s t k.
( ctx ~ ApiLayer s t k
)
=> ApiLayer s t k
-> WalletId
-> IO ()
registerWorker ctx wid = do
newWorker @_ @_ @ctx ctx wid config >>= \case
Nothing ->
return ()
Just worker ->
Registry.insert re worker
where
registerWorker re ctx wid = do
let config = MkWorker
{ workerBefore =
\_ _ -> return ()

, workerMain = \ctx' _ -> do
-- FIXME:
-- Review error handling here
unsafeRunExceptT $
W.restoreWallet @(WorkerCtx ctx) @s @t ctx' wid

, workerAfter =
defaultWorkerAfter . transformTextTrace

, workerAcquire =
(df ^. #withDatabase) wid
}
newWorker @_ @_ @ctx ctx wid config >>= \case
Nothing ->
return ()
Just worker ->
Registry.insert re worker
re = ctx ^. workerRegistry
df = ctx ^. dbFactory
config = MkWorker
{ workerBefore =
\_ _ -> return ()

, workerMain = \ctx' _ -> do
-- FIXME:
-- Review error handling here
unsafeRunExceptT $
W.restoreWallet @(WorkerCtx ctx) @s @t ctx' wid

, workerAfter =
defaultWorkerAfter . transformTextTrace

, workerAcquire =
withDatabase df wid
}

-- | Run an action in a particular worker context. Fails if there's no worker
-- for a given id.
@@ -57,20 +57,18 @@ import Data.Quantity
( Quantity (..) )
import Data.Word
( Word32 )
import GHC.Generics
( Generic )

import qualified Data.List as L

-- | Instantiate database layers at will
data DBFactory m s k = DBFactory
{ withDatabase :: WalletId -> (DBLayer m s k -> IO ()) -> IO ()
{ withDatabase :: forall a. WalletId -> (DBLayer m s k -> IO a) -> IO a
-- ^ Creates a new or use an existing database, maintaining an open
-- connection so long as necessary

, removeDatabase :: WalletId -> IO ()
-- ^ Erase any trace of the database
} deriving (Generic)
}

-- | A Database interface for storing various things in a DB. In practice,
-- we'll need some extra contraints on the wallet state that allows us to
@@ -29,6 +29,7 @@ module Cardano.Wallet.Registry
-- * Logging
, WithWorkerKey (..)
, WorkerRegistryLog (..)
, transformTrace
) where

import Prelude hiding
@@ -224,8 +225,8 @@ newWorker ctx k (MkWorker before main after acquire) = do
, workerResource = resource
}
where
tr = ctx ^. logger
tr' = contramap (fmap (toText . WithWorkerKey k)) $ appendName "worker" tr
tr = ctx ^. logger
tr' = transformTrace k tr
cleanup mvar e = tryPutMVar mvar Nothing *> after tr' e

-- | A worker log event includes the key (i.e. wallet ID) as context.
@@ -240,6 +241,14 @@ instance ToText key => ToText (WithWorkerKey key) where
Logging
-------------------------------------------------------------------------------}

transformTrace
:: ToText key
=> key
-> Trace IO Text
-> Trace IO Text
transformTrace k tr =
contramap (fmap (toText . WithWorkerKey k)) $ appendName "worker" tr

data WorkerRegistryLog
= MsgFinished
| MsgThreadKilled

0 comments on commit 196b4df

Please sign in to comment.
You can’t perform that action at this time.