Skip to content

Commit

Permalink
use a single-striped connection pool for each database layer
Browse files Browse the repository at this point in the history
  It is a rather common practice to use a pool of database connection
  when dealing with databases. So far, we've been using a single shared
  connection per wallet worker with, in front of each connection a lock
  preventing concurrent access to the database. The lock is only
  necessary because of the way persistent handles query statements
  internally, in principle, SQLite handles concurrent database accesses
  just well.

  For basic wallets, this is a relatively useless change. But for larger
  wallets like those manipulated by exchanges, we've observed very slow
  response time due to concurrent access of the database lock. Indeed,
  some requests may grab the lock for 10 or 20 seconds, preventing any
  requests from going throug. However, most requests are read-only
  requests and could be executed in parallel, at the discretion of
  the SQLite engine. I hope that the introduction of a connection pool
  will improve the overall experience for large wallets by better
  serving concurrent requests on the database. Finger crossed.
  • Loading branch information
KtorZ authored and rvl committed Feb 27, 2021
1 parent 1b42a42 commit 20c9cf6
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 75 deletions.
1 change: 1 addition & 0 deletions lib/core/cardano-wallet-core.cabal
Expand Up @@ -86,6 +86,7 @@ library
, quiet
, random
, random-shuffle
, resource-pool
, retry
, safe
, scientific
Expand Down
163 changes: 101 additions & 62 deletions lib/core/src/Cardano/DB/Sqlite.hs
Expand Up @@ -73,12 +73,16 @@ import Data.List.Split
( chunksOf )
import Data.Maybe
( fromMaybe )
import Data.Pool
( Pool, createPool, destroyAllResources, withResource )
import Data.Proxy
( Proxy (..) )
import Data.Text
( Text )
import Data.Text.Class
( ToText (..) )
import Data.Time.Clock
( NominalDiffTime )
import Database.Persist.Sql
( DBName (..)
, EntityField
Expand Down Expand Up @@ -108,9 +112,7 @@ import System.Log.FastLogger
import UnliftIO.Compat
( handleIf, mkRetryHandler )
import UnliftIO.Exception
( Exception, bracket_, handleJust, tryJust )
import UnliftIO.MVar
( newMVar, withMVarMasked )
( Exception, bracket_, handleJust, mask_, tryJust )

import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Char8 as B8
Expand All @@ -125,14 +127,12 @@ import qualified Database.Sqlite as Sqlite

-- | Context for the SQLite 'DBLayer'.
data SqliteContext = SqliteContext
{ getSqlBackend :: SqlBackend
{ connectionPool :: Pool (SqlBackend, Sqlite.Connection)
-- ^ A handle to the Persistent SQL backend.
, runQuery :: forall a. SqlPersistT IO a -> IO a
-- ^ 'safely' run a query with logging and lock-protection
, dbFile :: Maybe FilePath
-- ^ The actual database file, if any. If none, runs in-memory
, trace :: Tracer IO DBLog
-- ^ A 'Tracer' for logging
}

-- | Error type for when migrations go wrong after opening a database.
Expand Down Expand Up @@ -167,44 +167,16 @@ queryLogFunc tr _loc _source level str = traceWith tr (MsgQuery msg sev)
handleConstraint :: MonadUnliftIO m => e -> m a -> m (Either e a)
handleConstraint e = handleJust select handler . fmap Right
where
select (SqliteException ErrorConstraint _ _) = Just ()
select _ = Nothing
handler = const . pure . Left $ e
select (SqliteException ErrorConstraint _ _) = Just ()
select _ = Nothing
handler = const . pure . Left $ e

-- | Finalize database statements and close the database connection.
--
-- If the database connection is still in use, it will retry for up to a minute,
-- to let other threads finish up.
-- | Free all allocated database connections. See also 'destroySqliteBackend'
--
-- This function is idempotent: if the database connection has already been
-- closed, calling this function will exit without doing anything.
--
destroyDBLayer :: SqliteContext -> IO ()
destroyDBLayer (SqliteContext {getSqlBackend, trace, dbFile}) = do
traceWith trace (MsgClosing dbFile)
recovering pol (mkRetryHandler isBusy) (const $ close' getSqlBackend)
& handleIf isAlreadyClosed
(traceWith trace . MsgIsAlreadyClosed . showT)
& handleIf statementAlreadyFinalized
(traceWith trace . MsgStatementAlreadyFinalized . showT)
where
isAlreadyClosed = \case
-- Thrown when an attempt is made to close a connection that is already
-- in the closed state:
Sqlite.SqliteException Sqlite.ErrorMisuse _ _ -> True
Sqlite.SqliteException {} -> False

statementAlreadyFinalized = \case
-- Thrown
Persist.StatementAlreadyFinalized{} -> True
Persist.Couldn'tGetSQLConnection{} -> False

showT :: Show a => a -> Text
showT = T.pack . show

isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy)
pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms)
ms = 1000 -- microseconds in a millisecond
destroyDBLayer :: Tracer IO DBLog -> SqliteContext -> IO ()
destroyDBLayer tr SqliteContext{connectionPool,dbFile} = do
traceWith tr (MsgDestroyConnectionPool dbFile)
destroyAllResources connectionPool

{-------------------------------------------------------------------------------
Internal / Database Setup
Expand All @@ -219,32 +191,75 @@ startSqliteBackend
-> Maybe FilePath
-> IO (Either MigrationError SqliteContext)
startSqliteBackend manualMigration autoMigration tr fp = do
(unsafeBackend, connection) <-
createSqliteBackend tr fp manualMigration (queryLogFunc tr)
lock <- newMVar unsafeBackend
pool <- createSqlitePool tr fp manualMigration (queryLogFunc tr)
let observe :: IO a -> IO a
observe = bracketTracer (contramap MsgRun tr)
-- runSqlConn is guarded with a lock because it's not threadsafe in general.
-- It is also masked, so that the SqlBackend state is not corrupted if a
-- thread gets cancelled while running a query.
-- See: https://github.com/yesodweb/persistent/issues/981
--
-- Note that `withResource` does already mask async exception but only for
-- dealing with the pool resource acquisition. The action is then ran
-- unmasked with the acquired resource. If an asynchronous exception occurs,
-- the resource is NOT placed back in the pool.
let runQuery :: SqlPersistT IO a -> IO a
runQuery cmd = withMVarMasked lock $ \backend ->
observe $ runSqlConn cmd backend
autoMigrationResult <-
runQuery cmd = withResource pool $ \(backend, _) ->
observe $ mask_ $ runSqlConn cmd backend

autoMigrationResult <- withResource pool $ \(backend, connection) -> do
withForeignKeysDisabled tr connection
$ runQuery (runMigrationQuiet autoMigration)
$ mask_ (runSqlConn (runMigrationQuiet autoMigration) backend)
& tryJust (matchMigrationError @PersistException)
& tryJust (matchMigrationError @SqliteException)
& fmap join
traceWith tr $ MsgMigrations $ fmap length autoMigrationResult
let ctx = SqliteContext unsafeBackend runQuery fp tr
let ctx = SqliteContext pool runQuery fp
case autoMigrationResult of
Left e -> do
destroyDBLayer ctx
destroyDBLayer tr ctx
pure $ Left e
Right _ -> pure $ Right ctx

-- | Finalize database statements and close the database connection.
--
-- If the database connection is still in use, it will retry for up to a minute,
-- to let other threads finish up.
--
-- This function is idempotent: if the database connection has already been
-- closed, calling this function will exit without doing anything.
destroySqliteBackend
:: Tracer IO DBLog
-> SqlBackend
-> Maybe FilePath
-> IO ()
destroySqliteBackend tr sqlBackend dbFile = do
traceWith tr (MsgCloseSingleConnection dbFile)
recovering pol (mkRetryHandler isBusy) (const $ close' sqlBackend)
& handleIf isAlreadyClosed
(traceWith tr . MsgIsAlreadyClosed . showT)
& handleIf statementAlreadyFinalized
(traceWith tr . MsgStatementAlreadyFinalized . showT)
where
isAlreadyClosed = \case
-- Thrown when an attempt is made to close a connection that is already
-- in the closed state:
Sqlite.SqliteException Sqlite.ErrorMisuse _ _ -> True
Sqlite.SqliteException {} -> False

statementAlreadyFinalized = \case
-- Thrown
Persist.StatementAlreadyFinalized{} -> True
Persist.Couldn'tGetSQLConnection{} -> False

showT :: Show a => a -> Text
showT = T.pack . show

isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy)
pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms)
ms = 1000 -- microseconds in a millisecond


-- | Run the given task in a context where foreign key constraints are
-- /temporarily disabled/, before re-enabling them.
--
Expand Down Expand Up @@ -345,19 +360,38 @@ instance MatchMigrationError SqliteException where
newtype ManualMigration = ManualMigration
{ executeManualMigration :: Sqlite.Connection -> IO () }

createSqliteBackend
createSqlitePool
:: Tracer IO DBLog
-> Maybe FilePath
-> ManualMigration
-> LogFunc
-> IO (SqlBackend, Sqlite.Connection)
createSqliteBackend trace fp migration logFunc = do
-> IO (Pool (SqlBackend, Sqlite.Connection))
createSqlitePool tr fp migration logFunc = do
let connStr = sqliteConnStr fp
traceWith trace $ MsgConnStr connStr
conn <- Sqlite.open connStr
executeManualMigration migration conn
backend <- wrapConnectionInfo (mkSqliteConnectionInfo connStr) conn logFunc
pure (backend, conn)
traceWith tr $ MsgConnStr connStr

let createConnection = do
let info = mkSqliteConnectionInfo connStr
conn <- Sqlite.open connStr
executeManualMigration migration conn
backend <- wrapConnectionInfo info conn logFunc
pure (backend, conn)

let destroyConnection = \(backend, _) -> do
destroySqliteBackend tr backend fp

createPool
createConnection
destroyConnection
numberOfStripes
timeToLive
maximumConnections
where
numberOfStripes = 1
timeToLive = 600 :: NominalDiffTime
-- When running in :memory:, we want a single connection that does not get
-- cleaned up.
maximumConnections = maybe 1 (const 10) fp

sqliteConnStr :: Maybe FilePath -> Text
sqliteConnStr = maybe ":memory:" T.pack
Expand All @@ -371,7 +405,8 @@ data DBLog
| MsgQuery Text Severity
| MsgRun BracketLog
| MsgConnStr Text
| MsgClosing (Maybe FilePath)
| MsgCloseSingleConnection (Maybe FilePath)
| MsgDestroyConnectionPool (Maybe FilePath)
| MsgWillOpenDB (Maybe FilePath)
| MsgDatabaseReset
| MsgIsAlreadyClosed Text
Expand Down Expand Up @@ -446,7 +481,8 @@ instance HasSeverityAnnotation DBLog where
MsgQuery _ sev -> sev
MsgRun _ -> Debug
MsgConnStr _ -> Debug
MsgClosing _ -> Debug
MsgCloseSingleConnection _ -> Debug
MsgDestroyConnectionPool _ -> Debug
MsgWillOpenDB _ -> Info
MsgDatabaseReset -> Notice
MsgIsAlreadyClosed _ -> Warning
Expand All @@ -473,7 +509,10 @@ instance ToText DBLog where
MsgRun b -> "Running database action - " <> toText b
MsgWillOpenDB fp -> "Will open db at " <> (maybe "in-memory" T.pack fp)
MsgConnStr connStr -> "Using connection string: " <> connStr
MsgClosing fp -> "Closing database ("+|fromMaybe "in-memory" fp|+")"
MsgCloseSingleConnection fp ->
"Closing single database connection ("+|fromMaybe "in-memory" fp|+")"
MsgDestroyConnectionPool fp ->
"Destroy database connection pool ("+|fromMaybe "in-memory" fp|+")"
MsgDatabaseReset ->
"Non backward compatible database found. Removing old database \
\and re-creating it from scratch. Ignore the previous error."
Expand Down
8 changes: 4 additions & 4 deletions lib/core/src/Cardano/Pool/DB/Sqlite.hs
Expand Up @@ -201,12 +201,12 @@ withDecoratedDBLayer
-> (DBLayer IO -> IO a)
-- ^ Action to run.
-> IO a
withDecoratedDBLayer dbDecorator trace fp ti action = do
traceWith trace (MsgGeneric $ MsgWillOpenDB fp)
withDecoratedDBLayer dbDecorator tr fp ti action = do
traceWith tr (MsgGeneric $ MsgWillOpenDB fp)
bracket before after (action . decorateDBLayer dbDecorator . snd)
where
before = newDBLayer trace fp ti
after = destroyDBLayer . fst
before = newDBLayer tr fp ti
after = destroyDBLayer (contramap MsgGeneric tr) . fst

-- | Sets up a connection to the SQLite database.
--
Expand Down
6 changes: 3 additions & 3 deletions lib/core/src/Cardano/Wallet/DB/Sqlite.hs
Expand Up @@ -255,11 +255,11 @@ withDBLayer
-> ((SqliteContext, DBLayer IO s k) -> IO a)
-- ^ Action to run.
-> IO a
withDBLayer trace defaultFieldValues mDatabaseDir ti =
withDBLayer tr defaultFieldValues mDatabaseDir ti =
bracket before after
where
before = newDBLayer trace defaultFieldValues mDatabaseDir ti
after = destroyDBLayer . fst
before = newDBLayer tr defaultFieldValues mDatabaseDir ti
after = destroyDBLayer tr . fst

-- | Instantiate a 'DBFactory' from a given directory
newDBFactory
Expand Down
6 changes: 3 additions & 3 deletions lib/core/test/bench/db/Main.hs
Expand Up @@ -49,7 +49,7 @@ import Cardano.BM.Data.Severity
import Cardano.BM.Data.Trace
( Trace )
import Cardano.BM.Data.Tracer
( Tracer, filterSeverity )
( Tracer, filterSeverity, nullTracer )
import Cardano.BM.Setup
( setupTrace_, shutdown )
import Cardano.DB.Sqlite
Expand Down Expand Up @@ -691,7 +691,7 @@ defaultFieldValues = DefaultFieldValues

cleanupDB :: (FilePath, SqliteContext, DBLayer IO s k) -> IO ()
cleanupDB (db, ctx, _) = do
handle (\SqliteException{} -> pure ()) $ destroyDBLayer ctx
handle (\SqliteException{} -> pure ()) $ destroyDBLayer nullTracer ctx
mapM_ remove [db, db <> "-shm", db <> "-wal"]
where
remove f = doesFileExist f >>= \case
Expand Down Expand Up @@ -778,7 +778,7 @@ benchDiskSize :: Tracer IO DBLog -> (DBLayerBench -> IO ()) -> IO ()
benchDiskSize tr action = bracket (setupDB tr) cleanupDB $ \(f, ctx, db) -> do
action db
mapM_ (printFileSize "") [f, f <> "-shm", f <> "-wal"]
destroyDBLayer ctx
destroyDBLayer nullTracer ctx
printFileSize " (closed)" f
putStrLn ""
where
Expand Down
1 change: 0 additions & 1 deletion lib/core/test/unit/Cardano/Wallet/DB/SqliteSpec.hs
Expand Up @@ -1006,7 +1006,6 @@ testOpeningCleaning filepath call expectedAfterOpen expectedAfterClean = do
withDBLayer' (Just filepath) $ \db2 -> do
call db2 `shouldReturn` expectedAfterClean


-- | Run a test action inside withDBLayer, then check assertions.
withTestDBFile
:: (DBLayer IO (SeqState 'Mainnet ShelleyKey) ShelleyKey -> IO ())
Expand Down
5 changes: 3 additions & 2 deletions lib/shelley/bench/Restore.hs
Expand Up @@ -689,8 +689,8 @@ withBenchDBLayer
-> IO a
withBenchDBLayer ti tr action =
withSystemTempFile "bench.db" $ \dbFile _ -> do
let before = newDBLayer (trMessageText tr) migrationDefaultValues (Just dbFile) ti
let after = destroyDBLayer . fst
let before = newDBLayer tr' migrationDefaultValues (Just dbFile) ti
let after = destroyDBLayer tr' . fst
bracket before after $ \(_ctx, db) -> action db
where
migrationDefaultValues = Sqlite.DefaultFieldValues
Expand All @@ -700,6 +700,7 @@ withBenchDBLayer ti tr action =
, Sqlite.defaultHardforkEpoch = Nothing
, Sqlite.defaultKeyDeposit = Coin 0
}
tr' = trMessageText tr

prepareNode
:: forall n. (NetworkDiscriminantVal n)
Expand Down

0 comments on commit 20c9cf6

Please sign in to comment.