Skip to content

Commit

Permalink
retry sending specific messages
Browse files Browse the repository at this point in the history
  • Loading branch information
qfrank committed Apr 8, 2024
1 parent 9986125 commit 3672759
Show file tree
Hide file tree
Showing 27 changed files with 1,594 additions and 763 deletions.
305 changes: 305 additions & 0 deletions api/messenger_raw_message_resend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package api

import (
"context"
"errors"
"path/filepath"
"testing"

"github.com/status-im/status-go/eth-node/types"
m_common "github.com/status-im/status-go/multiaccounts/common"
"github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/services/utils"

"github.com/stretchr/testify/suite"
)

type MessengerRawMessageResendTest struct {
suite.Suite
aliceBackend *GethStatusBackend
bobBackend *GethStatusBackend
aliceMessenger *protocol.Messenger
bobMessenger *protocol.Messenger
}

func TestMessengerRawMessageResendTestSuite(t *testing.T) {
suite.Run(t, new(MessengerRawMessageResendTest))
}

// To be removed in https://github.com/status-im/status-go/issues/4437
func advertiseCommunityToUserOldWay(s *suite.Suite, community *communities.Community, alice *protocol.Messenger, bob *protocol.Messenger) {
chat := protocol.CreateOneToOneChat(bob.IdentityPublicKeyString(), bob.IdentityPublicKey(), bob.GetTransport())

inputMessage := common.NewMessage()
inputMessage.ChatId = chat.ID
inputMessage.Text = "some text"
inputMessage.CommunityID = community.IDString()

err := alice.SaveChat(chat)
s.Require().NoError(err)
_, err = alice.SendChatMessage(context.Background(), inputMessage)
s.Require().NoError(err)

// Ensure community is received
response, err := protocol.WaitOnMessengerResponse(
bob,
func(r *protocol.MessengerResponse) bool {
return len(r.Communities()) > 0
},
"bob did not receive community request to join",
)
s.Require().NoError(err)
communityInResponse := response.Communities()[0]
s.Require().Equal(community.ID(), communityInResponse.ID())
}

func (s *MessengerRawMessageResendTest) addMutualContact() {
bobPubkey := s.bobMessenger.IdentityPublicKeyCompressed()
bobZQ3ID, err := utils.SerializePublicKey(bobPubkey)
s.Require().NoError(err)
mr, err := s.aliceMessenger.AddContact(context.Background(), &requests.AddContact{
ID: bobZQ3ID,
DisplayName: "bob666",
})
s.Require().NoError(err)
s.Require().Len(mr.Messages(), 2)

var contactRequest *common.Message
waitOnMessengerResponse(&s.Suite, func(r *protocol.MessengerResponse) error {
for _, m := range r.Messages() {
if m.GetContentType() == protobuf.ChatMessage_CONTACT_REQUEST {
contactRequest = m
return nil
}
}
return errors.New("contact request not received")
}, s.bobMessenger)

mr, err = s.bobMessenger.AcceptContactRequest(context.Background(), &requests.AcceptContactRequest{
ID: types.FromHex(contactRequest.ID),
})
s.Require().NoError(err)
s.Require().Len(mr.Contacts, 1)

waitOnMessengerResponse(&s.Suite, func(r *protocol.MessengerResponse) error {
if len(r.Contacts) > 0 {
return nil
}
return errors.New("contact accepted not received")
}, s.aliceMessenger)
}

func (s *MessengerRawMessageResendTest) SetupTest() {
s.createAliceBobBackendAndLogin()
community := s.createTestCommunity(s.aliceMessenger, protobuf.CommunityPermissions_MANUAL_ACCEPT)
s.addMutualContact()
advertiseCommunityToUserOldWay(&s.Suite, community, s.aliceMessenger, s.bobMessenger)
requestBob := &requests.RequestToJoinCommunity{
CommunityID: community.ID(),
}
joinOnRequestCommunity(&s.Suite, community, s.aliceMessenger, s.bobMessenger, requestBob)
}

type MessageResponseValidator func(*protocol.MessengerResponse) error

func waitOnMessengerResponse(s *suite.Suite, fnWait MessageResponseValidator, user *protocol.Messenger) {
_, err := protocol.WaitOnMessengerResponse(
user,
func(r *protocol.MessengerResponse) bool {
err := fnWait(r)
if err != nil {
s.T().Logf("response error: %s", err.Error())
}
return err == nil
},
"MessengerResponse data not received",
)
s.Require().NoError(err)
}

