Skip to content

Commit

Permalink
retry sending specific messages
Browse files Browse the repository at this point in the history
  • Loading branch information
qfrank committed Apr 1, 2024
1 parent 30e143c commit 421c58b
Show file tree
Hide file tree
Showing 26 changed files with 1,094 additions and 626 deletions.
8 changes: 4 additions & 4 deletions multiaccounts/settings/sync_protobuf_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func buildRawSyncSettingMessage(msg *protobuf.SyncSetting, chatID string) (*comm
}

return &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_SETTING,
ResendAutomatically: true,
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_SETTING,
ResendType: common.ResendTypeDataSync,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *MessageSender) SendPrivate(
// Currently we don't support sending through datasync and setting custom waku fields,
// as the datasync interface is not rich enough to propagate that information, so we
// would have to add some complexity to handle this.
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
if rawMessage.ResendType == ResendTypeDataSync && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
}

Expand All @@ -185,7 +185,7 @@ func (s *MessageSender) SendPrivate(
// using the community topic and their key
func (s *MessageSender) SendCommunityMessage(
ctx context.Context,
rawMessage RawMessage,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a community message",
Expand All @@ -194,7 +194,7 @@ func (s *MessageSender) SendCommunityMessage(
)
rawMessage.Sender = s.identity

return s.sendCommunity(ctx, &rawMessage)
return s.sendCommunity(ctx, rawMessage)
}

// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *MessageSender) sendPrivate(
// earlier than the scheduled
s.notifyOnScheduledMessage(recipient, rawMessage)

if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendAutomatically {
if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendType == ResendTypeDataSync {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
Expand Down
46 changes: 39 additions & 7 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,46 @@ const (
KeyExMsgRekey CommKeyExMsgType = 2
)

// ResendType There are distinct mechanisms for retrying send messages: Datasync supports only direct messages (1-to-1 or private group chats)
// because it requires an acknowledgment (ACK). As implemented, sending a message to a community, where hundreds of
// people receive it, would lead all recipients to attempt sending an ACK, resulting in an excessive number of messages.
// Datasync utilizes ACKs, but community messages do not, to avoid this issue. However, we still aim to retry sending
// community messages if they fail to send or if we are offline.
type ResendType uint8

const (
// ResendTypeNone won't resend
ResendTypeNone ResendType = 0
// ResendTypeDataSync use DataSync which use MVDS as underlying dependency to resend messages
ResendTypeDataSync ResendType = 1
// ResendTypeRawMessage We have a function, watchExpiredMessages, that monitors the 'raw_messages' table
// and will attempts to resend messages if a previous message sending failed.
ResendTypeRawMessage ResendType = 2
)

type ResendMethod uint8

const (
// ResendMethodDynamic determined by logic of Messenger#resendExpiredMessages
ResendMethodDynamic ResendMethod = 0
// ResendMethodSendPrivate corresponding function MessageSender#SendPrivate
ResendMethodSendPrivate ResendMethod = 1
// ResendMethodSendCommunityMessage corresponding function MessageSender#SendCommunityMessage
ResendMethodSendCommunityMessage ResendMethod = 2
)

// RawMessage represent a sent or received message, kept for being able
// to re-send/propagate
type RawMessage struct {
ID string
LocalChatID string
LastSent uint64
SendCount int
Sent bool
ResendAutomatically bool
SkipEncryptionLayer bool // don't wrap message into ProtocolMessage
ID string
LocalChatID string
LastSent uint64
SendCount int
Sent bool
// don't wrap message into ProtocolMessage.
// when this is true, the message will not be resent via ResendTypeDataSync, but it's possible to
// resend it via ResendTypeRawMessage specified in ResendType
SkipEncryptionLayer bool
SendPushNotification bool
MessageType protobuf.ApplicationMetadataMessage_Type
Payload []byte
Expand All @@ -38,4 +68,6 @@ type RawMessage struct {
BeforeDispatch func(*RawMessage) error
HashRatchetGroupID []byte
PubsubTopic string
ResendType ResendType
ResendMethod ResendMethod
}
64 changes: 49 additions & 15 deletions protocol/common/raw_messages_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func (db RawMessagesPersistence) SaveRawMessage(message *RawMessage) error {
message.Sent = oldMessage.Sent
}
}

var sender []byte
if message.Sender != nil {
sender = crypto.FromECDSA(message.Sender)
}
_, err = tx.Exec(`
INSERT INTO
raw_messages
Expand All @@ -81,28 +84,41 @@ func (db RawMessagesPersistence) SaveRawMessage(message *RawMessage) error {
send_count,
sent,
message_type,
resend_automatically,
recipients,
skip_encryption,
send_push_notification,
send_push_notification,
skip_group_message_wrap,
send_on_personal_topic,
payload
payload,
sender,
community_id,
resend_type,
pubsub_topic,
hash_ratchet_group_id,
community_key_ex_msg_type,
resend_method
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
message.ID,
message.LocalChatID,
message.LastSent,
message.SendCount,
message.Sent,
message.MessageType,
message.ResendAutomatically,
encodedRecipients.Bytes(),
message.SkipEncryptionLayer,
message.SendPushNotification,
message.SkipGroupMessageWrap,
message.SendOnPersonalTopic,
message.Payload)
message.Payload,
sender,
message.CommunityID,
message.ResendType,
message.PubsubTopic,
message.HashRatchetGroupID,
message.CommunityKeyExMsgType,
message.ResendMethod,
)
return err
}

Expand All @@ -126,8 +142,8 @@ func (db RawMessagesPersistence) RawMessageByID(id string) (*RawMessage, error)
func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMessage, error) {
var rawPubKeys [][]byte
var encodedRecipients []byte
var skipGroupMessageWrap sql.NullBool
var sendOnPersonalTopic sql.NullBool
var skipGroupMessageWrap, sendOnPersonalTopic sql.NullBool
var sender []byte
message := &RawMessage{}

err := tx.QueryRow(`
Expand All @@ -138,13 +154,19 @@ func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMess
send_count,
sent,
message_type,
resend_automatically,
recipients,
skip_encryption,
send_push_notification,
send_push_notification,
skip_group_message_wrap,
send_on_personal_topic,
payload
payload,
sender,
community_id,
resend_type,
pubsub_topic,
hash_ratchet_group_id,
community_key_ex_msg_type,
resend_method
FROM
raw_messages
WHERE
Expand All @@ -157,27 +179,33 @@ func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMess
&message.SendCount,
&message.Sent,
&message.MessageType,
&message.ResendAutomatically,
&encodedRecipients,
&message.SkipEncryptionLayer,
&message.SendPushNotification,
&skipGroupMessageWrap,
&sendOnPersonalTopic,
&message.Payload,
&sender,
&message.CommunityID,
&message.ResendType,
&message.PubsubTopic,
&message.HashRatchetGroupID,
&message.CommunityKeyExMsgType,
&message.ResendMethod,
)
if err != nil {
return nil, err
}

if rawPubKeys != nil {
if encodedRecipients != nil {
// Restore recipients
decoder := gob.NewDecoder(bytes.NewBuffer(encodedRecipients))
err = decoder.Decode(&rawPubKeys)
if err != nil {
return nil, err
}
for _, pkBytes := range rawPubKeys {
pubkey, err := crypto.UnmarshalPubkey(pkBytes)
pubkey, err := crypto.DecompressPubkey(pkBytes)
if err != nil {
return nil, err
}
Expand All @@ -193,6 +221,12 @@ func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMess
message.SendOnPersonalTopic = sendOnPersonalTopic.Bool
}

if sender != nil {
message.Sender, err = crypto.ToECDSA(sender)
if err != nil {
return nil, err
}
}
return message, nil
}

Expand Down
44 changes: 44 additions & 0 deletions protocol/common/raw_messages_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"crypto/ecdsa"
"testing"

"github.com/stretchr/testify/require"

"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/t/helpers"
)

func TestSaveRawMessage(t *testing.T) {
db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)
require.NoError(t, sqlite.Migrate(db))
p := NewRawMessagesPersistence(db)

pk, err := crypto.GenerateKey()
require.NoError(t, err)

err = p.SaveRawMessage(&RawMessage{
ID: "1",
ResendType: ResendTypeRawMessage,
LocalChatID: "",
CommunityID: []byte("c1"),
CommunityKeyExMsgType: KeyExMsgRekey,
Sender: pk,
ResendMethod: ResendMethodSendPrivate,
Recipients: []*ecdsa.PublicKey{pk.Public().(*ecdsa.PublicKey)},
})
require.NoError(t, err)
m, err := p.RawMessageByID("1")
require.NoError(t, err)
require.Equal(t, "1", m.ID)
require.Equal(t, ResendTypeRawMessage, m.ResendType)
require.Equal(t, KeyExMsgRekey, m.CommunityKeyExMsgType)
require.Equal(t, "c1", string(m.CommunityID))
require.Equal(t, pk, m.Sender)
require.Equal(t, ResendMethodSendPrivate, m.ResendMethod)
require.Equal(t, 1, len(m.Recipients))
}
2 changes: 1 addition & 1 deletion protocol/communities_key_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
}
_, err := ckd.sender.SendCommunityMessage(context.Background(), rawMessage)
_, err := ckd.sender.SendCommunityMessage(context.Background(), &rawMessage)

if err != nil {
return err
Expand Down

0 comments on commit 421c58b

Please sign in to comment.