Skip to content

Commit

Permalink
count total too
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed May 13, 2024
1 parent b1ba750 commit 668e27c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
28 changes: 19 additions & 9 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ module Simplex.Messaging.Agent
debugAgentLocks,
getAgentStats,
resetAgentStats,
getDuplicateMsgCounts,
getMsgCounts,
getAgentSubscriptions,
logConnection,
)
Expand Down Expand Up @@ -555,8 +555,8 @@ resetAgentStats :: AgentClient -> IO ()
resetAgentStats = atomically . TM.clear . agentStats
{-# INLINE resetAgentStats #-}

getDuplicateMsgCounts :: AgentClient -> IO [(ConnId, Int)]
getDuplicateMsgCounts c = readTVarIO (duplicateMsgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs
getMsgCounts :: AgentClient -> IO [(ConnId, MsgCounts)]
getMsgCounts c = readTVarIO (msgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs

withAgentEnv' :: AgentClient -> AM' a -> IO a
withAgentEnv' c = (`runReaderT` agentEnv c)
Expand Down Expand Up @@ -2139,6 +2139,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
_ -> pure ()
let encryptedMsgHash = C.sha256Hash encAgentMessage
g <- asks random
atomically updateTotalMsgCount
tryError (agentClientMsg g encryptedMsgHash) >>= \case
Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do
conn'' <- resetRatchetSync
Expand Down Expand Up @@ -2172,7 +2173,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
| otherwise = pure conn'
Right _ -> prohibited >> ack
Left e@(AGENT A_DUPLICATE) -> do
atomically updateDuplicateMsgCounts
atomically updateDupMsgCount
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
| userAck -> ackDel internalId
Expand Down Expand Up @@ -2203,11 +2204,20 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
checkDuplicateHash e encryptedMsgHash =
unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $
throwError e
updateDuplicateMsgCounts :: STM ()
updateDuplicateMsgCounts =
TM.lookup connId (duplicateMsgCounts c) >>= \case
Just count -> modifyTVar' count (+ 1)
Nothing -> newTVar 0 >>= \count -> TM.insert connId count (duplicateMsgCounts c)
updateTotalMsgCount :: STM ()
updateTotalMsgCount =
TM.lookup connId (msgCounts c) >>= \case
Just v -> modifyTVar' v $ \counts -> counts {totalMsgCount = totalMsgCount counts + 1}
Nothing -> addMsgCount
updateDupMsgCount :: STM ()
updateDupMsgCount =
TM.lookup connId (msgCounts c) >>= \case
Just v -> modifyTVar' v $ \counts -> counts {dupMsgCount = dupMsgCount counts + 1}
Nothing -> addMsgCount
addMsgCount :: STM ()
addMsgCount = do
counts <- newTVar $ MsgCounts 1 1
TM.insert connId counts (msgCounts c)
agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448))
agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do
rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY
Expand Down
11 changes: 8 additions & 3 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ module Simplex.Messaging.Agent.Client
getAgentWorkersDetails,
AgentWorkersSummary (..),
getAgentWorkersSummary,
MsgCounts (..),
SMPTransportSession,
NtfTransportSession,
XFTPTransportSession,
Expand Down Expand Up @@ -298,11 +299,13 @@ data AgentClient = AgentClient
-- smpSubWorkers for SMP servers sessions
smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())),
agentStats :: TMap AgentStatsKey (TVar Int),
duplicateMsgCounts :: TMap ConnId (TVar Int),
msgCounts :: TMap ConnId (TVar MsgCounts),
clientId :: Int,
agentEnv :: Env
}

data MsgCounts = MsgCounts {totalMsgCount :: Int, dupMsgCount :: Int}

getAgentWorker :: (Ord k, Show k) => String -> Bool -> AgentClient -> k -> TMap k Worker -> (Worker -> AM ()) -> AM' Worker
getAgentWorker = getAgentWorker' id pure
{-# INLINE getAgentWorker #-}
Expand Down Expand Up @@ -460,7 +463,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
deleteLock <- createLock
smpSubWorkers <- TM.empty
agentStats <- TM.empty
duplicateMsgCounts <- TM.empty
msgCounts <- TM.empty
return
AgentClient
{ acThread,
Expand Down Expand Up @@ -496,7 +499,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
deleteLock,
smpSubWorkers,
agentStats,
duplicateMsgCounts,
msgCounts,
clientId,
agentEnv
}
Expand Down Expand Up @@ -1875,6 +1878,8 @@ getAgentWorkersSummary AgentClient {smpClients, ntfClients, xftpClients, smpDeli
(pure WorkersSummary {numActive, numIdle = numIdle + 1, totalRestarts = totalRestarts + restartCount})
(pure WorkersSummary {numActive = numActive + 1, numIdle, totalRestarts = totalRestarts + restartCount})

$(J.deriveJSON defaultJSON ''MsgCounts)

$(J.deriveJSON defaultJSON ''AgentLocks)

$(J.deriveJSON (enumJSON $ dropPrefix "TS") ''ProtocolTestStep)
Expand Down

0 comments on commit 668e27c

Please sign in to comment.