Permalink
Browse files

unix-process-conduit

  • Loading branch information...
1 parent 841e2ab commit 309ee985b0e128d8dae759b75df045f8e579c62b @snoyberg committed Oct 4, 2012
Showing with 88 additions and 59 deletions.
  1. +57 −42 Keter/Logger.hs
  2. +4 −0 Keter/Prelude.hs
  3. +25 −17 Keter/Process.hs
  4. +2 −0 keter.cabal
View
@@ -6,27 +6,56 @@ module Keter.Logger
, start
, attach
, detach
- , Handles (..)
+ , LogPipes (..)
+ , LogPipe
+ , mkLogPipe
, dummy
) where
import Keter.Prelude
-import System.IO (Handle, hClose)
import qualified Prelude as P
import qualified Keter.LogFile as LogFile
import Control.Concurrent (killThread)
import qualified Data.ByteString as S
-import Control.Exception (fromException, AsyncException (ThreadKilled))
+import Data.Conduit (Sink, await)
+import qualified Control.Concurrent.MVar as M
+import Control.Monad.Trans.Class (lift)
-data Handles = Handles
- { stdIn :: Maybe Handle
- , stdOut :: Maybe Handle
- , stdErr :: Maybe Handle
+data LogPipes = LogPipes
+ { stdOut :: LogPipe
+ , stdErr :: LogPipe
}
+data LogPipe = LogPipe
+ { readLogPipe :: KIO (Maybe S.ByteString)
+ , closeLogPipe :: KIO ()
+ }
+
+mkLogPipe :: KIO (LogPipe, Sink S.ByteString P.IO ())
+mkLogPipe = do
+ toSink <- newEmptyMVar
+ fromSink <- newEmptyMVar
+ let pipe = LogPipe
+ { readLogPipe = do
+ putMVar toSink True
+ takeMVar fromSink
+ , closeLogPipe = do
+ _ <- tryTakeMVar toSink
+ putMVar toSink False
+ }
+ sink = do
+ toCont <- lift $ M.takeMVar toSink
+ if toCont
+ then do
+ mbs <- await
+ lift $ M.putMVar fromSink mbs
+ maybe (return ()) (P.const sink) mbs
+ else return ()
+ return (pipe, sink)
+
newtype Logger = Logger (Command -> KIO ())
-data Command = Attach Handles | Detach
+data Command = Attach LogPipes | Detach
start :: LogFile.LogFile -- ^ stdout
-> LogFile.LogFile -- ^ stderr
@@ -50,52 +79,38 @@ start lfout lferr = do
Detach -> do
LogFile.close lfout
LogFile.close lferr
- Attach (Handles min mout merr) -> do
+ Attach (LogPipes out err) -> do
LogFile.addChunk lfout "\n\nAttaching new process\n\n"
LogFile.addChunk lferr "\n\nAttaching new process\n\n"
- hmClose min
- let go mhandle lf =
- case mhandle of
- Nothing -> return Nothing
- Just handle -> do
- etid <- forkKIO' $ listener handle lf
- case etid of
- Left e -> do
- $logEx e
- hmClose mhandle
- return Nothing
- Right tid -> return $ Just tid
- newout <- go mout lfout
- newerr <- go merr lferr
+ let go logpipe lf = do
+ etid <- forkKIO' $ listener logpipe lf
+ case etid of
+ Left e -> do
+ $logEx e
+ closeLogPipe logpipe
+ return Nothing
+ Right tid -> return $ Just tid
+ newout <- go out lfout
+ newerr <- go err lferr
loop chan newout newerr
-hmClose :: Maybe Handle -> KIO ()
-hmClose Nothing = return ()
-hmClose (Just h) = liftIO (hClose h) >>= either $logEx return
-
-listener :: Handle -> LogFile.LogFile -> KIO ()
+listener :: LogPipe -> LogFile.LogFile -> KIO ()
listener out lf =
loop
where
loop = do
- ebs <- liftIO $ S.hGetSome out 4096
- case ebs of
- Left e -> do
- case fromException e of
- Just ThreadKilled -> return ()
- _ -> $logEx e
- hmClose $ Just out
- Right bs
- | S.null bs -> hmClose (Just out)
- | otherwise -> do
- LogFile.addChunk lf bs
- listener out lf
+ mbs <- readLogPipe out
+ case mbs of
+ Nothing -> return ()
+ Just bs -> do
+ LogFile.addChunk lf bs
+ loop
-attach :: Logger -> Handles -> KIO ()
+attach :: Logger -> LogPipes -> KIO ()
attach (Logger f) h = f (Attach h)
detach :: Logger -> KIO ()
detach (Logger f) = f Detach
dummy :: Logger
-dummy = P.error "Logger.dummy"
+dummy = Logger $ P.const $ return ()
View
@@ -84,6 +84,7 @@ module Keter.Prelude
, modifyMVar_
, swapMVar
, takeMVar
+ , tryTakeMVar
, putMVar
-- * IORef
, I.IORef
@@ -247,6 +248,9 @@ swapMVar m = liftIO_ . M.swapMVar m
takeMVar :: M.MVar a -> KIO a
takeMVar = liftIO_ . M.takeMVar
+tryTakeMVar :: M.MVar a -> KIO (P.Maybe a)
+tryTakeMVar = liftIO_ . M.tryTakeMVar
+
putMVar :: M.MVar a -> a -> KIO ()
putMVar m = liftIO_ . M.putMVar m
View
@@ -7,11 +7,16 @@ module Keter.Process
) where
import Keter.Prelude
-import Keter.Logger
-import qualified System.Process as SP
+import Keter.Logger (Logger, attach, LogPipes (..), mkLogPipe)
import Data.Time (diffUTCTime)
+import Data.Conduit.Process.Unix (forkExecuteFile, waitForProcess, killProcess)
+import System.Posix.Types (ProcessID)
+import Prelude (error)
+import Filesystem.Path.CurrentOS (encode)
+import Data.Text.Encoding (encodeUtf8)
+import Data.Conduit (($$))
-data Status = NeedsRestart | NoRestart | Running SP.ProcessHandle
+data Status = NeedsRestart | NoRestart | Running ProcessID
-- | Run the given command, restarting if the process dies.
run :: FilePath -- ^ executable
@@ -33,27 +38,30 @@ run exec dir args env logger = do
log $ ProcessWaiting exec
threadDelay $ 5 * 1000 * 1000
_ -> return ()
- res <- liftIO $ SP.createProcess cp
+ (pout, sout) <- mkLogPipe
+ (perr, serr) <- mkLogPipe
+ res <- liftIO $ forkExecuteFile
+ (encode exec)
+ False
+ (map encodeUtf8 args)
+ (Just $ map (encodeUtf8 *** encodeUtf8) env)
+ (Just $ encode dir)
+ (Just $ return ())
+ (Just sout)
+ (Just serr)
case res of
Left e -> do
+ void $ liftIO $ return () $$ sout
+ void $ liftIO $ return () $$ serr
$logEx e
return (NeedsRestart, return ())
- Right (hin, hout, herr, ph) -> do
- attach logger $ Handles hin hout herr
+ Right pid -> do
+ attach logger $ LogPipes pout perr
log $ ProcessCreated exec
- return (Running ph, liftIO (SP.waitForProcess ph) >> loop (Just now))
+ return (Running pid, liftIO (waitForProcess pid) >> loop (Just now))
next
forkKIO $ loop Nothing
return $ Process mstatus
- where
- cp = (SP.proc (toString exec) $ map toString args)
- { SP.cwd = Just $ toString dir
- , SP.env = Just $ map (toString *** toString) env
- , SP.std_in = SP.CreatePipe
- , SP.std_out = SP.CreatePipe
- , SP.std_err = SP.CreatePipe
- , SP.close_fds = True
- }
-- | Abstract type containing information on a process which will be restarted.
newtype Process = Process (MVar Status)
@@ -63,5 +71,5 @@ terminate :: Process -> KIO ()
terminate (Process mstatus) = do
status <- swapMVar mstatus NoRestart
case status of
- Running ph -> void $ liftIO $ SP.terminateProcess ph
+ Running pid -> void $ liftIO $ killProcess pid
_ -> return ()
View
@@ -37,6 +37,8 @@ Library
, network-conduit >= 0.6 && < 0.7
, network-conduit-tls >= 0.5 && < 0.6
, http-reverse-proxy >= 0.1 && < 0.2
+ , unix-process-conduit >= 0.1 && < 0.2
+ , unix >= 2.5 && < 2.6
Exposed-Modules: Keter.Process
Keter.Postgres
Keter.TempFolder

0 comments on commit 309ee98

Please sign in to comment.