Skip to content

Commit

Permalink
optimize: use IntMap (#1005)
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Feb 21, 2024
1 parent 03c24c3 commit 5c1519d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
12 changes: 6 additions & 6 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
CPClients -> do
active <- unliftIO u (asks clients) >>= readTVarIO
hPutStrLn h $ "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
forM_ (M.toList active) $ \(cid, Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do
forM_ (IM.toList active) $ \(cid, Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do
connected' <- bshow <$> readTVarIO connected
rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt
sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt
Expand Down Expand Up @@ -331,13 +331,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h "Sockets: "
hPutStrLn h $ "accepted: " <> show accepted
hPutStrLn h $ "closed: " <> show closed
hPutStrLn h $ "active: " <> show (M.size active)
hPutStrLn h $ "leaked: " <> show (accepted - closed - M.size active)
hPutStrLn h $ "active: " <> show (IM.size active)
hPutStrLn h $ "leaked: " <> show (accepted - closed - IM.size active)
CPSocketThreads -> do
#if MIN_VERSION_base(4,18,0)
(_, _, active') <- unliftIO u $ asks sockets
active <- readTVarIO active'
forM_ (M.toList active) $ \(sid, tid') ->
forM_ (IM.toList active) $ \(sid, tid') ->
deRefWeak tid' >>= \case
Nothing -> hPutStrLn h $ intercalate "," [show sid, "", "gone", ""]
Just tid -> do
Expand Down Expand Up @@ -380,7 +380,7 @@ runClientTransport th@THandle {thVersion, sessionId} = do
nextClientId <- asks clientSeq
c <- atomically $ do
new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts
TM.insert clientId new active
modifyTVar' active $ IM.insert clientId new
pure new
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
Expand All @@ -402,7 +402,7 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT
srvSubs <- asks $ subscribers . server
atomically $ modifyTVar' srvSubs $ \cs ->
M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs
asks clients >>= atomically . TM.delete clientId
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
tIds <- atomically $ swapTVar endThreads IM.empty
liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds
where
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ data Env = Env
serverStats :: ServerStats,
sockets :: SocketState,
clientSeq :: TVar Int,
clients :: TMap Int Client
clients :: TVar (IntMap Client)
}

data Server = Server
Expand Down Expand Up @@ -184,7 +184,7 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
serverStats <- atomically . newServerStats =<< liftIO getCurrentTime
sockets <- atomically newSocketState
clientSeq <- newTVarIO 0
clients <- atomically TM.empty
clients <- newTVarIO mempty
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients}
where
restoreQueues :: QueueStore -> FilePath -> m (StoreLog 'WriteMode)
Expand Down
12 changes: 6 additions & 6 deletions src/Simplex/Messaging/Transport/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import Control.Monad.IO.Unlift
import qualified Crypto.Store.X509 as SX
import Data.Default (def)
import Data.List (find)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.Maybe (fromJust)
import qualified Data.X509 as X
import Data.X509.Validation (Fingerprint (..))
import qualified Data.X509.Validation as XV
import Network.Socket
import qualified Network.TLS as T
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow)
import System.Exit (exitFailure)
Expand Down Expand Up @@ -110,18 +110,18 @@ runTCPServerSocket (accepted, gracefullyClosed, clients) started getSocket serve
forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do
cId <- atomically $ stateTVar accepted $ \cId -> let cId' = cId + 1 in cId `seq` (cId', cId')
let closeConn _ = do
atomically $ TM.delete cId clients
atomically $ modifyTVar' clients $ IM.delete cId
gracefulClose conn 5000 `catchAll_` pure () -- catchAll_ is needed here in case the connection was closed earlier
atomically $ modifyTVar' gracefullyClosed (+1)
tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn
atomically $ TM.insert cId tId clients
atomically $ modifyTVar' clients $ IM.insert cId tId

type SocketState = (TVar Int, TVar Int, TMap Int (Weak ThreadId))
type SocketState = (TVar Int, TVar Int, TVar (IntMap (Weak ThreadId)))

newSocketState :: STM SocketState
newSocketState = (,,) <$> newTVar 0 <*> newTVar 0 <*> newTVar mempty

closeServer :: TMVar Bool -> TMap Int (Weak ThreadId) -> Socket -> IO ()
closeServer :: TMVar Bool -> TVar (IntMap (Weak ThreadId)) -> Socket -> IO ()
closeServer started clients sock = do
readTVarIO clients >>= mapM_ (deRefWeak >=> mapM_ killThread)
close sock
Expand Down

0 comments on commit 5c1519d

Please sign in to comment.