diff --git a/rfcs/2023-05-03-delivery-receipts.md b/rfcs/2023-05-03-delivery-receipts.md new file mode 100644 index 000000000..bc5658f96 --- /dev/null +++ b/rfcs/2023-05-03-delivery-receipts.md @@ -0,0 +1,170 @@ +# Delivery receipts + +## Problems + +User experience - users need to know that the messages are delivered to the recipient, as this confirms that the system is functioning. + +The downside of communicating message delivery as it confirms that the recipient was online, and, unless there is a delay in confirming, can be used to track the location via the variation in network latency. So delivery receipts should be delayed with a randomized interval and should be opt in or opt out. + +Another problem of message receipts is that they increase network traffic and server load. This could be avoided if delivery receipts are communicated as part of normal message delivery flow. + +Some other existing and planned features implicitely confirm message delivery and, possibly, should depend on message delivery being enabled: +- agent message to resume delivery when quota was exceeded (implemented, [rfc](./2022-12-27-queue-quota.md)) +- agent message to re-deliver skipped messages or to re-negotiate double ratchet. + +## Solution + +There are three layers where delivery receipts can be implemented: +- chat protocol. Pro: logic of when to deliver it is decoupled from the message flow, Con: extra traffic, can only work in duplex connections. +- agent client protocol. Pro: can be automated and combined with the protocol to re-deliver skipped messages. Con: extra traffic. +- SMP protocol. Pro: minimal extra traffic, Con: complicates server design as it would require pushing receipts when there is no next message. + +The last approach seems the most promising for avoiding additional traffic: +- modify client ACK command to include whether delivery receipt should be provided to sender, and, possibly, any e2e encrypted data that should be included in the receipt (e.g., that the receiving client already saw this message in case we use "feedback" variant of roumor-mongering protocol for groups). +- server would manage delaying of the receipts, by randomizing the time after which the receipt will be available to the sender, and by combining the receipts when possible. +- modify response to SEND command to include any available delivery receipts. +- add a separate delivery receipt that will be pushed to the sender in the connection where the message was received by the server. + +## SMP protocol changes + +```haskell +data Command (p :: Party) where + -- ... + ACK :: MsgId -> Maybe ByteString -> Command Recipient + -- the presense of ByteString in ACK indicates that the delivery needs to be confirmed. + -- the protocol does not define the format of this confirmation, it is application specific, and can be -- an empty string. + -- And open question is how to e2e encrypt information in this string - this probably can be handled on Agent client protocol level, and could be the same ratchet key that was used to encrypt and decrypt the message. The downside of this approach is that this key currently is not stored, and storing it requires additional logic to clear these keys if unused after some time. + -- TODO consider what could be a better approach. + SENT :: MsgId -> [(MsgId, UTCTime, ByteString)] -> Command Sender + -- or + -- SENT :: MsgId -> Command Sender + -- in case we just batch + -- this response will be sent to SEND command and will include a sender's message ID generated by the server (currently it does not exist), and posibly an empty list of delivery receipts with the same message IDs as in responses to SEND, timestamps when these receipts became available, and e2e encrypted ByteString passed in ACK command. + -- The ID in this response should be different from the ID used in MSG, to keep the promise of not having shared identifiers in sent/received traffic even inside TLS tunnel. + -- Keeping the quality of shared ciphertext also requires adding additional encryption layer between the server and the sender, this can be achieved in one of two ways: + -- 1) passing a separate DH key in each SEND command, and server including additional DH key in each SENT response, with computed DH secret per message later used to encrypt and decrypt the delivery receipt payloads. This is probably a bad idea as it would increase a cryptographic load on both the server and the client. + -- 2) agree a key per queue, in the same way it is done for the recipient. Possibly, it requires additional DH key in confirmation message that the recipient then uses to secure the queue, and passing this key in KEY (secure queue) command. The response to this secure command would the include server's DH key returned to the recipient that would be passed to the sender in HELLO message. Even though recipient could observe both public DH keys, they won't know the computed shared secret. Recipient that controls the server could perform MITM attack on this key exchange, but it doesn't give any benefit over what recipient can do when they have access to the server - the threat model remains the same. The downside of this approach is that it also requires additional changes in client protocol level (confirmation message format and HELLO message). + -- 3) also agree on a key per queue, but via separate commands between the sender and the server, once the sender was notified that the queue is secured. This approach is probably better, and the server would simply delay the delivery of delivery receipts until the shared secret is agreed. + SKEY :: C.PublicKeyX25519 -> Command Sender + SBKEY :: C.PublicKeyX25519 -> BrokerMsg + -- these are the command and response to agree secret to encrypt delivery receipt payloads for option 3 + SSUB :: Command Sender + -- subscribe to receive delivery receipts for a given queue - will be sent when the conversation is opened (unless there is an active subscription already), not all queues at once, and won't be re-subscribed on losing the server connection (TBC). + RCVD :: MsgId -> UTCTime -> ByteString -> Command Recipient + -- delivery receipt. UTCTime is the time when it became available, not the time when ACK was sent by the recipient, to avoid leaking location via network latency. +``` + +Possibly, there is no need to include delivery receipts into SENT response and instead just use batching of responses that is already supported. As server responses are not signed, there is no per-response overhead that is substantial, and a lot of receipts that are available can be packed into one block (depending on the size of payload that has to be fixed not to leak metadata). + +This all seems rather complicated for SMP protocol, and the approach of doing it on a higher level seems more attractive than initially. Possibly we should reconsider, and reduce traffic by reducing block sizes... Reducing block sizes unfortunately requires supporting variable block sizes, and would leak some metadata during the transition period. + +## Another approach + +Above represents substantial complexity, and at least doubles server code complexity for the feature that is definitely not doubling the value of server software. Moving to variable block size is simpler, but also has a lot of complexity, reduces metadata privacy (at least for the duration of migration period), reduces image preview quality, and requires postponing this feature for multiple releases, until all clients migrate. + +Given that the main traffic is generated by the groups, and direct messages do not create a lot of traffic, a much simpler and better solution is to simply send delivery receipts as the message, in direct conversations only, either as chat protocol message or as agent client protocol message (either on the message or on the envelope layer). + +### Comparison of these two approaches: + +**Chat protocol message** + +Pros: +- simpler, more contained change - SMP layer is not aware of this feature +- easier to extend protocol with additional application specific payload, e.g. references to group DAG +Cons: +- ? + +```json + // ... + "x.msg.delivered": { + "properties": { + "msgId": {"ref": "base64url"}, + "params": { + "properties": { + "msgId": {"ref": "base64url"}, + }, + "optionalProperties": { + "data": {} // possibly the initial protocol does not need it, with JSON can be added later + } + } + } + }, + // ... +``` + +**Agent client protocol envelope** + +Pros: +- possibility of using it in a wider range of the applications +- possibility to include received message hash to increase communication integrity - the sending client would be then notified, and it can be exposed in the UI, that the received message is not the same as sent. + +Cons: +- additional implementation complexity - requires additional events to communicate between chat and agent. + +```haskell +data AMessage = + -- ... + | A_RCVD AgentMsgId MsgHash ByteString -- references to the received message + -- ... +``` + +The weirdness of the above design is that it refers to the data present in the header of another message, the alternative would be to have a separate envelope for delivery receipt: + +```haskell +data AgentMessage = + -- ... + AgentMessageRcvd APrivHeader AgentMsgId MsgHash ByteString -- references to the received message + -- ... +``` + +But probably the first one is a bit better, TBC. + +In any case there should be an additional event to notify chat client: + +```haskell +data ACommand (p :: AParty) (e :: AEntity) where + -- ... + RCVD AgentMsgId MsgMeta ByteString -> ACommand Agent AEConn + -- ... +``` + +On the balance of things, implementing on the level of Agent Client protocol seems better, as the additional complexity is marginal, but it allows for wider range of applications, and also allows for additional delivery integrity validation. The format for payload still requires chat protocol message encoding once we want to add it, but initially it could be just an empty string. + +## Implementation plan + +Currently, we delete sent messages once delivered to the server. It would be helpful if we could keep the records in snd_messages table, and then use them to process delivery receipts, although it may be insufficient (we could add fields). Probably it is not possible to keep them as there is a foreign key constraint with `on delete cascade`. + +The new table will be used to track sent message hashes to correlate their IDs with delivery receipts (that will also be stored in messages/rcv_messages). Receipts need to be processed in chat in the same way as normal messages, so they would be sent to chat with MsgMeta, and the chat will need to ack them once processed. + +Agent ackMessage function will be also used to automatically schedule sending delivery receipts if they are enabled for connection, only for normal messages - no sending receipts to receipts. + +There can be receipt re-delivery to the chat in the same cases as when normal message can be re-delivered (in case of AGENT A_DUPLICATE when it was not ack'd by the user). + +## Other considerations + +### How clients decide whether to send delivery receipts + +Two options are possible - local settings per conversation or chat preferences framework, that allows to have mutual on/off. The latter seems preferable as without knowing whether the other party is sending receipts, it is not possible to uderstand what the absense of the receipt means - network malfunction or receipts disabled. + +Groups are a special case, as while some groups may enable sending the delivery receipts, the group members should be able to disable it locally. This should probably be done via a separate conversation setting in the same way as enabling notifications or favourites. In this case the receipt would only be sent to the group if it is enabled in the group and not disabled by the member. The toggle may be located in the same page as chat preferences, but it should be a separate setting. We might want though to communicate somehow whether a given member sends delivery receipts so that other members know whether to expect them or not. + +### How this functionality is released + +5.2: +- support for sending and receiving delivery receipts preference, both in direct messages and in groups (for forward compatibility). +- support for sending and receiving delivery receipts in direct chats only, but disable sending them + +5.3: +- show receipt preferences in the UI +- enable sending receipts in direct chats where they are enabled + +A separate question is how to enable this functionality for the existing contacts. Possible options are: + +1. Enable (as per default) for all contacts, show notification to the user when they open the app for update that delivery notifications are now sent by default to all contacts. Pro: no extra logic to implement. Con: may be perceived as a privacy violation, as to some contacts the delivery receipts will be sent before the user had chance to disable them. +2. Ask the user when the new version first runs whether they want to use delivery receipts and offer these options: + 1) keep enabled for all profiles (and for all contacts) + 2) enable for all profiles in ~12 or in ~24 hours giving users the chance to review all contacts / profiles and disable some of them. The problem here is also in possibility of the correlation in case they all start sending receipts at the same time. Possibly the option could be to set a random time in 12-15 or 24-30 hours range to avoid the possibility of such correlation. + 3) disable for all profiles – it will require sending profile updates to all contacts, so the delivery receipts should be kept disabled and profile updates should be sent after random intervals, one by one (not scheduled all at once, as the time may pass while the app is off). +3. Offer an option to enable globally later - we could keep it as a one-off option, available only to existing users, and visible on the top level of the Settings - once enabled, the option will disappear and it won't be possible to disable again. The downside here is that the new contacts would be receiving the profile with enabled notifications but they still won't be delivered... +4. Another option is to have all new contacts decided based on a global user default (that can be set in the settings and in the dialog on first start), but for the existing contacts keep in unset state that is not interpreted as either on or off, but interpreted as unknown until the user makes a choice... That might be an optimal solution for the users but it would probably require changing the preferences framework or some ad-hoc hacks. That still keeps the question open how to avoid correlation between profiles. +5. That might be the case for version agreement too - the availability of the option per contact will depend on the version. It doesn't answer the question what to do with global defaults though... diff --git a/simplexmq.cabal b/simplexmq.cabal index 89f57d40a..741a4d95e 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -34,7 +34,6 @@ flag swift library exposed-modules: - Simplex.FileTransfer Simplex.FileTransfer.Agent Simplex.FileTransfer.Client Simplex.FileTransfer.Client.Agent @@ -85,6 +84,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/FileTransfer.hs b/src/Simplex/FileTransfer.hs deleted file mode 100644 index 9f648d7ff..000000000 --- a/src/Simplex/FileTransfer.hs +++ /dev/null @@ -1,14 +0,0 @@ -module Simplex.FileTransfer where - --- TODO --- Protocol --- Store (in memory storage) --- StoreLog (append only log) --- FileDescription --- Server --- Client --- Server/Main (server CLI) --- Client/Main (client CLI) --- --- Transport for HTTP2 ? --- streaming Crypto diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index ab09a3a14..9b2499c44 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -117,7 +117,7 @@ import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (fromMaybe, isJust, isNothing) +import Data.Maybe (fromMaybe, isJust, isNothing, catMaybes) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock @@ -152,7 +152,6 @@ import Simplex.Messaging.Util import Simplex.Messaging.Version import UnliftIO.Async (async, race_) import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay) -import qualified UnliftIO.Exception as E import UnliftIO.STM -- import GHC.Conc (unsafeIOToSTM) @@ -203,8 +202,8 @@ acceptContactAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> Bool -> Con acceptContactAsync c corrId enableNtfs = withAgentEnv c .: acceptContactAsync' c corrId enableNtfs -- | Acknowledge message (ACK command) asynchronously, no synchronous response -ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> m () -ackMessageAsync c = withAgentEnv c .:. ackMessageAsync' c +ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m () +ackMessageAsync c = withAgentEnv c .:: ackMessageAsync' c -- | Switch connection to the new receive queue switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m ConnectionStats @@ -264,8 +263,8 @@ resubscribeConnections c = withAgentEnv c . resubscribeConnections' c sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId sendMessage c = withAgentEnv c .:. sendMessage' c -ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> m () -ackMessage c = withAgentEnv c .: ackMessage' c +ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m () +ackMessage c = withAgentEnv c .:. ackMessage' c -- | Switch connection to the new receive queue switchConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats @@ -432,7 +431,7 @@ processCommand c (connId, APC e cmd) = RJCT invId -> rejectContact' c connId invId $> (connId, OK) SUB -> subscribeConnection' c connId $> (connId, OK) SEND msgFlags msgBody -> (connId,) . MID <$> sendMessage' c connId msgFlags msgBody - ACK msgId -> ackMessage' c connId msgId $> (connId, OK) + ACK msgId rcptInfo_ -> ackMessage' c connId msgId rcptInfo_ $> (connId, OK) SWCH -> switchConnection' c connId $> (connId, OK) OFF -> suspendConnection' c connId $> (connId, OK) DEL -> deleteConnection' c connId $> (connId, OK) @@ -507,8 +506,8 @@ acceptContactAsync' c corrId enableNtfs invId ownConnInfo = do throwError err _ -> throwError $ CMD PROHIBITED -ackMessageAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> m () -ackMessageAsync' c corrId connId msgId = do +ackMessageAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m () +ackMessageAsync' c corrId connId msgId rcptInfo_ = do SomeConn cType _ <- withStore c (`getConn` connId) case cType of SCDuplex -> enqueueAck @@ -519,8 +518,11 @@ ackMessageAsync' c corrId connId msgId = do where enqueueAck :: m () enqueueAck = do - (RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId - enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId + let mId = InternalId msgId + RcvMsg {msgType} <- withStore c $ \db -> getRcvMsg db connId mId + when (isJust rcptInfo_ && msgType /= AM_A_MSG_) $ throwError $ CMD PROHIBITED + (RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId mId + enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId rcptInfo_ deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () deleteConnectionAsync' c connId = deleteConnectionsAsync' c [connId] @@ -891,7 +893,7 @@ runCommandProcessing c@AgentClient {subQ} server_ = do void $ joinConnSrv c userId connId True enableNtfs cReq connInfo srv notify OK LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK - ACK msgId -> withServer' . tryCommand $ ackMessage' c connId msgId >> notify OK + ACK msgId rcptInfo_ -> withServer' . tryCommand $ ackMessage' c connId msgId rcptInfo_ >> notify OK SWCH -> noServer . tryCommand . withConnLock c connId "switchConnection" $ withStore c (`getConn` connId) >>= \case @@ -1112,6 +1114,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl _ -> connError msgId NOT_ACCEPTED AM_REPLY_ -> notifyDel msgId err AM_A_MSG_ -> notifyDel msgId err + AM_A_RCVD_ -> notifyDel msgId err AM_QCONT_ -> notifyDel msgId err AM_QADD_ -> qError msgId "QADD: AUTH" AM_QKEY_ -> qError msgId "QKEY: AUTH" @@ -1166,6 +1169,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl qInfo <- createReplyQueue c cData sq srv void . enqueueMessage c cData sq SMP.noMsgFlags $ REPLY [qInfo] AM_A_MSG_ -> notify $ SENT mId + AM_A_RCVD_ -> pure () AM_QCONT_ -> pure () AM_QADD_ -> pure () AM_QKEY_ -> do @@ -1200,10 +1204,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl _ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue" _ -> internalErr msgId "QTEST sent not in duplex connection" AM_EREADY_ -> pure () - delMsg msgId + delMsgKeep (msgType == AM_A_MSG_) msgId where delMsg :: InternalId -> m () - delMsg msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId + delMsg = delMsgKeep False + delMsgKeep :: Bool -> InternalId -> m () + delMsgKeep keepForReceipt msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt notify :: forall e. AEntityI e => ACommand 'Agent e -> m () notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd) notifyDel :: AEntityI e => InternalId -> ACommand 'Agent e -> m () @@ -1220,22 +1226,38 @@ retrySndOp c loop = do atomically $ beginAgentOperation c AOSndNetwork loop -ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m () -ackMessage' c connId msgId = withConnLock c connId "ackMessage" $ do +ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m () +ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do SomeConn _ conn <- withStore c (`getConn` connId) case conn of - DuplexConnection {} -> ack - RcvConnection {} -> ack + DuplexConnection {} -> ack >> sendRcpt conn >> del + RcvConnection {} -> ack >> del SndConnection {} -> throwError $ CONN SIMPLEX ContactConnection {} -> throwError $ CMD PROHIBITED NewConnection _ -> throwError $ CMD PROHIBITED where ack :: m () ack = do - let mId = InternalId msgId - (rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId mId + -- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged + (rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId ackQueueMessage c rq srvMsgId - withStore' c $ \db -> deleteMsg db connId mId + del :: m () + del = withStore' c $ \db -> deleteMsg db connId $ InternalId msgId + sendRcpt :: Connection 'CDuplex -> m () + sendRcpt (DuplexConnection cData _ sqs) = do + msg@RcvMsg {msgType, msgReceipt} <- withStore c $ \db -> getRcvMsg db connId $ InternalId msgId + case rcptInfo_ of + Just rcptInfo -> do + unless (msgType == AM_A_MSG_) $ throwError (CMD PROHIBITED) + when (messageRcptsSupported cData) $ do + let RcvMsg {msgMeta = MsgMeta {sndMsgId}, internalHash} = msg + rcpt = A_RCVD [AMessageReceipt {agentMsgId = sndMsgId, msgHash = internalHash, rcptInfo}] + void $ enqueueMessages c cData sqs SMP.MsgFlags {notification = False} rcpt + Nothing -> case (msgType, msgReceipt) of + -- only remove sent message if receipt hash was Ok, both to debug and for future redundancy + (AM_A_RCVD_, Just MsgReceipt {agentMsgId = sndMsgId, msgRcptStatus = MROk}) -> + withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId + _ -> pure () switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats switchConnection' c connId = @@ -1725,30 +1747,30 @@ cleanupManager c@AgentClient {subQ} = do delay <- asks (initialCleanupDelay . config) liftIO $ threadDelay' delay int <- asks (cleanupInterval . config) + ttl <- asks $ storedMsgDataTTL . config forever $ do - void . runExceptT $ do - deleteConns `catchAgentError` (notify "" . ERR) - deleteRcvMsgHashes `catchAgentError` (notify "" . ERR) - deleteProcessedRatchetKeyHashes `catchAgentError` (notify "" . ERR) - deleteRcvFilesExpired `catchAgentError` (notify "" . RFERR) - deleteRcvFilesDeleted `catchAgentError` (notify "" . RFERR) - deleteRcvFilesTmpPaths `catchAgentError` (notify "" . RFERR) - deleteSndFilesExpired `catchAgentError` (notify "" . SFERR) - deleteSndFilesDeleted `catchAgentError` (notify "" . SFERR) - deleteSndFilesPrefixPaths `catchAgentError` (notify "" . SFERR) - deleteExpiredReplicasForDeletion `catchAgentError` (notify "" . SFERR) + run ERR deleteConns + run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl) + run ERR $ withStore' c (`deleteSndMsgsExpired` ttl) + run ERR $ withStore' c (`deleteRatchetKeyHashesExpired` ttl) + run RFERR deleteRcvFilesExpired + run RFERR deleteRcvFilesDeleted + run RFERR deleteRcvFilesTmpPaths + run SFERR deleteSndFilesExpired + run SFERR deleteSndFilesDeleted + run SFERR deleteSndFilesPrefixPaths + run SFERR deleteExpiredReplicasForDeletion liftIO $ threadDelay' int where + run :: forall e. AEntityI e => (AgentErrorType -> ACommand 'Agent e) -> ExceptT AgentErrorType m () -> m () + run err a = do + void . runExceptT $ a `catchAgentError` (notify "" . err) + step <- asks $ cleanupStepInterval . config + liftIO $ threadDelay step deleteConns = withLock (deleteLock c) "cleanupManager" $ do void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c withStore' c deleteUsersWithoutConns >>= mapM_ (notify "" . DEL_USER) - deleteRcvMsgHashes = do - rcvMsgHashesTTL <- asks $ rcvMsgHashesTTL . config - withStore' c (`deleteRcvMsgHashesExpired` rcvMsgHashesTTL) - deleteProcessedRatchetKeyHashes = do - rkHashesTTL <- asks $ processedRatchetKeyHashesTTL . config - withStore' c (`deleteProcessedRatchetKeyHashesExpired` rkHashesTTL) deleteRcvFilesExpired = do rcvFilesTTL <- asks $ rcvFilesTTL . config rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL) @@ -1855,6 +1877,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s A_MSG body -> do logServer "<--" c srv rId "MSG " notify $ MSG msgMeta msgFlags body + A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending addr QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg qs QKEY qs -> qDuplexAckDel conn'' "QKEY" $ qKeyMsg qs @@ -2078,6 +2101,28 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s void $ tryPutTMVar qLock () Nothing -> qError "QCONT: queue address not found" + messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m () + messagesRcvd rcpts msgMeta@MsgMeta {broker = (srvMsgId, _)} _ = do + logServer "<--" c srv rId "MSG " + rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing + case L.nonEmpty . catMaybes $ L.toList rs of + Just rs' -> notify $ RCVD msgMeta rs' -- client must ACK once processed + Nothing -> enqueueCmd $ ICAck rId srvMsgId + where + clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt) + clientReceipt AMessageReceipt {agentMsgId, msgHash} = do + let sndMsgId = InternalSndId agentMsgId + SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStore c $ \db -> getSndMsgViaRcpt db connId sndMsgId + if msgType /= AM_A_MSG_ + then notify (ERR $ AGENT A_PROHIBITED) $> Nothing -- unexpected message type for receipt + else case msgReceipt of + Just MsgReceipt {msgRcptStatus = MROk} -> pure Nothing -- already notified with MROk status + _ -> do + let msgRcptStatus = if msgHash == internalHash then MROk else MRBadMsgHash + rcpt = MsgReceipt {agentMsgId = msgId, msgRcptStatus} + withStore' c $ \db -> updateSndMsgRcpt db connId sndMsgId rcpt + pure $ Just rcpt + -- processed by queue sender qAddMsg :: NonEmpty (SMPQueueUri, Maybe SndQAddr) -> Connection 'CDuplex -> m () qAddMsg ((_, Nothing) :| _) _ = qError "adding queue without switching is not supported" @@ -2195,7 +2240,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s rkHash k1 k2 = C.sha256Hash $ C.pubKeyBytes k1 <> C.pubKeyBytes k2 ratchetExists :: m Bool ratchetExists = withStore' c $ \db -> do - exists <- checkProcessedRatchetKeyHashExists db connId rkHashRcv + exists <- checkRatchetKeyHashExists db connId rkHashRcv unless exists $ addProcessedRatchetKeyHash db connId rkHashRcv pure exists getSendRatchetKeys :: m (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448) diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 5e779cc22..3a818f250 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -87,8 +87,8 @@ data AgentConfig = AgentConfig helloTimeout :: NominalDiffTime, initialCleanupDelay :: Int64, cleanupInterval :: Int64, - rcvMsgHashesTTL :: NominalDiffTime, - processedRatchetKeyHashesTTL :: NominalDiffTime, + cleanupStepInterval :: Int, + storedMsgDataTTL :: NominalDiffTime, rcvFilesTTL :: NominalDiffTime, sndFilesTTL :: NominalDiffTime, xftpNotifyErrsOnRetry :: Bool, @@ -152,8 +152,8 @@ defaultAgentConfig = helloTimeout = 2 * nominalDay, initialCleanupDelay = 30 * 1000000, -- 30 seconds cleanupInterval = 30 * 60 * 1000000, -- 30 minutes - rcvMsgHashesTTL = 30 * nominalDay, - processedRatchetKeyHashesTTL = 30 * nominalDay, + cleanupStepInterval = 200000, -- 200ms + storedMsgDataTTL = 21 * nominalDay, rcvFilesTTL = 2 * nominalDay, sndFilesTTL = nominalDay, xftpNotifyErrsOnRetry = True, diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 5ab875757..e4c720aef 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -69,6 +69,10 @@ module Simplex.Messaging.Agent.Protocol AgentMessageType (..), APrivHeader (..), AMessage (..), + AMessageReceipt (..), + MsgReceipt (..), + MsgReceiptInfo, + MsgReceiptStatus (..), SndQAddr, SMPServer, pattern SMPServer, @@ -211,7 +215,7 @@ import Text.Read import UnliftIO.Exception (Exception) currentSMPAgentVersion :: Version -currentSMPAgentVersion = 3 +currentSMPAgentVersion = 4 supportedSMPAgentVRange :: VersionRange supportedSMPAgentVRange = mkVersionRange 1 currentSMPAgentVersion @@ -314,7 +318,7 @@ data ACommand (p :: AParty) (e :: AEntity) where JOIN :: Bool -> AConnectionRequestUri -> ConnInfo -> ACommand Client AEConn -- response OK CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand Agent AEConn -- ConnInfo is from sender, [SMPServer] will be empty only in v1 handshake LET :: ConfirmationId -> ConnInfo -> ACommand Client AEConn -- ConnInfo is from client - REQ :: InvitationId -> L.NonEmpty SMPServer -> ConnInfo -> ACommand Agent AEConn -- ConnInfo is from sender + REQ :: InvitationId -> NonEmpty SMPServer -> ConnInfo -> ACommand Agent AEConn -- ConnInfo is from sender ACPT :: InvitationId -> ConnInfo -> ACommand Client AEConn -- ConnInfo is from client RJCT :: InvitationId -> ACommand Client AEConn INFO :: ConnInfo -> ACommand Agent AEConn @@ -332,7 +336,8 @@ data ACommand (p :: AParty) (e :: AEntity) where SENT :: AgentMsgId -> ACommand Agent AEConn MERR :: AgentMsgId -> AgentErrorType -> ACommand Agent AEConn MSG :: MsgMeta -> MsgFlags -> MsgBody -> ACommand Agent AEConn - ACK :: AgentMsgId -> ACommand Client AEConn + ACK :: AgentMsgId -> Maybe MsgReceiptInfo -> ACommand Client AEConn + RCVD :: MsgMeta -> NonEmpty MsgReceipt -> ACommand Agent AEConn SWCH :: ACommand Client AEConn OFF :: ACommand Client AEConn DEL :: ACommand Client AEConn @@ -392,6 +397,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where MERR_ :: ACommandTag Agent AEConn MSG_ :: ACommandTag Agent AEConn ACK_ :: ACommandTag Client AEConn + RCVD_ :: ACommandTag Agent AEConn SWCH_ :: ACommandTag Client AEConn OFF_ :: ACommandTag Client AEConn DEL_ :: ACommandTag Client AEConn @@ -443,7 +449,8 @@ aCommandTag = \case SENT _ -> SENT_ MERR {} -> MERR_ MSG {} -> MSG_ - ACK _ -> ACK_ + ACK {} -> ACK_ + RCVD {} -> RCVD_ SWCH -> SWCH_ OFF -> OFF_ DEL -> DEL_ @@ -743,6 +750,25 @@ data MsgMeta = MsgMeta } deriving (Eq, Show) +instance StrEncoding MsgMeta where + strEncode MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sndMsgId} = + B.unwords + [ strEncode integrity, + "R=" <> bshow rmId <> "," <> showTs rTs, + "B=" <> encode bmId <> "," <> showTs bTs, + "S=" <> bshow sndMsgId + ] + where + showTs = B.pack . formatISO8601Millis + strP = do + integrity <- strP + recipient <- " R=" *> partyMeta A.decimal + broker <- " B=" *> partyMeta base64P + sndMsgId <- " S=" *> A.decimal + pure MsgMeta {integrity, recipient, broker, sndMsgId} + where + partyMeta idParser = (,) <$> idParser <* A.char ',' <*> tsISO8601P + data SMPConfirmation = SMPConfirmation { -- | sender's public key to use for authentication of sender's commands at the recepient's server senderKey :: SndPublicVerifyKey, @@ -815,7 +841,7 @@ data AgentMessage = AgentConnInfo ConnInfo | -- AgentConnInfoReply is only used in duplexHandshake mode (v2), allowing to include reply queue(s) in the initial confirmation. -- It makes REPLY message unnecessary. - AgentConnInfoReply (L.NonEmpty SMPQueueInfo) ConnInfo + AgentConnInfoReply (NonEmpty SMPQueueInfo) ConnInfo | AgentRatchetInfo ByteString | AgentMessage APrivHeader AMessage deriving (Show) @@ -841,6 +867,7 @@ data AgentMessageType | AM_HELLO_ | AM_REPLY_ | AM_A_MSG_ + | AM_A_RCVD_ | AM_QCONT_ | AM_QADD_ | AM_QKEY_ @@ -857,6 +884,7 @@ instance Encoding AgentMessageType where AM_HELLO_ -> "H" AM_REPLY_ -> "R" AM_A_MSG_ -> "M" + AM_A_RCVD_ -> "V" AM_QCONT_ -> "QC" AM_QADD_ -> "QA" AM_QKEY_ -> "QK" @@ -871,6 +899,7 @@ instance Encoding AgentMessageType where 'H' -> pure AM_HELLO_ 'R' -> pure AM_REPLY_ 'M' -> pure AM_A_MSG_ + 'V' -> pure AM_A_RCVD_ 'Q' -> A.anyChar >>= \case 'C' -> pure AM_QCONT_ @@ -896,6 +925,7 @@ agentMessageType = \case -- REPLY is only used in v1 REPLY _ -> AM_REPLY_ A_MSG _ -> AM_A_MSG_ + A_RCVD {} -> AM_A_RCVD_ QCONT _ -> AM_QCONT_ QADD _ -> AM_QADD_ QKEY _ -> AM_QKEY_ @@ -920,6 +950,7 @@ data AMsgType = HELLO_ | REPLY_ | A_MSG_ + | A_RCVD_ | QCONT_ | QADD_ | QKEY_ @@ -933,6 +964,7 @@ instance Encoding AMsgType where HELLO_ -> "H" REPLY_ -> "R" A_MSG_ -> "M" + A_RCVD_ -> "V" QCONT_ -> "QC" QADD_ -> "QA" QKEY_ -> "QK" @@ -944,6 +976,7 @@ instance Encoding AMsgType where 'H' -> pure HELLO_ 'R' -> pure REPLY_ 'M' -> pure A_MSG_ + 'V' -> pure A_RCVD_ 'Q' -> A.anyChar >>= \case 'C' -> pure QCONT_ @@ -962,23 +995,60 @@ data AMessage = -- | the first message in the queue to validate it is secured HELLO | -- | reply queues information - REPLY (L.NonEmpty SMPQueueInfo) + REPLY (NonEmpty SMPQueueInfo) | -- | agent envelope for the client message A_MSG MsgBody + | -- | agent envelope for delivery receipt + A_RCVD (NonEmpty AMessageReceipt) | -- | the message instructing the client to continue sending messages (after ERR QUOTA) QCONT SndQAddr | -- add queue to connection (sent by recipient), with optional address of the replaced queue - QADD (L.NonEmpty (SMPQueueUri, Maybe SndQAddr)) + QADD (NonEmpty (SMPQueueUri, Maybe SndQAddr)) | -- key to secure the added queues and agree e2e encryption key (sent by sender) - QKEY (L.NonEmpty (SMPQueueInfo, SndPublicVerifyKey)) + QKEY (NonEmpty (SMPQueueInfo, SndPublicVerifyKey)) | -- inform that the queues are ready to use (sent by recipient) - QUSE (L.NonEmpty (SndQAddr, Bool)) + QUSE (NonEmpty (SndQAddr, Bool)) | -- sent by the sender to test new queues and to complete switching - QTEST (L.NonEmpty SndQAddr) + QTEST (NonEmpty SndQAddr) | -- ratchet re-synchronization is complete, with last decrypted sender message id (recipient's `last_external_snd_msg_id`) - EREADY Int64 + EREADY AgentMsgId + deriving (Show) + +-- | this type is used to send as part of the protocol between different clients +-- TODO possibly, rename fields and types referring to external and internal IDs to make them different +data AMessageReceipt = AMessageReceipt + { agentMsgId :: AgentMsgId, -- this is an external snd message ID referenced by the message recipient + msgHash :: MsgHash, + rcptInfo :: MsgReceiptInfo + } deriving (Show) +-- | this type is used as part of agent protocol to communicate with the user application +data MsgReceipt = MsgReceipt + { agentMsgId :: AgentMsgId, -- this is an internal agent message ID of received message + msgRcptStatus :: MsgReceiptStatus + } + deriving (Eq, Show) + +data MsgReceiptStatus = MROk | MRBadMsgHash + deriving (Eq, Show) + +instance StrEncoding MsgReceiptStatus where + strEncode = \case + MROk -> "ok" + MRBadMsgHash -> "badMsgHash" + strP = + A.takeWhile1 (/= ' ') >>= \ case + "ok" -> pure MROk + "badMsgHash" -> pure MRBadMsgHash + _ -> fail "bad MsgReceiptStatus" + +instance ToJSON MsgReceiptStatus where + toJSON = strToJSON + toEncoding = strToJEncoding + +type MsgReceiptInfo = ByteString + type SndQAddr = (SMPServer, SMP.SenderId) instance Encoding AMessage where @@ -986,6 +1056,7 @@ instance Encoding AMessage where HELLO -> smpEncode HELLO_ REPLY smpQueues -> smpEncode (REPLY_, smpQueues) A_MSG body -> smpEncode (A_MSG_, Tail body) + A_RCVD mrs -> smpEncode (A_RCVD_, mrs) QCONT addr -> smpEncode (QCONT_, addr) QADD qs -> smpEncode (QADD_, qs) QKEY qs -> smpEncode (QKEY_, qs) @@ -998,6 +1069,7 @@ instance Encoding AMessage where HELLO_ -> pure HELLO REPLY_ -> REPLY <$> smpP A_MSG_ -> A_MSG . unTail <$> smpP + A_RCVD_ -> A_RCVD <$> smpP QCONT_ -> QCONT <$> smpP QADD_ -> QADD <$> smpP QKEY_ -> QKEY <$> smpP @@ -1005,6 +1077,21 @@ instance Encoding AMessage where QTEST_ -> QTEST <$> smpP EREADY_ -> EREADY <$> smpP +instance Encoding AMessageReceipt where + smpEncode AMessageReceipt {agentMsgId, msgHash, rcptInfo} = + smpEncode (agentMsgId, msgHash, Large rcptInfo) + smpP = do + (agentMsgId, msgHash, Large rcptInfo) <- smpP + pure AMessageReceipt {agentMsgId, msgHash, rcptInfo} + +instance StrEncoding MsgReceipt where + strEncode MsgReceipt {agentMsgId, msgRcptStatus} = + strEncode agentMsgId <> ":" <> strEncode msgRcptStatus + strP = do + agentMsgId <- strP <* A.char ':' + msgRcptStatus <- strP + pure MsgReceipt {agentMsgId, msgRcptStatus} + instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where strEncode = \case CRInvitationUri crData e2eParams -> crEncode "invitation" crData (Just e2eParams) @@ -1234,7 +1321,7 @@ deriving instance Show AConnectionRequestUri data ConnReqUriData = ConnReqUriData { crScheme :: ConnReqScheme, crAgentVRange :: VersionRange, - crSmpQueues :: L.NonEmpty SMPQueueUri, + crSmpQueues :: NonEmpty SMPQueueUri, crClientData :: Maybe CRClientData } deriving (Eq, Show) @@ -1296,7 +1383,7 @@ instance StrEncoding MsgIntegrity where strP = "OK" $> MsgOk <|> "ERR " *> (MsgError <$> strP) strEncode = \case MsgOk -> "OK" - MsgError e -> "ERR" <> strEncode e + MsgError e -> "ERR " <> strEncode e instance ToJSON MsgIntegrity where toJSON = J.genericToJSON $ sumTypeJSON fstToLower @@ -1316,7 +1403,7 @@ data MsgErrorType instance StrEncoding MsgErrorType where strP = "ID " *> (MsgBadId <$> A.decimal) - <|> "IDS " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal) + <|> "NO_ID " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal) <|> "HASH" $> MsgBadHash <|> "DUPLICATE" $> MsgDuplicate strEncode = \case @@ -1557,6 +1644,7 @@ instance StrEncoding ACmdTag where "MERR" -> ct MERR_ "MSG" -> ct MSG_ "ACK" -> t ACK_ + "RCVD" -> ct RCVD_ "SWCH" -> t SWCH_ "OFF" -> t OFF_ "DEL" -> t DEL_ @@ -1611,6 +1699,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where MERR_ -> "MERR" MSG_ -> "MSG" ACK_ -> "ACK" + RCVD_ -> "RCVD" SWCH_ -> "SWCH" OFF_ -> "OFF" DEL_ -> "DEL" @@ -1654,7 +1743,7 @@ commandP binaryP = RJCT_ -> s (RJCT <$> A.takeByteString) SUB_ -> pure SUB SEND_ -> s (SEND <$> smpP <* A.space <*> binaryP) - ACK_ -> s (ACK <$> A.decimal) + ACK_ -> s (ACK <$> A.decimal <*> optional (A.space *> binaryP)) SWCH_ -> pure SWCH OFF_ -> pure OFF DEL_ -> pure DEL @@ -1676,7 +1765,8 @@ commandP binaryP = MID_ -> s (MID <$> A.decimal) SENT_ -> s (SENT <$> A.decimal) MERR_ -> s (MERR <$> A.decimal <* A.space <*> strP) - MSG_ -> s (MSG <$> msgMetaP <* A.space <*> smpP <* A.space <*> binaryP) + MSG_ -> s (MSG <$> strP <* A.space <*> smpP <* A.space <*> binaryP) + RCVD_ -> s (RCVD <$> strP <* A.space <*> strP) DEL_RCVQ_ -> s (DEL_RCVQ <$> strP_ <*> strP_ <*> strP) DEL_CONN_ -> pure DEL_CONN DEL_USER_ -> s (DEL_USER <$> strP) @@ -1701,13 +1791,6 @@ commandP binaryP = in case ds of [] -> Left "no sender file description" sd : rds -> SFDONE <$> strDecode (encodeUtf8 sd) <*> mapM (strDecode . encodeUtf8) rds - msgMetaP = do - integrity <- strP - recipient <- " R=" *> partyMeta A.decimal - broker <- " B=" *> partyMeta base64P - sndMsgId <- " S=" *> A.decimal - pure MsgMeta {integrity, recipient, broker, sndMsgId} - partyMeta idParser = (,) <$> idParser <* A.char ',' <*> tsISO8601P parseCommand :: ByteString -> Either AgentErrorType ACmd parseCommand = parse (commandP A.takeByteString) $ CMD SYNTAX @@ -1736,8 +1819,9 @@ serializeCommand = \case MID mId -> s (MID_, Str $ bshow mId) SENT mId -> s (SENT_, Str $ bshow mId) MERR mId e -> s (MERR_, Str $ bshow mId, e) - MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, serializeMsgMeta msgMeta, smpEncode msgFlags, serializeBinary msgBody] - ACK mId -> s (ACK_, Str $ bshow mId) + MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, s msgMeta, smpEncode msgFlags, serializeBinary msgBody] + ACK mId rcptInfo_ -> s (ACK_, Str $ bshow mId) <> maybe "" (B.cons ' ' . serializeBinary) rcptInfo_ + RCVD msgMeta rcpts -> s (RCVD_, msgMeta, rcpts) SWCH -> s SWCH_ OFF -> s OFF_ DEL -> s DEL_ @@ -1759,19 +1843,9 @@ serializeCommand = \case where s :: StrEncoding a => a -> ByteString s = strEncode - showTs :: UTCTime -> ByteString - showTs = B.pack . formatISO8601Millis connections :: [ConnId] -> ByteString connections = B.intercalate "," . map strEncode sfDone sd rds = B.intercalate fdSeparator $ strEncode sd : map strEncode rds - serializeMsgMeta :: MsgMeta -> ByteString - serializeMsgMeta MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sndMsgId} = - B.unwords - [ strEncode integrity, - "R=" <> bshow rmId <> "," <> showTs rTs, - "B=" <> encode bmId <> "," <> showTs bTs, - "S=" <> bshow sndMsgId - ] serializeBinary :: ByteString -> ByteString serializeBinary body = bshow (B.length body) <> "\n" <> body diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 2a51e7f9f..e190c171d 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -315,6 +315,9 @@ ratchetSyncAllowed cData@ConnData {ratchetSyncState} = ratchetSyncSupported' :: ConnData -> Bool ratchetSyncSupported' ConnData {connAgentVersion} = connAgentVersion >= 3 +messageRcptsSupported :: ConnData -> Bool +messageRcptsSupported ConnData {connAgentVersion} = connAgentVersion >= 4 + -- this function should be mirrored in the clients ratchetSyncSendProhibited :: ConnData -> Bool ratchetSyncSendProhibited ConnData {ratchetSyncState} = @@ -506,7 +509,10 @@ data RcvMsgData = RcvMsgData data RcvMsg = RcvMsg { internalId :: InternalId, msgMeta :: MsgMeta, + msgType :: AgentMessageType, msgBody :: MsgBody, + internalHash :: MsgHash, + msgReceipt :: Maybe MsgReceipt, -- if this message is a delivery receipt userAck :: Bool } @@ -521,6 +527,14 @@ data SndMsgData = SndMsgData prevMsgHash :: MsgHash } +data SndMsg = SndMsg + { internalId :: InternalId, + internalSndId :: InternalSndId, + msgType :: AgentMessageType, + internalHash :: MsgHash, + msgReceipt :: Maybe MsgReceipt + } + data PendingMsgData = PendingMsgData { msgId :: InternalId, msgType :: AgentMessageType, diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 9cda785a3..0d9e4df9f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -56,8 +56,8 @@ module Simplex.Messaging.Agent.Store.SQLite getDeletedConnIds, setConnRatchetSync, addProcessedRatchetKeyHash, - checkProcessedRatchetKeyHashExists, - deleteProcessedRatchetKeyHashesExpired, + checkRatchetKeyHashExists, + deleteRatchetKeyHashesExpired, getRcvConn, getRcvQueueById, getSndQueueById, @@ -99,16 +99,21 @@ module Simplex.Messaging.Agent.Store.SQLite updateSndIds, createSndMsg, createSndMsgDelivery, + getSndMsgViaRcpt, + updateSndMsgRcpt, getPendingMsgData, updatePendingMsgRIState, getPendingMsgs, deletePendingMsgs, setMsgUserAck, + getRcvMsg, getLastMsg, checkRcvMsgHashExists, deleteMsg, + deleteDeliveredSndMsg, deleteSndMsgDelivery, deleteRcvMsgHashesExpired, + deleteSndMsgsExpired, -- Double ratchet persistence createRatchetX3dhKeys, getRatchetX3dhKeys, @@ -893,6 +898,31 @@ createSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO createSndMsgDelivery db connId SndQueue {dbQueueId} msgId = DB.execute db "INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?)" (connId, dbQueueId, msgId) +getSndMsgViaRcpt :: DB.Connection -> ConnId -> InternalSndId -> IO (Either StoreError SndMsg) +getSndMsgViaRcpt db connId sndMsgId = + firstRow toSndMsg SEMsgNotFound $ + DB.query + db + [sql| + SELECT s.internal_id, m.msg_type, s.internal_hash, s.rcpt_internal_id, s.rcpt_status + FROM snd_messages s + JOIN messages m ON s.internal_id = m.internal_id + WHERE s.conn_id = ? AND s.internal_snd_id = ? + |] + (connId, sndMsgId) + where + toSndMsg :: (InternalId, AgentMessageType, MsgHash, Maybe AgentMsgId, Maybe MsgReceiptStatus) -> SndMsg + toSndMsg (internalId, msgType, internalHash, rcptInternalId_, rcptStatus_) = + let msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_ + in SndMsg {internalId, internalSndId = sndMsgId, msgType, internalHash, msgReceipt} + +updateSndMsgRcpt :: DB.Connection -> ConnId -> InternalSndId -> MsgReceipt -> IO () +updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} = + DB.execute + db + "UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?" + (agentMsgId, msgRcptStatus, connId, sndMsgId) + getPendingMsgData :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData)) getPendingMsgData db connId msgId = do rq_ <- L.head <$$> getRcvQueuesByConnId_ db connId @@ -929,32 +959,51 @@ deletePendingMsgs db connId SndQueue {dbQueueId} = setMsgUserAck :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (RcvQueue, SMP.MsgId)) setMsgUserAck db connId agentMsgId = runExceptT $ do - liftIO $ DB.execute db "UPDATE rcv_messages SET user_ack = ? WHERE conn_id = ? AND internal_id = ?" (True, connId, agentMsgId) (dbRcvId, srvMsgId) <- ExceptT . firstRow id SEMsgNotFound $ DB.query db "SELECT rcv_queue_id, broker_id FROM rcv_messages WHERE conn_id = ? AND internal_id = ?" (connId, agentMsgId) rq <- ExceptT $ getRcvQueueById db connId dbRcvId + liftIO $ DB.execute db "UPDATE rcv_messages SET user_ack = ? WHERE conn_id = ? AND internal_id = ?" (True, connId, agentMsgId) pure (rq, srvMsgId) +getRcvMsg :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError RcvMsg) +getRcvMsg db connId agentMsgId = + firstRow toRcvMsg SEMsgNotFound $ + DB.query + db + [sql| + SELECT + r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash, + m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack + FROM rcv_messages r + JOIN messages m ON r.internal_id = m.internal_id + LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id + WHERE r.conn_id = ? AND r.internal_id = ? + |] + (connId, agentMsgId) + getLastMsg :: DB.Connection -> ConnId -> SMP.MsgId -> IO (Maybe RcvMsg) getLastMsg db connId msgId = - maybeFirstRow rcvMsg $ + maybeFirstRow toRcvMsg $ DB.query db [sql| SELECT - r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, - m.msg_body, r.user_ack + r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash, + m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack FROM rcv_messages r JOIN messages m ON r.internal_id = m.internal_id JOIN connections c ON r.conn_id = c.conn_id AND c.last_internal_msg_id = r.internal_id + LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id WHERE r.conn_id = ? AND r.broker_id = ? |] (connId, msgId) - where - rcvMsg (agentMsgId, internalTs, brokerId, brokerTs, sndMsgId, integrity, msgBody, userAck) = - let msgMeta = MsgMeta {recipient = (agentMsgId, internalTs), broker = (brokerId, brokerTs), sndMsgId, integrity} - in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgBody, userAck} + +toRcvMsg :: (Int64, InternalTs, BrokerId, BrokerTs, AgentMsgId, MsgIntegrity, MsgHash, AgentMessageType, MsgBody, Maybe AgentMsgId, Maybe MsgReceiptStatus, Bool) -> RcvMsg +toRcvMsg (agentMsgId, internalTs, brokerId, brokerTs, sndMsgId, integrity, internalHash, msgType, msgBody, rcptInternalId_, rcptStatus_, userAck) = + let msgMeta = MsgMeta {recipient = (agentMsgId, internalTs), broker = (brokerId, brokerTs), sndMsgId, integrity} + msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_ + in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck} checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool checkRcvMsgHashExists db connId hash = do @@ -971,20 +1020,55 @@ deleteMsg :: DB.Connection -> ConnId -> InternalId -> IO () deleteMsg db connId msgId = DB.execute db "DELETE FROM messages WHERE conn_id = ? AND internal_id = ?;" (connId, msgId) -deleteSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO () -deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId = do +deleteMsgContent :: DB.Connection -> ConnId -> InternalId -> IO () +deleteMsgContent db connId msgId = + DB.execute db "UPDATE messages SET msg_body = x'' WHERE conn_id = ? AND internal_id = ?;" (connId, msgId) + +deleteDeliveredSndMsg :: DB.Connection -> ConnId -> InternalId -> IO () +deleteDeliveredSndMsg db connId msgId = do + cnt <- countPendingSndDeliveries_ db connId msgId + when (cnt == 0) $ deleteMsg db connId msgId + +deleteSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> Bool -> IO () +deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId keepForReceipt = do DB.execute db "DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ? AND internal_id = ?" (connId, dbQueueId, msgId) - (Only (cnt :: Int) : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ?" (connId, msgId) - when (cnt == 0) $ deleteMsg db connId msgId + cnt <- countPendingSndDeliveries_ db connId msgId + when (cnt == 0) $ do + del <- + maybeFirstRow id (DB.query db "SELECT rcpt_internal_id, rcpt_status FROM snd_messages WHERE conn_id = ? AND internal_id = ?" (connId, msgId)) >>= \case + Just (Just (_ :: Int64), Just MROk) -> pure deleteMsg + _ -> pure $ if keepForReceipt then deleteMsgContent else deleteMsg + del db connId msgId + +countPendingSndDeliveries_ :: DB.Connection -> ConnId -> InternalId -> IO Int +countPendingSndDeliveries_ db connId msgId = do + (Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ?" (connId, msgId) + pure cnt deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO () deleteRcvMsgHashesExpired db ttl = do cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime DB.execute db "DELETE FROM encrypted_rcv_message_hashes WHERE created_at < ?" (Only cutoffTs) +deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> IO () +deleteSndMsgsExpired db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + DB.execute + db + [sql| + DELETE FROM messages + WHERE internal_id IN ( + SELECT s.internal_id + FROM snd_messages s + JOIN messages m USING (internal_id) + WHERE m.internal_ts < ? + ) + |] + (Only cutoffTs) + createRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> IO () createRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 = DB.execute db "INSERT INTO ratchets (conn_id, x3dh_priv_key_1, x3dh_priv_key_2) VALUES (?, ?, ?)" (connId, x3dhPrivKey1, x3dhPrivKey2) @@ -1504,6 +1588,10 @@ instance ToField AgentCommandTag where toField = toField . strEncode instance FromField AgentCommandTag where fromField = blobFieldParser strP +instance ToField MsgReceiptStatus where toField = toField . decodeLatin1 . strEncode + +instance FromField MsgReceiptStatus where fromField = fromTextField_ $ eitherToMaybe . strDecode . encodeUtf8 + listToEither :: e -> [a] -> Either e a listToEither _ (x : _) = Right x listToEither e _ = Left e @@ -1682,8 +1770,8 @@ addProcessedRatchetKeyHash :: DB.Connection -> ConnId -> ByteString -> IO () addProcessedRatchetKeyHash db connId hash = DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, hash) -checkProcessedRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool -checkProcessedRatchetKeyHashExists db connId hash = do +checkRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool +checkRatchetKeyHashExists db connId hash = do fromMaybe False <$> maybeFirstRow fromOnly @@ -1693,8 +1781,8 @@ checkProcessedRatchetKeyHashExists db connId hash = do (connId, hash) ) -deleteProcessedRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO () -deleteProcessedRatchetKeyHashesExpired db ttl = do +deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO () +deleteRatchetKeyHashesExpired db ttl = do cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 737edda95..810bebd3c 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -63,6 +63,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_r import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -91,7 +92,8 @@ schemaMigrations = ("m20230510_files_pending_replicas_indexes", m20230510_files_pending_replicas_indexes, Just down_m20230510_files_pending_replicas_indexes), ("m20230516_encrypted_rcv_message_hashes", m20230516_encrypted_rcv_message_hashes, Just down_m20230516_encrypted_rcv_message_hashes), ("m20230531_switch_status", m20230531_switch_status, Just down_m20230531_switch_status), - ("m20230615_ratchet_sync", m20230615_ratchet_sync, Just down_m20230615_ratchet_sync) + ("m20230615_ratchet_sync", m20230615_ratchet_sync, Just down_m20230615_ratchet_sync), + ("m20230701_delivery_receipts", m20230701_delivery_receipts, Just down_m20230701_delivery_receipts) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230701_delivery_receipts.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230701_delivery_receipts.hs new file mode 100644 index 000000000..1dab82674 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230701_delivery_receipts.hs @@ -0,0 +1,24 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20230701_delivery_receipts :: Query +m20230701_delivery_receipts = + [sql| +ALTER TABLE snd_messages ADD COLUMN rcpt_internal_id INTEGER; +ALTER TABLE snd_messages ADD COLUMN rcpt_status TEXT; + +CREATE INDEX idx_snd_messages_rcpt_internal_id ON snd_messages(conn_id, rcpt_internal_id); +|] + +down_m20230701_delivery_receipts :: Query +down_m20230701_delivery_receipts = + [sql| +DROP INDEX idx_snd_messages_rcpt_internal_id; + +ALTER TABLE snd_messages DROP COLUMN rcpt_internal_id; +ALTER TABLE snd_messages DROP COLUMN rcpt_status; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index ca07c8244..10e3148b0 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -119,6 +119,8 @@ CREATE TABLE snd_messages( previous_msg_hash BLOB NOT NULL DEFAULT x'', retry_int_slow INTEGER, retry_int_fast INTEGER, + rcpt_internal_id INTEGER, + rcpt_status TEXT, PRIMARY KEY(conn_id, internal_snd_id), FOREIGN KEY(conn_id, internal_id) REFERENCES messages ON DELETE CASCADE @@ -461,3 +463,7 @@ CREATE INDEX idx_processed_ratchet_key_hashes_hash ON processed_ratchet_key_hash conn_id, hash ); +CREATE INDEX idx_snd_messages_rcpt_internal_id ON snd_messages( + conn_id, + rcpt_internal_id +); diff --git a/tests/AgentTests/ConnectionRequestTests.hs b/tests/AgentTests/ConnectionRequestTests.hs index ea457d0bc..d603a0f40 100644 --- a/tests/AgentTests/ConnectionRequestTests.hs +++ b/tests/AgentTests/ConnectionRequestTests.hs @@ -114,7 +114,7 @@ connectionRequestTests = <> urlEncode True testDhKeyStrUri <> "&e2e=v%3D1%26x3dh%3DMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D%2CMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D" strEncode connectionRequestCurrentRange - `shouldBe` "https://simplex.chat/invitation#/?v=1-3&smp=smp%3A%2F%2F1234-w%3D%3D%40smp.simplex.im%3A5223%2F3456-w%3D%3D%23%2F%3Fv%3D1%26dh%3D" + `shouldBe` "https://simplex.chat/invitation#/?v=1-4&smp=smp%3A%2F%2F1234-w%3D%3D%40smp.simplex.im%3A5223%2F3456-w%3D%3D%23%2F%3Fv%3D1%26dh%3D" <> urlEncode True testDhKeyStrUri <> "%2Csmp%3A%2F%2F1234-w%3D%3D%40smp.simplex.im%3A5223%2F3456-w%3D%3D%23%2F%3Fv%3D1%26dh%3D" <> urlEncode True testDhKeyStrUri @@ -158,7 +158,7 @@ connectionRequestTests = <> testDhKeyStrUri <> "&e2e=extra_key%3Dnew%26v%3D1-2%26x3dh%3DMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D%2CMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D" <> "&some_new_param=abc" - <> "&v=1-3" + <> "&v=1-4" ) `shouldBe` Right connectionRequestCurrentRange strDecode diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 9aac75b63..f9e8b4c71 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -104,6 +104,9 @@ pGet c = do pattern Msg :: MsgBody -> ACommand 'Agent e pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody +pattern Rcvd :: AgentMsgId -> ACommand 'Agent e +pattern Rcvd agentMsgId <- RCVD MsgMeta {integrity = MsgOk} [MsgReceipt {agentMsgId, msgRcptStatus = MROk}] + smpCfgVPrev :: ProtocolClientConfig smpCfgVPrev = (smpCfg agentCfg) {serverVRange = serverVRangePrev} where @@ -293,6 +296,9 @@ functionalAPITests t = do describe "getRatchetAdHash" $ it "should return the same data for both peers" $ withSmpServer t testRatchetAdHash + describe "Delivery receipts" $ do + it "should send and receive delivery receipt" $ withSmpServer t testDeliveryReceipts + it "should send delivery receipt only in connection v3+" $ testDeliveryReceiptsVersion t testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do @@ -352,17 +358,17 @@ runAgentClientTest alice bob baseId = do 2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?" get alice ##> ("", bobId, SENT $ baseId + 2) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 1 + ackMessage bob aliceId (baseId + 1) Nothing get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 2 + ackMessage bob aliceId (baseId + 2) Nothing 3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too" get bob ##> ("", aliceId, SENT $ baseId + 3) 4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1" get bob ##> ("", aliceId, SENT $ baseId + 4) get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 3 + ackMessage alice bobId (baseId + 3) Nothing get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 4 + ackMessage alice bobId (baseId + 4) Nothing suspendConnection alice bobId 5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2" get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH)) @@ -389,17 +395,17 @@ runAgentClientContactTest alice bob baseId = do 2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?" get alice ##> ("", bobId, SENT $ baseId + 2) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 1 + ackMessage bob aliceId (baseId + 1) Nothing get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 2 + ackMessage bob aliceId (baseId + 2) Nothing 3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too" get bob ##> ("", aliceId, SENT $ baseId + 3) 4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1" get bob ##> ("", aliceId, SENT $ baseId + 4) get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 3 + ackMessage alice bobId (baseId + 3) Nothing get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 4 + ackMessage alice bobId (baseId + 4) Nothing suspendConnection alice bobId 5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2" get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH)) @@ -702,7 +708,7 @@ testDuplicateMessage t = do runRight_ $ do subscribeConnection bob1 aliceId get bob1 =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob1 aliceId 4 + ackMessage bob1 aliceId 4 Nothing 5 <- sendMessage alice bobId SMP.noMsgFlags "hello 2" get alice ##> ("", bobId, SENT 5) get bob1 =##> \case ("", c, Msg "hello 2") -> c == aliceId; _ -> False @@ -714,7 +720,7 @@ testDuplicateMessage t = do -- commenting two lines below and uncommenting further two lines would also runRight_, -- it is the scenario tested above, when the message was not acknowledged by the user threadDelay 200000 - Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 + Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing disconnectAgentClient alice disconnectAgentClient bob1 @@ -727,7 +733,7 @@ testDuplicateMessage t = do subscribeConnection bob2 aliceId subscribeConnection alice2 bobId -- get bob2 =##> \case ("", c, Msg "hello 2") -> c == aliceId; _ -> False - -- ackMessage bob2 aliceId 5 + -- ackMessage bob2 aliceId 5 Nothing -- message 2 is not delivered again, even though it was delivered to the agent 6 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 3" get alice2 ##> ("", bobId, SENT 6) @@ -743,7 +749,7 @@ testSkippedMessages t = do 4 <- sendMessage alice bobId SMP.noMsgFlags "hello" get alice ##> ("", bobId, SENT 4) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId 4 + ackMessage bob aliceId 4 Nothing disconnectAgentClient bob @@ -773,12 +779,12 @@ testSkippedMessages t = do 8 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 5" get alice2 ##> ("", bobId, SENT 8) get bob2 =##> \case ("", c, MSG MsgMeta {integrity = MsgError {errorInfo = MsgSkipped {fromMsgId = 4, toMsgId = 6}}} _ "hello 5") -> c == aliceId; _ -> False - ackMessage bob2 aliceId 5 + ackMessage bob2 aliceId 5 Nothing 9 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 6" get alice2 ##> ("", bobId, SENT 9) get bob2 =##> \case ("", c, Msg "hello 6") -> c == aliceId; _ -> False - ackMessage bob2 aliceId 6 + ackMessage bob2 aliceId 6 Nothing testRatchetSync :: HasCallStack => ATransport -> IO () testRatchetSync t = do @@ -807,24 +813,24 @@ setupDesynchronizedRatchet alice bob = do 4 <- sendMessage alice bobId SMP.noMsgFlags "hello" get alice ##> ("", bobId, SENT 4) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId 4 + ackMessage bob aliceId 4 Nothing 5 <- sendMessage bob aliceId SMP.noMsgFlags "hello 2" get bob ##> ("", aliceId, SENT 5) get alice =##> \case ("", c, Msg "hello 2") -> c == bobId; _ -> False - ackMessage alice bobId 5 + ackMessage alice bobId 5 Nothing liftIO $ copyFile testDB2 (testDB2 <> ".bak") 6 <- sendMessage alice bobId SMP.noMsgFlags "hello 3" get alice ##> ("", bobId, SENT 6) get bob =##> \case ("", c, Msg "hello 3") -> c == aliceId; _ -> False - ackMessage bob aliceId 6 + ackMessage bob aliceId 6 Nothing 7 <- sendMessage bob aliceId SMP.noMsgFlags "hello 4" get bob ##> ("", aliceId, SENT 7) get alice =##> \case ("", c, Msg "hello 4") -> c == bobId; _ -> False - ackMessage alice bobId 7 + ackMessage alice bobId 7 Nothing disconnectAgentClient bob @@ -1056,7 +1062,7 @@ testSuspendingAgent = do 4 <- sendMessage a bId SMP.noMsgFlags "hello" get a ##> ("", bId, SENT 4) get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False - ackMessage b aId 4 + ackMessage b aId 4 Nothing suspendAgent b 1000000 get' b ##> ("", "", SUSPENDED) 5 <- sendMessage a bId SMP.noMsgFlags "hello 2" @@ -1074,7 +1080,7 @@ testSuspendingAgentCompleteSending t = do 4 <- sendMessage a bId SMP.noMsgFlags "hello" get a ##> ("", bId, SENT 4) get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False - ackMessage b aId 4 + ackMessage b aId 4 Nothing pure (aId, bId) runRight_ $ do @@ -1093,9 +1099,9 @@ testSuspendingAgentCompleteSending t = do pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False - ackMessage a bId 5 + ackMessage a bId 5 Nothing get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False - ackMessage a bId 6 + ackMessage a bId 6 Nothing testSuspendingAgentTimeout :: ATransport -> IO () testSuspendingAgentTimeout t = do @@ -1106,7 +1112,7 @@ testSuspendingAgentTimeout t = do 4 <- sendMessage a bId SMP.noMsgFlags "hello" get a ##> ("", bId, SENT 4) get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False - ackMessage b aId 4 + ackMessage b aId 4 Nothing pure (aId, bId) runRight_ $ do @@ -1205,20 +1211,20 @@ testAsyncCommands = do 2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?" get alice ##> ("", bobId, SENT $ baseId + 2) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessageAsync bob "4" aliceId $ baseId + 1 + ackMessageAsync bob "4" aliceId (baseId + 1) Nothing ("4", _, OK) <- get bob get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False - ackMessageAsync bob "5" aliceId $ baseId + 2 + ackMessageAsync bob "5" aliceId (baseId + 2) Nothing ("5", _, OK) <- get bob 3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too" get bob ##> ("", aliceId, SENT $ baseId + 3) 4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1" get bob ##> ("", aliceId, SENT $ baseId + 4) get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False - ackMessageAsync alice "6" bobId $ baseId + 3 + ackMessageAsync alice "6" bobId (baseId + 3) Nothing ("6", _, OK) <- get alice get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False - ackMessageAsync alice "7" bobId $ baseId + 4 + ackMessageAsync alice "7" bobId (baseId + 4) Nothing ("7", _, OK) <- get alice deleteConnectionAsync alice bobId get alice =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bobId; _ -> False @@ -1263,17 +1269,17 @@ testAcceptContactAsync = do 2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?" get alice ##> ("", bobId, SENT $ baseId + 2) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 1 + ackMessage bob aliceId (baseId + 1) Nothing get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 2 + ackMessage bob aliceId (baseId + 2) Nothing 3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too" get bob ##> ("", aliceId, SENT $ baseId + 3) 4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1" get bob ##> ("", aliceId, SENT $ baseId + 4) get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 3 + ackMessage alice bobId (baseId + 3) Nothing get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 4 + ackMessage alice bobId (baseId + 4) Nothing suspendConnection alice bobId 5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2" get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH)) @@ -1740,7 +1746,7 @@ testSwitch2ConnectionsAbort1 servers = do withB :: (AgentClient -> IO a) -> IO a withB = withAgent agentCfg {initialClientId = 1} servers testDB2 -testCreateQueueAuth :: (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int +testCreateQueueAuth :: HasCallStack => (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int testCreateQueueAuth clnt1 clnt2 = do a <- getClient clnt1 b <- getClient clnt2 @@ -1772,7 +1778,7 @@ testSMPServerConnectionTest t newQueueBasicAuth srv = a <- getSMPAgentClient' agentCfg initAgentServers testDB -- initially passed server is not running runRight $ testProtocolServer a 1 srv -testRatchetAdHash :: IO () +testRatchetAdHash :: HasCallStack => IO () testRatchetAdHash = do a <- getSMPAgentClient' agentCfg initAgentServers testDB b <- getSMPAgentClient' agentCfg initAgentServers testDB2 @@ -1782,6 +1788,73 @@ testRatchetAdHash = do ad2 <- getConnectionRatchetAdHash b aId liftIO $ ad1 `shouldBe` ad2 +testDeliveryReceipts :: HasCallStack => IO () +testDeliveryReceipts = do + a <- getSMPAgentClient' agentCfg initAgentServers testDB + b <- getSMPAgentClient' agentCfg initAgentServers testDB2 + runRight_ $ do + (aId, bId) <- makeConnection a b + -- a sends, b receives and sends delivery receipt + 4 <- sendMessage a bId SMP.noMsgFlags "hello" + get a ##> ("", bId, SENT 4) + get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False + ackMessage b aId 4 $ Just "" + get a =##> \case ("", c, Rcvd 4) -> c == bId; _ -> False + ackMessage a bId 5 Nothing + -- b sends, a receives and sends delivery receipt + 6 <- sendMessage b aId SMP.noMsgFlags "hello too" + get b ##> ("", aId, SENT 6) + get a =##> \case ("", c, Msg "hello too") -> c == bId; _ -> False + ackMessage a bId 6 $ Just "" + get b =##> \case ("", c, Rcvd 6) -> c == aId; _ -> False + ackMessage b aId 7 (Just "") `catchError` \e -> liftIO $ e `shouldBe` Agent.CMD PROHIBITED + ackMessage b aId 7 Nothing + +testDeliveryReceiptsVersion :: HasCallStack => ATransport -> IO () +testDeliveryReceiptsVersion t = do + a <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB + b <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB2 + withSmpServerStoreMsgLogOn t testPort $ \_ -> do + (aId, bId) <- runRight $ do + (aId, bId) <- makeConnection a b + checkVersion a bId 3 + checkVersion b aId 3 + 4 <- sendMessage a bId SMP.noMsgFlags "hello" + get a ##> ("", bId, SENT 4) + get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False + ackMessage b aId 4 $ Just "" + liftIO $ noMessages a "no delivery receipt (unsupported version)" + 5 <- sendMessage b aId SMP.noMsgFlags "hello too" + get b ##> ("", aId, SENT 5) + get a =##> \case ("", c, Msg "hello too") -> c == bId; _ -> False + ackMessage a bId 5 $ Just "" + liftIO $ noMessages b "no delivery receipt (unsupported version)" + pure (aId, bId) + + disconnectAgentClient a + disconnectAgentClient b + a' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB + b' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB2 + + runRight_ $ do + subscribeConnection a' bId + subscribeConnection b' aId + exchangeGreetingsMsgId 6 a' bId b' aId + checkVersion a' bId 4 + checkVersion b' aId 4 + 8 <- sendMessage a' bId SMP.noMsgFlags "hello" + get a' ##> ("", bId, SENT 8) + get b' =##> \case ("", c, Msg "hello") -> c == aId; _ -> False + ackMessage b' aId 8 $ Just "" + get a' =##> \case ("", c, Rcvd 8) -> c == bId; _ -> False + ackMessage a' bId 9 Nothing + 10 <- sendMessage b' aId SMP.noMsgFlags "hello too" + get b' ##> ("", aId, SENT 10) + get a' =##> \case ("", c, Msg "hello too") -> c == bId; _ -> False + ackMessage a' bId 10 $ Just "" + get b' =##> \case ("", c, Rcvd 10) -> c == aId; _ -> False + ackMessage b' aId 11 Nothing + testTwoUsers :: HasCallStack => IO () testTwoUsers = do let nc = netCfg initAgentServers @@ -1898,13 +1971,13 @@ exchangeGreetingsMsgId msgId alice bobId bob aliceId = do liftIO $ msgId1 `shouldBe` msgId get alice ##> ("", bobId, SENT msgId) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId msgId + ackMessage bob aliceId msgId Nothing msgId2 <- sendMessage bob aliceId SMP.noMsgFlags "hello too" let msgId' = msgId + 1 liftIO $ msgId2 `shouldBe` msgId' get bob ##> ("", aliceId, SENT msgId') get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False - ackMessage alice bobId msgId' + ackMessage alice bobId msgId' Nothing exchangeGreetingsMsgIds :: HasCallStack => AgentClient -> ConnId -> Int64 -> AgentClient -> ConnId -> Int64 -> ExceptT AgentErrorType IO () exchangeGreetingsMsgIds alice bobId aliceMsgId bob aliceId bobMsgId = do @@ -1912,11 +1985,11 @@ exchangeGreetingsMsgIds alice bobId aliceMsgId bob aliceId bobMsgId = do liftIO $ msgId1 `shouldBe` aliceMsgId get alice ##> ("", bobId, SENT aliceMsgId) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - ackMessage bob aliceId bobMsgId + ackMessage bob aliceId bobMsgId Nothing msgId2 <- sendMessage bob aliceId SMP.noMsgFlags "hello too" let aliceMsgId' = aliceMsgId + 1 bobMsgId' = bobMsgId + 1 liftIO $ msgId2 `shouldBe` bobMsgId' get bob ##> ("", aliceId, SENT bobMsgId') get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False - ackMessage alice bobId aliceMsgId' + ackMessage alice bobId aliceMsgId' Nothing diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 36eaf8f7c..1891193d1 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -253,7 +253,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do runRight_ $ do get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 1 + ackMessage alice bobId (baseId + 1) Nothing -- delete notification subscription toggleConnectionNtfs alice bobId False liftIO $ threadDelay 250000 @@ -296,13 +296,13 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 1) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 1 + ackMessage alice bobId (baseId + 1) Nothing -- alice sends message 2 <- msgId <$> sendMessage alice bobId (SMP.MsgFlags True) "hey there" get alice ##> ("", bobId, SENT $ baseId + 2) void $ messageNotification apnsQ get bob =##> \case ("", c, Msg "hey there") -> c == aliceId; _ -> False - ackMessage bob aliceId $ baseId + 2 + ackMessage bob aliceId (baseId + 2) Nothing -- no unexpected notifications should follow noNotification apnsQ where @@ -343,7 +343,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 1) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 1 + ackMessage alice bobId (baseId + 1) Nothing -- set mode to NMPeriodic NTActive <- registerNtfToken alice tkn NMPeriodic -- send message, no notification @@ -352,7 +352,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 2) noNotification apnsQ get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 2 + ackMessage alice bobId (baseId + 2) Nothing -- set mode to NMInstant NTActive <- registerNtfToken alice tkn NMInstant -- send message, receive notification @@ -361,7 +361,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 3) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello there") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 3 + ackMessage alice bobId (baseId + 3) Nothing -- turn off notifications deleteNtfToken alice tkn -- send message, no notification @@ -370,7 +370,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 4) noNotification apnsQ get alice =##> \case ("", c, Msg "why hello there") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 4 + ackMessage alice bobId (baseId + 4) Nothing -- turn on notifications, set mode to NMInstant void $ registerTestToken alice "abcd" NMInstant apnsQ -- send message, receive notification @@ -379,7 +379,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 5) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hey") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 5 + ackMessage alice bobId (baseId + 5) Nothing -- no notifications should follow noNotification apnsQ where @@ -407,7 +407,7 @@ testChangeToken APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 1) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 1 + ackMessage alice bobId (baseId + 1) Nothing pure (aliceId, bobId) disconnectAgentClient alice @@ -422,7 +422,7 @@ testChangeToken APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 2) void $ messageNotification apnsQ get alice1 =##> \case ("", c, Msg "hello there") -> c == bobId; _ -> False - ackMessage alice1 bobId $ baseId + 2 + ackMessage alice1 bobId (baseId + 2) Nothing -- no notifications should follow noNotification apnsQ where @@ -441,7 +441,7 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT 4) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage alice bobId 4 + ackMessage alice bobId 4 Nothing liftIO $ killThread threadId pure (aliceId, bobId) @@ -467,7 +467,7 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT 4) void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage alice bobId 4 + ackMessage alice bobId 4 Nothing liftIO $ killThread threadId pure (aliceId, bobId) @@ -498,7 +498,7 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do get b ##> ("", aliceId, SENT msgId) void $ messageNotification apnsQ get a =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - ackMessage a bobId msgId + ackMessage a bobId msgId Nothing pure conns runRight_ @AgentErrorType $ do @@ -545,7 +545,7 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do get b ##> ("", aId, SENT msgId) void $ messageNotification apnsQ get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False - ackMessage a bId msgId + ackMessage a bId msgId Nothing testMessage "hello" _ <- switchConnectionAsync a "" bId switchComplete a bId b aId