Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: count received duplicate messages #1148

Merged
merged 6 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

-- |
-- Module : Simplex.Messaging.Agent
Expand Down Expand Up @@ -113,6 +113,7 @@
debugAgentLocks,
getAgentStats,
resetAgentStats,
getMsgCounts,
getAgentSubscriptions,
logConnection,
)
Expand Down Expand Up @@ -554,6 +555,9 @@
resetAgentStats = atomically . TM.clear . agentStats
{-# INLINE resetAgentStats #-}

getMsgCounts :: AgentClient -> IO [(ConnId, (Int, Int))] -- (total, duplicates)
getMsgCounts c = readTVarIO (msgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs

withAgentEnv' :: AgentClient -> AM' a -> IO a
withAgentEnv' c = (`runReaderT` agentEnv c)
{-# INLINE withAgentEnv' #-}
Expand Down Expand Up @@ -2135,6 +2139,7 @@
_ -> pure ()
let encryptedMsgHash = C.sha256Hash encAgentMessage
g <- asks random
atomically updateTotalMsgCount
tryError (agentClientMsg g encryptedMsgHash) >>= \case
Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do
conn'' <- resetRatchetSync
Expand Down Expand Up @@ -2168,6 +2173,7 @@
| otherwise = pure conn'
Right _ -> prohibited >> ack
Left e@(AGENT A_DUPLICATE) -> do
atomically updateDupMsgCount
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
| userAck -> ackDel internalId
Expand Down Expand Up @@ -2198,6 +2204,20 @@
checkDuplicateHash e encryptedMsgHash =
unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $
throwError e
updateTotalMsgCount :: STM ()
updateTotalMsgCount =
TM.lookup connId (msgCounts c) >>= \case
Just v -> modifyTVar' v $ first (+ 1)
Nothing -> addMsgCount 0
updateDupMsgCount :: STM ()
updateDupMsgCount =
TM.lookup connId (msgCounts c) >>= \case
Just v -> modifyTVar' v $ second (+ 1)
Nothing -> addMsgCount 1
addMsgCount :: Int -> STM ()
addMsgCount duplicate = do
counts <- newTVar (1, duplicate)
TM.insert connId counts (msgCounts c)
agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448))
agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do
rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@
-- smpSubWorkers for SMP servers sessions
smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())),
agentStats :: TMap AgentStatsKey (TVar Int),
msgCounts :: TMap ConnId (TVar (Int, Int)), -- (total, duplicates)
clientId :: Int,
agentEnv :: Env
}
Expand Down Expand Up @@ -459,6 +460,7 @@
deleteLock <- createLock
smpSubWorkers <- TM.empty
agentStats <- TM.empty
msgCounts <- TM.empty
return
AgentClient
{ acThread,
Expand Down Expand Up @@ -494,6 +496,7 @@
deleteLock,
smpSubWorkers,
agentStats,
msgCounts,
clientId,
agentEnv
}
Expand Down Expand Up @@ -1685,7 +1688,7 @@
where
statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res}

userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p))

Check warning on line 1691 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 1691 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: UserProtocol p
userServers c = case protocolTypeI @p of
SPSMP -> smpServers c
SPXFTP -> xftpServers c
Expand Down
Loading