Skip to content

Commit

Permalink
use bracket-style resource acquisition for the db connection pool
Browse files Browse the repository at this point in the history
  This avoids the need for an extra 'TVar Bool' to guard the connection
  pool from threads whishing to acquire new resources. Instead, we can
  wrap the pool acquisition in a bracket: `bracket createPool destroyAllResources`
  so that the pool is cleaned up when done and we are sure that no
  thread will attempt to acquire a new resource while
  destroyAllResources is called.

  This sole change wasn't as straightforward as I wanted because it
  moves the control of the `SqliteContext` up in the stack and therefore
  requires reviewing many more parts of both the pool and wallet db
  layers. I think it's for a greater good in the end and make them both
  slightly better / robust. In the end, it is still a bit "awkward" that
  we have constructors / functions in those modules that are solely used
  by the test code and not by the actual application (this is the case
  of 'withDBLayer' for instance...).

  To not over-complicate things, I ended up handling the in-memory and
  in-file SqliteContext setup a bit differently. Incidentally I realized
  later that we run most of our unit-tests on the 'in-memory' version;
  which means that we aren't testing the resource pool in the context of
  the unit tests. I am not sure whether this is a good thing or not: it
  makes the unit tests a bit more focus on testing the actual business
  logic, and we still have the system-level integration tests to put the
  resource pool under great stress.
  • Loading branch information
KtorZ authored and Anviking committed Jan 25, 2021
1 parent 2f37250 commit 0656feb
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 329 deletions.
2 changes: 0 additions & 2 deletions lib/core/cardano-wallet-core.cabal
Expand Up @@ -95,7 +95,6 @@ library
, servant-server
, split
, statistics
, stm
, streaming-commons
, strict-non-empty-containers
, string-interpolate
Expand Down Expand Up @@ -408,7 +407,6 @@ benchmark db
, fmt
, iohk-monitoring
, memory
, persistent-sqlite
, random
, temporary
, text
Expand Down
210 changes: 99 additions & 111 deletions lib/core/src/Cardano/DB/Sqlite.hs
Expand Up @@ -24,12 +24,19 @@

