Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 12 additions & 63 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ module Simplex.Messaging.Agent
where

import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple (logInfo, showText)
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader
Expand Down Expand Up @@ -463,20 +463,7 @@ ackMessageAsync' c corrId connId msgId = do
enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId

deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnectionAsync' c connId = withConnLock c connId "deleteConnectionAsync" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
case connRcvQueues conn of
[] -> delete
rqs -> do
withStore' c (`setConnDeleted` connId)
disableConn c connId
void . forkIO $ do
forM_ rqs $ deleteConnQueue c True
delete
where
delete = do
withStore' c (`deleteConn` connId)
atomically $ writeTBQueue (subQ c) ("", connId, DEL_CONN)
deleteConnectionAsync' c connId = deleteConnectionsAsync' c [connId]

deleteConnectionsAsync' :: AgentMonad m => AgentClient -> [ConnId] -> m ()
deleteConnectionsAsync' = deleteConnectionsAsync_ $ pure ()
Expand Down Expand Up @@ -627,30 +614,14 @@ rejectContact' c contactConnId invId =
withStore c $ \db -> deleteInvitation db contactConnId invId

-- | Subscribe to receive connection messages (SUB command) in Reader monad
subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
subscribeConnection' c connId = do
SomeConn _ conn <- withStore c (`getConn` connId)
resumeConnCmds c connId
case conn of
DuplexConnection cData (rq :| rqs) sqs -> do
mapM_ (resumeMsgDelivery c cData) sqs
subscribe cData rq
mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs
SndConnection cData sq -> do
resumeMsgDelivery c cData sq
case status (sq :: SndQueue) of
Confirmed -> pure ()
Active -> throwError $ CONN SIMPLEX
_ -> throwError $ INTERNAL "unexpected queue status"
RcvConnection cData rq -> subscribe cData rq
ContactConnection cData rq -> subscribe cData rq
NewConnection _ -> pure ()
where
subscribe :: ConnData -> RcvQueue -> m ()
subscribe ConnData {enableNtfs} rq = do
subscribeQueue c rq
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, if enableNtfs then NSCCreate else NSCDelete)
subscribeConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
subscribeConnection' c connId = toConnResult connId =<< subscribeConnections' c [connId]

toConnResult :: AgentMonad m => ConnId -> Map ConnId (Either AgentErrorType ()) -> m ()
toConnResult connId rs = case M.lookup connId rs of
Just (Right ()) -> when (M.size rs > 1) $ logError $ T.pack $ "too many results " <> show (M.size rs)
Just (Left e) -> throwError e
_ -> throwError $ INTERNAL $ "no result for connection " <> B.unpack connId

type QCmdResult = (QueueStatus, Either AgentErrorType ())

Expand Down Expand Up @@ -720,10 +691,7 @@ subscribeConnections' c connIds = do
writeTBQueue (subQ c) ("", "", ERR . INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected)

resubscribeConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
resubscribeConnection' c connId =
unlessM
(atomically $ hasActiveSubscription c connId)
(subscribeConnection' c connId)
resubscribeConnection' c connId = toConnResult connId =<< resubscribeConnections' c [connId]

resubscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
resubscribeConnections' _ [] = pure M.empty
Expand Down Expand Up @@ -1207,13 +1175,7 @@ suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do
-- unlike deleteConnectionAsync, this function does not mark connection as deleted in case of deletion failure
-- currently it is used only in tests
deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
let rqs = connRcvQueues conn
unless (null rqs) $ do
disableConn c connId
forM_ rqs $ deleteConnQueue c False
withStore' c (`deleteConn` connId)
deleteConnection' c connId = toConnResult connId =<< deleteConnections' c [connId]

connRcvQueues :: Connection d -> [RcvQueue]
connRcvQueues = \case
Expand All @@ -1223,19 +1185,6 @@ connRcvQueues = \case
SndConnection _ _ -> []
NewConnection _ -> []

deleteConnQueue :: AgentMonad m => AgentClient -> Bool -> RcvQueue -> m ()
deleteConnQueue c ntf rq = do
maxErrs <- asks $ deleteErrorCount . config
(deleteQueue c rq >> notify Nothing) `catchError` handleError maxErrs
withStore' c (`deleteConnRcvQueue` rq)
where
handleError maxErrs e
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs = do
withStore' c (`incRcvDeleteErrors` rq)
throwError e
| otherwise = notify $ Just e
notify e_ = when ntf $ atomically $ writeTBQueue (subQ c) ("", qConnId rq, DEL_RCVQ (qServer rq) (queueId rq) e_)

disableConn :: AgentMonad m => AgentClient -> ConnId -> m ()
disableConn c connId = do
atomically $ removeSubscription c connId
Expand Down
16 changes: 0 additions & 16 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ module Simplex.Messaging.Agent.Client
closeProtocolServerClients,
runSMPServerTest,
newRcvQueue,
subscribeQueue,
subscribeQueues,
getQueueMessage,
decryptSMPMessage,
Expand Down Expand Up @@ -683,21 +682,6 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange = do
}
pure (rq, SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey)

subscribeQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
subscribeQueue c rq@RcvQueue {connId, server, rcvPrivateKey, rcvId} = do
whenM (atomically . TM.member (server, rcvId) $ getMsgLocks c) . throwError $ CMD PROHIBITED
atomically $ do
modifyTVar' (subscrConns c) $ S.insert connId
RQ.addQueue rq $ pendingSubs c
r <- withSMPClient c rq "SUB" $ \smp ->
liftIO $ runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq
case r of
Left e -> do
tSess <- mkSMPTransportSession c rq
when (temporaryClientError e) $ reconnectServer c tSess
throwError (protocolClientError SMP (B.unpack $ strEncode server) e)
_ -> pure ()

processSubResult :: AgentClient -> RcvQueue -> Either ProtocolClientError () -> IO (Either ProtocolClientError ())
processSubResult c rq r = do
case r of
Expand Down
8 changes: 4 additions & 4 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,9 @@ testDeleteConnectionAsync t = do
pure ([bId1, bId2, bId3] :: [ConnId])
runRight_ $ do
deleteConnectionsAsync a connIds
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
Expand Down Expand Up @@ -792,7 +792,7 @@ testUsersNoServer t = do
get b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
runRight_ $ do
deleteUser a auId True
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c == bId'; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
get a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"
Expand Down