Skip to content

Commit

Permalink
ntf server: better batching and logging (#780)
Browse files Browse the repository at this point in the history
* ntf server: better batching and logging

* reduce batch delay for ntf server

* comments

* 5.1.3, ntf 1.4.2

* more logging

* more logging

* split large batches, more logging

* remove some logs
  • Loading branch information
epoberezkin committed Jun 26, 2023
1 parent 3a74558 commit 4a927d1
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 123 deletions.
2 changes: 1 addition & 1 deletion package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: simplexmq
version: 5.1.2
version: 5.1.3
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
Expand Down
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack

name: simplexmq
version: 5.1.2
version: 5.1.3
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
9 changes: 4 additions & 5 deletions src/Simplex/FileTransfer/Client/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Char (toLower)
import Data.Function (on)
import Data.Int (Int64)
import Data.List (foldl', groupBy, sortOn)
import Data.List (foldl', sortOn)
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
Expand All @@ -66,7 +65,7 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.Server.CLI (getCliCommand')
import Simplex.Messaging.Util (ifM, tshow, whenM)
import Simplex.Messaging.Util (groupAllOn, ifM, tshow, whenM)
import System.Exit (exitFailure)
import System.FilePath (splitFileName, (</>))
import System.IO.Temp (getCanonicalTemporaryDirectory)
Expand Down Expand Up @@ -316,7 +315,7 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
let xftpSrvs = fromMaybe defaultXFTPServers (nonEmpty xftpServers)
srvs <- liftIO $ replicateM (length chunks) $ getXFTPServer gen xftpSrvs
let thd3 (_, _, x) = x
chunks' = groupBy ((==) `on` thd3) $ sortOn thd3 $ zip3 [1 ..] chunks srvs
chunks' = groupAllOn thd3 $ zip3 [1 ..] chunks srvs
-- TODO shuffle/unshuffle chunks
-- the reason we don't do pooled downloads here within one server is that http2 library doesn't handle cleint concurrency, even though
-- upload doesn't allow other requests within the same client until complete (but download does allow).
Expand Down Expand Up @@ -428,7 +427,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
liftIO $ printNoNewLine "Downloading file..."
downloadedChunks <- newTVarIO []
let srv FileChunk {replicas} = server (head replicas :: FileChunkReplica)
srvChunks = groupBy ((==) `on` srv) $ sortOn srv chunks
srvChunks = groupAllOn srv chunks
chunkPaths <- map snd . sortOn fst . concat <$> pooledForConcurrentlyN 16 srvChunks (mapM $ downloadFileChunk a encPath size downloadedChunks)
encDigest <- liftIO $ LC.sha512Hash <$> readChunks chunkPaths
when (encDigest /= unFileDigest digest) $ throwError $ CLIError "File digest mismatch"
Expand Down
11 changes: 4 additions & 7 deletions src/Simplex/FileTransfer/Description.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Function (on)
import Data.Int (Int64)
import Data.List (foldl', groupBy, sortOn)
import Data.List (foldl', sortOn)
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
Expand All @@ -59,7 +58,7 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (XFTPServer)
import Simplex.Messaging.Util (bshow, (<$?>))
import Simplex.Messaging.Util (bshow, groupAllOn, (<$?>))

data FileDescription (p :: FileParty) = FileDescription
{ party :: SFileParty p,
Expand Down Expand Up @@ -258,17 +257,15 @@ instance (ToField a) => ToField (FileSize a) where toField (FileSize s) = toFiel

groupReplicasByServer :: FileSize Word32 -> [FileChunk] -> [[FileServerReplica]]
groupReplicasByServer defChunkSize =
groupBy ((==) `on` replicaServer)
. sortOn replicaServer
. unfoldChunksToReplicas defChunkSize
groupAllOn replicaServer . unfoldChunksToReplicas defChunkSize

encodeFileReplicas :: FileSize Word32 -> [FileChunk] -> [YAMLServerReplicas]
encodeFileReplicas defChunkSize =
map encodeServerReplicas . groupReplicasByServer defChunkSize
where
encodeServerReplicas fs =
YAMLServerReplicas
{ server = replicaServer $ head fs, -- groupBy guarantees that fs is not empty
{ server = replicaServer $ head fs, -- groupAllOn guarantees that fs is not empty
chunks = map (B.unpack . encodeServerReplica) fs
}

Expand Down
9 changes: 5 additions & 4 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,10 @@ import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as U
import Data.Char (toLower)
import Data.Function (on)
import Data.Functor (($>))
import Data.IORef
import Data.Int (Int64)
import Data.List (foldl', groupBy, intercalate, sortBy)
import Data.List (foldl', intercalate, sortBy)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
Expand Down Expand Up @@ -250,7 +249,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, dropPrefix, fromTextField_, s
import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, diffToMilliseconds, eitherToMaybe, ($>>=), (<$$>))
import Simplex.Messaging.Util (bshow, diffToMilliseconds, eitherToMaybe, groupOn, ($>>=), (<$$>))
import Simplex.Messaging.Version
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
import System.Exit (exitFailure)
Expand Down Expand Up @@ -1092,7 +1091,9 @@ insertedRowId db = fromOnly . head <$> DB.query_ db "SELECT last_insert_rowid()"

getPendingCommands :: DB.Connection -> ConnId -> IO [(Maybe SMPServer, [AsyncCmdId])]
getPendingCommands db connId = do
map (\ids -> (fst $ head ids, map snd ids)) . groupBy ((==) `on` fst) . map srvCmdId
-- `groupOn` is used instead of `groupAllOn` to avoid extra sorting by `server + cmdId`, as the query already sorts by them.
-- TODO review whether this can break if, e.g., the server has another key hash.
map (\ids -> (fst $ head ids, map snd ids)) . groupOn fst . map srvCmdId
<$> DB.query
db
[sql|
Expand Down
15 changes: 9 additions & 6 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ data ProtocolClientConfig = ProtocolClientConfig
defaultTransport :: (ServiceName, ATransport),
-- | network configuration
networkConfig :: NetworkConfig,
-- | SMP client-server protocol version range
smpServerVRange :: VersionRange
-- | client-server protocol version range
serverVRange :: VersionRange,
-- | delay between sending batches of commands (microseconds)
batchDelay :: Maybe Int
}

-- | Default protocol client configuration.
Expand All @@ -230,7 +232,8 @@ defaultClientConfig =
{ qSize = 64,
defaultTransport = ("443", transport @TLS),
networkConfig = defaultNetworkConfig,
smpServerVRange = supportedSMPServerVRange
serverVRange = supportedSMPServerVRange,
batchDelay = Nothing
}

data Request err msg = Request
Expand Down Expand Up @@ -276,7 +279,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId)
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getProtocolClient :: forall err msg. Protocol err msg => TransportSession msg -> ProtocolClientConfig -> Maybe (TBQueue (ServerTransmission msg)) -> (ProtocolClient err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient err msg))
getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, smpServerVRange} msgQ disconnected = do
getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, serverVRange, batchDelay} msgQ disconnected = do
case chooseTransportHost networkConfig (host srv) of
Right useHost ->
(atomically (mkProtocolClient useHost) >>= runClient useTransport useHost)
Expand Down Expand Up @@ -329,7 +332,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,

client :: forall c. Transport c => TProxy c -> PClient err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient err msg)) -> c -> IO ()
client _ c cVar h =
runExceptT (protocolClientHandshake @err @msg h (keyHash srv) smpServerVRange) >>= \case
runExceptT (protocolClientHandshake @err @msg h (keyHash srv) serverVRange) >>= \case
Left e -> atomically . putTMVar cVar . Left $ PCETransportError e
Right th@THandle {sessionId, thVersion} -> do
sessionTs <- getCurrentTime
Expand All @@ -341,7 +344,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,
`finally` disconnected c'

send :: Transport c => ProtocolClient err msg -> THandle c -> IO ()
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPut h
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPut h batchDelay

receive :: Transport c => ProtocolClient err msg -> THandle c -> IO ()
receive ProtocolClient {client_ = PClient {rcvQ}} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ
Expand Down
56 changes: 24 additions & 32 deletions src/Simplex/Messaging/Client/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import Control.Logger.Simple
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Except
import Data.Bifunctor (first)
import Data.Bifunctor (first, bimap)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.List (find, partition)
import Data.Either (partitionEithers)
import Data.List (partition)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (listToMaybe)
import Data.Set (Set)
import Data.Text.Encoding
import Data.Tuple (swap)
Expand All @@ -43,16 +45,15 @@ import UnliftIO (async)
import UnliftIO.Exception (Exception)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Data.Either (isLeft)

type SMPClientVar = TMVar (Either SMPClientError SMPClient)

data SMPClientAgentEvent
= CAConnected SMPServer
| CADisconnected SMPServer (Set SMPSub)
| CAReconnected SMPServer
| CAResubscribed SMPServer SMPSub
| CASubError SMPServer SMPSub SMPClientError
| CAResubscribed SMPServer (NonEmpty SMPSub)
| CASubError SMPServer (NonEmpty (SMPSub, SMPClientError))

data SMPSubParty = SPRecipient | SPNotifier
deriving (Eq, Ord, Show)
Expand Down Expand Up @@ -208,45 +209,36 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
reconnectClient :: ExceptT SMPClientError IO ()
reconnectClient = do
withSMP ca srv $ \smp -> do
liftIO . notify $ CAReconnected srv
liftIO $ notify $ CAReconnected srv
cs_ <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca)
forM_ cs_ $ \cs -> do
subs' <- filterM (fmap not . atomically . hasSub (srvSubs ca) srv . fst) $ M.assocs cs
let (nSubs, rSubs) = partition (isNotifier . fst . fst) subs'
nRs <- liftIO $ subscribe_ smp SPNotifier nSubs
rRs <- liftIO $ subscribe_ smp SPRecipient rSubs
case find isLeft $ nRs <> rRs of
Just (Left e) -> throwE e
_ -> pure ()
subscribe_ smp SPNotifier nSubs
subscribe_ smp SPRecipient rSubs
where
isNotifier = \case
SPNotifier -> True
SPRecipient -> False

subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateSignKey)] -> IO [Either SMPClientError ()]
subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateSignKey)] -> ExceptT SMPClientError IO ()
subscribe_ smp party subs =
case L.nonEmpty subs of
Just subs' -> do
let subs'' = L.map (first snd) subs'
rs <- L.zip subs'' <$> smpSubscribeQueues party ca smp srv subs''
rs' <- forM rs $ \(sub, r) -> do
let sub' = first (party,) sub
s = fst sub'
case snd r of
Right () -> do
atomically $ addSubscription ca srv sub'
notify $ CAResubscribed srv s
pure $ Right ()
Left e -> do
case e of
PCEResponseTimeout -> pure $ Left e
PCENetworkError -> pure $ Left e
_ -> do
notify $ CASubError srv s e
atomically $ removePendingSubscription ca srv s
pure $ Right ()
pure $ L.toList rs'
Nothing -> pure []
let subs'' :: (NonEmpty (QueueId, C.APrivateSignKey)) = L.map (first snd) subs'
rs <- liftIO $ smpSubscribeQueues party ca smp srv subs''
let rs' :: (NonEmpty ((SMPSub, C.APrivateSignKey), Either SMPClientError ())) =
L.zipWith (first . const) subs' rs
rs'' :: [Either (SMPSub, SMPClientError) (SMPSub, C.APrivateSignKey)] =
map (\(sub, r) -> bimap (fst sub,) (const sub) r) $ L.toList rs'
(errs, oks) = partitionEithers rs''
(tempErrs, finalErrs) = partition (temporaryClientError . snd) errs
mapM_ (atomically . addSubscription ca srv) oks
mapM_ (liftIO . notify . CAResubscribed srv) $ L.nonEmpty $ map fst oks
mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs
mapM_ (liftIO . notify . CASubError srv) $ L.nonEmpty finalErrs
mapM_ (throwE . snd) $ listToMaybe tempErrs
Nothing -> pure ()

notify :: SMPClientAgentEvent -> IO ()
notify evt = atomically $ writeTBQueue (agentQ ca) evt
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Notifications/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ data NtfSubStatus
NSAuth
| -- | SMP error other than AUTH
NSErr ByteString
deriving (Eq, Show)
deriving (Eq, Ord, Show)

ntfShouldSubscribe :: NtfSubStatus -> Bool
ntfShouldSubscribe = \case
Expand Down
Loading

0 comments on commit 4a927d1

Please sign in to comment.