Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: ensure that messages are sent to different members #4221

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package
type: git
location: https://github.com/simplex-chat/simplexmq.git
tag: 769e54db76fc33a04085410cdf78ae77ed9717b6
tag: 6309f92c6860fce39d6675eb92fc460bdc3db01d

source-repository-package
type: git
Expand Down
2 changes: 1 addition & 1 deletion scripts/nix/sha256map.nix
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"https://github.com/simplex-chat/simplexmq.git"."769e54db76fc33a04085410cdf78ae77ed9717b6" = "01bwy6hnkpp51jzdym8880462003npgzb2lns5946nww2smybrk6";
"https://github.com/simplex-chat/simplexmq.git"."6309f92c6860fce39d6675eb92fc460bdc3db01d" = "0aiyjcjraispkripv1viry6357nxvydv2sb6k5ali0hr8cnsskg3";
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
"https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d";
"https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl";
Expand Down
37 changes: 23 additions & 14 deletions src/Simplex/Chat.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
Expand All @@ -11,7 +12,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 15 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 15 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module Simplex.Chat where

Expand Down Expand Up @@ -46,6 +47,7 @@
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, listToMaybe, mapMaybe, maybeToList)
import qualified Data.Set as S
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
Expand Down Expand Up @@ -355,7 +357,7 @@
. map (\ServerCfg {server} -> server)
. filter (\ServerCfg {enabled} -> enabled)

cfgServers :: UserProtocol p => SProtocolType p -> (DefaultAgentServers -> NonEmpty (ProtoServerWithAuth p))

Check warning on line 360 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-macos-latest-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 360 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-windows-latest-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 360 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 360 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 360 in src/Simplex/Chat.hs

View workflow job for this annotation

GitHub Actions / build-macos-13-9.6.3

Redundant constraint: UserProtocol p
cfgServers p DefaultAgentServers {smp, xftp} = case p of
SPSMP -> smp
SPXFTP -> xftp
Expand Down Expand Up @@ -1681,7 +1683,7 @@
sndMsgs <- lift $ createSndMessages idsEvts
let msgReqs_ :: NonEmpty (Either ChatError MsgReq) = L.zipWith (fmap . ctMsgReq) ctConns sndMsgs
(errs, ctSndMsgs :: [(Contact, SndMessage)]) <-
lift $ partitionEithers . L.toList . zipWith3' combineResults ctConns sndMsgs <$> deliverMessagesB msgReqs_
partitionEithers . L.toList . zipWith3' combineResults ctConns sndMsgs <$> deliverMessagesB msgReqs_
timestamp <- liftIO getCurrentTime
lift . void $ withStoreBatch' $ \db -> map (createCI db user timestamp) ctSndMsgs
pure CRBroadcastSent {user, msgContent = mc, successes = length ctSndMsgs, failures = length errs, timestamp}
Expand Down Expand Up @@ -2378,7 +2380,7 @@
Just changedCts -> do
let idsEvts = L.map ctSndEvent changedCts
msgReqs_ <- lift $ L.zipWith ctMsgReq changedCts <$> createSndMessages idsEvts
(errs, cts) <- lift $ partitionEithers . L.toList . L.zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
(errs, cts) <- partitionEithers . L.toList . L.zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
unless (null errs) $ toView $ CRChatErrors (Just user) errs
let changedCts' = filter (\ChangedProfileContact {ct, ct'} -> directOrUsed ct' && mergedPreferences ct' /= mergedPreferences ct) cts
lift $ createContactsSndFeatureItems user' changedCts'
Expand Down Expand Up @@ -6471,21 +6473,21 @@

deliverMessage' :: Connection -> MsgFlags -> MsgBody -> MessageId -> CM (Int64, PQEncryption)
deliverMessage' conn msgFlags msgBody msgId =
lift (deliverMessages ((conn, msgFlags, msgBody, msgId) :| [])) >>= \case
deliverMessages ((conn, msgFlags, msgBody, msgId) :| []) >>= \case
r :| [] -> liftEither r
rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)

type MsgReq = (Connection, MsgFlags, MsgBody, MessageId)

deliverMessages :: NonEmpty MsgReq -> CM' (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessages :: NonEmpty MsgReq -> CM (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessages msgs = deliverMessagesB $ L.map Right msgs