module Cardano.DB.Sqlite
( SqliteContext (..)
, newSqliteContext
, newInMemorySqliteContext

-- * ConnectionPool
, ConnectionPool
, newConnectionPool
, destroyConnectionPool

-- * Helpers
, chunkSize
, dbChunked
, dbChunked'
, destroyDBLayer
, handleConstraint
, newSqliteContext
, unsafeRunQuery

-- * Manual Migration
Expand All @@ -54,10 +61,8 @@ import Cardano.DB.Sqlite.Delete
( DeleteSqliteDatabaseLog )
import Cardano.Wallet.Logging
( BracketLog, bracketTracer )
import Control.Concurrent.STM.TVar
( TVar, newTVarIO, readTVarIO, writeTVar )
import Control.Monad
( join, mapM_, when )
( join, mapM_, void, when )
import Control.Monad.IO.Unlift
( MonadUnliftIO (..) )
import Control.Monad.Logger
Expand All @@ -74,14 +79,10 @@ import Data.Aeson
( ToJSON (..) )
import Data.Function
( (&) )
import Data.Functor
( ($>), (<&>) )
import Data.List
( isInfixOf )
import Data.List.Split
( chunksOf )
import Data.Maybe
( fromMaybe )
import Data.Pool
( Pool, createPool, destroyAllResources, withResource )
import Data.Proxy
Expand Down Expand Up @@ -121,9 +122,10 @@ import System.Log.FastLogger
import UnliftIO.Compat
( handleIf, mkRetryHandler )
import UnliftIO.Exception
( Exception, bracket_, handleJust, mask_, throwIO, tryJust )
( Exception, bracket_, handleJust, mask_, tryJust )
import UnliftIO.MVar
( newMVar, withMVarMasked )

import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Char8 as B8
import qualified Data.Text as T
Expand All @@ -137,25 +139,13 @@ import qualified Database.Sqlite as Sqlite

-- | Context for the SQLite 'DBLayer'.
data SqliteContext = SqliteContext
{ connectionPool :: Pool (SqlBackend, Sqlite.Connection)
-- ^ A handle to the Persistent SQL backend.
, isDatabaseActive :: TVar Bool
-- ^ A mutable reference to know whether the database is 'active'. This is
-- useful to prevent new requests from being accepted when we're trying to
-- shutdown the database. It is actually crucial with the connection pool
-- since, even though we can purge the pool of all existing resources, we
-- can't easily prevent the creation of new resources. This TVar must
-- therefore be used to guard any call to 'withResource'; if 'False', then
-- 'withResource' mustn't be called.
, runQuery :: forall a. SqlPersistT IO a -> IO a
{ runQuery :: forall a. SqlPersistT IO a -> IO a
-- ^ 'safely' run a query with logging and lock-protection
, dbFile :: Maybe FilePath
-- ^ The actual database file, if any. If none, runs in-memory
}

data DatabaseIsShuttingDownError = DatabaseIsShuttingDownError deriving Show

instance Exception DatabaseIsShuttingDownError
type ConnectionPool = Pool (SqlBackend, Sqlite.Connection)

-- | Error type for when migrations go wrong after opening a database.
newtype MigrationError = MigrationError
Expand Down Expand Up @@ -193,57 +183,78 @@ handleConstraint e = handleJust select handler . fmap Right
select _ = Nothing
handler = const . pure . Left $ e

-- | Free all allocated database connections. See also 'destroySqliteBackend'
--
destroyDBLayer :: Tracer IO DBLog -> SqliteContext -> IO ()
destroyDBLayer tr SqliteContext{connectionPool,isDatabaseActive,dbFile} = do
STM.atomically $ writeTVar isDatabaseActive False
traceWith tr (MsgDestroyConnectionPool dbFile)
destroyAllResources connectionPool

{-------------------------------------------------------------------------------
Internal / Database Setup
-------------------------------------------------------------------------------}

-- | Opens the SQLite database connection pool, sets up query logging and timing,
-- runs schema migrations if necessary.
newInMemorySqliteContext
:: Tracer IO DBLog
-> [ManualMigration]
-> Migration
-> IO SqliteContext
newInMemorySqliteContext tr manualMigrations autoMigration = do
conn <- Sqlite.open connStr
mapM_ (`executeManualMigration` conn) manualMigrations
unsafeBackend <- wrapConnectionInfo info conn (queryLogFunc tr)
void $ runSqlConn (runMigrationQuiet autoMigration) unsafeBackend

let observe :: forall a. IO a -> IO a
observe = bracketTracer (contramap MsgRun tr)

-- We still use a lock with the in-memory database to protect it from
-- concurrent accesses and ensure database integrity in case where multiple
-- threads would be reading/writing from/to it.
lock <- newMVar unsafeBackend
let runQuery :: forall a. SqlPersistT IO a -> IO a
runQuery cmd = withMVarMasked lock (observe . runSqlConn cmd)

return $ SqliteContext { runQuery, dbFile }
where
dbFile = Nothing
connStr = sqliteConnStr dbFile
info = mkSqliteConnectionInfo connStr

-- | Sets up query logging and timing, runs schema migrations if necessary and
-- provide a safe 'SqliteContext' for interacting with the database.
newSqliteContext
:: [ManualMigration]
:: Tracer IO DBLog
-> ConnectionPool
-> [ManualMigration]
-> Migration
-> Tracer IO DBLog
-> Maybe FilePath
-> FilePath
-> IO (Either MigrationError SqliteContext)
newSqliteContext manualMigrations autoMigration tr dbFile = do
isDatabaseActive <- newTVarIO True
createSqlitePool tr dbFile manualMigrations autoMigration <&> \case
newSqliteContext tr pool manualMigrations autoMigration fp = do
migrationResult <- withResource pool $ \(backend, conn) -> do
let executeAutoMigration = runSqlConn (runMigrationQuiet autoMigration) backend
migrationResult <- withForeignKeysDisabled tr conn $ do
mapM_ (`executeManualMigration` conn) manualMigrations
executeAutoMigration
& tryJust (matchMigrationError @PersistException)
& tryJust (matchMigrationError @SqliteException)
& fmap join
traceWith tr $ MsgMigrations $ fmap length migrationResult
return migrationResult
return $ case migrationResult of
Left e -> Left e
Right connectionPool ->
Right{} ->
let observe :: IO a -> IO a
observe = bracketTracer (contramap MsgRun tr)

-- runSqlConn is guarded with a lock because it's not threadsafe in
-- general.It is also masked, so that the SqlBackend state is not
-- corrupted if a thread gets cancelled while running a query.
-- See: https://github.com/yesodweb/persistent/issues/981
--
-- Note that `withResource` does already mask async exception but
-- only for dealing with the pool resource acquisition. The action
-- is then ran unmasked with the acquired resource. If an
-- asynchronous exception occurs (or actually any exception), the
-- resource is NOT placed back in the pool.
-- runSqlConn is guarded with a lock because it's not threadsafe in
-- general.It is also masked, so that the SqlBackend state is not
-- corrupted if a thread gets cancelled while running a query.
-- See: https://github.com/yesodweb/persistent/issues/981
--
-- Note that `withResource` does already mask async exception but
-- only for dealing with the pool resource acquisition. The action
-- is then ran unmasked with the acquired resource. If an
-- asynchronous exception occurs (or actually any exception), the
-- resource is NOT placed back in the pool.
runQuery :: SqlPersistT IO a -> IO a
runQuery cmd = do
readTVarIO isDatabaseActive >>= \case
False -> throwIO DatabaseIsShuttingDownError
True -> withResource connectionPool $
mask_ . observe . retryOnBusy tr . runSqlConn cmd . fst

in Right $ SqliteContext
{ connectionPool
, isDatabaseActive
, runQuery
, dbFile
}
runQuery cmd = withResource pool $
mask_ . observe . retryOnBusy tr . runSqlConn cmd . fst

in Right $ SqliteContext { runQuery, dbFile = Just fp }

-- | Finalize database statements and close the database connection.
--
Expand All @@ -255,7 +266,7 @@ newSqliteContext manualMigrations autoMigration tr dbFile = do
destroySqliteBackend
:: Tracer IO DBLog
-> SqlBackend
-> Maybe FilePath
-> FilePath
-> IO ()
destroySqliteBackend tr sqlBackend dbFile = do
traceWith tr (MsgCloseSingleConnection dbFile)
Expand Down Expand Up @@ -406,56 +417,36 @@ instance MatchMigrationError SqliteException where
newtype ManualMigration = ManualMigration
{ executeManualMigration :: Sqlite.Connection -> IO () }

createSqlitePool
newConnectionPool
:: Tracer IO DBLog
-> Maybe FilePath
-> [ManualMigration]
-> Migration
-> IO (Either MigrationError (Pool (SqlBackend, Sqlite.Connection)))
createSqlitePool tr fp migrations autoMigration = do
let connStr = sqliteConnStr fp
-> FilePath
-> IO ConnectionPool
newConnectionPool tr fp = do
let connStr = sqliteConnStr (Just fp)
let info = mkSqliteConnectionInfo connStr
traceWith tr $ MsgConnStr connStr

let createConnection = do
traceWith tr $ MsgWillOpenDB (Just fp)

let acquireConnection = do
conn <- Sqlite.open connStr
(,conn) <$> wrapConnectionInfo info conn (queryLogFunc tr)

let destroyConnection = \(backend, _) -> do
let releaseConnection = \(backend, _) -> do
destroySqliteBackend tr backend fp

pool <- createPool
createConnection
destroyConnection
createPool
acquireConnection
releaseConnection
numberOfStripes
timeToLive
maximumConnections

-- Run migrations BEFORE making the pool widely accessible to other threads.
-- This works fine for the :memory: case because there's a single connection
-- in the pool, so the next 'withResource' will get exactly this
-- connection.
migrationResult <- withResource pool $ \(backend, conn) -> mask_ $ do
let executeAutoMigration = runSqlConn (runMigrationQuiet autoMigration) backend
migrationResult <- withForeignKeysDisabled tr conn $ do
mapM_ (`executeManualMigration` conn) migrations
executeAutoMigration
& tryJust (matchMigrationError @PersistException)
& tryJust (matchMigrationError @SqliteException)
& fmap join
traceWith tr $ MsgMigrations $ fmap length migrationResult
return migrationResult

case migrationResult of
Left e -> destroyAllResources pool $> Left e
Right{} -> return (Right pool)
where
numberOfStripes = 1
-- When running in :memory:, we want a single connection that does not get
-- cleaned up. Indeed, the pool will regularly remove connections, destroying
-- our :memory: database regularly otherwise.
maximumConnections = maybe 1 (const 10) fp
timeToLive = maybe 31536000 {- one year -} (const 600) {- 10 minutes -} fp :: NominalDiffTime
maximumConnections = 10
timeToLive = 600 {- 10 minutes -} :: NominalDiffTime

destroyConnectionPool :: Pool a -> IO ()
destroyConnectionPool = destroyAllResources

sqliteConnStr :: Maybe FilePath -> Text
sqliteConnStr = maybe ":memory:" T.pack
Expand All @@ -468,9 +459,8 @@ data DBLog
= MsgMigrations (Either MigrationError Int)
| MsgQuery Text Severity
| MsgRun BracketLog
| MsgConnStr Text
| MsgCloseSingleConnection (Maybe FilePath)
| MsgDestroyConnectionPool (Maybe FilePath)
| MsgCloseSingleConnection FilePath
| MsgDestroyConnectionPool FilePath
| MsgWillOpenDB (Maybe FilePath)
| MsgDatabaseReset
| MsgIsAlreadyClosed Text
Expand Down Expand Up @@ -545,7 +535,6 @@ instance HasSeverityAnnotation DBLog where
MsgMigrations (Left _) -> Error
MsgQuery _ sev -> sev
MsgRun _ -> Debug
MsgConnStr _ -> Notice
MsgCloseSingleConnection _ -> Info
MsgDestroyConnectionPool _ -> Notice
MsgWillOpenDB _ -> Info
Expand Down Expand Up @@ -576,14 +565,13 @@ instance ToText DBLog where
MsgQuery stmt _ -> stmt
MsgRun b -> "Running database action - " <> toText b
MsgWillOpenDB fp -> "Will open db at " <> (maybe "in-memory" T.pack fp)
MsgConnStr connStr -> "Using connection string: " <> connStr
MsgCloseSingleConnection fp ->
"Closing single database connection ("+|fromMaybe "in-memory" fp|+")"
MsgDestroyConnectionPool fp ->
"Destroy database connection pool ("+|fromMaybe "in-memory" fp|+")"
MsgDatabaseReset ->
"Non backward compatible database found. Removing old database \
\and re-creating it from scratch. Ignore the previous error."
MsgCloseSingleConnection fp ->
"Closing single database connection ("+|fp|+")"
MsgDestroyConnectionPool fp ->
"Destroy database connection pool ("+|fp|+")"
MsgIsAlreadyClosed msg ->
"Attempted to close an already closed connection: " <> msg
MsgStatementAlreadyFinalized msg ->
Expand Down

0 comments on commit 0656feb

Please sign in to comment.