Skip to content

Commit

Permalink
refactor: move checkIsFatal logic to usePool
Browse files Browse the repository at this point in the history
The fatal logic is now inside `usePool`. It centralizes the
logic which is better for Locality of Behavior.

Removes:

- The need to do checkIsFatal on other parts of the code
- SCFatalFail/ConnFatalFail states which are no longer needed.
  • Loading branch information
steve-chavez committed May 16, 2024
1 parent 9d763ae commit 33b6ba8
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 115 deletions.
2 changes: 1 addition & 1 deletion src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ run appState = do
let observer = AppState.getObserver appState
conf@AppConfig{..} <- AppState.getConfig appState

observer $ AppServerStartObs prettyVersion
observer $ AppStartObs prettyVersion

AppState.connectionWorker appState -- Loads the initial SchemaCache
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.connectionWorker appState) (AppState.reReadConfig False appState)
Expand Down
145 changes: 69 additions & 76 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,12 @@ data AppState = AppState
data SchemaCacheStatus
= SCLoaded
| SCPending
| SCFatalFail
deriving Eq

-- | Current database connection status
data ConnectionStatus
= ConnEstablished
| ConnPending
| ConnFatalFail Text
deriving Eq

type AppSockets = (NS.Socket, Maybe NS.Socket)
Expand Down Expand Up @@ -226,15 +224,57 @@ initPool AppConfig{..} observer =

-- | Run an action with a database connection.
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{stateObserver=observer,..} sess = do
usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} sess = do
observer PoolRequest

res <- SQL.use statePool sess

observer PoolRequestFullfilled

whenLeft res (\case
SQL.AcquisitionTimeoutUsageError -> observer $ PoolAcqTimeoutObs SQL.AcquisitionTimeoutUsageError
error
-- TODO We're using the 500 HTTP status for getting all internal db errors but there's no response here. We need a new intermediate type to not rely on the HTTP status.
| Error.status (Error.PgError False error) >= HTTP.status500 -> observer $ QueryErrorCodeHighObs error
| otherwise -> pure ())
SQL.AcquisitionTimeoutUsageError ->
observer $ PoolAcqTimeoutObs SQL.AcquisitionTimeoutUsageError
err@(SQL.ConnectionUsageError e) ->
let failureMessage = BS.unpack $ fromMaybe mempty e in
when (("FATAL: password authentication failed" `isInfixOf` failureMessage) || ("no password supplied" `isInfixOf` failureMessage)) $ do
observer $ ExitDBFatalError ServerAuthError err
killThread mainThreadId
err@(SQL.SessionUsageError (SQL.QueryError tpl _ (SQL.ResultError resultErr))) -> do
case resultErr of
SQL.UnexpectedResult{} -> do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
SQL.RowError{} -> do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
SQL.UnexpectedAmountOfRows{} -> do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
-- Check for a syntax error (42601 is the pg code) only for queries that don't have `WITH pgrst_source` as prefix.
-- This would mean the error is on our schema cache queries, so we treat it as fatal.
-- TODO have a better way to mark this as a schema cache query
SQL.ServerError "42601" _ _ _ _ ->
unless ("WITH pgrst_source" `BS.isPrefixOf` tpl) $ do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
-- Check for a "prepared statement <name> already exists" error (Code 42P05: duplicate_prepared_statement).
-- This would mean that a connection pooler in transaction mode is being used
-- while prepared statements are enabled in the PostgREST configuration,
-- both of which are incompatible with each other.
SQL.ServerError "42P05" _ _ _ _ -> do
observer $ ExitDBFatalError ServerError42P05 err
killThread mainThreadId
-- Check for a "transaction blocks not allowed in statement pooling mode" error (Code 08P01: protocol_violation).
-- This would mean that a connection pooler in statement mode is being used which is not supported in PostgREST.
SQL.ServerError "08P01" "transaction blocks not allowed in statement pooling mode" _ _ _ -> do
observer $ ExitDBFatalError ServerError08P01 err
killThread mainThreadId
SQL.ServerError{} ->
when (Error.status (Error.PgError False err) >= HTTP.status500) $
observer $ QueryErrorCodeHighObs err
SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ClientError _)) ->
pure ()
)

return res

Expand Down Expand Up @@ -340,15 +380,10 @@ loadSchemaCache appState@AppState{stateObserver=observer} = do
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
case result of
Left e -> do
case checkIsFatal e of
Just hint -> do
observer $ SchemaCacheFatalErrorObs e hint
return SCFatalFail
Nothing -> do
putSCacheStatus appState SCPending
putSchemaCache appState Nothing
observer $ SchemaCacheNormalErrorObs e
return SCPending
putSCacheStatus appState SCPending
putSchemaCache appState Nothing
observer $ SchemaCacheErrorObs e
return SCPending

