Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
DEL -> delQueueAndMsgs st
where
createQueue :: QueueStore -> RcvPublicVerifyKey -> RcvPublicDhKey -> m (Transmission BrokerMsg)
createQueue st recipientKey dhKey = do
createQueue st recipientKey dhKey = time "NEW" $ do
(rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair'
let rcvDhSecret = C.dh' dhKey privDhKey
qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey}
Expand Down Expand Up @@ -398,14 +398,14 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
liftM2 (,) (randomId n) (randomId n)

secureQueue_ :: QueueStore -> SndPublicVerifyKey -> m (Transmission BrokerMsg)
secureQueue_ st sKey = do
secureQueue_ st sKey = time "KEY" $ do
withLog $ \s -> logSecureQueue s queueId sKey
stats <- asks serverStats
atomically $ modifyTVar (qSecured stats) (+ 1)
atomically $ (corrId,queueId,) . either ERR (const OK) <$> secureQueue st queueId sKey

addQueueNotifier_ :: QueueStore -> NtfPublicVerifyKey -> RcvNtfPublicDhKey -> m (Transmission BrokerMsg)
addQueueNotifier_ st notifierKey dhKey = do
addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do
(rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair'
let rcvNtfDhSecret = C.dh' dhKey privDhKey
(corrId,queueId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret
Expand Down Expand Up @@ -433,10 +433,10 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
okResp <$> atomically (suspendQueue st queueId)

subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg)
subscribeQueue qr rId = timed "subscribe" sessionId rId $ do
subscribeQueue qr rId = do
atomically (TM.lookup rId subscriptions) >>= \case
Nothing ->
atomically newSub >>= deliver
newSub >>= deliver
Just sub ->
readTVarIO sub >>= \case
Sub {subThread = ProhibitSub} ->
Expand All @@ -445,20 +445,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
s ->
atomically (tryTakeTMVar $ delivered s) >> deliver sub
where
newSub :: STM (TVar Sub)
newSub = do
newSub :: m (TVar Sub)
newSub = time "SUB newSub" . atomically $ do
writeTBQueue subscribedQ (rId, clnt)
sub <- newTVar =<< newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
deliver :: TVar Sub -> m (Transmission BrokerMsg)
deliver sub = do
q <- getStoreMsgQueue rId
q <- getStoreMsgQueue "SUB" rId
msg_ <- atomically $ tryPeekMsg q
deliverMessage qr rId sub q msg_
deliverMessage "SUB" qr rId sub q msg_

getMessage :: QueueRec -> m (Transmission BrokerMsg)
getMessage qr =
getMessage qr = time "GET" $ do
atomically (TM.lookup queueId subscriptions) >>= \case
Nothing ->
atomically newSub >>= getMessage_
Expand All @@ -478,7 +478,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
pure s
getMessage_ :: Sub -> m (Transmission BrokerMsg)
getMessage_ s = do
q <- getStoreMsgQueue queueId
q <- getStoreMsgQueue "GET" queueId
atomically $
tryPeekMsg q >>= \case
Just msg ->
Expand All @@ -490,20 +490,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
withQueue action = maybe (pure $ err AUTH) action qr_

subscribeNotifications :: m (Transmission BrokerMsg)
subscribeNotifications = atomically $ do
subscribeNotifications = time "NSUB" . atomically $ do
unlessM (TM.member queueId ntfSubscriptions) $ do
writeTBQueue ntfSubscribedQ (queueId, clnt)
TM.insert queueId () ntfSubscriptions
pure ok

acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg)
acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do
acknowledgeMsg qr msgId = time "ACK" $ do
atomically (TM.lookup queueId subscriptions) >>= \case
Nothing -> pure $ err NO_MSG
Just sub ->
atomically (getDelivered sub) >>= \case
Just s -> do
q <- getStoreMsgQueue queueId
q <- getStoreMsgQueue "ACK" queueId
case s of
Sub {subThread = ProhibitSub} -> do
msgDeleted <- atomically $ tryDelMsg q msgId
Expand All @@ -512,7 +512,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
_ -> do
(msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId
when msgDeleted updateStats
deliverMessage qr queueId sub q msg_
deliverMessage "ACK" qr queueId sub q msg_
_ -> pure $ err NO_MSG
where
getDelivered :: TVar Sub -> STM (Maybe Sub)
Expand All @@ -537,16 +537,13 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
mapM mkMessage (C.maxLenBS msgBody) >>= \case
Left _ -> pure $ err LARGE_MSG
Right msg -> do
ms <- asks msgStore
ServerConfig {messageExpiration, msgQueueQuota} <- asks config
old <- liftIO $ mapM expireBeforeEpoch messageExpiration
ntfNonceDrg <- asks idsDrg
resp@(_, _, sent) <- timed "send" sessionId queueId $ do
q <- atomically $ getMsgQueue ms (recipientId qr) msgQueueQuota
atomically $ mapM_ (deleteExpiredMsgs q) old
resp@(_, _, sent) <- time "SEND" $ do
q <- getStoreMsgQueue "SEND" $ recipientId qr
expireMessages q
atomically $ ifM (isFull q) (pure $ err QUOTA) (writeMsg q msg $> ok)
when (sent == OK) $ do
when (notification msgFlags) . atomically $ trySendNotification msg ntfNonceDrg
when (sent == OK) . time "SEND ok" $ do
when (notification msgFlags) $
atomically . trySendNotification msg =<< asks idsDrg
stats <- asks serverStats
atomically $ modifyTVar (msgSent stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
Expand All @@ -558,6 +555,12 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
msgTs <- liftIO getSystemTime
pure $ Message msgId msgTs msgFlags body

expireMessages :: MsgQueue -> m ()
expireMessages q = do
msgExp <- asks $ messageExpiration . config
old <- liftIO $ mapM expireBeforeEpoch msgExp
atomically $ mapM_ (deleteExpiredMsgs q) old

trySendNotification :: Message -> TVar ChaChaDRG -> STM ()
trySendNotification msg ntfNonceDrg =
forM_ (notifier qr) $ \NtfCreds {notifierId, rcvNtfDhSecret} ->
Expand All @@ -576,8 +579,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128
pure . (cbNonce,) $ fromRight "" encNMsgMeta

deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg)
deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do
deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg)
deliverMessage name qr rId sub q msg_ = time (name <> " deliver") $ do
readTVarIO sub >>= \case
s@Sub {subThread = NoSub} ->
case msg_ of
Expand All @@ -597,13 +600,16 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
where
subscriber = do
msg <- atomically $ peekMsg q
timed "subscriber" sessionId rId . atomically $ do
time "subscriber" . atomically $ do
let encMsg = encryptMsg qr msg
writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)]
s <- readTVar sub
void $ setDelivered s msg
writeTVar sub s {subThread = NoSub}

time :: T.Text -> m a -> m a
time name = timed name sessionId queueId

encryptMsg :: QueueRec -> Message -> RcvMessage
encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody}
| thVersion == 1 || thVersion == 2 = encrypt msgBody
Expand All @@ -617,8 +623,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ,
setDelivered :: Sub -> Message -> STM Bool
setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId

getStoreMsgQueue :: RecipientId -> m MsgQueue
getStoreMsgQueue rId = do
getStoreMsgQueue :: T.Text -> RecipientId -> m MsgQueue
getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
atomically $ getMsgQueue ms rId quota
Expand Down