Skip to content

Commit

Permalink
Merge pull request #13 from reflex-frp/cn-#6-better-solution-for-race…
Browse files Browse the repository at this point in the history
…-condition-cleanup

Better solution for race condition cleanup
  • Loading branch information
3noch committed May 20, 2020
2 parents b38205e + 945eebd commit 5023873
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 46 deletions.
1 change: 1 addition & 0 deletions reflex-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ tested-with: GHC ==8.6.5
library
exposed-modules: Reflex.Process
build-depends: base >=4.12 && <4.14
, async >= 2 && < 3
, bytestring >=0.10 && < 0.11
, data-default >= 0.7.1 && < 0.8
, process >= 1.6.4 && < 1.7
Expand Down
87 changes: 41 additions & 46 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ module Reflex.Process
, SendPipe (..)
) where

import Control.Concurrent (forkIO, killThread, ThreadId)
import Control.Concurrent.Async (Async, async, waitBoth)
import Control.Concurrent.Chan (newChan, readChan, writeChan)
import Control.Exception (mask_)
import Control.Exception (finally)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Default
import qualified GHC.IO.Handle as H
import Data.Default (Default (..))
import Data.Function (fix)
import GHC.IO.Handle (Handle)
import qualified GHC.IO.Handle as H
import System.Exit (ExitCode)
import Data.Function
import qualified System.Posix.Signals as P
import qualified System.Process as P
import System.Process hiding (createProcess)
import qualified System.Process as P

import Reflex

Expand All @@ -45,7 +45,7 @@ data SendPipe i

-- | The inputs to a process
data ProcessConfig t i = ProcessConfig
{ _processConfig_stdin :: Event t (SendPipe i)
{ _processConfig_stdin :: Event t i
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
Expand All @@ -67,6 +67,8 @@ data Process t o e = Process
, _process_stderr :: Event t e
-- ^ Fires whenever there's some new stderr output. See note on '_process_stdout'.
, _process_exit :: Event t ExitCode
-- ^ Fires when the process is over and no @stdout@ or @stderr@ data is left.
-- Once this fires, no other 'Event's for the process will fire again.
, _process_signal :: Event t P.Signal
-- ^ Fires when a signal has actually been sent to the process (via '_processConfig_signal').
}
Expand All @@ -80,7 +82,7 @@ data Process t o e = Process
-- to those pipes.
createRedirectedProcess
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (SendPipe i -> IO ()))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard output handler
Expand All @@ -98,7 +100,7 @@ createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (Proces
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput :: SendPipe i -> IO () <- liftIO $ mkWriteStdInput hIn
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
Expand All @@ -107,30 +109,27 @@ createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (Proces
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let
output :: Handle -> m (Event t o, ThreadId)
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ forkIO reader
return (e, t)
output :: Handle -> m (Event t o, Async ())
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ async reader
return (e, t)

errOutput :: Handle -> m (Event t e, Async ())
errOutput h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ async reader
return (e, t)

let
err_output :: Handle -> m (Event t e, ThreadId)
err_output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ forkIO reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- err_output hErr
(err, errThread) <- errOutput hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ forkIO $ do
void $ liftIO $ async $ flip finally (P.cleanupProcess po) $ do
waited <- waitForProcess ph
mask_ $ do
ecTrigger waited
P.cleanupProcess po
killThread outThread
killThread errThread
_ <- waitBoth outThread errThread
ecTrigger waited -- Output events should never fire after process completion
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
Expand All @@ -154,15 +153,15 @@ createProcessBufferingInput
=> IO (SendPipe ByteString) -- ^ Read a value from the input stream buffer
-> (SendPipe ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t ByteString -- ^ The process configuration in terms of Reflex
-> ProcessConfig t (SendPipe ByteString) -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
let
input :: Handle -> IO (SendPipe ByteString -> IO ())
input h = do
H.hSetBuffering h H.NoBuffering
void $ liftIO $ forkIO $ fix $ \loop -> do
newMessage :: SendPipe ByteString <- readBuffer
void $ liftIO $ async $ fix $ \loop -> do
newMessage <- readBuffer
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
Expand All @@ -173,21 +172,17 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
SendPipe_EOF -> H.hClose h
loop
return writeBuffer

output h trigger = do
H.hSetBuffering h H.LineBuffering
let go = do
open <- H.hIsOpen h
when open $ do
readable <- H.hIsReadable h
when readable $ do
out <- BS.hGetSome h 32768
if BS.null out
then return ()
else do
void $ trigger out
go
return go
pure $ fix $ \go -> do
open <- H.hIsOpen h
when open $ do
readable <- H.hIsReadable h
when readable $ do
out <- BS.hGetSome h 32768
if BS.null out
then H.hClose h
else void (trigger out) *> go
createRedirectedProcess input output output p procConfig

-- | Create a process feeding it input using an 'Event' and exposing its output
Expand All @@ -204,7 +199,7 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t ByteString
-> ProcessConfig t (SendPipe ByteString)
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
channel <- liftIO newChan
Expand Down

0 comments on commit 5023873

Please sign in to comment.