Skip to content

Commit

Permalink
Fix CPU usage and other concurrency problems.
Browse files Browse the repository at this point in the history
This is a hard commit to break up into smaller chunks.  It basically
rewrites the concurrency portion of Server.hs, which ends up touching a
few other places.  Here's the main parts:

(1) Get rid of the separate cfgWorkQ channel that holds composes waiting
to be started.  Instead, I've introduced a new ComposeMsgAsk type that
the API server can use to request a compose be started, just like it
uses other message types to ask other things.

(2) This means the slot in AskTuple used for the compose server to write
an answer back now becomes a Maybe.  If a compose is requested, there is
no answer.  The client requests the status later on.

(3) Use the concurrency functions in the async module.  These are a
little nicer to use and there's all sorts of more advanced functions we
may be able to make use of later, if needed.

(4) composeServer does two tasks - listen and respond to messages, and
start composes.  I was previously doing this pretty stupidly.  In a
single loop, I would deal with any messages and then look for any
composes to start.  This is now done in two separate, eternally running
threads.

(5) There is actually still a worklist that does the same function as
cfgWorkQ used to do, but it is no longer exposed.  It's only used by
composeServer and its two helper threads.

(6) Stop using an IORef to manage the worklist.  Polling on this value
in a loop was keeping the CPU pegged at 100% at all times.  Instead, use
a list in a TVar and use some custom code to treat it like a queue.  It
shouldn't ever get large, so this should be fine.  Note the very
important comment above the retry call.
  • Loading branch information
clumens committed Apr 23, 2018
1 parent 8f9be9f commit 52315df
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 57 deletions.
1 change: 1 addition & 0 deletions bdcs-api.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ library
-- other-modules:
-- other-extensions:
build-depends: aeson >= 1.0.0.0 && < 1.4.0.0,
async,
base >= 4.9 && < 5.0,
bdcs >= 0.4.0,
bifunctors,
Expand Down
1 change: 1 addition & 0 deletions src/BDCS/API/Compose.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ instance FromJSON UuidStatus where

data ComposeMsgAsk = AskBuildsWaiting
| AskBuildsInProgress
| AskCompose ComposeInfo