func requestToJoinCommunity(s *suite.Suite, controlNode *protocol.Messenger, user *protocol.Messenger, request *requests.RequestToJoinCommunity) types.HexBytes {
response, err := user.RequestToJoinCommunity(request)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.RequestsToJoinCommunity(), 1)

requestToJoin := response.RequestsToJoinCommunity()[0]
s.Require().Equal(requestToJoin.PublicKey, user.IdentityPublicKeyString())

_, err = protocol.WaitOnMessengerResponse(
controlNode,
func(r *protocol.MessengerResponse) bool {
if len(r.RequestsToJoinCommunity()) == 0 {
return false
}

for _, resultRequest := range r.RequestsToJoinCommunity() {
if resultRequest.PublicKey == user.IdentityPublicKeyString() {
return true
}
}
return false
},
"control node did not receive community request to join",
)
s.Require().NoError(err)

return requestToJoin.ID
}

func joinOnRequestCommunity(s *suite.Suite, community *communities.Community, controlNode *protocol.Messenger, user *protocol.Messenger, request *requests.RequestToJoinCommunity) {
// Request to join the community
requestToJoinID := requestToJoinCommunity(s, controlNode, user, request)

// accept join request
acceptRequestToJoin := &requests.AcceptRequestToJoinCommunity{ID: requestToJoinID}
response, err := controlNode.AcceptRequestToJoinCommunity(acceptRequestToJoin)
s.Require().NoError(err)
s.Require().NotNil(response)

updatedCommunity := response.Communities()[0]
s.Require().NotNil(updatedCommunity)
s.Require().True(updatedCommunity.HasMember(user.IdentityPublicKey()))

// receive request to join response
_, err = protocol.WaitOnMessengerResponse(
user,
func(r *protocol.MessengerResponse) bool {
return len(r.Communities()) > 0 && r.Communities()[0].HasMember(user.IdentityPublicKey())
},
"user did not receive request to join response",
)
s.Require().NoError(err)

userCommunity, err := user.GetCommunityByID(community.ID())
s.Require().NoError(err)
s.Require().True(userCommunity.HasMember(user.IdentityPublicKey()))

_, err = protocol.WaitOnMessengerResponse(
controlNode,
func(r *protocol.MessengerResponse) bool {
return len(r.Communities()) > 0 && r.Communities()[0].HasMember(user.IdentityPublicKey())
},
"control node did not receive request to join response",
)
s.Require().NoError(err)
}

func (s *MessengerRawMessageResendTest) TearDownTest() {
s.Require().NoError(s.aliceBackend.Logout())
s.Require().NoError(s.bobBackend.Logout())

}

func (s *MessengerRawMessageResendTest) createAliceBobBackendAndLogin() {
s.aliceBackend = NewGethStatusBackend()
s.bobBackend = NewGethStatusBackend()

aliceRootDir := filepath.Join(s.T().TempDir())
s.T().Logf("aliceRootDir: %s", aliceRootDir)
bobRootDir := filepath.Join(s.T().TempDir())
s.T().Logf("bobRootDir: %s", bobRootDir)

aliceCreateAccountRequest := s.setCreateAccountRequest("alice", aliceRootDir, filepath.Join(aliceRootDir, "alice.log"))
bobCreateAccountRequest := s.setCreateAccountRequest("bob66", bobRootDir, filepath.Join(bobRootDir, "bob.log"))

_, err := s.aliceBackend.CreateAccountAndLogin(aliceCreateAccountRequest)
s.Require().NoError(err)
_, err = s.bobBackend.CreateAccountAndLogin(bobCreateAccountRequest)
s.Require().NoError(err)

s.aliceMessenger = s.aliceBackend.Messenger()
s.Require().NotNil(s.aliceMessenger)
s.bobMessenger = s.bobBackend.Messenger()
s.Require().NotNil(s.bobMessenger)
_, err = s.aliceMessenger.Start()
s.Require().NoError(err)
_, err = s.bobMessenger.Start()
s.Require().NoError(err)
}

func (s *MessengerRawMessageResendTest) createTestCommunity(controlNode *protocol.Messenger, membershipType protobuf.CommunityPermissions_Access) *communities.Community {
description := &requests.CreateCommunity{
Membership: membershipType,
Name: "status",
Color: "#ffffff",
Description: "status community description",
PinMessageAllMembersEnabled: false,
}
response, err := controlNode.CreateCommunity(description, true)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
s.Require().Len(response.Chats(), 1)
return response.Communities()[0]
}

