Skip to content

Commit

Permalink
withTH
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed May 10, 2024
1 parent 7b7bd74 commit 7199f0f
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h "AUTH"

runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do
runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
Expand All @@ -422,12 +422,12 @@ runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = d
pure new
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
withTH <- withMVar <$> newMVar th -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ ([liftIO $ send th c, liftIO $ sendMsg th c, client c s, receive h c] <> disconnectThread_ c expCfg)
raceAny_ ([liftIO $ send withTH c, liftIO $ sendMsg withTH c, client c s, receive th c] <> disconnectThread_ c expCfg)
`finally` clientDisconnected c
where
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ _ _ = []
noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c)

Expand Down Expand Up @@ -460,10 +460,10 @@ cancelSub sub =
_ -> return ()

receive :: Transport c => THandleSMP c 'TServer -> Client -> M ()
receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
forever $ do
ts <- L.toList <$> liftIO (tGet h)
ts <- L.toList <$> liftIO (tGet th)
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
as <- partitionEithers <$> mapM cmdAction ts
write sndQ $ fst as
Expand All @@ -480,20 +480,20 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv
VRFailed -> Left (corrId, queueId, ERR AUTH)
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty

send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
send th c@Client {sndQ, msgQ, sessionId} = do
send :: Transport c => ((THandleSMP c 'TServer -> IO ()) -> IO ()) -> Client -> IO ()
send withTH c@Client {sndQ, msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ atomically (readTBQueue sndQ) >>= sendTransmissions
where
sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO ()
sendTransmissions ts
| L.length ts <= 2 = tSend th c ts
| L.length ts <= 2 = tSend withTH c ts
| otherwise = do
let (msgs_, ts') = mapAccumR splitMessages [] ts
-- If the request had batched subscriptions (L.length ts > 2)
-- this will reply OK to all SUBs in the first batched transmission,
-- to reduce client timeouts.
tSend th c ts'
tSend withTH c ts'
-- After that all messages will be sent in separate transmissions,
-- without any client response timeouts, and allowing them to interleave
-- with other requests responses.
Expand All @@ -505,14 +505,14 @@ send th c@Client {sndQ, msgQ, sessionId} = do
MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK))
_ -> (msgs, t)

sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
sendMsg th c@Client {msgQ, sessionId} = do
sendMsg :: Transport c => ((THandleSMP c 'TServer -> IO ()) -> IO ()) -> Client -> IO ()
sendMsg withTH c@Client {msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg"
forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend th c [t])
forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend withTH c [t])

tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO ()
tSend th Client {sndActiveAt} ts = do
withMVar th $ \h@THandle {params} ->
tSend :: Transport c => ((THandleSMP c 'TServer -> IO ()) -> IO ()) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO ()
tSend withTH Client {sndActiveAt} ts = do
withTH $ \h@THandle {params} ->
void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime

Expand Down

0 comments on commit 7199f0f

Please sign in to comment.