Skip to content

Commit

Permalink
Make SendMessage block until the message is sent
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Nov 14, 2023
1 parent afc2b4d commit 7919c1a
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:


complement:
name: JS tests
name: Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3 # Checkout crypto tests
Expand Down
10 changes: 6 additions & 4 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type Client interface {
// provide a bogus room ID.
IsRoomEncrypted(t *testing.T, roomID string) (bool, error)
// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room. Returns the event ID of the sent event.
SendMessage(t *testing.T, roomID, text string)
// room. Returns the event ID of the sent event, so MUST BLOCK until the event has been sent.
SendMessage(t *testing.T, roomID, text string) (eventID string)
// Wait until an event with the given body is seen. Not all impls expose event IDs
// hence needing to use body as a proxy.
WaitUntilEventInRoom(t *testing.T, roomID string, checker func(e Event) bool) Waiter
Expand Down Expand Up @@ -69,10 +69,12 @@ func (c *LoggedClient) IsRoomEncrypted(t *testing.T, roomID string) (bool, error
return c.Client.IsRoomEncrypted(t, roomID)
}

func (c *LoggedClient) SendMessage(t *testing.T, roomID, text string) {
func (c *LoggedClient) SendMessage(t *testing.T, roomID, text string) (eventID string) {
t.Helper()
c.Logf(t, "%s SendMessage %s => %s", c.logPrefix(), roomID, text)
c.Client.SendMessage(t, roomID, text)
eventID = c.Client.SendMessage(t, roomID, text)
c.Logf(t, "%s SendMessage %s => %s", c.logPrefix(), roomID, eventID)
return
}

func (c *LoggedClient) WaitUntilEventInRoom(t *testing.T, roomID string, checker func(e Event) bool) Waiter {
Expand Down
11 changes: 8 additions & 3 deletions internal/api/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func NewJSClient(t *testing.T, opts ClientCreationOpts) (Client, error) {
func (c *JSClient) Close(t *testing.T) {
c.cancel()
c.listeners = make(map[int32]func(roomID string, ev Event))
if t.Failed() {
// print logs for this test

}
}

func (c *JSClient) UserID() string {
Expand Down Expand Up @@ -186,7 +190,7 @@ func (c *JSClient) StartSyncing(t *testing.T) (stopSyncing func()) {
chrome.AwaitExecute(t, c.ctx, `window.__client.startClient({});`)
select {
case <-time.After(5 * time.Second):
t.Fatalf("took >5s to StartSyncing")
t.Fatalf("[%s](js) took >5s to StartSyncing", c.userID)
case <-ch:
}
cancel()
Expand All @@ -210,12 +214,13 @@ func (c *JSClient) IsRoomEncrypted(t *testing.T, roomID string) (bool, error) {

// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room.
func (c *JSClient) SendMessage(t *testing.T, roomID, text string) {
err := chrome.AwaitExecute(t, c.ctx, fmt.Sprintf(`window.__client.sendMessage("%s", {
func (c *JSClient) SendMessage(t *testing.T, roomID, text string) (eventID string) {
res, err := chrome.AwaitExecuteInto[map[string]interface{}](t, c.ctx, fmt.Sprintf(`window.__client.sendMessage("%s", {
"msgtype": "m.text",
"body": "%s"
});`, roomID, text))
must.NotError(t, "failed to sendMessage", err)
return (*res)["event_id"].(string)
}

func (c *JSClient) MustBackpaginate(t *testing.T, roomID string, count int) {
Expand Down
41 changes: 29 additions & 12 deletions internal/api/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/matrix-org/complement-crypto/rust/matrix_sdk_ffi"
"github.com/matrix-org/complement/must"
"golang.org/x/exp/slices"
)

func init() {
Expand Down Expand Up @@ -87,7 +88,7 @@ func (c *RustClient) StartSyncing(t *testing.T) (stopSyncing func()) {
for !isSyncing {
select {
case <-time.After(5 * time.Second):
t.Fatalf("timed out after 5s StartSyncing")
t.Fatalf("[%s](rust) timed out after 5s StartSyncing", c.userID)
case state := <-ch:
fmt.Println(state)
switch state.(type) {
Expand Down Expand Up @@ -135,13 +136,36 @@ func (c *RustClient) Type() ClientType {

// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room. Returns the event ID of the sent event.
func (c *RustClient) SendMessage(t *testing.T, roomID, text string) {
func (c *RustClient) SendMessage(t *testing.T, roomID, text string) (eventID string) {
t.Helper()
ch := make(chan bool)
// we need a timeline listener before we can send messages, AND that listener must be attached to the
// same *Room you call .Send on :S
r := c.ensureListening(t, roomID)
t.Logf("%s: SendMessage[%s]: '%s'", c.userID, roomID, text)
cancel := c.listenForUpdates(func(roomID string) {
info := c.rooms[roomID]
if info == nil {
return
}
for _, ev := range info.timeline {
if ev == nil {
continue
}
if ev.Text == text && ev.ID != "" {
eventID = ev.ID
close(ch)
}
}
})
defer cancel()
r.Send(matrix_sdk_ffi.MessageEventContentFromHtml(text, text))
select {
case <-time.After(5 * time.Second):
t.Fatalf("SendMessage: timed out after 5s")
case <-ch:
return
}
return
}

func (c *RustClient) MustBackpaginate(t *testing.T, roomID string, count int) {
Expand Down Expand Up @@ -206,14 +230,7 @@ func (c *RustClient) ensureListening(t *testing.T, roomID string) *matrix_sdk_ff
t.Logf("TimelineListener[%s] INSERT %d out of bounds of events timeline of size %d", roomID, i, len(timeline))
continue
}
if timeline[i] != nil {
// shift the item in this position right and insert this item
timeline = append(timeline, nil)
copy(timeline[i+1:], timeline[i:])
timeline[i] = timelineItemToEvent(insertData.Item)
} else {
timeline[i] = timelineItemToEvent(insertData.Item)
}
timeline = slices.Insert(timeline, i, timelineItemToEvent(insertData.Item))
fmt.Printf("[%s]_______ INSERT %+v\n", c.userID, timeline[i])
case matrix_sdk_ffi.TimelineChangeAppend:
appendItems := d.Append()
Expand All @@ -225,7 +242,7 @@ func (c *RustClient) ensureListening(t *testing.T, roomID string) *matrix_sdk_ff
timeline = append(timeline, ev)
fmt.Printf("[%s]_______ APPEND %+v\n", c.userID, ev)
}
case matrix_sdk_ffi.TimelineChangePushBack:
case matrix_sdk_ffi.TimelineChangePushBack: // append but 1 element
pbData := d.PushBack()
if pbData == nil {
continue
Expand Down
15 changes: 15 additions & 0 deletions internal/chrome/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ func AwaitExecute(t *testing.T, ctx context.Context, js string) error {
)
}

func AwaitExecuteInto[T any](t *testing.T, ctx context.Context, js string) (*T, error) {
t.Helper()
t.Log(js)
out := new(T)
err := chromedp.Run(ctx,
chromedp.Evaluate(js, &out, func(p *runtime.EvaluateParams) *runtime.EvaluateParams {
return p.WithAwaitPromise(true)
}),
)
if err != nil {
return nil, err
}
return out, nil
}

func MustAwaitExecute(t *testing.T, ctx context.Context, js string) {
t.Helper()
err := AwaitExecute(t, ctx, js)
Expand Down
10 changes: 2 additions & 8 deletions tests/membership_acls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func TestAliceBobEncryptionWorks(t *testing.T) {
t.Logf("bob room encrypted = %v", isEncrypted)

waiter := bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(wantMsgBody))
alice.SendMessage(t, roomID, wantMsgBody)
evID := alice.SendMessage(t, roomID, wantMsgBody)

// Bob receives the message
t.Logf("bob (%s) waiting for event", bob.Type())
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
})
}
Expand Down Expand Up @@ -153,12 +153,6 @@ func TestCanDecryptMessagesAfterInviteButBeforeJoin(t *testing.T) {

// Alice sends the message whilst Bob is still invited.
alice.SendMessage(t, roomID, wantMsgBody)
// wait for SS proxy to get it. Only needed when testing Rust TODO FIXME
// Without this, the join will race with sending the msg and you could end up with the
// message being sent POST join, which breaks the point of this test.
// kegan: I think this happens because SendMessage on Rust does not block until a 200 OK
// because it allows for local echo. Can we fix the RustClient?
time.Sleep(time.Second)

// Bob joins the room (via Complement, but it shouldn't matter)
csapiBob.MustJoinRoom(t, roomID, []string{"hs1"})
Expand Down

0 comments on commit 7919c1a

Please sign in to comment.