Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: delivery receipts #752

Merged
merged 28 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
40cfbf7
rfc: delivery receipts
epoberezkin May 11, 2023
99d6547
update doc
epoberezkin Jun 21, 2023
c88da8d
update rfc
epoberezkin Jun 21, 2023
8485b85
Merge branch 'master' into ep/rfc-delivery-receipts
epoberezkin Jun 30, 2023
39fcca9
implementation plan, types, schema
epoberezkin Jul 2, 2023
3d058d5
migration, update types
epoberezkin Jul 2, 2023
2768e95
update types
epoberezkin Jul 2, 2023
a701051
rename migration
epoberezkin Jul 2, 2023
869942d
export MsgReceiptStatus, JSON encoding
epoberezkin Jul 2, 2023
a88dfb2
Merge branch 'master' into ep/rfc-delivery-receipts
epoberezkin Jul 3, 2023
66f61cc
update rfc, schema
epoberezkin Jul 3, 2023
b94d633
correction
epoberezkin Jul 3, 2023
9e60dd4
Merge branch 'master' into ep/rfc-delivery-receipts
epoberezkin Jul 5, 2023
7df12ee
skeleton of the implementation
epoberezkin Jul 5, 2023
bca9aa5
more implementation (some tests fail)
epoberezkin Jul 6, 2023
c907cbd
more code, 1 test fails
epoberezkin Jul 6, 2023
d9d5239
fix encoding
epoberezkin Jul 7, 2023
cab4132
refactor
epoberezkin Jul 7, 2023
a8d8337
refactor
epoberezkin Jul 7, 2023
04b4ad2
test, fix
epoberezkin Jul 7, 2023
c1360fa
only send receipts in v3+, test
epoberezkin Jul 7, 2023
2871f42
Merge branch 'master' into ep/rfc-delivery-receipts
epoberezkin Jul 7, 2023
f276439
Merge branch 'master' into ep/rfc-delivery-receipts
epoberezkin Jul 9, 2023
2bf72ff
Merge branch 'master' into ep/rfc-delivery-receipts
epoberezkin Jul 11, 2023
5b31d3b
flip condition
epoberezkin Jul 13, 2023
1e20e7a
flip condition
epoberezkin Jul 13, 2023
f60f981
agent version 4 required to send receipts
epoberezkin Jul 13, 2023
8d33493
fix test
epoberezkin Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ flag swift

library
exposed-modules:
Simplex.FileTransfer
Simplex.FileTransfer.Agent
Simplex.FileTransfer.Client
Simplex.FileTransfer.Client.Agent
Expand Down
14 changes: 0 additions & 14 deletions src/Simplex/FileTransfer.hs

This file was deleted.

122 changes: 82 additions & 40 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
)
where

import Control.Concurrent.STM (stateTVar)

Check warning on line 103 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04

The import of ‘Control.Concurrent.STM’ is redundant

Check warning on line 103 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04

The import of ‘Control.Concurrent.STM’ is redundant
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
Expand All @@ -117,7 +117,7 @@
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing)
import Data.Maybe (fromMaybe, isJust, isNothing, catMaybes)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
Expand Down Expand Up @@ -152,7 +152,6 @@
import Simplex.Messaging.Version
import UnliftIO.Async (async, race_)
import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import qualified UnliftIO.Exception as E
import UnliftIO.STM

-- import GHC.Conc (unsafeIOToSTM)
Expand Down Expand Up @@ -203,8 +202,8 @@
acceptContactAsync c corrId enableNtfs = withAgentEnv c .: acceptContactAsync' c corrId enableNtfs

-- | Acknowledge message (ACK command) asynchronously, no synchronous response
ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> m ()
ackMessageAsync c = withAgentEnv c .:. ackMessageAsync' c
ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessageAsync c = withAgentEnv c .:: ackMessageAsync' c

-- | Switch connection to the new receive queue
switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m ConnectionStats
Expand Down Expand Up @@ -264,8 +263,8 @@
sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId
sendMessage c = withAgentEnv c .:. sendMessage' c

ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage c = withAgentEnv c .: ackMessage' c
ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessage c = withAgentEnv c .:. ackMessage' c