deliverMessagesB :: NonEmpty (Either ChatError MsgReq) -> CM' (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessagesB :: NonEmpty (Either ChatError MsgReq) -> CM (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessagesB msgReqs = do
msgReqs' <- liftIO compressBodies
sent <- L.zipWith prepareBatch msgReqs' <$> withAgent' (`sendMessagesB` L.map toAgent msgReqs')
void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
sent <- L.zipWith prepareBatch msgReqs' <$> withAgent (`sendMessagesB` L.map toAgent msgReqs')
lift . void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
lift . withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
where
compressBodies =
forME msgReqs $ \mr@(conn@Connection {pqSupport, connChatVersion = v}, msgFlags, msgBody, msgId) ->
Expand Down Expand Up @@ -6544,10 +6546,11 @@
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId)
recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members)
let msgFlags = MsgFlags {notification = hasNotification $ toCMEventTag chatMsgEvent}
(toSend, pending) = foldr addMember ([], []) recipientMembers
(toSend, pending, _, dups) = foldr addMember ([], [], S.empty, 0 :: Int) recipientMembers
-- TODO PQ either somehow ensure that group members connections cannot have pqSupport/pqEncryption or pass Off's here
msgReqs = map (\(_, conn) -> (conn, msgFlags, msgBody, msgId)) toSend
delivered <- maybe (pure []) (fmap L.toList . lift . deliverMessages) $ L.nonEmpty msgReqs
when (dups /= 0) $ logError $ "sendGroupMessage: " <> tshow dups <> " duplicate members"
delivered <- maybe (pure []) (fmap L.toList . deliverMessages) $ L.nonEmpty msgReqs
let errors = lefts delivered
unless (null errors) $ toView $ CRChatErrors (Just user) errors
stored <- lift . withStoreBatch' $ \db -> map (\m -> createPendingGroupMessage db (groupMemberId' m) msgId Nothing) pending
Expand All @@ -6560,10 +6563,16 @@
liftM2 (<>) (shuffle adminMs) (shuffle otherMs)
where
isAdmin GroupMember {memberRole} = memberRole >= GRAdmin
addMember m (toSend, pending) = case memberSendAction chatMsgEvent members m of
Just (MSASend conn) -> ((m, conn) : toSend, pending)
Just MSAPending -> (toSend, m : pending)
Nothing -> (toSend, pending)
addMember m acc@(toSend, pending, !mIds, !dups) = case memberSendAction chatMsgEvent members m of
Just a
| mId `S.member` mIds -> (toSend, pending, mIds, dups + 1)
| otherwise -> case a of
MSASend conn -> ((m, conn) : toSend, pending, mIds', dups)
MSAPending -> (toSend, m : pending, mIds', dups)
Nothing -> acc
where
mId = groupMemberId' m
mIds' = S.insert mId mIds
filterSent :: [Either ChatError a] -> [mem] -> (mem -> GroupMember) -> [GroupMember]
filterSent rs ms mem = [mem m | (Right _, m) <- zip rs ms]

Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Chat/Archive.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Database.SQLite3 as SQL
import Simplex.Chat.Controller
import Simplex.Chat.Util ()
import Simplex.Messaging.Agent.Client (agentClientStore)
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), closeSQLiteStore, keyString, sqlString, storeKey)
import Simplex.Messaging.Util
Expand Down
30 changes: 30 additions & 0 deletions src/Simplex/Chat/Util.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Simplex.Chat.Util (week, encryptFile, chunkSize, liftIOEither, shuffle) where

import Control.Exception (Exception)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift (MonadUnliftIO (..))
import Control.Monad.Reader
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as LB
import Data.List (sortBy)
import Data.Ord (comparing)
Expand All @@ -13,6 +21,7 @@ import Data.Word (Word16)
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import qualified Simplex.Messaging.Crypto.File as CF
import System.Random (randomRIO)
import qualified UnliftIO.Exception as E
import UnliftIO.IO (IOMode (..), withFile)

week :: NominalDiffTime
Expand Down Expand Up @@ -46,3 +55,24 @@ shuffle xs = map snd . sortBy (comparing fst) <$> mapM (\x -> (,x) <$> random) x
liftIOEither :: (MonadIO m, MonadError e m) => IO (Either e a) -> m a
liftIOEither a = liftIO a >>= liftEither
{-# INLINE liftIOEither #-}

newtype InternalException e = InternalException {unInternalException :: e}
deriving (Eq, Show)

instance Exception e => Exception (InternalException e)

instance Exception e => MonadUnliftIO (ExceptT e IO) where
{-# INLINE withRunInIO #-}
withRunInIO :: ((forall a. ExceptT e IO a -> IO a) -> IO b) -> ExceptT e IO b
withRunInIO inner =
ExceptT . fmap (first unInternalException) . E.try $
withRunInIO $ \run ->
inner $ run . (either (E.throwIO . InternalException) pure <=< runExceptT)

instance Exception e => MonadUnliftIO (ExceptT e (ReaderT r IO)) where
{-# INLINE withRunInIO #-}
withRunInIO :: ((forall a. ExceptT e (ReaderT r IO) a -> IO a) -> IO b) -> ExceptT e (ReaderT r IO) b
withRunInIO inner =
withExceptT unInternalException . ExceptT . E.try $
withRunInIO $ \run ->
inner $ run . (either (E.throwIO . InternalException) pure <=< runExceptT)
7 changes: 4 additions & 3 deletions tests/ChatTests/Direct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ testAddContact = versionTestMatrix2 runTestAddContact
bob #$> ("/_read chat @2", id, "ok")
alice #$> ("/read user", id, "ok")
alice #$> ("/_read user 1", id, "ok")
features = if pqExpected
then chatFeatures
else (0, e2eeInfoNoPQStr) : tail chatFeatures
features =
if pqExpected
then chatFeatures
else (0, e2eeInfoNoPQStr) : tail chatFeatures

testDuplicateContactsSeparate :: HasCallStack => FilePath -> IO ()
testDuplicateContactsSeparate =
Expand Down
Loading