Skip to content

Commit

Permalink
retry sending specific messages
Browse files Browse the repository at this point in the history
  • Loading branch information
qfrank committed Mar 22, 2024
1 parent 8086b24 commit 6c35a8f
Show file tree
Hide file tree
Showing 21 changed files with 362 additions and 237 deletions.
8 changes: 4 additions & 4 deletions multiaccounts/settings/sync_protobuf_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func buildRawSyncSettingMessage(msg *protobuf.SyncSetting, chatID string) (*comm
}

return &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_SETTING,
ResendAutomatically: true,
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_SETTING,
ResendType: common.ResendTypeDataSync,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *MessageSender) SendPrivate(
// Currently we don't support sending through datasync and setting custom waku fields,
// as the datasync interface is not rich enough to propagate that information, so we
// would have to add some complexity to handle this.
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
if rawMessage.ResendType == ResendTypeDataSync && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
}

Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *MessageSender) sendPrivate(
// earlier than the scheduled
s.notifyOnScheduledMessage(recipient, rawMessage)

if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendAutomatically {
if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendType == ResendTypeDataSync {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
Expand Down
35 changes: 28 additions & 7 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,37 @@ const (
KeyExMsgRekey CommKeyExMsgType = 2
)

// There are distinct mechanisms for retrying send messages: Datasync supports only direct messages (1-to-1 or private group chats)
// because it requires an acknowledgment (ACK). As implemented, sending a message to a community, where hundreds of
// people receive it, would lead all recipients to attempt sending an ACK, resulting in an excessive number of messages.
// Datasync utilizes ACKs, but community messages do not, to avoid this issue. However, we still aim to retry sending
// community messages if they fail to send or if we are offline.
type ResendType uint8

const (
// Wont resend
ResendTypeNone ResendType = 0
// use DataSync which use MVDS as underlying dependency to resend messages
ResendTypeDataSync ResendType = 1
// We have a function, watchExpiredMessages, that monitors the 'raw_messages' table
// and will attempts to resend messages if a previous message sending failed.
ResendTypeRawMessage ResendType = 2
)

// RawMessage represent a sent or received message, kept for being able
// to re-send/propagate
type RawMessage struct {
ID string
LocalChatID string
LastSent uint64
SendCount int
Sent bool
ResendAutomatically bool
SkipEncryptionLayer bool // don't wrap message into ProtocolMessage
ID string
LocalChatID string
LastSent uint64
SendCount int
Sent bool
// Deprecated: use ResendType instead since ResendType should make code more readable,
// we can keep this field for a while to avoid breaking changes and remove it later
ResendAutomatically bool
ResendType ResendType
// don't wrap message into ProtocolMessage
SkipEncryptionLayer bool
SendPushNotification bool
MessageType protobuf.ApplicationMetadataMessage_Type
Payload []byte
Expand Down
42 changes: 34 additions & 8 deletions protocol/common/raw_messages_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ func (db RawMessagesPersistence) RawMessageByID(id string) (*RawMessage, error)
func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMessage, error) {
var rawPubKeys [][]byte
var encodedRecipients []byte
var skipGroupMessageWrap sql.NullBool
var sendOnPersonalTopic sql.NullBool
var skipGroupMessageWrap, sendOnPersonalTopic sql.NullBool
var resendType, communityKeyExMsgType int
var sender, communityID, hashRatchetGroupID []byte
var pubsubTopic sql.NullString
message := &RawMessage{}

err := tx.QueryRow(`
Expand All @@ -138,13 +140,18 @@ func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMess
send_count,
sent,
message_type,
resend_automatically,
recipients,
skip_encryption,
send_push_notification,
send_push_notification,
skip_group_message_wrap,
send_on_personal_topic,
payload
payload,
sender,
community_id,
resend_type,
pubsub_topic,
hash_ratchet_group_id,
community_key_ex_msg_type
FROM
raw_messages
WHERE
Expand All @@ -157,20 +164,27 @@ func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMess
&message.SendCount,
&message.Sent,
&message.MessageType,
&message.ResendAutomatically,
&encodedRecipients,
&message.SkipEncryptionLayer,
&message.SendPushNotification,
&skipGroupMessageWrap,
&sendOnPersonalTopic,
&message.Payload,
&sender,
&communityID,
&resendType,
&pubsubTopic,
&hashRatchetGroupID,
&communityKeyExMsgType,
)
if err != nil {
return nil, err
}

if rawPubKeys != nil {
// Restore recipients
message.ResendType = ResendType(resendType)
message.CommunityKeyExMsgType = CommKeyExMsgType(communityKeyExMsgType)

if encodedRecipients != nil {
decoder := gob.NewDecoder(bytes.NewBuffer(encodedRecipients))
err = decoder.Decode(&rawPubKeys)
if err != nil {
Expand All @@ -193,6 +207,18 @@ func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMess
message.SendOnPersonalTopic = sendOnPersonalTopic.Bool
}

if pubsubTopic.Valid {
message.PubsubTopic = pubsubTopic.String
}

if sender != nil {
message.Sender, err = crypto.ToECDSA(sender)
if err != nil {
return nil, err
}
}
message.CommunityID = communityID
message.HashRatchetGroupID = hashRatchetGroupID
return message, nil
}

Expand Down

0 comments on commit 6c35a8f

Please sign in to comment.