Skip to content

Commit

Permalink
Fixes, more data syncing functions
Browse files Browse the repository at this point in the history
  • Loading branch information
alwx committed Mar 25, 2024
1 parent ab73cf8 commit d22c5eb
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 33 deletions.
2 changes: 1 addition & 1 deletion protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,6 @@ func (s *MessageSender) SendPublic(

// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
// returns cloned messages with replaced payloads
// TODO(alwx): peer syncing
func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error {

datasyncMessage, err := s.datasync.Unwrap(
Expand All @@ -762,6 +761,7 @@ func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, respo
response.DatasyncSender = m.SigPubKey()
response.DatasyncAcks = append(response.DatasyncAcks, datasyncMessage.Acks...)
response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
// TODO(alwx): peer syncing
for _, o := range datasyncMessage.GroupOffers {
for _, mID := range o.MessageIds {
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{ChatID: o.GroupId, MessageID: mID})
Expand Down
79 changes: 66 additions & 13 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
v1protocol "github.com/status-im/status-go/protocol/v1"
)

var peerSyncingLoopInterval time.Duration = 60 * time.Second
var peerSyncingLoopInterval = 60 * time.Second
var maxAdvertiseMessages = 40

func (m *Messenger) markDeliveredMessages(acks [][]byte) {
Expand Down Expand Up @@ -118,12 +118,30 @@ func (m *Messenger) sendDatasyncOffers() error {
return nil
}

communities, err := m.communitiesManager.Joined()
err = m.sendDatasyncOffersForCommunities()
if err != nil {
return err
}

for _, community := range communities {
err = m.sendDatasyncOffersForChats()
if err != nil {
return err
}

// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}

func (m *Messenger) sendDatasyncOffersForCommunities() error {
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return err
}

for _, community := range joinedCommunities {
var chatIDs [][]byte
for id := range community.Chats() {
chatIDs = append(chatIDs, []byte(community.IDString()+id))
Expand All @@ -133,22 +151,22 @@ func (m *Messenger) sendDatasyncOffers() error {
continue
}

availableMessages, err := m.peersyncing.AvailableMessagesByGroupIDs(chatIDs, maxAdvertiseMessages)
availableMessages, err := m.peersyncing.AvailableMessagesByChatIDs(chatIDs, maxAdvertiseMessages)
if err != nil {
return err
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
groupID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[groupID] = append(availableMessagesMap[groupID], m.ID)
chatID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID)
}

datasyncMessage := &datasyncproto.Payload{}
if len(availableMessages) == 0 {
continue
}
for groupID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(groupID), MessageIds: m})
for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
Expand All @@ -164,12 +182,48 @@ func (m *Messenger) sendDatasyncOffers() error {
if err != nil {
return err
}
}
return nil
}

func (m *Messenger) sendDatasyncOffersForChats() error {
for _, chat := range m.Chats() {
availableMessages, err := m.peersyncing.AvailableMessagesByChatID([]byte(chat.ID), maxAdvertiseMessages)
if err != nil {
return err
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
chatID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID)
}

datasyncMessage := &datasyncproto.Payload{}
if len(availableMessages) == 0 {
continue
}
for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}

publicKey, err := chat.PublicKey()
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
if err != nil {
return err
}
}
// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}

Expand Down Expand Up @@ -231,7 +285,6 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
return nil
}

// TODO(alwx): peer syncing
// canSyncMessageWith checks the permission of a message
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
switch message.Type {
Expand Down
2 changes: 2 additions & 0 deletions protocol/messenger_peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func (s *MessengerPeersyncingSuite) thirdPartyTest(community *communities.Commun

s.joinCommunity(community, s.owner, s.bob)

// TODO(alwx): test needs to be written

// Bob should now send an offer
_, err = WaitOnMessengerResponse(
s.bob,
Expand Down
8 changes: 4 additions & 4 deletions protocol/peersyncing/peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ func (p *PeerSyncing) AvailableMessages() ([]SyncMessage, error) {
return p.persistence.All()
}

func (p *PeerSyncing) AvailableMessagesByGroupID(groupID []byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByGroupID(groupID, limit)
func (p *PeerSyncing) AvailableMessagesByChatID(groupID []byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByChatID(groupID, limit)
}

func (p *PeerSyncing) AvailableMessagesByGroupIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByGroupIDs(groupIDs, limit)
func (p *PeerSyncing) AvailableMessagesByChatIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByChatIDs(groupIDs, limit)
}

func (p *PeerSyncing) MessagesByIDs(messageIDs [][]byte) ([]SyncMessage, error) {
Expand Down
22 changes: 11 additions & 11 deletions protocol/peersyncing/peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ func (s *PeerSyncingSuite) SetupTest() {
s.p = New(Config{Database: db})
}

var testGroupID = []byte("group-id")
var testCommunityID = []byte("community-id")

func (s *PeerSyncingSuite) TestBasic() {

syncMessage := SyncMessage{
ID: []byte("test-id"),
ChatID: testGroupID,
ChatID: testCommunityID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 1,
Expand All @@ -48,19 +48,19 @@ func (s *PeerSyncingSuite) TestBasic() {
s.Require().NoError(err)
s.Require().Len(allMessages, 1)

byGroupID, err := s.p.AvailableMessagesByGroupID(syncMessage.ChatID, 10)
byGroupID, err := s.p.AvailableMessagesByChatID(syncMessage.ChatID, 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 1)

byGroupID, err = s.p.AvailableMessagesByGroupID([]byte("random-group-id"), 10)
byGroupID, err = s.p.AvailableMessagesByChatID([]byte("random-group-id"), 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 0)

newSyncMessage := SyncMessage{
ID: []byte("test-id-2"),
ChatID: testGroupID,
ChatID: testCommunityID,
Type: SyncMessageCommunityType,
Payload: []byte("test-2"),
Timestamp: 2,
Expand All @@ -77,31 +77,31 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {

syncMessage1 := SyncMessage{
ID: []byte("test-id-1"),
ChatID: testGroupID,
ChatID: testCommunityID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 1,
}

syncMessage2 := SyncMessage{
ID: []byte("test-id-2"),
ChatID: testGroupID,
ChatID: testCommunityID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 2,
}

syncMessage3 := SyncMessage{
ID: []byte("test-id-3"),
ChatID: testGroupID,
ChatID: testCommunityID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 3,
}

syncMessage4 := SyncMessage{
ID: []byte("test-id-4"),
ChatID: testGroupID,
ChatID: testCommunityID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 4,
Expand All @@ -112,7 +112,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
s.Require().NoError(s.p.Add(syncMessage3))
s.Require().NoError(s.p.Add(syncMessage4))

byGroupID, err := s.p.AvailableMessagesByGroupID(testGroupID, 10)
byGroupID, err := s.p.AvailableMessagesByChatID(testCommunityID, 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 4)
Expand All @@ -122,7 +122,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
s.Require().Equal(syncMessage3.ID, byGroupID[1].ID)
s.Require().Equal(syncMessage4.ID, byGroupID[0].ID)

byGroupID, err = s.p.AvailableMessagesByGroupID(testGroupID, 3)
byGroupID, err = s.p.AvailableMessagesByChatID(testCommunityID, 3)

s.Require().NoError(err)
s.Require().Len(byGroupID, 3)
Expand Down
8 changes: 4 additions & 4 deletions protocol/peersyncing/sync_message_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type SyncMessagePersistence interface {
Add(SyncMessage) error
All() ([]SyncMessage, error)
Complement([]SyncMessage) ([]SyncMessage, error)
ByGroupID([]byte, int) ([]SyncMessage, error)
ByGroupIDs([][]byte, int) ([]SyncMessage, error)
ByChatID([]byte, int) ([]SyncMessage, error)
ByChatIDs([][]byte, int) ([]SyncMessage, error)
ByMessageIDs([][]byte) ([]SyncMessage, error)
}

Expand Down Expand Up @@ -54,7 +54,7 @@ func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) {
return messages, nil
}

func (p *SyncMessageSQLitePersistence) ByGroupID(groupID []byte, limit int) ([]SyncMessage, error) {
func (p *SyncMessageSQLitePersistence) ByChatID(groupID []byte, limit int) ([]SyncMessage, error) {
var messages []SyncMessage
rows, err := p.db.Query(`SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages WHERE group_id = ? ORDER BY timestamp DESC LIMIT ?`, groupID, limit)
if err != nil {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn
return complement, nil
}

func (p *SyncMessageSQLitePersistence) ByGroupIDs(ids [][]byte, limit int) ([]SyncMessage, error) {
func (p *SyncMessageSQLitePersistence) ByChatIDs(ids [][]byte, limit int) ([]SyncMessage, error) {
if len(ids) == 0 {
return nil, nil
}
Expand Down

0 comments on commit d22c5eb

Please sign in to comment.