-- | Switch connection to the new receive queue
switchConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats
Expand Down Expand Up @@ -432,7 +431,7 @@
RJCT invId -> rejectContact' c connId invId $> (connId, OK)
SUB -> subscribeConnection' c connId $> (connId, OK)
SEND msgFlags msgBody -> (connId,) . MID <$> sendMessage' c connId msgFlags msgBody
ACK msgId -> ackMessage' c connId msgId $> (connId, OK)
ACK msgId rcptInfo_ -> ackMessage' c connId msgId rcptInfo_ $> (connId, OK)
SWCH -> switchConnection' c connId $> (connId, OK)
OFF -> suspendConnection' c connId $> (connId, OK)
DEL -> deleteConnection' c connId $> (connId, OK)
Expand Down Expand Up @@ -507,8 +506,8 @@
throwError err
_ -> throwError $ CMD PROHIBITED

ackMessageAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> m ()
ackMessageAsync' c corrId connId msgId = do
ackMessageAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessageAsync' c corrId connId msgId rcptInfo_ = do
SomeConn cType _ <- withStore c (`getConn` connId)
case cType of
SCDuplex -> enqueueAck
Expand All @@ -519,8 +518,11 @@
where
enqueueAck :: m ()
enqueueAck = do
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId
let mId = InternalId msgId
RcvMsg {msgType} <- withStore c $ \db -> getRcvMsg db connId mId
unless (msgType == AM_A_MSG_ || isNothing rcptInfo_) $ throwError $ CMD PROHIBITED
epoberezkin marked this conversation as resolved.
Show resolved Hide resolved
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId mId
enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId rcptInfo_

deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnectionAsync' c connId = deleteConnectionsAsync' c [connId]
Expand Down Expand Up @@ -891,7 +893,7 @@
void $ joinConnSrv c userId connId True enableNtfs cReq connInfo srv
notify OK
LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK
ACK msgId -> withServer' . tryCommand $ ackMessage' c connId msgId >> notify OK
ACK msgId rcptInfo_ -> withServer' . tryCommand $ ackMessage' c connId msgId rcptInfo_ >> notify OK
SWCH ->
noServer . tryCommand . withConnLock c connId "switchConnection" $
withStore c (`getConn` connId) >>= \case
Expand Down Expand Up @@ -1202,10 +1204,12 @@
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr msgId "QTEST sent not in duplex connection"
AM_EREADY_ -> pure ()
delMsg msgId
delMsgKeep (msgType == AM_A_MSG_) msgId
where
delMsg :: InternalId -> m ()
delMsg msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId
delMsg = delMsgKeep False
delMsgKeep :: Bool -> InternalId -> m ()
delMsgKeep keepForReceipt msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt
notify :: forall e. AEntityI e => ACommand 'Agent e -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd)
notifyDel :: AEntityI e => InternalId -> ACommand 'Agent e -> m ()
Expand All @@ -1222,22 +1226,38 @@
atomically $ beginAgentOperation c AOSndNetwork
loop

ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage' c connId msgId = withConnLock c connId "ackMessage" $ do
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection {} -> ack
RcvConnection {} -> ack
DuplexConnection {} -> ack >> sendRcpt conn >> del
RcvConnection {} -> ack >> del
SndConnection {} -> throwError $ CONN SIMPLEX
ContactConnection {} -> throwError $ CMD PROHIBITED
NewConnection _ -> throwError $ CMD PROHIBITED
where
ack :: m ()
ack = do
let mId = InternalId msgId
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId mId
-- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
ackQueueMessage c rq srvMsgId
withStore' c $ \db -> deleteMsg db connId mId
del :: m ()
del = withStore' c $ \db -> deleteMsg db connId $ InternalId msgId
sendRcpt :: Connection 'CDuplex -> m ()
sendRcpt (DuplexConnection cData@ConnData {connAgentVersion} _ sqs) = do
msg@RcvMsg {msgType, msgReceipt} <- withStore c $ \db -> getRcvMsg db connId $ InternalId msgId
case rcptInfo_ of
Just rcptInfo -> do
unless (msgType == AM_A_MSG_) $ throwError (CMD PROHIBITED)
when (connAgentVersion >= 3) $ do
let RcvMsg {msgMeta = MsgMeta {sndMsgId}, internalHash} = msg
rcpt = A_RCVD [AMessageReceipt {agentMsgId = sndMsgId, msgHash = internalHash, rcptInfo}]
void $ enqueueMessages c cData sqs SMP.MsgFlags {notification = False} rcpt
Nothing -> case (msgType, msgReceipt) of
-- only remove sent message if receipt hash was Ok, both to debug and for future redundancy
(AM_A_RCVD_, Just MsgReceipt {agentMsgId = sndMsgId, msgRcptStatus = MROk}) ->
withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId
_ -> pure ()

switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
switchConnection' c connId =
Expand Down Expand Up @@ -1727,30 +1747,30 @@
delay <- asks (initialCleanupDelay . config)
liftIO $ threadDelay' delay
int <- asks (cleanupInterval . config)
ttl <- asks $ storedMsgDataTTL . config
forever $ do
void . runExceptT $ do
deleteConns `catchAgentError` (notify "" . ERR)
deleteRcvMsgHashes `catchAgentError` (notify "" . ERR)
deleteProcessedRatchetKeyHashes `catchAgentError` (notify "" . ERR)
deleteRcvFilesExpired `catchAgentError` (notify "" . RFERR)
deleteRcvFilesDeleted `catchAgentError` (notify "" . RFERR)
deleteRcvFilesTmpPaths `catchAgentError` (notify "" . RFERR)
deleteSndFilesExpired `catchAgentError` (notify "" . SFERR)
deleteSndFilesDeleted `catchAgentError` (notify "" . SFERR)
deleteSndFilesPrefixPaths `catchAgentError` (notify "" . SFERR)
deleteExpiredReplicasForDeletion `catchAgentError` (notify "" . SFERR)
run ERR deleteConns
run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl)
run ERR $ withStore' c (`deleteSndMsgsExpired` ttl)
run ERR $ withStore' c (`deleteRatchetKeyHashesExpired` ttl)
run RFERR deleteRcvFilesExpired
run RFERR deleteRcvFilesDeleted
run RFERR deleteRcvFilesTmpPaths
run SFERR deleteSndFilesExpired
run SFERR deleteSndFilesDeleted
run SFERR deleteSndFilesPrefixPaths
run SFERR deleteExpiredReplicasForDeletion
liftIO $ threadDelay' int
where
run :: forall e. AEntityI e => (AgentErrorType -> ACommand 'Agent e) -> ExceptT AgentErrorType m () -> m ()
run err a = do
void . runExceptT $ a `catchAgentError` (notify "" . err)
step <- asks $ cleanupStepInterval . config
liftIO $ threadDelay step
deleteConns =
withLock (deleteLock c) "cleanupManager" $ do
void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c
withStore' c deleteUsersWithoutConns >>= mapM_ (notify "" . DEL_USER)
deleteRcvMsgHashes = do
rcvMsgHashesTTL <- asks $ rcvMsgHashesTTL . config
withStore' c (`deleteRcvMsgHashesExpired` rcvMsgHashesTTL)
deleteProcessedRatchetKeyHashes = do
rkHashesTTL <- asks $ processedRatchetKeyHashesTTL . config
withStore' c (`deleteProcessedRatchetKeyHashesExpired` rkHashesTTL)
deleteRcvFilesExpired = do
rcvFilesTTL <- asks $ rcvFilesTTL . config
rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL)
Expand Down Expand Up @@ -1857,7 +1877,7 @@
A_MSG body -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
A_RCVD {} -> ackDel msgId
A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta
QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending addr
QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg qs
QKEY qs -> qDuplexAckDel conn'' "QKEY" $ qKeyMsg qs
Expand Down Expand Up @@ -2081,6 +2101,28 @@
void $ tryPutTMVar qLock ()
Nothing -> qError "QCONT: queue address not found"

messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m ()
messagesRcvd rcpts msgMeta@MsgMeta {broker = (srvMsgId, _)} _ = do
logServer "<--" c srv rId "MSG <RCPT>"
rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing
case L.nonEmpty . catMaybes $ L.toList rs of
Just rs' -> notify $ RCVD msgMeta rs' -- client must ACK once processed
Nothing -> enqueueCmd $ ICAck rId srvMsgId
where
clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt)
clientReceipt AMessageReceipt {agentMsgId, msgHash} = do
let sndMsgId = InternalSndId agentMsgId
SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStore c $ \db -> getSndMsgViaRcpt db connId sndMsgId
if msgType /= AM_A_MSG_
then notify (ERR $ AGENT A_PROHIBITED) $> Nothing -- unexpected message type for receipt
else case msgReceipt of
Just MsgReceipt {msgRcptStatus = MROk} -> pure Nothing -- already notified with MROk status
_ -> do
let msgRcptStatus = if msgHash == internalHash then MROk else MRBadMsgHash
rcpt = MsgReceipt {agentMsgId = msgId, msgRcptStatus}
withStore' c $ \db -> updateSndMsgRcpt db connId sndMsgId rcpt
pure $ Just rcpt

-- processed by queue sender
qAddMsg :: NonEmpty (SMPQueueUri, Maybe SndQAddr) -> Connection 'CDuplex -> m ()
qAddMsg ((_, Nothing) :| _) _ = qError "adding queue without switching is not supported"
Expand Down Expand Up @@ -2198,7 +2240,7 @@
rkHash k1 k2 = C.sha256Hash $ C.pubKeyBytes k1 <> C.pubKeyBytes k2
ratchetExists :: m Bool
ratchetExists = withStore' c $ \db -> do
exists <- checkProcessedRatchetKeyHashExists db connId rkHashRcv
exists <- checkRatchetKeyHashExists db connId rkHashRcv
unless exists $ addProcessedRatchetKeyHash db connId rkHashRcv
pure exists
getSendRatchetKeys :: m (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448)
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ data AgentConfig = AgentConfig
helloTimeout :: NominalDiffTime,
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
rcvMsgHashesTTL :: NominalDiffTime,
processedRatchetKeyHashesTTL :: NominalDiffTime,
cleanupStepInterval :: Int,
storedMsgDataTTL :: NominalDiffTime,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
Expand Down Expand Up @@ -152,8 +152,8 @@ defaultAgentConfig =
helloTimeout = 2 * nominalDay,
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvMsgHashesTTL = 30 * nominalDay,
processedRatchetKeyHashesTTL = 30 * nominalDay,
cleanupStepInterval = 200000, -- 200ms second
epoberezkin marked this conversation as resolved.
Show resolved Hide resolved
storedMsgDataTTL = 21 * nominalDay,
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
xftpNotifyErrsOnRetry = True,
Expand Down
29 changes: 17 additions & 12 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ module Simplex.Messaging.Agent.Protocol
AgentMessageType (..),
APrivHeader (..),
AMessage (..),
AMessageReceipt (..),
MsgReceipt (..),
MsgReceiptInfo,
MsgReceiptStatus (..),
Expand Down Expand Up @@ -335,8 +336,8 @@ data ACommand (p :: AParty) (e :: AEntity) where
SENT :: AgentMsgId -> ACommand Agent AEConn
MERR :: AgentMsgId -> AgentErrorType -> ACommand Agent AEConn
MSG :: MsgMeta -> MsgFlags -> MsgBody -> ACommand Agent AEConn
ACK :: AgentMsgId -> ACommand Client AEConn
RCVD :: MsgMeta -> MsgReceipt -> ACommand Agent AEConn
ACK :: AgentMsgId -> Maybe MsgReceiptInfo -> ACommand Client AEConn
RCVD :: MsgMeta -> NonEmpty MsgReceipt -> ACommand Agent AEConn
SWCH :: ACommand Client AEConn
OFF :: ACommand Client AEConn
DEL :: ACommand Client AEConn
Expand Down Expand Up @@ -448,7 +449,7 @@ aCommandTag = \case
SENT _ -> SENT_
MERR {} -> MERR_
MSG {} -> MSG_
ACK _ -> ACK_
ACK {} -> ACK_
RCVD {} -> RCVD_
SWCH -> SWCH_
OFF -> OFF_
Expand Down Expand Up @@ -1013,15 +1014,18 @@ data AMessage
EREADY AgentMsgId
deriving (Show)