Right sCache -> do
-- IMPORTANT: While the pending schema cache state starts from running the above querySchemaCache, only at this stage we block API requests due to the usage of an
Expand All @@ -373,24 +408,25 @@ loadSchemaCache appState@AppState{stateObserver=observer} = do
-- program.
-- 3. Obtains the sCache. If this fails, it goes back to 1.
internalConnectionWorker :: AppState -> IO ()
internalConnectionWorker appState@AppState{stateObserver=observer} = work
internalConnectionWorker appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} = work
where
work = do
AppConfig{..} <- getConfig appState
observer DBConnectAttemptObs
connStatus <- establishConnection appState
case connStatus of
ConnFatalFail reason ->
-- Fatal error when connecting
observer (ExitFatalObs reason) >> killThread (getMainThreadId appState)
ConnPending ->
unless configDbPoolAutomaticRecovery
$ observer ExitDBNoRecoveryObs >> killThread (getMainThreadId appState)
unless configDbPoolAutomaticRecovery $ do
observer ExitDBNoRecoveryObs
killThread mainThreadId
ConnEstablished -> do
actualPgVersion <- getPgVersion appState
when (actualPgVersion < minimumPgVersion) $ do
observer $ ExitUnsupportedPgVersion actualPgVersion minimumPgVersion
killThread mainThreadId
-- Procede with initialization
when configDbChannelEnabled $
signalListener appState
actualPgVersion <- getPgVersion appState
observer (DBConnectedObs $ pgvFullName actualPgVersion)
-- this could be fail because the connection drops, but the loadSchemaCache will pick the error and retry again
-- We cannot retry after it fails immediately, because db-pre-config could have user errors. We just log the error and continue.
Expand All @@ -403,9 +439,6 @@ internalConnectionWorker appState@AppState{stateObserver=observer} = work
SCPending ->
-- retry reloading the schema cache
work
SCFatalFail ->
-- die if our schema cache query has an error
killThread $ getMainThreadId appState

-- | Repeatedly flush the pool, and check if a connection from the
-- pool allows access to the PostgreSQL database.
Expand All @@ -432,21 +465,12 @@ establishConnection appState@AppState{stateObserver=observer} =
case pgVersion of
Left e -> do
observer $ ConnectionPgVersionErrorObs e
case checkIsFatal e of
Just reason ->
return $ ConnFatalFail reason
Nothing -> do
putConnStatus appState ConnPending
return ConnPending
Right version ->
if version < minimumPgVersion then
return . ConnFatalFail $
"Cannot run in this PostgreSQL version, PostgREST needs at least "
<> pgvName minimumPgVersion
else do
putConnStatus appState ConnEstablished
putPgVersion appState version
return ConnEstablished
putConnStatus appState ConnPending
return ConnPending
Right version -> do
putConnStatus appState ConnEstablished
putPgVersion appState version
return ConnEstablished

shouldRetry :: RetryStatus -> ConnectionStatus -> IO Bool
shouldRetry rs isConnSucc = do
Expand All @@ -468,13 +492,7 @@ reReadConfig startingUp appState@AppState{stateObserver=observer} = do
qDbSettings <- usePool appState (queryDbSettings (dumpQi <$> configDbPreConfig) configDbPreparedStatements)
case qDbSettings of
Left e -> do
observer ConfigReadErrorObs
case checkIsFatal e of
Just hint -> do
observer $ ConfigReadErrorFatalObs e hint
killThread (getMainThreadId appState)
Nothing -> do
observer $ ConfigReadErrorNotFatalObs e
observer $ ConfigReadErrorObs e
pure mempty
Right x -> pure x
else
Expand Down Expand Up @@ -510,7 +528,7 @@ runListener conf@AppConfig{configDbChannelEnabled} appState = do
-- NOTIFY <db-channel> - with an empty payload - is done, it refills the schema
-- cache. It uses the connectionWorker in case the LISTEN connection dies.
listener :: AppState -> AppConfig -> IO ()
listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
listener appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} conf@AppConfig{..} = do
let dbChannel = toS configDbChannel

