Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alwx committed Apr 15, 2024
1 parent 0ba86f7 commit 5aaf245
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 53 deletions.
13 changes: 7 additions & 6 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,23 @@ func (m *Messenger) sendDatasyncOffersForCommunities() error {

func (m *Messenger) sendDatasyncOffersForChats() error {
for _, chat := range m.Chats() {
availableMessages, err := m.peersyncing.AvailableMessagesByChatID([]byte(chat.ID), maxAdvertiseMessages)
chatIDBytes := []byte(chat.ID)
chatIDHex := types.Bytes2Hex(chatIDBytes)
availableMessages, err := m.peersyncing.AvailableMessagesByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
if err != nil {
return err
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
chatID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID)
for _, message := range availableMessages {
availableMessagesMap[chatIDHex] = append(availableMessagesMap[chatIDHex], message.ID)
}

datasyncMessage := &datasyncproto.Payload{}
if len(availableMessages) == 0 {
continue
}
for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
for _, message := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions protocol/peersyncing/peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ func (p *PeerSyncing) AvailableMessages() ([]SyncMessage, error) {
return p.persistence.All()
}

func (p *PeerSyncing) AvailableMessagesByChatID(groupID []byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByChatID(groupID, limit)
}

func (p *PeerSyncing) AvailableMessagesByChatIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) {
return p.persistence.ByChatIDs(groupIDs, limit)
}
Expand Down
8 changes: 4 additions & 4 deletions protocol/peersyncing/peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func (s *PeerSyncingSuite) TestBasic() {
s.Require().NoError(err)
s.Require().Len(allMessages, 1)

byGroupID, err := s.p.AvailableMessagesByChatID(syncMessage.ChatID, 10)
byGroupID, err := s.p.AvailableMessagesByChatIDs([][]byte{syncMessage.ChatID}, 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 1)

byGroupID, err = s.p.AvailableMessagesByChatID([]byte("random-group-id"), 10)
byGroupID, err = s.p.AvailableMessagesByChatIDs([][]byte{[]byte("random-group-id")}, 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 0)
Expand Down Expand Up @@ -112,7 +112,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
s.Require().NoError(s.p.Add(syncMessage3))
s.Require().NoError(s.p.Add(syncMessage4))

byGroupID, err := s.p.AvailableMessagesByChatID(testCommunityID, 10)
byGroupID, err := s.p.AvailableMessagesByChatIDs([][]byte{testCommunityID}, 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 4)
Expand All @@ -122,7 +122,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
s.Require().Equal(syncMessage3.ID, byGroupID[1].ID)
s.Require().Equal(syncMessage4.ID, byGroupID[0].ID)

byGroupID, err = s.p.AvailableMessagesByChatID(testCommunityID, 3)
byGroupID, err = s.p.AvailableMessagesByChatIDs([][]byte{testCommunityID}, 3)

s.Require().NoError(err)
s.Require().Len(byGroupID, 3)
Expand Down
54 changes: 15 additions & 39 deletions protocol/peersyncing/sync_message_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ type SyncMessagePersistence interface {
Add(SyncMessage) error
All() ([]SyncMessage, error)
Complement([]SyncMessage) ([]SyncMessage, error)
ByChatID([]byte, int) ([]SyncMessage, error)
ByChatIDs([][]byte, int) ([]SyncMessage, error)
ByMessageIDs([][]byte) ([]SyncMessage, error)
}
Expand Down Expand Up @@ -54,9 +53,22 @@ func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) {
return messages, nil
}

func (p *SyncMessageSQLitePersistence) ByChatID(chatID []byte, limit int) ([]SyncMessage, error) {
func (p *SyncMessageSQLitePersistence) ByChatIDs(ids [][]byte, limit int) ([]SyncMessage, error) {
if len(ids) == 0 {
return nil, nil
}

queryArgs := make([]interface{}, 0, len(ids))
for _, id := range ids {
queryArgs = append(queryArgs, id)
}
queryArgs = append(queryArgs, limit)

inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages WHERE chat_id IN (" + inVector + ") ORDER BY timestamp DESC LIMIT ?" // nolint: gosec

var messages []SyncMessage
rows, err := p.db.Query(`SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages WHERE chat_id = ? ORDER BY timestamp DESC LIMIT ?`, chatID, limit)
rows, err := p.db.Query(query, queryArgs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,42 +132,6 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn
return complement, nil
}

func (p *SyncMessageSQLitePersistence) ByChatIDs(ids [][]byte, limit int) ([]SyncMessage, error) {
if len(ids) == 0 {
return nil, nil
}

queryArgs := make([]interface{}, 0, len(ids))
for _, id := range ids {
queryArgs = append(queryArgs, id)
}
queryArgs = append(queryArgs, limit)

inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages WHERE chat_id IN (" + inVector + ") ORDER BY timestamp DESC LIMIT ?" // nolint: gosec

var messages []SyncMessage
rows, err := p.db.Query(query, queryArgs...)
if err != nil {
return nil, err
}

defer rows.Close()

for rows.Next() {
var m SyncMessage

err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}

messages = append(messages, m)
}
return messages, nil

}

func (p *SyncMessageSQLitePersistence) ByMessageIDs(ids [][]byte) ([]SyncMessage, error) {
if len(ids) == 0 {
return nil, nil
Expand Down

0 comments on commit 5aaf245

Please sign in to comment.