data ComposeMsgResp = RespBuildsWaiting [T.Text]
| RespBuildsInProgress [T.Text]
Expand Down
6 changes: 2 additions & 4 deletions src/BDCS/API/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ module BDCS.API.Config(AskTuple,
where

import BDCS.API.Utils(GitLock(..))
import BDCS.API.Compose(ComposeInfo, ComposeMsgAsk, ComposeMsgResp)
import BDCS.API.Compose(ComposeMsgAsk, ComposeMsgResp)
import Control.Concurrent.STM.TChan(TChan)
import Control.Concurrent.STM.TMVar(TMVar)
import Data.IORef(IORef)
import Database.Persist.Sql(ConnectionPool)

type AskTuple = (ComposeMsgAsk, TMVar ComposeMsgResp)
type AskTuple = (ComposeMsgAsk, Maybe (TMVar ComposeMsgResp))

data ServerConfig = ServerConfig
{ cfgRepoLock :: GitLock -- ^ Lock required for accessing recipe repo
, cfgChan :: TChan AskTuple -- ^ Channel for the API server to ask things of
-- the compose server. The tuple is the message
-- that needs a response and a location for where
-- the response should be written to.
, cfgWorkQ :: IORef [ComposeInfo] -- ^ Worklist of composes
, cfgPool :: ConnectionPool -- ^ SQL connection pool for accessing MDDB
, cfgBdcs :: FilePath -- ^ Location of the content store
, cfgResultsDir :: FilePath -- ^ Base location for writing results
Expand Down
120 changes: 71 additions & 49 deletions src/BDCS/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ module BDCS.API.Server(mkApp,
where

import BDCS.API.Compose(ComposeInfo(..), ComposeMsgAsk(..), ComposeMsgResp(..), compose)
import BDCS.API.Config(AskTuple, ServerConfig(..))
import BDCS.API.Config(ServerConfig(..))
import BDCS.API.Recipes(openOrCreateRepo, commitRecipeDirectory)
import BDCS.API.Utils(GitLock(..))
import BDCS.API.V0(V0API, v0ApiServer)
import BDCS.API.Version(apiVersion)
import BDCS.DB(schemaVersion, getDbVersion)
import Control.Concurrent(ThreadId, forkFinally, forkIO)
import Control.Concurrent.Async(Async, async, concurrently_, waitCatch)
import qualified Control.Concurrent.ReadWriteLock as RWL
import Control.Concurrent.STM.TChan(newTChan, tryReadTChan)
import Control.Concurrent.STM.TChan(newTChan, readTChan)
import Control.Concurrent.STM.TVar(TVar, modifyTVar, newTVar, readTVar, writeTVar)
import Control.Concurrent.STM.TMVar(putTMVar)
import Control.Monad(forever, void, when)
import Control.Monad.Except(runExceptT)
Expand All @@ -51,10 +52,12 @@ import Control.Monad.STM(atomically)
import Data.Aeson
import Data.Int(Int64)
import Data.IORef(IORef, atomicModifyIORef', newIORef, readIORef)
import Data.List(uncons)
import qualified Data.Map as Map
import Data.String.Conversions(cs)
import qualified Data.Text as T
import Database.Persist.Sqlite
import GHC.Conc(retry)
import qualified GI.Ggit as Git
import Network.Wai
import Network.Wai.Handler.Warp
Expand All @@ -63,7 +66,7 @@ import Servant
import System.Directory(createDirectoryIfMissing)
import System.FilePath.Posix((</>))

type InProgressMap = Map.Map T.Text (ThreadId, ComposeInfo)
type InProgressMap = Map.Map T.Text (Async (), ComposeInfo)

-- | The status of the server, the database, and the API.
data ServerStatus = ServerStatus
Expand Down Expand Up @@ -149,12 +152,10 @@ mkApp bdcsPath gitRepoPath sqliteDbPath = do
void $ commitRecipeDirectory repo "master" gitRepoPath
lock <- RWL.new

q <- newIORef []
chan <- atomically newTChan

let cfg = ServerConfig { cfgRepoLock = GitLock lock repo,
cfgChan = chan,
cfgWorkQ = q,
cfgPool = pool,
cfgBdcs = bdcsPath,
cfgResultsDir = "/var/lib/composer" }
Expand All @@ -165,7 +166,7 @@ mkApp bdcsPath gitRepoPath sqliteDbPath = do
-- which means the client immediately gets a response with a build ID.
-- The compose (which could take a while) proceeds independently. The
-- client uses a different route to check and fetch the results.
void $ forkIO $ composeServer cfg
void $ async $ composeServer cfg

return $ app cfg

Expand All @@ -181,55 +182,76 @@ composeServer ServerConfig{..} = do
-- is currently running.
inProgressRef <- newIORef Map.empty

-- We run in a loop forever doing two tasks:
-- A list of all composes currently waiting to be run. It would be easier if we
-- could use a TChan to represent it, but there's no good way to grab the entire
-- contents of one - it's a FIFO. Thus we need to use some sort of list.
worklist <- atomically $ newTVar []

-- From here, we run two separate threads forever.
--
-- (1) Reading questions out of the shared one-way communications channel and
-- sending responses back to the API server.
-- One thread reads messages out of the channel and responds to them. This includes
-- things like "what is waiting in the queue?" and "what is currently composing?".
-- It also includes requests to start new composes.
--
-- (2) Firing off composes for requests in the queue.
forever $ do
atomically (tryReadTChan cfgChan) >>= respondToMessage inProgressRef

-- For now, we only support running one compose at a time. If the mutable
-- variable is not empty, we are already running a compose. Don't try to start
-- another one. This leaves the work queue alone so we can grab the next item
-- later.
inProgress <- readIORef inProgressRef
when (Map.null inProgress) $
nextInQ cfgWorkQ >>= \case
-- Start another compose. When the thread finishes (either because the
-- compose is done or because it failed), clear out the mutable variable.
Just ci -> do threadId <- forkFinally (runFileLoggingT (ciResultsDir ci </> "compose.log") $ compose cfgBdcs cfgPool ci)
(\_ -> removeCompose inProgressRef (ciId ci))

addCompose inProgressRef ci threadId

_ -> return ()
-- The other thread runs composes. It does one at a time - reading the first item
-- out of a worklist, starting the compose, and waiting for it to finish. When one
-- compose is finished, it can look at the list to see about starting the next one.
concurrently_ (messagesThread inProgressRef worklist)
(composesThread inProgressRef worklist)
where
-- Pop the next element off the head of the queue and return it.
nextInQ :: IORef [a] -> IO (Maybe a)
nextInQ q = atomicModifyIORef' q (\case (hd:tl) -> (tl, Just hd)
_ -> ([], Nothing))

-- Add a newly started compose to the in progress map.
addCompose :: IORef InProgressMap -> ComposeInfo -> ThreadId -> IO ()
addCompose ref ci@ComposeInfo{..} threadId =
void $ atomicModifyIORef' ref (\m -> (Map.insert ciId (threadId, ci) m, ()))
addCompose :: IORef InProgressMap -> ComposeInfo -> Async () -> IO ()
addCompose ref ci@ComposeInfo{..} thread =
void $ atomicModifyIORef' ref (\m -> (Map.insert ciId (thread, ci) m, ()))

-- Remove a completed (or killed?) compose from the in progress map.
removeCompose :: IORef InProgressMap -> T.Text -> IO ()
removeCompose ref uuid =
void $ atomicModifyIORef' ref (\m -> (Map.delete uuid m, ()))

respondToMessage :: IORef InProgressMap -> Maybe AskTuple -> IO ()
respondToMessage _ (Just (AskBuildsWaiting, r)) = do
q <- readIORef cfgWorkQ
atomically $ putTMVar r (RespBuildsWaiting $ map ciId q)

respondToMessage ref (Just (AskBuildsInProgress, r)) = do
-- Get just the ComposeInfo records for all the in-progress composes.
inProgress <- map snd . Map.elems <$> readIORef ref
-- And then extract the UUIDs of each, and that's the answer.
atomically $ putTMVar r (RespBuildsInProgress $ map ciId inProgress)

respondToMessage _ _ = return ()
composesThread :: IORef InProgressMap -> TVar [ComposeInfo] -> IO ()
composesThread inProgressRef worklist = forever $ do
-- For now, we only support running one compose at a time. If the mutable
-- variable is not empty, we are already running a compose. Don't try to
-- start another one. This leaves the work queue alone so we can grab the next
-- item later.
inProgress <- readIORef inProgressRef
when (Map.null inProgress) $ do
-- Start another compose and wait for it to be done. When the thread
-- finishes (either because the compose is done or because it failed),
-- clear out the mutable variable. Here, we don't actually care about
-- whether it failed or finished - that's all handled elsewhere.
ci <- atomically $ do
lst <- readTVar worklist
case uncons lst of
-- This call to retry is very important. Without it, any polling
-- on data structures like a TVar or an IORef keeps the CPU pegged
-- at 100% the entire time.
Nothing -> retry
Just (x, xs) -> writeTVar worklist xs >> return x

thread <- async $ runFileLoggingT (ciResultsDir ci </> "compose.log")
(compose cfgBdcs cfgPool ci)

addCompose inProgressRef ci thread
void $ waitCatch thread
removeCompose inProgressRef (ciId ci)

messagesThread :: IORef InProgressMap -> TVar [ComposeInfo] -> IO ()
messagesThread inProgressRef worklist = forever $ atomically (readTChan cfgChan) >>= \case
(AskBuildsWaiting, Just r) -> do
lst <- atomically $ readTVar worklist
atomically $ putTMVar r (RespBuildsWaiting $ map ciId lst)

(AskBuildsInProgress, Just r) -> do
-- Get just the ComposeInfo records for all the in-progress composes.
inProgress <- map snd . Map.elems <$> readIORef inProgressRef
-- And then extract the UUIDs of each, and that's the answer.
atomically $ putTMVar r (RespBuildsInProgress $ map ciId inProgress)

(AskCompose ci, _) ->
-- Add the new compose to the end of the work queue. It will eventually
-- get around to being run by composesThread.
atomically $ modifyTVar worklist (++ [ci])

_ -> return ()
7 changes: 3 additions & 4 deletions src/BDCS/API/V0.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ import Data.Bifunctor(bimap)
import qualified Data.ByteString.Lazy as LBS
import Data.Either(partitionEithers, rights)
import Data.Int(Int64)
import Data.IORef(atomicModifyIORef')
import Data.List(find, sortBy)
import Data.List.Extra(nubOrd)
import Data.Maybe(fromMaybe, mapMaybe)
Expand Down Expand Up @@ -1781,7 +1780,7 @@ compose cfg@ServerConfig{..} ComposeBody{..} test = case exportTypeFromText cbTy
ciCustom=customActions,
ciType=ty }

void $ atomicModifyIORef' cfgWorkQ (\ref -> (ref ++ [ci], ()))
liftIO $ atomically $ writeTChan cfgChan (AskCompose ci, Nothing)
return $ ComposeResponse True (T.pack $ show buildId)
where
-- | Construct an error message for unsupported output selected
Expand Down Expand Up @@ -1901,7 +1900,7 @@ composeQueue ServerConfig{..} = do
-- will be written. This prevents needing to write a communications
-- protocol. Making it initially empty is very important.
r <- liftIO $ atomically newEmptyTMVar
liftIO $ atomically $ writeTChan cfgChan (AskBuildsWaiting, r)
liftIO $ atomically $ writeTChan cfgChan (AskBuildsWaiting, Just r)

-- Wait for the response to show up in the TMVar we created. This blocks,
-- but the server doesn't do much in its main thread so it shouldn't block
Expand All @@ -1912,7 +1911,7 @@ composeQueue ServerConfig{..} = do

-- And then we do the same thing for builds currently running.
r' <- liftIO $ atomically newEmptyTMVar
liftIO $ atomically $ writeTChan cfgChan (AskBuildsInProgress, r')
liftIO $ atomically $ writeTChan cfgChan (AskBuildsInProgress, Just r')
buildsRunning <- liftIO (atomically $ readTMVar r') >>= \case
RespBuildsInProgress lst -> return lst
_ -> return []
Expand Down

0 comments on commit 52315df

Please sign in to comment.