-- The listener has to wait for a signal from the connectionWorker.
Expand All @@ -534,7 +552,7 @@ listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
where
handleFinally dbChannel False err = do
observer $ DBListenerFailRecoverObs False dbChannel err
killThread (getMainThreadId appState)
killThread mainThreadId
handleFinally dbChannel True err = do
-- if the thread dies, we try to recover
observer $ DBListenerFailRecoverObs True dbChannel err
Expand All @@ -554,28 +572,3 @@ listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
-- it's necessary to restart the pg connections because they cache the pg catalog(see #2620)
connectionWorker appState

checkIsFatal :: SQL.UsageError -> Maybe Text
checkIsFatal (SQL.ConnectionUsageError e)
| isAuthFailureMessage = Just $ toS failureMessage
| otherwise = Nothing
where isAuthFailureMessage =
("FATAL: password authentication failed" `isInfixOf` failureMessage) ||
("no password supplied" `isInfixOf` failureMessage)
failureMessage = BS.unpack $ fromMaybe mempty e
checkIsFatal(SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ResultError serverError)))
= case serverError of
-- Check for a syntax error (42601 is the pg code). This would mean the error is on our part somehow, so we treat it as fatal.
SQL.ServerError "42601" _ _ _ _
-> Just "This is probably a bug in PostgREST, please report it at https://github.com/PostgREST/postgrest/issues"
-- Check for a "prepared statement <name> already exists" error (Code 42P05: duplicate_prepared_statement).
-- This would mean that a connection pooler in transaction mode is being used
-- while prepared statements are enabled in the PostgREST configuration,
-- both of which are incompatible with each other.
SQL.ServerError "42P05" _ _ _ _
-> Just "If you are using connection poolers in transaction mode, try setting db-prepared-statements to false."
-- Check for a "transaction blocks not allowed in statement pooling mode" error (Code 08P01: protocol_violation).
-- This would mean that a connection pooler in statement mode is being used which is not supported in PostgREST.
SQL.ServerError "08P01" "transaction blocks not allowed in statement pooling mode" _ _ _
-> Just "Connection poolers in statement mode are not supported."
_ -> Nothing
checkIsFatal _ = Nothing
2 changes: 1 addition & 1 deletion src/PostgREST/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ observationMetrics (MetricsState poolTimeouts poolAvailable poolWaiting _ schema
SchemaCacheLoadedObs resTime -> do
withLabel schemaCacheLoads "SUCCESS" incCounter
setGauge schemaCacheQueryTime resTime
SchemaCacheNormalErrorObs _ -> do
SchemaCacheErrorObs _ -> do
withLabel schemaCacheLoads "FAIL" incCounter
_ ->
pure ()
Expand Down
64 changes: 34 additions & 30 deletions src/PostgREST/Observation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,36 @@ Description : Module for observability types
-}
module PostgREST.Observation
( Observation(..)
, ObsFatalError(..)
, observationMessage
, ObservationHandler
) where

import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Hasql.Connection as SQL
import qualified Hasql.Pool as SQL
import qualified Hasql.Pool.Observation as SQL
import qualified Network.Socket as NS
import Numeric (showFFloat)
import qualified PostgREST.Error as Error
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Hasql.Connection as SQL
import qualified Hasql.Pool as SQL
import qualified Hasql.Pool.Observation as SQL
import qualified Network.Socket as NS
import Numeric (showFFloat)
import PostgREST.Config.PgVersion
import qualified PostgREST.Error as Error

import Protolude
import Protolude.Partial (fromJust)

data Observation
= AdminStartObs (Maybe Int)
| AppServerStartObs ByteString
| AppStartObs ByteString
| AppServerPortObs NS.PortNumber
| AppServerUnixObs FilePath
| DBConnectAttemptObs
| ExitFatalObs Text
| ExitUnsupportedPgVersion PgVersion PgVersion
| ExitDBNoRecoveryObs
| ExitDBFatalError ObsFatalError SQL.UsageError
| DBConnectedObs Text
| SchemaCacheFatalErrorObs SQL.UsageError Text
| SchemaCacheNormalErrorObs SQL.UsageError
| SchemaCacheErrorObs SQL.UsageError
| SchemaCacheQueriedObs Double
| SchemaCacheSummaryObs Text
| SchemaCacheLoadedObs Double
Expand All @@ -43,9 +45,7 @@ data Observation
| DBListenerFailRecoverObs Bool Text (Either SomeException ())
| DBListenerGotSCacheMsg ByteString
| DBListenerGotConfigMsg ByteString
| ConfigReadErrorObs
| ConfigReadErrorFatalObs SQL.UsageError Text
| ConfigReadErrorNotFatalObs SQL.UsageError
| ConfigReadErrorObs SQL.UsageError
| ConfigInvalidObs Text
| ConfigSucceededObs
| QueryRoleSettingsErrorObs SQL.UsageError
Expand All @@ -55,29 +55,37 @@ data Observation
| PoolRequest
| PoolRequestFullfilled

