Skip to content

Commit

Permalink
Extend peersyncing to sync 1-to-1 messages
Browse files Browse the repository at this point in the history
  • Loading branch information
alwx committed Apr 17, 2024
1 parent d246699 commit bda7a6b
Show file tree
Hide file tree
Showing 11 changed files with 592 additions and 336 deletions.
588 changes: 360 additions & 228 deletions appdatabase/migrations/bindata.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX peersyncing_messages_timestamp;

ALTER TABLE peersyncing_messages RENAME COLUMN group_id TO chat_id;

CREATE INDEX peersyncing_messages_timestamp ON peersyncing_messages(chat_id, timestamp);
4 changes: 2 additions & 2 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, respo
response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
for _, o := range datasyncMessage.GroupOffers {
for _, mID := range o.MessageIds {
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{GroupID: o.GroupId, MessageID: mID})
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{ChatID: o.GroupId, MessageID: mID})
}
}

Expand Down Expand Up @@ -837,7 +837,7 @@ func (s *MessageSender) HandleMessages(wakuMessage *types.Message) (*HandleMessa
}

type DatasyncOffer struct {
GroupID []byte
ChatID []byte
MessageID []byte
}

Expand Down
3 changes: 1 addition & 2 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,6 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessageType = peersyncing.SyncMessageCommunityType
} else if chat.PrivateGroupChat() {
syncMessageType = peersyncing.SyncMessagePrivateGroup

}

wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
Expand All @@ -2532,7 +2531,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessage := peersyncing.SyncMessage{
Type: syncMessageType,
ID: types.Hex2Bytes(rawMessage.ID),
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Payload: wrappedMessage,
Timestamp: m.transport.GetCurrentTime() / 1000,
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2457,7 +2457,7 @@ func (m *Messenger) addPeersyncingMessage(chat *Chat, msg *v1protocol.StatusMess
syncMessage := peersyncing.SyncMessage{
Type: syncMessageType,
ID: msg.ApplicationLayer.ID,
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Payload: msg.EncryptionLayer.Payload,
Timestamp: uint64(msg.TransportLayer.Message.Timestamp),
}
Expand Down
94 changes: 72 additions & 22 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
v1protocol "github.com/status-im/status-go/protocol/v1"
)

var peerSyncingLoopInterval time.Duration = 60 * time.Second
var peerSyncingLoopInterval = 60 * time.Second
var maxAdvertiseMessages = 40

func (m *Messenger) markDeliveredMessages(acks [][]byte) {
Expand Down Expand Up @@ -118,37 +118,47 @@ func (m *Messenger) sendDatasyncOffers() error {
return nil
}

communities, err := m.communitiesManager.Joined()
err = m.sendDatasyncOffersForCommunities()
if err != nil {
return err
}

for _, community := range communities {
err = m.sendDatasyncOffersForChats()
if err != nil {
return err
}

// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}

func (m *Messenger) sendDatasyncOffersForCommunities() error {
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return err
}

for _, community := range joinedCommunities {
var chatIDs [][]byte
for id := range community.Chats() {
chatIDs = append(chatIDs, []byte(community.IDString()+id))
}

if len(chatIDs) == 0 {
continue
}

availableMessages, err := m.peersyncing.AvailableMessagesByGroupIDs(chatIDs, maxAdvertiseMessages)
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs(chatIDs, maxAdvertiseMessages)
if err != nil {
return err
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
groupID := types.Bytes2Hex(m.GroupID)
availableMessagesMap[groupID] = append(availableMessagesMap[groupID], m.ID)
}

datasyncMessage := &datasyncproto.Payload{}
if len(availableMessages) == 0 {
if len(availableMessagesMap) == 0 {
continue
}
for groupID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(groupID), MessageIds: m})
for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
Expand All @@ -164,12 +174,43 @@ func (m *Messenger) sendDatasyncOffers() error {
if err != nil {
return err
}
}
return nil
}

func (m *Messenger) sendDatasyncOffersForChats() error {
for _, chat := range m.Chats() {
chatIDBytes := []byte(chat.ID)
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
if err != nil {
return err
}
datasyncMessage := &datasyncproto.Payload{}
if len(availableMessagesMap) == 0 {
continue
}
for _, message := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}

publicKey, err := chat.PublicKey()
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
if err != nil {
return err
}
}
// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}

Expand All @@ -188,7 +229,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
var offeredMessages []peersyncing.SyncMessage

for _, o := range offers {
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{GroupID: o.GroupID, ID: o.MessageID})
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.ChatID, ID: o.MessageID})
}

messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
Expand Down Expand Up @@ -235,7 +276,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
switch message.Type {
case peersyncing.SyncMessageCommunityType:
chat, ok := m.allChats.Load(string(message.GroupID))
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
Expand All @@ -245,7 +286,12 @@ func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ec
}

return m.canSyncCommunityMessageWith(chat, community, peer)

case peersyncing.SyncMessageOneToOneType:
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
return m.canSyncOneToOneMessageWith(chat, peer)
default:
return false, nil
}
Expand All @@ -258,6 +304,10 @@ func (m *Messenger) canSyncCommunityMessageWith(chat *Chat, community *communiti
return community.IsMemberInChat(peer, chat.CommunityChatID()), nil
}

