Skip to content

Commit

Permalink
trace-forward: readFromSink fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jutaro committed May 30, 2023
1 parent 2c18d78 commit 3388368
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions trace-forward/src/Trace/Forward/Utils/TraceObject.hs
Expand Up @@ -11,7 +11,7 @@ module Trace.Forward.Utils.TraceObject
, getTraceObjectsFromReply
) where

import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TVar
import Control.Monad (unless, when, (<$!>))
Expand Down Expand Up @@ -122,30 +122,47 @@ readFromSink sink@ForwardSink{forwardQueue, wasUsed} =
replyList <-
case blocking of
TokBlocking -> do
objs <- atomically $ getNTraceObjects n forwardQueue >>= \case
[] -> retry -- No 'TraceObject's yet, just wait...
(x:xs) -> return $ x NE.:| xs
atomically . modifyTVar' wasUsed . const $ True
objs <- atomically $ do
queue <- readTVar forwardQueue
res <- getNTraceObjectsBlocking n queue >>= \case
[] -> error "impossible"
(x:xs) -> return $ x NE.:| xs
modifyTVar' wasUsed . const $ True
pure res
return $ BlockingReply objs
TokNonBlocking -> do
objs <- atomically $ getNTraceObjects n forwardQueue
unless (null objs) $
atomically . modifyTVar' wasUsed . const $ True
objs <- atomically $ do
queue <- readTVar forwardQueue
res <- getNTraceObjectsNonBlocking n queue
unless (null res) $
modifyTVar' wasUsed . const $ True
pure res
return $ NonBlockingReply objs
return (replyList, readFromSink sink)
, Forwarder.recvMsgDone = return ()
}

-- | Returns at most N 'TraceObject's from the queue.
getNTraceObjects
getNTraceObjectsNonBlocking
:: Word16
-> TVar (TBQueue lo)
-> TBQueue lo
-> STM [lo]
getNTraceObjects 0 _ = return []
getNTraceObjects n q =
readTVar q >>= tryReadTBQueue >>= \case
Just lo' -> (lo' :) <$> getNTraceObjects (n - 1) q
Nothing -> return []
getNTraceObjectsNonBlocking 0 _ = return []
getNTraceObjectsNonBlocking n q =
tryReadTBQueue q >>=
\case
Just lo -> (lo :) <$> getNTraceObjectsNonBlocking (n - 1) q
Nothing -> return []

-- | Returns at most N 'TraceObject's from the queue.
getNTraceObjectsBlocking
:: Word16
-> TBQueue lo
-> STM [lo]
getNTraceObjectsBlocking 0 _ = return []
getNTraceObjectsBlocking n q = do
lo <- readTBQueue q
(lo :) <$> getNTraceObjectsNonBlocking (n - 1) q

getTraceObjectsFromReply
:: BlockingReplyList blocking lo -- ^ The reply with list of 'TraceObject's.
Expand Down

0 comments on commit 3388368

Please sign in to comment.