Skip to content

Commit

Permalink
refactor: move debounce from AppState to Logger
Browse files Browse the repository at this point in the history
Will allow to capture accurate timeout metrics.
  • Loading branch information
steve-chavez committed Apr 12, 2024
1 parent 2de32fc commit fbc4d56
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 34 deletions.
47 changes: 17 additions & 30 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,35 +82,33 @@ data AuthResult = AuthResult

data AppState = AppState
-- | Database connection pool
{ statePool :: SQL.Pool
{ statePool :: SQL.Pool
-- | Database server version, will be updated by the connectionWorker
, statePgVersion :: IORef PgVersion
, statePgVersion :: IORef PgVersion
-- | No schema cache at the start. Will be filled in by the connectionWorker
, stateSchemaCache :: IORef (Maybe SchemaCache)
, stateSchemaCache :: IORef (Maybe SchemaCache)
-- | If schema cache is loaded
, stateSchemaCacheLoaded :: IORef Bool
, stateSchemaCacheLoaded :: IORef Bool
-- | starts the connection worker with a debounce
, debouncedConnectionWorker :: IO ()
, debouncedConnectionWorker :: IO ()
-- | Binary semaphore used to sync the listener(NOTIFY reload) with the connectionWorker.
, stateListener :: MVar ()
, stateListener :: MVar ()
-- | State of the LISTEN channel, used for the admin server checks
, stateIsListenerOn :: IORef Bool
, stateIsListenerOn :: IORef Bool
-- | Config that can change at runtime
, stateConf :: IORef AppConfig
, stateConf :: IORef AppConfig
-- | Time used for verifying JWT expiration
, stateGetTime :: IO UTCTime
, stateGetTime :: IO UTCTime
-- | Used for killing the main thread in case a subthread fails
, stateMainThreadId :: ThreadId
, stateMainThreadId :: ThreadId
-- | Keeps track of when the next retry for connecting to database is scheduled
, stateRetryNextIn :: IORef Int
-- | Emits a pool error observation with a debounce
, debounceAcquisitionTimeoutObs :: IO ()
, stateRetryNextIn :: IORef Int
-- | JWT Cache
, jwtCache :: C.Cache ByteString AuthResult
, jwtCache :: C.Cache ByteString AuthResult
-- | Network socket for REST API
, stateSocketREST :: NS.Socket
, stateSocketREST :: NS.Socket
-- | Network socket for the admin UI
, stateSocketAdmin :: Maybe NS.Socket
, stateSocketAdmin :: Maybe NS.Socket
}

type AppSockets = (NS.Socket, Maybe NS.Socket)
Expand All @@ -123,7 +121,7 @@ init conf = do
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock }

initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> IO AppState
initWithPool (sock, adminSock) pool conf@AppConfig{configObserver=observer} = do
initWithPool (sock, adminSock) pool conf = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
Expand All @@ -135,20 +133,10 @@ initWithPool (sock, adminSock) pool conf@AppConfig{configObserver=observer} = do
<*> mkAutoUpdate defaultUpdateSettings { updateAction = getCurrentTime }
<*> myThreadId
<*> newIORef 0
<*> pure (pure ())
<*> C.newCache Nothing
<*> pure sock
<*> pure adminSock


debPoolTimeout <-
let oneSecond = 1000000 in
mkDebounce defaultDebounceSettings
{ debounceAction = observer $ PoolAcqTimeoutObs SQL.AcquisitionTimeoutUsageError
, debounceFreq = 5*oneSecond
, debounceEdge = leadingEdge -- logs at the start and the end
}

debWorker <-
let decisecond = 100000 in
mkDebounce defaultDebounceSettings
Expand All @@ -157,7 +145,7 @@ initWithPool (sock, adminSock) pool conf@AppConfig{configObserver=observer} = do
, debounceEdge = leadingEdge -- runs the worker at the start and the end
}

return appState { debounceAcquisitionTimeoutObs = debPoolTimeout, debouncedConnectionWorker = debWorker }
return appState { debouncedConnectionWorker = debWorker }

destroy :: AppState -> IO ()
destroy = destroyPool
Expand Down Expand Up @@ -211,8 +199,7 @@ usePool AppState{..} AppConfig{configLogLevel, configObserver=observer} sess = d

when (configLogLevel > LogCrit) $ do
whenLeft res (\case
-- TODO debouncing will not be correct if we want to have a metric for the amount of timeouts
SQL.AcquisitionTimeoutUsageError -> debounceAcquisitionTimeoutObs -- this can happen rapidly for many requests, so we debounce.
SQL.AcquisitionTimeoutUsageError -> observer $ PoolAcqTimeoutObs SQL.AcquisitionTimeoutUsageError
error
-- TODO We're using the 500 HTTP status for getting all internal db errors but there's no response here. We need a new intermediate type to not rely on the HTTP status.
| Error.status (Error.PgError False error) >= HTTP.status500 -> observer $ QueryErrorCodeHighObs error
Expand Down
31 changes: 27 additions & 4 deletions src/PostgREST/Logger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module PostgREST.Logger

import Control.AutoUpdate (defaultUpdateSettings, mkAutoUpdate,
updateAction)
import Control.Debounce

import Data.Time (ZonedTime, defaultTimeLocale, formatTime,
getZonedTime)
Expand All @@ -27,14 +28,31 @@ import qualified PostgREST.Auth as Auth

import Protolude

newtype LoggerState = LoggerState
{ stateGetZTime :: IO ZonedTime -- ^ Time with time zone used for logs
data LoggerState = LoggerState
{ stateGetZTime :: IO ZonedTime -- ^ Time with time zone used for logs
, stateLogDebouncePoolTimeout :: MVar (IO ()) -- ^ Logs with a debounce
}

init :: IO LoggerState
init = do
zTime <- mkAutoUpdate defaultUpdateSettings { updateAction = getZonedTime }
pure $ LoggerState zTime
LoggerState zTime <$> newEmptyMVar

logWithDebounce :: LoggerState -> IO () -> IO ()
logWithDebounce loggerState action = do
debouncer <- tryReadMVar $ stateLogDebouncePoolTimeout loggerState
case debouncer of
Just d -> d
Nothing -> do
newDebouncer <-
let oneSecond = 1000000 in
mkDebounce defaultDebounceSettings
{ debounceAction = action
, debounceFreq = 5*oneSecond
, debounceEdge = leadingEdge -- logs at the start and the end
}
putMVar (stateLogDebouncePoolTimeout loggerState) newDebouncer
newDebouncer

middleware :: LogLevel -> Wai.Middleware
middleware logLevel = case logLevel of
Expand All @@ -51,7 +69,12 @@ middleware logLevel = case logLevel of
}

observationLogger :: LoggerState -> ObservationHandler
observationLogger loggerState obs = logWithZTime loggerState $ observationMessage obs
observationLogger loggerState obs = case obs of
o@(PoolAcqTimeoutObs _) -> do
logWithDebounce loggerState $
logWithZTime loggerState $ observationMessage o
o ->
logWithZTime loggerState $ observationMessage o

logWithZTime :: LoggerState -> Text -> IO ()
logWithZTime loggerState txt = do
Expand Down
5 changes: 5 additions & 0 deletions test/spec/fixtures/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3759,3 +3759,8 @@ create aggregate test.outfunc_agg (anyelement) (
, stype = "pg/outfunc"
, sfunc = outfunc_trans
);

-- used for manual testing
create or replace function test.sleep(seconds double precision default 5) returns void as $$
select pg_sleep(seconds);
$$ language sql;

0 comments on commit fbc4d56

Please sign in to comment.