-- | this type is used to send as part of the protocol between different clients
-- TODO possibly, rename fields and types referring to external and internal IDs to make them different
data AMessageReceipt = AMessageReceipt
{ agentMsgId :: AgentMsgId,
{ agentMsgId :: AgentMsgId, -- this is an external snd message ID referenced by the message recipient
msgHash :: MsgHash,
rcptInfo :: MsgReceiptInfo
}
deriving (Show)

-- | this type is used as part of agent protocol to communicate with the user application
data MsgReceipt = MsgReceipt
{ agentMsgId :: AgentMsgId,
{ agentMsgId :: AgentMsgId, -- this is an internal agent message ID of received message
msgRcptStatus :: MsgReceiptStatus
}
deriving (Eq, Show)
Expand Down Expand Up @@ -1082,9 +1086,10 @@ instance Encoding AMessageReceipt where

instance StrEncoding MsgReceipt where
strEncode MsgReceipt {agentMsgId, msgRcptStatus} =
B.unwords [strEncode agentMsgId, strEncode msgRcptStatus]
strEncode agentMsgId <> ":" <> strEncode msgRcptStatus
strP = do
(agentMsgId, msgRcptStatus) <- strP
agentMsgId <- strP <* A.char ':'
msgRcptStatus <- strP
pure MsgReceipt {agentMsgId, msgRcptStatus}

instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
Expand Down Expand Up @@ -1378,7 +1383,7 @@ instance StrEncoding MsgIntegrity where
strP = "OK" $> MsgOk <|> "ERR " *> (MsgError <$> strP)
strEncode = \case
MsgOk -> "OK"
MsgError e -> "ERR" <> strEncode e
MsgError e -> "ERR " <> strEncode e

instance ToJSON MsgIntegrity where
toJSON = J.genericToJSON $ sumTypeJSON fstToLower
Expand All @@ -1398,7 +1403,7 @@ data MsgErrorType
instance StrEncoding MsgErrorType where
strP =
"ID " *> (MsgBadId <$> A.decimal)
<|> "IDS " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal)
<|> "NO_ID " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal)
<|> "HASH" $> MsgBadHash
<|> "DUPLICATE" $> MsgDuplicate
strEncode = \case
Expand Down Expand Up @@ -1738,7 +1743,7 @@ commandP binaryP =
RJCT_ -> s (RJCT <$> A.takeByteString)
SUB_ -> pure SUB
SEND_ -> s (SEND <$> smpP <* A.space <*> binaryP)
ACK_ -> s (ACK <$> A.decimal)
ACK_ -> s (ACK <$> A.decimal <*> optional (A.space *> binaryP))
SWCH_ -> pure SWCH
OFF_ -> pure OFF
DEL_ -> pure DEL
Expand Down Expand Up @@ -1815,8 +1820,8 @@ serializeCommand = \case
SENT mId -> s (SENT_, Str $ bshow mId)
MERR mId e -> s (MERR_, Str $ bshow mId, e)
MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, s msgMeta, smpEncode msgFlags, serializeBinary msgBody]
ACK mId -> s (ACK_, Str $ bshow mId)
RCVD msgMeta msgRcpt -> s (RCVD_, msgMeta, msgRcpt)
ACK mId rcptInfo_ -> s (ACK_, Str $ bshow mId) <> maybe "" (B.cons ' ' . serializeBinary) rcptInfo_
RCVD msgMeta rcpts -> s (RCVD_, msgMeta, rcpts)
SWCH -> s SWCH_
OFF -> s OFF_
DEL -> s DEL_
Expand Down
Loading
Loading