Skip to content

Commit

Permalink
Add TestClientRetriesSendToDevice; add TrySendMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Dec 1, 2023
1 parent 07716f6 commit e8b9858
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 13 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ jobs:
env:
COMPLEMENT_BASE_IMAGE: homeserver
COMPLEMENT_ENABLE_DIRTY_RUNS: 1
COMPLEMENT_CRYPTO_WRITE_CONTAINER_LOGS: 1
COMPLEMENT_SHARE_ENV_PREFIX: PASS_
PASS_SYNAPSE_COMPLEMENT_DATABASE: sqlite
DOCKER_BUILDKIT: 1
Expand All @@ -118,4 +119,4 @@ jobs:
name: Logs - ${{ job.status }}
path: |
./tests/rust_sdk_logs*
./tests/js_sdk.log
./tests/*.log
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Network connectivity:
- [x] If a server cannot send device list updates over federation, it retries. https://github.com/matrix-org/complement/pull/695
- [ ] If a client cannot query device keys for a user, it retries.
- [ ] If a server cannot query device keys on another server, it retries.
- [ ] If a client cannot send a to-device msg, it retries.
- [x] If a client cannot send a to-device msg, it retries.
- [x] If a server cannot send a to-device msg to another server, it retries. https://github.com/matrix-org/complement/pull/694
- [ ] Repeat all of the above, but restart the client|server after the initial connection failure. This checks that retries aren't just stored in memory but persisted to disk.

Expand Down
10 changes: 10 additions & 0 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Client interface {
// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room. Returns the event ID of the sent event, so MUST BLOCK until the event has been sent.
SendMessage(t *testing.T, roomID, text string) (eventID string)
// TrySendMessage tries to send the message, but can fail.
TrySendMessage(t *testing.T, roomID, text string) (eventID string, err error)
// Wait until an event with the given body is seen. Not all impls expose event IDs
// hence needing to use body as a proxy.
WaitUntilEventInRoom(t *testing.T, roomID string, checker func(e Event) bool) Waiter
Expand Down Expand Up @@ -80,6 +82,14 @@ func (c *LoggedClient) IsRoomEncrypted(t *testing.T, roomID string) (bool, error
return c.Client.IsRoomEncrypted(t, roomID)
}

func (c *LoggedClient) TrySendMessage(t *testing.T, roomID, text string) (eventID string, err error) {
t.Helper()
c.Logf(t, "%s TrySendMessage %s => %s", c.logPrefix(), roomID, text)
eventID, err = c.Client.TrySendMessage(t, roomID, text)
c.Logf(t, "%s TrySendMessage %s => %s", c.logPrefix(), roomID, eventID)
return
}

func (c *LoggedClient) SendMessage(t *testing.T, roomID, text string) (eventID string) {
t.Helper()
c.Logf(t, "%s SendMessage %s => %s", c.logPrefix(), roomID, text)
Expand Down
15 changes: 11 additions & 4 deletions internal/api/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func (c *JSClient) MustGetEvent(t *testing.T, roomID, eventID string) Event {
// Tests should call stopSyncing() at the end of the test.
func (c *JSClient) StartSyncing(t *testing.T) (stopSyncing func()) {
t.Helper()
t.Logf("%s is starting to sync", c.userID)
chrome.MustExecute(t, c.ctx, fmt.Sprintf(`
var fn;
fn = function(state) {
Expand Down Expand Up @@ -260,7 +259,6 @@ func (c *JSClient) StartSyncing(t *testing.T) (stopSyncing func()) {
// There's no callbacks for that yet, so sleep and pray.
// See https://github.com/matrix-org/matrix-js-sdk/blob/v29.1.0/src/rust-crypto/rust-crypto.ts#L1483
time.Sleep(500 * time.Millisecond)
t.Logf("%s is now syncing", c.userID)
return func() {
chrome.AwaitExecute(t, c.ctx, `window.__client.stopClient();`)
}
Expand All @@ -282,13 +280,22 @@ func (c *JSClient) IsRoomEncrypted(t *testing.T, roomID string) (bool, error) {
// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room.
func (c *JSClient) SendMessage(t *testing.T, roomID, text string) (eventID string) {
t.Helper()
eventID, err := c.TrySendMessage(t, roomID, text)
must.NotError(t, "failed to sendMessage", err)
return eventID
}

func (c *JSClient) TrySendMessage(t *testing.T, roomID, text string) (eventID string, err error) {
t.Helper()
res, err := chrome.AwaitExecuteInto[map[string]interface{}](t, c.ctx, fmt.Sprintf(`window.__client.sendMessage("%s", {
"msgtype": "m.text",
"body": "%s"
});`, roomID, text))
must.NotError(t, "failed to sendMessage", err)
return (*res)["event_id"].(string)
if err != nil {
return "", err
}
return (*res)["event_id"].(string), nil
}

func (c *JSClient) MustBackpaginate(t *testing.T, roomID string, count int) {
Expand Down
23 changes: 20 additions & 3 deletions internal/api/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ func NewRustClient(t *testing.T, opts ClientCreationOpts, ssURL string) (Client,

func (c *RustClient) Close(t *testing.T) {
t.Helper()
c.roomsMu.Lock()
for _, rri := range c.rooms {
if rri.stream != nil {
// ensure we don't see AddTimelineListener callbacks as they can cause panics
// if we t.Logf after t has passed/failed.
rri.stream.Cancel()
}
}
c.FFIClient.Destroy()
}

Expand Down Expand Up @@ -182,6 +190,15 @@ func (c *RustClient) Type() ClientTypeLang {
// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room. Returns the event ID of the sent event.
func (c *RustClient) SendMessage(t *testing.T, roomID, text string) (eventID string) {
t.Helper()
eventID, err := c.TrySendMessage(t, roomID, text)
if err != nil {
fatalf(t, err.Error())
}
return eventID
}

func (c *RustClient) TrySendMessage(t *testing.T, roomID, text string) (eventID string, err error) {
t.Helper()
ch := make(chan bool)
// we need a timeline listener before we can send messages, AND that listener must be attached to the
Expand All @@ -205,12 +222,12 @@ func (c *RustClient) SendMessage(t *testing.T, roomID, text string) (eventID str
defer cancel()
r.Send(matrix_sdk_ffi.MessageEventContentFromHtml(text, text))
select {
case <-time.After(15 * time.Second):
fatalf(t, "SendMessage(rust) %s: timed out after 35s", c.userID)
case <-time.After(11 * time.Second):
err = fmt.Errorf("SendMessage(rust) %s: timed out after 11s", c.userID)
return
case <-ch:
return
}
return
}

func (c *RustClient) MustBackpaginate(t *testing.T, roomID string, count int) {
Expand Down
8 changes: 4 additions & 4 deletions internal/chrome/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func MustExecuteInto[T any](t *testing.T, ctx context.Context, js string) T {

func ExecuteInto[T any](t *testing.T, ctx context.Context, js string) (*T, error) {
t.Helper()
t.Log(js)
//t.Log(js)
out := new(T)
err := chromedp.Run(ctx,
chromedp.Evaluate(js, &out),
Expand All @@ -34,7 +34,7 @@ func ExecuteInto[T any](t *testing.T, ctx context.Context, js string) (*T, error

func AwaitExecute(t *testing.T, ctx context.Context, js string) error {
var r *runtime.RemoteObject // stop large responses causing errors "Object reference chain is too long (-32000)"
t.Log(js)
//t.Log(js)
return chromedp.Run(ctx,
chromedp.Evaluate(js, &r, func(p *runtime.EvaluateParams) *runtime.EvaluateParams {
return p.WithAwaitPromise(true)
Expand All @@ -44,7 +44,7 @@ func AwaitExecute(t *testing.T, ctx context.Context, js string) error {

func AwaitExecuteInto[T any](t *testing.T, ctx context.Context, js string) (*T, error) {
t.Helper()
t.Log(js)
//t.Log(js)
out := new(T)
err := chromedp.Run(ctx,
chromedp.Evaluate(js, &out, func(p *runtime.EvaluateParams) *runtime.EvaluateParams {
Expand All @@ -66,7 +66,7 @@ func MustAwaitExecute(t *testing.T, ctx context.Context, js string) {
func MustExecute(t *testing.T, ctx context.Context, js string) {
t.Helper()
var r *runtime.RemoteObject // stop large responses causing errors "Object reference chain is too long (-32000)"
t.Log(js)
//t.Log(js)
err := chromedp.Run(ctx,
chromedp.Evaluate(js, &r),
)
Expand Down
59 changes: 59 additions & 0 deletions tests/client_connectivity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tests

import (
"net/http"
"testing"
"time"

"github.com/matrix-org/complement-crypto/internal/api"
)

// Test that if a client is unable to call /sendToDevice, it retries.
func TestClientRetriesSendToDevice(t *testing.T) {
ClientTypeMatrix(t, func(t *testing.T, clientTypeA, clientTypeB api.ClientType) {
tc := CreateTestContext(t, clientTypeA, clientTypeB)
roomID := tc.CreateNewEncryptedRoom(t, tc.Alice, "public_chat", nil)
tc.Bob.MustJoinRoom(t, roomID, []string{clientTypeA.HS})
alice := tc.MustLoginClient(t, tc.Alice, clientTypeA)
defer alice.Close(t)
bob := tc.MustLoginClient(t, tc.Bob, clientTypeB)
defer bob.Close(t)
aliceStopSyncing := alice.StartSyncing(t)
defer aliceStopSyncing()
bobStopSyncing := bob.StartSyncing(t)
defer bobStopSyncing()
// lets device keys be exchanged
time.Sleep(time.Second)

wantMsgBody := "Hello world!"
waiter := bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(wantMsgBody))

var evID string
var err error
// now gateway timeout the /sendToDevice endpoint
tc.Deployment.WithMITMOptions(t, map[string]interface{}{
"statuscode": map[string]interface{}{
"return_status": http.StatusGatewayTimeout,
"filter": "~u .*\\/sendToDevice.*",
},
}, func() {
evID, err = alice.TrySendMessage(t, roomID, wantMsgBody)
if err != nil {
// we allow clients to fail the send if they cannot call /sendToDevice
t.Logf("TrySendMessage: %s", err)
}
if evID != "" {
t.Logf("TrySendMessage: => %s", evID)
}
})

if err != nil {
// retry now we have connectivity
evID = alice.SendMessage(t, roomID, wantMsgBody)
}

// Bob receives the message
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
})
}

0 comments on commit e8b9858

Please sign in to comment.