func (s *MessengerRawMessageResendTest) setCreateAccountRequest(displayName, backupDisabledDataDir, logFilePath string) *requests.CreateAccount {
nameServer := "1.1.1.1"
verifyENSContractAddress := "0x00000000000C2E074eC69A0dFb2997BA6C7d2e1e"
verifyTransactionChainID := int64(1)
verifyURL := "https://eth-archival.rpc.grove.city/v1/3ef2018191814b7e1009b8d9"
logLevel := "DEBUG"
networkID := uint64(1)
password := "qwerty"
return &requests.CreateAccount{
UpstreamConfig: verifyURL,
WakuV2Nameserver: &nameServer,
VerifyENSContractAddress: &verifyENSContractAddress,
BackupDisabledDataDir: backupDisabledDataDir,
Password: password,
DisplayName: displayName,
LogEnabled: true,
VerifyTransactionChainID: &verifyTransactionChainID,
VerifyTransactionURL: &verifyURL,
VerifyENSURL: &verifyURL,
LogLevel: &logLevel,
LogFilePath: logFilePath,
NetworkID: &networkID,
CustomizationColor: string(m_common.CustomizationColorPrimary),
}
}

// TestMessageSent tests if ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN is in state `sent` without resending
func (s *MessengerRawMessageResendTest) TestMessageSent() {
ids, err := s.bobMessenger.RawMessagesIDsByType(protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN)
s.Require().NoError(err)
s.Require().Len(ids, 1)
rawMessage, err := s.bobMessenger.RawMessageByID(ids[0])
s.Require().NoError(err)
s.Require().NotNil(rawMessage)
s.Require().True(rawMessage.Sent)
}

// TestMessageResend tests if ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN is resent
func (s *MessengerRawMessageResendTest) TestMessageResend() {
ids, err := s.bobMessenger.RawMessagesIDsByType(protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN)
s.Require().NoError(err)
s.Require().Len(ids, 1)
rawMessage, err := s.bobMessenger.RawMessageByID(ids[0])
s.Require().NoError(err)
s.Require().NotNil(rawMessage)
s.Require().NoError(s.bobMessenger.UpdateRawMessageSent(rawMessage.ID, false, 0))
err = tt.RetryWithBackOff(func() error {
rawMessage, err := s.bobMessenger.RawMessageByID(ids[0])
s.Require().NoError(err)
s.Require().NotNil(rawMessage)
if !rawMessage.Sent {
return errors.New("message ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN was not resent yet")
}
return nil
})
s.Require().NoError(err)

waitOnMessengerResponse(&s.Suite, func(r *protocol.MessengerResponse) error {
if len(r.RequestsToJoinCommunity()) > 0 {
return nil
}
return errors.New("community request to join not received")
}, s.aliceMessenger)
}
8 changes: 4 additions & 4 deletions multiaccounts/settings/sync_protobuf_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func buildRawSyncSettingMessage(msg *protobuf.SyncSetting, chatID string) (*comm
}

return &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_SETTING,
ResendAutomatically: true,
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_SETTING,
ResendType: common.ResendTypeDataSync,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *MessageSender) SendPrivate(
// Currently we don't support sending through datasync and setting custom waku fields,
// as the datasync interface is not rich enough to propagate that information, so we
// would have to add some complexity to handle this.
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
if rawMessage.ResendType == ResendTypeDataSync && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
}

Expand All @@ -185,7 +185,7 @@ func (s *MessageSender) SendPrivate(
// using the community topic and their key
func (s *MessageSender) SendCommunityMessage(
ctx context.Context,
rawMessage RawMessage,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a community message",
Expand All @@ -194,7 +194,7 @@ func (s *MessageSender) SendCommunityMessage(
)
rawMessage.Sender = s.identity

return s.sendCommunity(ctx, &rawMessage)
return s.sendCommunity(ctx, rawMessage)
}

// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *MessageSender) sendPrivate(
// earlier than the scheduled
s.notifyOnScheduledMessage(recipient, rawMessage)

if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendAutomatically {
if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendType == ResendTypeDataSync {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
Expand Down

0 comments on commit 3672759

Please sign in to comment.