Skip to content

Commit

Permalink
Extend peersyncing to sync 1-to-1 messages
Browse files Browse the repository at this point in the history
  • Loading branch information
alwx committed Mar 21, 2024
1 parent 0aed93f commit ab73cf8
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 24 deletions.
5 changes: 3 additions & 2 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ func (s *MessageSender) SendPublic(

// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
// returns cloned messages with replaced payloads
// TODO(alwx): peer syncing
func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error {

datasyncMessage, err := s.datasync.Unwrap(
Expand All @@ -763,7 +764,7 @@ func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, respo
response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
for _, o := range datasyncMessage.GroupOffers {
for _, mID := range o.MessageIds {
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{GroupID: o.GroupId, MessageID: mID})
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{ChatID: o.GroupId, MessageID: mID})
}
}

Expand Down Expand Up @@ -837,7 +838,7 @@ func (s *MessageSender) HandleMessages(wakuMessage *types.Message) (*HandleMessa
}

type DatasyncOffer struct {
GroupID []byte
ChatID []byte
MessageID []byte
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2529,7 +2529,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessage := peersyncing.SyncMessage{
Type: syncMessageType,
ID: types.Hex2Bytes(rawMessage.ID),
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Payload: wrappedMessage,
Timestamp: m.transport.GetCurrentTime() / 1000,
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2442,7 +2442,7 @@ func (m *Messenger) addPeersyncingMessage(chat *Chat, msg *v1protocol.StatusMess
syncMessage := peersyncing.SyncMessage{
Type: syncMessageType,
ID: msg.ApplicationLayer.ID,
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Payload: msg.EncryptionLayer.Payload,
Timestamp: uint64(msg.TransportLayer.Message.Timestamp),
}
Expand Down
15 changes: 11 additions & 4 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (m *Messenger) sendDatasyncOffers() error {
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
groupID := types.Bytes2Hex(m.GroupID)
groupID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[groupID] = append(availableMessagesMap[groupID], m.ID)
}

Expand Down Expand Up @@ -188,7 +188,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
var offeredMessages []peersyncing.SyncMessage

for _, o := range offers {
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{GroupID: o.GroupID, ID: o.MessageID})
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.ChatID, ID: o.MessageID})
}

messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
Expand Down Expand Up @@ -231,11 +231,12 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
return nil
}

// TODO(alwx): peer syncing
// canSyncMessageWith checks the permission of a message
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
switch message.Type {
case peersyncing.SyncMessageCommunityType:
chat, ok := m.allChats.Load(string(message.GroupID))
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
Expand All @@ -245,7 +246,13 @@ func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ec
}

return m.canSyncCommunityMessageWith(chat, community, peer)

case peersyncing.SyncMessageOneToOneType:
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
key := common.PubkeyToHex(peer)
return chat.HasMember(key), nil
default:
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {

syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"),
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageCommunityType,
Payload: []byte("some-payload"),
Timestamp: 1,
Expand Down
14 changes: 7 additions & 7 deletions protocol/peersyncing/peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *PeerSyncingSuite) TestBasic() {

syncMessage := SyncMessage{
ID: []byte("test-id"),
GroupID: testGroupID,
ChatID: testGroupID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 1,
Expand All @@ -48,7 +48,7 @@ func (s *PeerSyncingSuite) TestBasic() {
s.Require().NoError(err)
s.Require().Len(allMessages, 1)

byGroupID, err := s.p.AvailableMessagesByGroupID(syncMessage.GroupID, 10)
byGroupID, err := s.p.AvailableMessagesByGroupID(syncMessage.ChatID, 10)

s.Require().NoError(err)
s.Require().Len(byGroupID, 1)
Expand All @@ -60,7 +60,7 @@ func (s *PeerSyncingSuite) TestBasic() {

newSyncMessage := SyncMessage{
ID: []byte("test-id-2"),
GroupID: testGroupID,
ChatID: testGroupID,
Type: SyncMessageCommunityType,
Payload: []byte("test-2"),
Timestamp: 2,
Expand All @@ -77,31 +77,31 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {

syncMessage1 := SyncMessage{
ID: []byte("test-id-1"),
GroupID: testGroupID,
ChatID: testGroupID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 1,
}

syncMessage2 := SyncMessage{
ID: []byte("test-id-2"),
GroupID: testGroupID,
ChatID: testGroupID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 2,
}

syncMessage3 := SyncMessage{
ID: []byte("test-id-3"),
GroupID: testGroupID,
ChatID: testGroupID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 3,
}

syncMessage4 := SyncMessage{
ID: []byte("test-id-4"),
GroupID: testGroupID,
ChatID: testGroupID,
Type: SyncMessageCommunityType,
Payload: []byte("test"),
Timestamp: 4,
Expand Down
4 changes: 2 additions & 2 deletions protocol/peersyncing/sync_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ type SyncMessageType int
type SyncMessage struct {
ID []byte
Type SyncMessageType
GroupID []byte
ChatID []byte
Payload []byte
Timestamp uint64
}

var ErrSyncMessageNotValid = errors.New("sync message not valid")

func (s *SyncMessage) Valid() error {
valid := len(s.ID) != 0 && s.Type != SyncMessageNoType && len(s.GroupID) != 0 && len(s.Payload) != 0 && s.Timestamp != 0
valid := len(s.ID) != 0 && s.Type != SyncMessageNoType && len(s.ChatID) != 0 && len(s.Payload) != 0 && s.Timestamp != 0
if !valid {
return ErrSyncMessageNotValid
}
Expand Down
12 changes: 6 additions & 6 deletions protocol/peersyncing/sync_message_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (p *SyncMessageSQLitePersistence) Add(message SyncMessage) error {
if err := message.Valid(); err != nil {
return err
}
_, err := p.db.Exec(`INSERT INTO peersyncing_messages (id, type, group_id, payload, timestamp) VALUES (?, ?, ?, ?, ?)`, message.ID, message.Type, message.GroupID, message.Payload, message.Timestamp)
_, err := p.db.Exec(`INSERT INTO peersyncing_messages (id, type, group_id, payload, timestamp) VALUES (?, ?, ?, ?, ?)`, message.ID, message.Type, message.ChatID, message.Payload, message.Timestamp)
return err
}

Expand All @@ -44,7 +44,7 @@ func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) {
for rows.Next() {
var m SyncMessage

err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp)
err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}
Expand All @@ -66,7 +66,7 @@ func (p *SyncMessageSQLitePersistence) ByGroupID(groupID []byte, limit int) ([]S
for rows.Next() {
var m SyncMessage

err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp)
err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn
for rows.Next() {
var m SyncMessage

err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp)
err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (p *SyncMessageSQLitePersistence) ByGroupIDs(ids [][]byte, limit int) ([]Sy
for rows.Next() {
var m SyncMessage

err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp)
err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (p *SyncMessageSQLitePersistence) ByMessageIDs(ids [][]byte) ([]SyncMessage
for rows.Next() {
var m SyncMessage

err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp)
err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ab73cf8

Please sign in to comment.