From 61fd5d06a5db173c51208655bb3984102fe06c0c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 3 Nov 2022 08:37:28 +0000 Subject: [PATCH 1/2] server: additional logs for slow operations --- src/Simplex/Messaging/Server.hs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 22992f6107..9f0e1371af 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -436,7 +436,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, subscribeQueue qr rId = timed "subscribe" sessionId rId $ do atomically (TM.lookup rId subscriptions) >>= \case Nothing -> - atomically newSub >>= deliver + newSub >>= deliver Just sub -> readTVarIO sub >>= \case Sub {subThread = ProhibitSub} -> @@ -445,20 +445,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, s -> atomically (tryTakeTMVar $ delivered s) >> deliver sub where - newSub :: STM (TVar Sub) - newSub = do + newSub :: m (TVar Sub) + newSub = timed "subscribe newSub" sessionId rId . atomically $ do writeTBQueue subscribedQ (rId, clnt) sub <- newTVar =<< newSubscription NoSub TM.insert rId sub subscriptions pure sub deliver :: TVar Sub -> m (Transmission BrokerMsg) deliver sub = do - q <- getStoreMsgQueue rId + q <- getStoreMsgQueue "subscribe" rId msg_ <- atomically $ tryPeekMsg q - deliverMessage qr rId sub q msg_ + deliverMessage "subscribe" qr rId sub q msg_ getMessage :: QueueRec -> m (Transmission BrokerMsg) - getMessage qr = + getMessage qr = timed "getMessage" sessionId queueId $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> atomically newSub >>= getMessage_ @@ -478,7 +478,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, pure s getMessage_ :: Sub -> m (Transmission BrokerMsg) getMessage_ s = do - q <- getStoreMsgQueue queueId + q <- getStoreMsgQueue "getMessage" queueId atomically $ tryPeekMsg q >>= \case Just msg -> @@ -503,7 +503,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, Just sub -> atomically (getDelivered sub) >>= \case Just s -> do - q <- getStoreMsgQueue queueId + q <- getStoreMsgQueue "ack" queueId case s of Sub {subThread = ProhibitSub} -> do msgDeleted <- atomically $ tryDelMsg q msgId @@ -512,7 +512,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, _ -> do (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId when msgDeleted updateStats - deliverMessage qr queueId sub q msg_ + deliverMessage "ack" qr queueId sub q msg_ _ -> pure $ err NO_MSG where getDelivered :: TVar Sub -> STM (Maybe Sub) @@ -576,8 +576,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128 pure . (cbNonce,) $ fromRight "" encNMsgMeta - deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) - deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do + deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) + deliverMessage name qr rId sub q msg_ = timed (name <> " deliver") sessionId rId $ do readTVarIO sub >>= \case s@Sub {subThread = NoSub} -> case msg_ of @@ -617,8 +617,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, setDelivered :: Sub -> Message -> STM Bool setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId - getStoreMsgQueue :: RecipientId -> m MsgQueue - getStoreMsgQueue rId = do + getStoreMsgQueue :: T.Text -> RecipientId -> m MsgQueue + getStoreMsgQueue name rId = timed (name <> " getMsgQueue") sessionId rId $ do ms <- asks msgStore quota <- asks $ msgQueueQuota . config atomically $ getMsgQueue ms rId quota From c9c054d4d88b7969c936338c592abc4650884530 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 3 Nov 2022 09:04:06 +0000 Subject: [PATCH 2/2] more time logs --- src/Simplex/Messaging/Server.hs | 56 ++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 9f0e1371af..8bdafaf043 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -354,7 +354,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, DEL -> delQueueAndMsgs st where createQueue :: QueueStore -> RcvPublicVerifyKey -> RcvPublicDhKey -> m (Transmission BrokerMsg) - createQueue st recipientKey dhKey = do + createQueue st recipientKey dhKey = time "NEW" $ do (rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair' let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey} @@ -398,14 +398,14 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, liftM2 (,) (randomId n) (randomId n) secureQueue_ :: QueueStore -> SndPublicVerifyKey -> m (Transmission BrokerMsg) - secureQueue_ st sKey = do + secureQueue_ st sKey = time "KEY" $ do withLog $ \s -> logSecureQueue s queueId sKey stats <- asks serverStats atomically $ modifyTVar (qSecured stats) (+ 1) atomically $ (corrId,queueId,) . either ERR (const OK) <$> secureQueue st queueId sKey addQueueNotifier_ :: QueueStore -> NtfPublicVerifyKey -> RcvNtfPublicDhKey -> m (Transmission BrokerMsg) - addQueueNotifier_ st notifierKey dhKey = do + addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do (rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair' let rcvNtfDhSecret = C.dh' dhKey privDhKey (corrId,queueId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret @@ -433,7 +433,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, okResp <$> atomically (suspendQueue st queueId) subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg) - subscribeQueue qr rId = timed "subscribe" sessionId rId $ do + subscribeQueue qr rId = do atomically (TM.lookup rId subscriptions) >>= \case Nothing -> newSub >>= deliver @@ -446,19 +446,19 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, atomically (tryTakeTMVar $ delivered s) >> deliver sub where newSub :: m (TVar Sub) - newSub = timed "subscribe newSub" sessionId rId . atomically $ do + newSub = time "SUB newSub" . atomically $ do writeTBQueue subscribedQ (rId, clnt) sub <- newTVar =<< newSubscription NoSub TM.insert rId sub subscriptions pure sub deliver :: TVar Sub -> m (Transmission BrokerMsg) deliver sub = do - q <- getStoreMsgQueue "subscribe" rId + q <- getStoreMsgQueue "SUB" rId msg_ <- atomically $ tryPeekMsg q - deliverMessage "subscribe" qr rId sub q msg_ + deliverMessage "SUB" qr rId sub q msg_ getMessage :: QueueRec -> m (Transmission BrokerMsg) - getMessage qr = timed "getMessage" sessionId queueId $ do + getMessage qr = time "GET" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> atomically newSub >>= getMessage_ @@ -478,7 +478,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, pure s getMessage_ :: Sub -> m (Transmission BrokerMsg) getMessage_ s = do - q <- getStoreMsgQueue "getMessage" queueId + q <- getStoreMsgQueue "GET" queueId atomically $ tryPeekMsg q >>= \case Just msg -> @@ -490,20 +490,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, withQueue action = maybe (pure $ err AUTH) action qr_ subscribeNotifications :: m (Transmission BrokerMsg) - subscribeNotifications = atomically $ do + subscribeNotifications = time "NSUB" . atomically $ do unlessM (TM.member queueId ntfSubscriptions) $ do writeTBQueue ntfSubscribedQ (queueId, clnt) TM.insert queueId () ntfSubscriptions pure ok acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg) - acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do + acknowledgeMsg qr msgId = time "ACK" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case Just s -> do - q <- getStoreMsgQueue "ack" queueId + q <- getStoreMsgQueue "ACK" queueId case s of Sub {subThread = ProhibitSub} -> do msgDeleted <- atomically $ tryDelMsg q msgId @@ -512,7 +512,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, _ -> do (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId when msgDeleted updateStats - deliverMessage "ack" qr queueId sub q msg_ + deliverMessage "ACK" qr queueId sub q msg_ _ -> pure $ err NO_MSG where getDelivered :: TVar Sub -> STM (Maybe Sub) @@ -537,16 +537,13 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, mapM mkMessage (C.maxLenBS msgBody) >>= \case Left _ -> pure $ err LARGE_MSG Right msg -> do - ms <- asks msgStore - ServerConfig {messageExpiration, msgQueueQuota} <- asks config - old <- liftIO $ mapM expireBeforeEpoch messageExpiration - ntfNonceDrg <- asks idsDrg - resp@(_, _, sent) <- timed "send" sessionId queueId $ do - q <- atomically $ getMsgQueue ms (recipientId qr) msgQueueQuota - atomically $ mapM_ (deleteExpiredMsgs q) old + resp@(_, _, sent) <- time "SEND" $ do + q <- getStoreMsgQueue "SEND" $ recipientId qr + expireMessages q atomically $ ifM (isFull q) (pure $ err QUOTA) (writeMsg q msg $> ok) - when (sent == OK) $ do - when (notification msgFlags) . atomically $ trySendNotification msg ntfNonceDrg + when (sent == OK) . time "SEND ok" $ do + when (notification msgFlags) $ + atomically . trySendNotification msg =<< asks idsDrg stats <- asks serverStats atomically $ modifyTVar (msgSent stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) @@ -558,6 +555,12 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, msgTs <- liftIO getSystemTime pure $ Message msgId msgTs msgFlags body + expireMessages :: MsgQueue -> m () + expireMessages q = do + msgExp <- asks $ messageExpiration . config + old <- liftIO $ mapM expireBeforeEpoch msgExp + atomically $ mapM_ (deleteExpiredMsgs q) old + trySendNotification :: Message -> TVar ChaChaDRG -> STM () trySendNotification msg ntfNonceDrg = forM_ (notifier qr) $ \NtfCreds {notifierId, rcvNtfDhSecret} -> @@ -577,7 +580,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, pure . (cbNonce,) $ fromRight "" encNMsgMeta deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) - deliverMessage name qr rId sub q msg_ = timed (name <> " deliver") sessionId rId $ do + deliverMessage name qr rId sub q msg_ = time (name <> " deliver") $ do readTVarIO sub >>= \case s@Sub {subThread = NoSub} -> case msg_ of @@ -597,13 +600,16 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, where subscriber = do msg <- atomically $ peekMsg q - timed "subscriber" sessionId rId . atomically $ do + time "subscriber" . atomically $ do let encMsg = encryptMsg qr msg writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)] s <- readTVar sub void $ setDelivered s msg writeTVar sub s {subThread = NoSub} + time :: T.Text -> m a -> m a + time name = timed name sessionId queueId + encryptMsg :: QueueRec -> Message -> RcvMessage encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody} | thVersion == 1 || thVersion == 2 = encrypt msgBody @@ -618,7 +624,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId getStoreMsgQueue :: T.Text -> RecipientId -> m MsgQueue - getStoreMsgQueue name rId = timed (name <> " getMsgQueue") sessionId rId $ do + getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do ms <- asks msgStore quota <- asks $ msgQueueQuota . config atomically $ getMsgQueue ms rId quota