From 5c1519df7f1810e9a1ab049307ce83d27bb17c3e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 21 Feb 2024 11:52:24 +0000 Subject: [PATCH] optimize: use IntMap (#1005) --- src/Simplex/Messaging/Server.hs | 12 ++++++------ src/Simplex/Messaging/Server/Env/STM.hs | 4 ++-- src/Simplex/Messaging/Transport/Server.hs | 12 ++++++------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index f81f66d8d..5a9baeab8 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -290,7 +290,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do CPClients -> do active <- unliftIO u (asks clients) >>= readTVarIO hPutStrLn h $ "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions" - forM_ (M.toList active) $ \(cid, Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do + forM_ (IM.toList active) $ \(cid, Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do connected' <- bshow <$> readTVarIO connected rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt @@ -331,13 +331,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "Sockets: " hPutStrLn h $ "accepted: " <> show accepted hPutStrLn h $ "closed: " <> show closed - hPutStrLn h $ "active: " <> show (M.size active) - hPutStrLn h $ "leaked: " <> show (accepted - closed - M.size active) + hPutStrLn h $ "active: " <> show (IM.size active) + hPutStrLn h $ "leaked: " <> show (accepted - closed - IM.size active) CPSocketThreads -> do #if MIN_VERSION_base(4,18,0) (_, _, active') <- unliftIO u $ asks sockets active <- readTVarIO active' - forM_ (M.toList active) $ \(sid, tid') -> + forM_ (IM.toList active) $ \(sid, tid') -> deRefWeak tid' >>= \case Nothing -> hPutStrLn h $ intercalate "," [show sid, "", "gone", ""] Just tid -> do @@ -380,7 +380,7 @@ runClientTransport th@THandle {thVersion, sessionId} = do nextClientId <- asks clientSeq c <- atomically $ do new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts - TM.insert clientId new active + modifyTVar' active $ IM.insert clientId new pure new s <- asks server expCfg <- asks $ inactiveClientExpiration . config @@ -402,7 +402,7 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT srvSubs <- asks $ subscribers . server atomically $ modifyTVar' srvSubs $ \cs -> M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs - asks clients >>= atomically . TM.delete clientId + asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index aa86cbd29..82666a0fc 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -109,7 +109,7 @@ data Env = Env serverStats :: ServerStats, sockets :: SocketState, clientSeq :: TVar Int, - clients :: TMap Int Client + clients :: TVar (IntMap Client) } data Server = Server @@ -184,7 +184,7 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, serverStats <- atomically . newServerStats =<< liftIO getCurrentTime sockets <- atomically newSocketState clientSeq <- newTVarIO 0 - clients <- atomically TM.empty + clients <- newTVarIO mempty return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients} where restoreQueues :: QueueStore -> FilePath -> m (StoreLog 'WriteMode) diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 06f97a353..3ea989180 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -29,14 +29,14 @@ import Control.Monad.IO.Unlift import qualified Crypto.Store.X509 as SX import Data.Default (def) import Data.List (find) +import Data.IntMap.Strict (IntMap) +import qualified Data.IntMap.Strict as IM import Data.Maybe (fromJust) import qualified Data.X509 as X import Data.X509.Validation (Fingerprint (..)) import qualified Data.X509.Validation as XV import Network.Socket import qualified Network.TLS as T -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow) import System.Exit (exitFailure) @@ -110,18 +110,18 @@ runTCPServerSocket (accepted, gracefullyClosed, clients) started getSocket serve forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do cId <- atomically $ stateTVar accepted $ \cId -> let cId' = cId + 1 in cId `seq` (cId', cId') let closeConn _ = do - atomically $ TM.delete cId clients + atomically $ modifyTVar' clients $ IM.delete cId gracefulClose conn 5000 `catchAll_` pure () -- catchAll_ is needed here in case the connection was closed earlier atomically $ modifyTVar' gracefullyClosed (+1) tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn - atomically $ TM.insert cId tId clients + atomically $ modifyTVar' clients $ IM.insert cId tId -type SocketState = (TVar Int, TVar Int, TMap Int (Weak ThreadId)) +type SocketState = (TVar Int, TVar Int, TVar (IntMap (Weak ThreadId))) newSocketState :: STM SocketState newSocketState = (,,) <$> newTVar 0 <*> newTVar 0 <*> newTVar mempty -closeServer :: TMVar Bool -> TMap Int (Weak ThreadId) -> Socket -> IO () +closeServer :: TMVar Bool -> TVar (IntMap (Weak ThreadId)) -> Socket -> IO () closeServer started clients sock = do readTVarIO clients >>= mapM_ (deRefWeak >=> mapM_ killThread) close sock