func (m *Messenger) canSyncOneToOneMessageWith(chat *Chat, peer *ecdsa.PublicKey) (bool, error) {
return chat.HasMember(common.PubkeyToHex(peer)), nil
}

func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error {
if len(messageIDs) == 0 {
return nil
Expand Down
98 changes: 97 additions & 1 deletion protocol/messenger_peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package protocol

import (
"context"
"encoding/hex"
"testing"
"time"

"github.com/stretchr/testify/suite"
"go.uber.org/zap"

gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
Expand Down Expand Up @@ -243,7 +245,7 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {

syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"),
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageCommunityType,
Payload: []byte("some-payload"),
Timestamp: 1,
Expand All @@ -261,3 +263,97 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {
s.Require().NoError(err)
s.Require().True(canSyncWithAlice)
}

func (s *MessengerPeersyncingSuite) TestSyncOneToOne() {
s.alice.featureFlags.Peersyncing = true
s.owner.featureFlags.Peersyncing = true

pkString := hex.EncodeToString(crypto.FromECDSAPub(&s.alice.identity.PublicKey))
chat := CreateOneToOneChat(pkString, &s.alice.identity.PublicKey, s.owner.transport)

chat.LastClockValue = uint64(100000000000000)
err := s.owner.SaveChat(chat)
s.NoError(err)
_, err = s.alice.Join(chat)
s.NoError(err)

chatID := chat.ID
inputMessage := common.NewMessage()
inputMessage.ChatId = chatID
inputMessage.ContentType = protobuf.ChatMessage_TEXT_PLAIN
inputMessage.Text = "some text"

ctx := context.Background()

// Send message, it should be received
response, err := s.alice.SendChatMessage(ctx, inputMessage)
s.Require().NoError(err)
s.Require().Len(response.Messages(), 1)
messageID := response.Messages()[0].ID

// Make sure the message makes it to the owner
response, err = WaitOnMessengerResponse(
s.owner,
func(r *MessengerResponse) bool {
return len(r.Messages()) == 1 && r.Messages()[0].ID == messageID
},
"message not received",
)
s.Require().NoError(err)
s.Require().NotNil(response)

msg, err := s.owner.peersyncing.AvailableMessages()
s.Require().NoError(err)
s.Require().Len(msg, 1)

// Alice should now send an offer
_, err = WaitOnMessengerResponse(
s.alice,
func(r *MessengerResponse) bool {
return s.alice.peersyncingOffers[messageID[2:]] != 0
},
"offer not sent",
)
s.Require().NoError(err)

// Owner should now reply to the offer
_, err = WaitOnMessengerResponse(
s.owner,
func(r *MessengerResponse) bool {
return s.owner.peersyncingRequests[s.alice.myHexIdentity()+messageID[2:]] != 0
},
"request not sent",
)
s.Require().NoError(err)
}

func (s *MessengerPeersyncingSuite) TestCanSyncOneToOneMessageWith() {
s.alice.featureFlags.Peersyncing = true
s.owner.featureFlags.Peersyncing = true

pkString := hex.EncodeToString(crypto.FromECDSAPub(&s.alice.identity.PublicKey))
chat := CreateOneToOneChat(pkString, &s.alice.identity.PublicKey, s.owner.transport)

chat.LastClockValue = uint64(100000000000000)
err := s.owner.SaveChat(chat)
s.NoError(err)
_, err = s.alice.Join(chat)
s.NoError(err)

syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"),
ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageOneToOneType,
Payload: []byte("some-payload"),
Timestamp: chat.LastClockValue,
}
s.Require().NoError(s.owner.peersyncing.Add(syncMessage))

canSyncWithBob, err := s.owner.canSyncOneToOneMessageWith(chat, &s.bob.identity.PublicKey)
s.Require().NoError(err)
s.Require().False(canSyncWithBob)

canSyncWithAlice, err := s.owner.canSyncOneToOneMessageWith(chat, &s.alice.identity.PublicKey)
s.Require().NoError(err)
s.Require().True(canSyncWithAlice)
}
19 changes: 13 additions & 6 deletions protocol/peersyncing/peersyncing.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package peersyncing

import "github.com/status-im/status-go/eth-node/types"

type PeerSyncing struct {
persistence SyncMessagePersistence
config Config
Expand All @@ -25,12 +27,17 @@ func (p *PeerSyncing) AvailableMessages() ([]SyncMessage, error) {
return p.persistence.All()
}

func (p *PeerSyncing) AvailableMessagesByGroupID(groupID []byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByGroupID(groupID, limit)
}

func (p *PeerSyncing) AvailableMessagesByGroupIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByGroupIDs(groupIDs, limit)
func (p *PeerSyncing) AvailableMessagesMapByChatIDs(groupIDs [][]byte, limit int) (map[string][][]byte, error) {
availableMessages, err := p.persistence.ByChatIDs(groupIDs, limit)
if err != nil {
return nil, err
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
chatID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID)
}
return availableMessagesMap, err
}

func (p *PeerSyncing) MessagesByIDs(messageIDs [][]byte) ([]SyncMessage, error) {
Expand Down

0 comments on commit bda7a6b

Please sign in to comment.