diff --git a/bdcs-api.cabal b/bdcs-api.cabal index 6595d5f..bdac999 100644 --- a/bdcs-api.cabal +++ b/bdcs-api.cabal @@ -35,6 +35,7 @@ library -- other-modules: -- other-extensions: build-depends: aeson >= 1.0.0.0 && < 1.4.0.0, + async, base >= 4.9 && < 5.0, bdcs >= 0.4.0, bifunctors, diff --git a/src/BDCS/API/Compose.hs b/src/BDCS/API/Compose.hs index d1908c5..9a3eea8 100644 --- a/src/BDCS/API/Compose.hs +++ b/src/BDCS/API/Compose.hs @@ -124,6 +124,7 @@ instance FromJSON UuidStatus where data ComposeMsgAsk = AskBuildsWaiting | AskBuildsInProgress + | AskCompose ComposeInfo data ComposeMsgResp = RespBuildsWaiting [T.Text] | RespBuildsInProgress [T.Text] diff --git a/src/BDCS/API/Config.hs b/src/BDCS/API/Config.hs index dfdc7b7..6ff04c7 100644 --- a/src/BDCS/API/Config.hs +++ b/src/BDCS/API/Config.hs @@ -22,13 +22,12 @@ module BDCS.API.Config(AskTuple, where import BDCS.API.Utils(GitLock(..)) -import BDCS.API.Compose(ComposeInfo, ComposeMsgAsk, ComposeMsgResp) +import BDCS.API.Compose(ComposeMsgAsk, ComposeMsgResp) import Control.Concurrent.STM.TChan(TChan) import Control.Concurrent.STM.TMVar(TMVar) -import Data.IORef(IORef) import Database.Persist.Sql(ConnectionPool) -type AskTuple = (ComposeMsgAsk, TMVar ComposeMsgResp) +type AskTuple = (ComposeMsgAsk, Maybe (TMVar ComposeMsgResp)) data ServerConfig = ServerConfig { cfgRepoLock :: GitLock -- ^ Lock required for accessing recipe repo @@ -36,7 +35,6 @@ data ServerConfig = ServerConfig -- the compose server. The tuple is the message -- that needs a response and a location for where -- the response should be written to. - , cfgWorkQ :: IORef [ComposeInfo] -- ^ Worklist of composes , cfgPool :: ConnectionPool -- ^ SQL connection pool for accessing MDDB , cfgBdcs :: FilePath -- ^ Location of the content store , cfgResultsDir :: FilePath -- ^ Base location for writing results diff --git a/src/BDCS/API/Server.hs b/src/BDCS/API/Server.hs index 1bf8e03..f63013b 100644 --- a/src/BDCS/API/Server.hs +++ b/src/BDCS/API/Server.hs @@ -34,15 +34,16 @@ module BDCS.API.Server(mkApp, where import BDCS.API.Compose(ComposeInfo(..), ComposeMsgAsk(..), ComposeMsgResp(..), compose) -import BDCS.API.Config(AskTuple, ServerConfig(..)) +import BDCS.API.Config(ServerConfig(..)) import BDCS.API.Recipes(openOrCreateRepo, commitRecipeDirectory) import BDCS.API.Utils(GitLock(..)) import BDCS.API.V0(V0API, v0ApiServer) import BDCS.API.Version(apiVersion) import BDCS.DB(schemaVersion, getDbVersion) -import Control.Concurrent(ThreadId, forkFinally, forkIO) +import Control.Concurrent.Async(Async, async, concurrently_, waitCatch) import qualified Control.Concurrent.ReadWriteLock as RWL -import Control.Concurrent.STM.TChan(newTChan, tryReadTChan) +import Control.Concurrent.STM.TChan(newTChan, readTChan) +import Control.Concurrent.STM.TVar(TVar, modifyTVar, newTVar, readTVar, writeTVar) import Control.Concurrent.STM.TMVar(putTMVar) import Control.Monad(forever, void, when) import Control.Monad.Except(runExceptT) @@ -51,10 +52,12 @@ import Control.Monad.STM(atomically) import Data.Aeson import Data.Int(Int64) import Data.IORef(IORef, atomicModifyIORef', newIORef, readIORef) +import Data.List(uncons) import qualified Data.Map as Map import Data.String.Conversions(cs) import qualified Data.Text as T import Database.Persist.Sqlite +import GHC.Conc(retry) import qualified GI.Ggit as Git import Network.Wai import Network.Wai.Handler.Warp @@ -63,7 +66,7 @@ import Servant import System.Directory(createDirectoryIfMissing) import System.FilePath.Posix(()) -type InProgressMap = Map.Map T.Text (ThreadId, ComposeInfo) +type InProgressMap = Map.Map T.Text (Async (), ComposeInfo) -- | The status of the server, the database, and the API. data ServerStatus = ServerStatus @@ -149,12 +152,10 @@ mkApp bdcsPath gitRepoPath sqliteDbPath = do void $ commitRecipeDirectory repo "master" gitRepoPath lock <- RWL.new - q <- newIORef [] chan <- atomically newTChan let cfg = ServerConfig { cfgRepoLock = GitLock lock repo, cfgChan = chan, - cfgWorkQ = q, cfgPool = pool, cfgBdcs = bdcsPath, cfgResultsDir = "/var/lib/composer" } @@ -165,7 +166,7 @@ mkApp bdcsPath gitRepoPath sqliteDbPath = do -- which means the client immediately gets a response with a build ID. -- The compose (which could take a while) proceeds independently. The -- client uses a different route to check and fetch the results. - void $ forkIO $ composeServer cfg + void $ async $ composeServer cfg return $ app cfg @@ -181,55 +182,76 @@ composeServer ServerConfig{..} = do -- is currently running. inProgressRef <- newIORef Map.empty - -- We run in a loop forever doing two tasks: + -- A list of all composes currently waiting to be run. It would be easier if we + -- could use a TChan to represent it, but there's no good way to grab the entire + -- contents of one - it's a FIFO. Thus we need to use some sort of list. + worklist <- atomically $ newTVar [] + + -- From here, we run two separate threads forever. -- - -- (1) Reading questions out of the shared one-way communications channel and - -- sending responses back to the API server. + -- One thread reads messages out of the channel and responds to them. This includes + -- things like "what is waiting in the queue?" and "what is currently composing?". + -- It also includes requests to start new composes. -- - -- (2) Firing off composes for requests in the queue. - forever $ do - atomically (tryReadTChan cfgChan) >>= respondToMessage inProgressRef - - -- For now, we only support running one compose at a time. If the mutable - -- variable is not empty, we are already running a compose. Don't try to start - -- another one. This leaves the work queue alone so we can grab the next item - -- later. - inProgress <- readIORef inProgressRef - when (Map.null inProgress) $ - nextInQ cfgWorkQ >>= \case - -- Start another compose. When the thread finishes (either because the - -- compose is done or because it failed), clear out the mutable variable. - Just ci -> do threadId <- forkFinally (runFileLoggingT (ciResultsDir ci "compose.log") $ compose cfgBdcs cfgPool ci) - (\_ -> removeCompose inProgressRef (ciId ci)) - - addCompose inProgressRef ci threadId - - _ -> return () + -- The other thread runs composes. It does one at a time - reading the first item + -- out of a worklist, starting the compose, and waiting for it to finish. When one + -- compose is finished, it can look at the list to see about starting the next one. + concurrently_ (messagesThread inProgressRef worklist) + (composesThread inProgressRef worklist) where - -- Pop the next element off the head of the queue and return it. - nextInQ :: IORef [a] -> IO (Maybe a) - nextInQ q = atomicModifyIORef' q (\case (hd:tl) -> (tl, Just hd) - _ -> ([], Nothing)) - -- Add a newly started compose to the in progress map. - addCompose :: IORef InProgressMap -> ComposeInfo -> ThreadId -> IO () - addCompose ref ci@ComposeInfo{..} threadId = - void $ atomicModifyIORef' ref (\m -> (Map.insert ciId (threadId, ci) m, ())) + addCompose :: IORef InProgressMap -> ComposeInfo -> Async () -> IO () + addCompose ref ci@ComposeInfo{..} thread = + void $ atomicModifyIORef' ref (\m -> (Map.insert ciId (thread, ci) m, ())) -- Remove a completed (or killed?) compose from the in progress map. removeCompose :: IORef InProgressMap -> T.Text -> IO () removeCompose ref uuid = void $ atomicModifyIORef' ref (\m -> (Map.delete uuid m, ())) - respondToMessage :: IORef InProgressMap -> Maybe AskTuple -> IO () - respondToMessage _ (Just (AskBuildsWaiting, r)) = do - q <- readIORef cfgWorkQ - atomically $ putTMVar r (RespBuildsWaiting $ map ciId q) - - respondToMessage ref (Just (AskBuildsInProgress, r)) = do - -- Get just the ComposeInfo records for all the in-progress composes. - inProgress <- map snd . Map.elems <$> readIORef ref - -- And then extract the UUIDs of each, and that's the answer. - atomically $ putTMVar r (RespBuildsInProgress $ map ciId inProgress) - - respondToMessage _ _ = return () + composesThread :: IORef InProgressMap -> TVar [ComposeInfo] -> IO () + composesThread inProgressRef worklist = forever $ do + -- For now, we only support running one compose at a time. If the mutable + -- variable is not empty, we are already running a compose. Don't try to + -- start another one. This leaves the work queue alone so we can grab the next + -- item later. + inProgress <- readIORef inProgressRef + when (Map.null inProgress) $ do + -- Start another compose and wait for it to be done. When the thread + -- finishes (either because the compose is done or because it failed), + -- clear out the mutable variable. Here, we don't actually care about + -- whether it failed or finished - that's all handled elsewhere. + ci <- atomically $ do + lst <- readTVar worklist + case uncons lst of + -- This call to retry is very important. Without it, any polling + -- on data structures like a TVar or an IORef keeps the CPU pegged + -- at 100% the entire time. + Nothing -> retry + Just (x, xs) -> writeTVar worklist xs >> return x + + thread <- async $ runFileLoggingT (ciResultsDir ci "compose.log") + (compose cfgBdcs cfgPool ci) + + addCompose inProgressRef ci thread + void $ waitCatch thread + removeCompose inProgressRef (ciId ci) + + messagesThread :: IORef InProgressMap -> TVar [ComposeInfo] -> IO () + messagesThread inProgressRef worklist = forever $ atomically (readTChan cfgChan) >>= \case + (AskBuildsWaiting, Just r) -> do + lst <- atomically $ readTVar worklist + atomically $ putTMVar r (RespBuildsWaiting $ map ciId lst) + + (AskBuildsInProgress, Just r) -> do + -- Get just the ComposeInfo records for all the in-progress composes. + inProgress <- map snd . Map.elems <$> readIORef inProgressRef + -- And then extract the UUIDs of each, and that's the answer. + atomically $ putTMVar r (RespBuildsInProgress $ map ciId inProgress) + + (AskCompose ci, _) -> + -- Add the new compose to the end of the work queue. It will eventually + -- get around to being run by composesThread. + atomically $ modifyTVar worklist (++ [ci]) + + _ -> return () diff --git a/src/BDCS/API/V0.hs b/src/BDCS/API/V0.hs index bda7c60..fafaeaa 100644 --- a/src/BDCS/API/V0.hs +++ b/src/BDCS/API/V0.hs @@ -93,7 +93,6 @@ import Data.Bifunctor(bimap) import qualified Data.ByteString.Lazy as LBS import Data.Either(partitionEithers, rights) import Data.Int(Int64) -import Data.IORef(atomicModifyIORef') import Data.List(find, sortBy) import Data.List.Extra(nubOrd) import Data.Maybe(fromMaybe, mapMaybe) @@ -1781,7 +1780,7 @@ compose cfg@ServerConfig{..} ComposeBody{..} test = case exportTypeFromText cbTy ciCustom=customActions, ciType=ty } - void $ atomicModifyIORef' cfgWorkQ (\ref -> (ref ++ [ci], ())) + liftIO $ atomically $ writeTChan cfgChan (AskCompose ci, Nothing) return $ ComposeResponse True (T.pack $ show buildId) where -- | Construct an error message for unsupported output selected @@ -1901,7 +1900,7 @@ composeQueue ServerConfig{..} = do -- will be written. This prevents needing to write a communications -- protocol. Making it initially empty is very important. r <- liftIO $ atomically newEmptyTMVar - liftIO $ atomically $ writeTChan cfgChan (AskBuildsWaiting, r) + liftIO $ atomically $ writeTChan cfgChan (AskBuildsWaiting, Just r) -- Wait for the response to show up in the TMVar we created. This blocks, -- but the server doesn't do much in its main thread so it shouldn't block @@ -1912,7 +1911,7 @@ composeQueue ServerConfig{..} = do -- And then we do the same thing for builds currently running. r' <- liftIO $ atomically newEmptyTMVar - liftIO $ atomically $ writeTChan cfgChan (AskBuildsInProgress, r') + liftIO $ atomically $ writeTChan cfgChan (AskBuildsInProgress, Just r') buildsRunning <- liftIO (atomically $ readTMVar r') >>= \case RespBuildsInProgress lst -> return lst _ -> return []