data ObsFatalError = ServerAuthError | ServerPgrstBug | ServerError42P05 | ServerError08P01

type ObservationHandler = Observation -> IO ()

observationMessage :: Observation -> Text
observationMessage = \case
AdminStartObs port ->
"Admin server listening on port " <> show (fromIntegral (fromJust port) :: Integer)
AppServerStartObs ver ->
AppStartObs ver ->
"Starting PostgREST " <> T.decodeUtf8 ver <> "..."
AppServerPortObs port ->
"Listening on port " <> show port
AppServerUnixObs sock ->
"Listening on unix socket " <> show sock
DBConnectAttemptObs ->
"Attempting to connect to the database..."
ExitFatalObs reason ->
"Fatal error encountered. " <> reason
ExitDBNoRecoveryObs ->
"Automatic recovery disabled, exiting."
DBConnectedObs ver ->
"Successfully connected to " <> ver
SchemaCacheFatalErrorObs usageErr hint ->
"A fatal error ocurred when loading the schema cache. " <> hint <> ". " <> jsonMessage usageErr
SchemaCacheNormalErrorObs usageErr ->
ExitUnsupportedPgVersion pgVer minPgVer ->
"Cannot run in this PostgreSQL version (" <> pgvName pgVer <> "), PostgREST needs at least " <> pgvName minPgVer
ExitDBNoRecoveryObs ->
"Automatic recovery disabled, exiting."
ExitDBFatalError ServerAuthError usageErr ->
jsonMessage usageErr
ExitDBFatalError ServerPgrstBug usageErr ->
"This is probably a bug in PostgREST, please report it at https://github.com/PostgREST/postgrest/issues. " <> jsonMessage usageErr
ExitDBFatalError ServerError42P05 usageErr ->
"If you are using connection poolers in transaction mode, try setting db-prepared-statements to false. " <> jsonMessage usageErr
ExitDBFatalError ServerError08P01 usageErr ->
"Connection poolers in statement mode are not supported." <> jsonMessage usageErr
SchemaCacheErrorObs usageErr ->
"An error ocurred when loading the schema cache. " <> jsonMessage usageErr
SchemaCacheQueriedObs resultTime ->
"Schema cache queried in " <> showMillis resultTime <> " milliseconds"
Expand All @@ -99,12 +107,8 @@ observationMessage = \case
"Received a schema cache reload message on the " <> show channel <> " channel"
DBListenerGotConfigMsg channel ->
"Received a config reload message on the " <> show channel <> " channel"
ConfigReadErrorObs ->
"An error ocurred when trying to query database settings for the config parameters"
ConfigReadErrorFatalObs usageErr hint ->
hint <> ". " <> jsonMessage usageErr
ConfigReadErrorNotFatalObs usageErr ->
jsonMessage usageErr
ConfigReadErrorObs usageErr ->
"An error ocurred when trying to query database settings for the config parameters." <> jsonMessage usageErr
QueryRoleSettingsErrorObs usageErr ->
"An error ocurred when trying to query the role settings. " <> jsonMessage usageErr
QueryErrorCodeHighObs usageErr ->
Expand Down
7 changes: 0 additions & 7 deletions src/PostgREST/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import PostgREST.Config (AppConfig (..),
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.MediaType (MediaType (..))
import PostgREST.Observation (Observation (..))
import PostgREST.Plan (ActionPlan (..),
CallReadPlan (..),
CrudPlan (..),
Expand Down Expand Up @@ -78,16 +77,10 @@ data QueryResult
runQuery :: AppState.AppState -> AppConfig -> AuthResult -> ApiRequest -> ActionPlan -> SchemaCache -> PgVersion -> Bool -> ExceptT Error IO QueryResult
runQuery _ _ _ _ (NoDb x) _ _ _ = pure $ NoDbResult x
runQuery appState config AuthResult{..} apiReq (Db plan) sCache pgVer authenticated = do
let observer = AppState.getObserver appState

lift $ observer PoolRequest

dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState (transaction isoLvl txMode $ runExceptT dbHandler)

lift $ observer PoolRequestFullfilled

resp <-
liftEither . mapLeft Error.PgErr $
mapLeft (Error.PgError authenticated) dbResp
Expand Down

0 comments on commit 33b6ba8

Please sign in to comment.