diff --git a/internal/deploy/callback/callback_addon.go b/internal/deploy/callback/callback_addon.go index ab69e18..f7faf6d 100644 --- a/internal/deploy/callback/callback_addon.go +++ b/internal/deploy/callback/callback_addon.go @@ -162,12 +162,13 @@ func NewCallbackServer(t ct.TestLike, hostnameRunningComplement string) (*Callba // SendError returns a callback.Fn which returns the provided statusCode // along with a JSON error $count times, after which it lets the response -// pass through. This is useful for testing retries. +// pass through. This is useful for testing retries. If count=0, always send +// an error response. func SendError(count uint32, statusCode int) Fn { var seen atomic.Uint32 return func(d Data) *Response { next := seen.Add(1) - if next > count { + if count > 0 && next > count { return nil } return &Response{ diff --git a/internal/deploy/mitm/client.go b/internal/deploy/mitm/client.go index 54ba2d8..dbf54c9 100644 --- a/internal/deploy/mitm/client.go +++ b/internal/deploy/mitm/client.go @@ -6,7 +6,6 @@ import ( "io" "net/http" "net/url" - "sync" "testing" "time" @@ -40,10 +39,8 @@ func NewClient(proxyURL *url.URL, hostnameRunningComplement string) *Client { func (m *Client) Configure(t *testing.T) *Configuration { return &Configuration{ - t: t, - pathCfgs: make(map[string]*MITMPathConfiguration), - mu: &sync.Mutex{}, - client: m, + t: t, + client: m, } } diff --git a/internal/deploy/mitm/configuration.go b/internal/deploy/mitm/configuration.go index df56649..2cb4eff 100644 --- a/internal/deploy/mitm/configuration.go +++ b/internal/deploy/mitm/configuration.go @@ -1,14 +1,10 @@ package mitm import ( - "encoding/json" "strings" - "sync" - "sync/atomic" "testing" "github.com/matrix-org/complement-crypto/internal/deploy/callback" - "github.com/matrix-org/complement/ct" "github.com/matrix-org/complement/must" ) @@ -17,10 +13,8 @@ import ( // Tests will typically build up this configuration by calling `Intercept` with the paths // they are interested in. type Configuration struct { - t *testing.T - pathCfgs map[string]*MITMPathConfiguration - mu *sync.Mutex - client *Client + t *testing.T + client *Client } // Filter represents a mitmproxy filter; see https://docs.mitmproxy.org/stable/concepts-filters/ @@ -133,142 +127,3 @@ func (c *Configuration) WithIntercept(opts InterceptOpts, inner func()) { defer c.client.UnlockOptions(c.t, lockID) inner() } - -func (c *Configuration) ForPath(partialPath string) *MITMPathConfiguration { - c.mu.Lock() - defer c.mu.Unlock() - p, ok := c.pathCfgs[partialPath] - if ok { - return p - } - p = &MITMPathConfiguration{ - t: c.t, - path: partialPath, - } - c.pathCfgs[partialPath] = p - return p -} - -// Execute a mitm proxy configuration for the duration of `inner`. -func (c *Configuration) Execute(inner func()) { - // The HTTP request to mitmproxy needs to look like: - // { - // $addon_name: { - // $addon_values... - // } - // } - // - // The API shape of the add-ons are located inside the python files in tests/mitmproxy_addons - if len(c.pathCfgs) > 1 { - c.t.Fatalf(">1 path config currently unsupported") // TODO - } - c.mu.Lock() - callbackAddon := map[string]any{} - for _, pathConfig := range c.pathCfgs { - if pathConfig.filter() != "" { - callbackAddon["filter"] = pathConfig.filter() - } - cbServer, err := callback.NewCallbackServer(c.t, c.client.hostnameRunningComplement) - must.NotError(c.t, "failed to start callback server", err) - defer cbServer.Close() - - if pathConfig.listener != nil { - responseCallbackURL := cbServer.SetOnResponseCallback(c.t, pathConfig.listener) - callbackAddon["callback_response_url"] = responseCallbackURL - } - if pathConfig.blockRequest != nil && *pathConfig.blockRequest { - // reimplement statuscode plugin logic in Go - // TODO: refactor this - var count atomic.Uint32 - requestCallbackURL := cbServer.SetOnRequestCallback(c.t, func(cd callback.Data) *callback.Response { - newCount := count.Add(1) - if pathConfig.blockCount > 0 && newCount > uint32(pathConfig.blockCount) { - return nil // don't block - } - // block this request by sending back a fake response - return &callback.Response{ - RespondStatusCode: pathConfig.blockStatusCode, - RespondBody: json.RawMessage(`{"error":"complement-crypto says no"}`), - } - }) - callbackAddon["callback_request_url"] = requestCallbackURL - } - } - c.mu.Unlock() - - lockID := c.client.LockOptions(c.t, map[string]any{ - "callback": callbackAddon, - }) - defer c.client.UnlockOptions(c.t, lockID) - inner() - -} - -type MITMPathConfiguration struct { - t *testing.T - path string - accessToken string - method string - listener func(cd callback.Data) *callback.Response - - blockCount int - blockStatusCode int - blockRequest *bool // nil means don't block -} - -func (p *MITMPathConfiguration) filter() string { - // the filter is a python regexp - // "Regexes are Python-style" - https://docs.mitmproxy.org/stable/concepts-filters/ - // re.escape() escapes very little: - // "Changed in version 3.7: Only characters that can have special meaning in a regular expression are escaped. - // As a result, '!', '"', '%', "'", ',', '/', ':', ';', '<', '=', '>', '@', and "`" are no longer escaped." - // https://docs.python.org/3/library/re.html#re.escape - // - // The majority of HTTP paths are just /foo/bar with % for path-encoding e.g @foo:bar=>%40foo%3Abar, - // so on balance we can probably just use the path directly. - var s strings.Builder - s.WriteString("~u .*" + p.path + ".*") - if p.method != "" { - s.WriteString(" ~m " + strings.ToUpper(p.method)) - } - if p.accessToken != "" { - s.WriteString(" ~hq " + p.accessToken) - } - return s.String() -} - -func (p *MITMPathConfiguration) Listen(cb func(cd callback.Data) *callback.Response) *MITMPathConfiguration { - p.listener = cb - return p -} - -func (p *MITMPathConfiguration) AccessToken(accessToken string) *MITMPathConfiguration { - p.accessToken = accessToken - return p -} - -func (p *MITMPathConfiguration) Method(method string) *MITMPathConfiguration { - p.method = method - return p -} - -func (p *MITMPathConfiguration) BlockRequest(count, returnStatusCode int) *MITMPathConfiguration { - if p.blockRequest != nil { - // we can't express blocking requests and responses separately, it doesn't make sense. - ct.Fatalf(p.t, "BlockRequest or BlockResponse cannot be called multiple times for the same path") - } - p.blockCount = count - p.blockRequest = &boolTrue - p.blockStatusCode = returnStatusCode - return p -} - -func (p *MITMPathConfiguration) BlockResponse(count, returnStatusCode int) *MITMPathConfiguration { - if p.blockRequest != nil { - ct.Fatalf(p.t, "BlockRequest or BlockResponse cannot be called multiple times for the same path") - } - p.blockCount = count - p.blockRequest = &boolFalse - p.blockStatusCode = returnStatusCode - return p -} diff --git a/tests/notification_test.go b/tests/notification_test.go index c29b3c8..c2c1bb4 100644 --- a/tests/notification_test.go +++ b/tests/notification_test.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/complement-crypto/internal/api" "github.com/matrix-org/complement-crypto/internal/cc" "github.com/matrix-org/complement-crypto/internal/deploy/callback" + "github.com/matrix-org/complement-crypto/internal/deploy/mitm" "github.com/matrix-org/complement/must" ) @@ -567,23 +568,26 @@ func TestMultiprocessDupeOTKUpload(t *testing.T) { // artificially slow down the HTTP responses, such that we will potentially have 2 in-flight /keys/upload requests // at once. If the NSE and main apps are talking to each other, they should be using the same key ID + key. // If not... well, that's a bug because then the client will forget one of these keys. - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/upload").Listen(func(cd callback.Data) *callback.Response { - if cd.AccessToken != aliceAccessToken { - return nil // let bob upload OTKs - } - aliceUploadedNewKeys = true - if cd.ResponseCode != 200 { - // we rely on the homeserver checking and rejecting when the same key ID is used with - // different keys. - t.Errorf("/keys/upload returned an error, duplicate key upload? %+v => %v", cd, string(cd.ResponseBody)) - } - // tarpit the response - t.Logf("tarpitting keys/upload response for 4 seconds") - time.Sleep(4 * time.Second) - return nil - }) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/upload", + }, + ResponseCallback: func(cd callback.Data) *callback.Response { + if cd.AccessToken != aliceAccessToken { + return nil // let bob upload OTKs + } + aliceUploadedNewKeys = true + if cd.ResponseCode != 200 { + // we rely on the homeserver checking and rejecting when the same key ID is used with + // different keys. + t.Errorf("/keys/upload returned an error, duplicate key upload? %+v => %v", cd, string(cd.ResponseBody)) + } + // tarpit the response + t.Logf("tarpitting keys/upload response for 4 seconds") + time.Sleep(4 * time.Second) + return nil + }, + }, func() { var eventID string // Bob appears and sends a message, causing Bob to claim one of Alice's OTKs. // The main app will see this in /sync and then try to upload another OTK, which we will tarpit. diff --git a/tests/one_time_keys_test.go b/tests/one_time_keys_test.go index 9245e6f..6e5605a 100644 --- a/tests/one_time_keys_test.go +++ b/tests/one_time_keys_test.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/complement-crypto/internal/api" "github.com/matrix-org/complement-crypto/internal/cc" "github.com/matrix-org/complement-crypto/internal/deploy/callback" + "github.com/matrix-org/complement-crypto/internal/deploy/mitm" "github.com/matrix-org/complement/b" "github.com/matrix-org/complement/client" "github.com/matrix-org/complement/ct" @@ -108,9 +109,14 @@ func TestFallbackKeyIsUsedIfOneTimeKeysRunOut(t *testing.T) { var roomID string var waiter api.Waiter // Block all /keys/upload requests for Alice - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/upload").AccessToken(alice.CurrentAccessToken(t)).BlockRequest(0, http.StatusGatewayTimeout) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/upload", + Method: "POST", + AccessToken: alice.CurrentAccessToken(t), + }, + RequestCallback: callback.SendError(0, http.StatusGatewayTimeout), + }, func() { // claim all OTKs mustClaimOTKs(t, otkGobbler, tc.Alice, int(otkCount)) @@ -156,9 +162,13 @@ func TestFailedOneTimeKeyUploadRetries(t *testing.T) { // make a room so we can kick clients roomID := tc.Alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) // block /keys/upload and make a client - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/upload").Method("POST").BlockRequest(2, http.StatusGatewayTimeout) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/upload", + Method: "POST", + }, + RequestCallback: callback.SendError(2, http.StatusGatewayTimeout), + }, func() { tc.WithAliceSyncing(t, func(alice api.Client) { tc.Bob.MustDo(t, "POST", []string{ "_matrix", "client", "v3", "keys", "claim", @@ -204,16 +214,21 @@ func TestFailedKeysClaimRetries(t *testing.T) { // make a room which will link the 2 users together when roomID := tc.CreateNewEncryptedRoom(t, tc.Alice, cc.EncRoomOptions.PresetPublicChat()) // block /keys/claim and join the room, causing the Olm session to be created - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/claim").Method("POST").BlockRequest(2, http.StatusGatewayTimeout).Listen(func(cd callback.Data) *callback.Response { - t.Logf("%+v", cd) - if cd.ResponseCode == 200 { - waiter.Finish() - stopPoking.Store(true) - } - return nil - }) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/claim", + Method: "POST", + }, + RequestCallback: callback.SendError(2, http.StatusGatewayTimeout), + ResponseCallback: func(cd callback.Data) *callback.Response { + t.Logf("%+v", cd) + if cd.ResponseCode == 200 { + waiter.Finish() + stopPoking.Store(true) + } + return nil + }, + }, func() { // join the room. This should cause an Olm session to be made but it will fail as we cannot // call /keys/claim. We should retry though. tc.Bob.MustJoinRoom(t, roomID, []string{clientType.HS}) diff --git a/tests/state_synchronisation_test.go b/tests/state_synchronisation_test.go index 95a0176..bf5a192 100644 --- a/tests/state_synchronisation_test.go +++ b/tests/state_synchronisation_test.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/complement-crypto/internal/api" "github.com/matrix-org/complement-crypto/internal/cc" "github.com/matrix-org/complement-crypto/internal/deploy/callback" + "github.com/matrix-org/complement-crypto/internal/deploy/mitm" "github.com/matrix-org/complement/ct" "github.com/matrix-org/complement/helpers" ) @@ -36,25 +37,28 @@ func testSigkillBeforeKeysUploadResponseRust(t *testing.T, clientType api.Client var terminateClient func() seenSecondKeysUploadWaiter := helpers.NewWaiter() tc := Instance().CreateTestContext(t, clientType, clientType) - - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/upload").Listen(func(cd callback.Data) *callback.Response { - if terminated.Load() { - // make sure the 2nd upload 200 OKs - if cd.ResponseCode != 200 { - t.Errorf("2nd /keys/upload did not 200 OK => got %v", cd.ResponseCode) + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/upload", + Method: "POST", + }, + ResponseCallback: func(cd callback.Data) *callback.Response { + if terminated.Load() { + // make sure the 2nd upload 200 OKs + if cd.ResponseCode != 200 { + t.Errorf("2nd /keys/upload did not 200 OK => got %v", cd.ResponseCode) + } + t.Logf("recv 2nd /keys/upload => HTTP %d", cd.ResponseCode) + seenSecondKeysUploadWaiter.Finish() + return nil } - t.Logf("recv 2nd /keys/upload => HTTP %d", cd.ResponseCode) - seenSecondKeysUploadWaiter.Finish() + // destroy the client + mu.Lock() + terminateClient() + mu.Unlock() return nil - } - // destroy the client - mu.Lock() - terminateClient() - mu.Unlock() - return nil - }) - mitmConfiguration.Execute(func() { + }, + }, func() { // login in a different process remoteClient := tc.MustCreateClient(t, &cc.ClientCreationRequest{ User: tc.Alice, @@ -98,30 +102,31 @@ func testSigkillBeforeKeysUploadResponseJS(t *testing.T, clientType api.ClientTy var terminateClient func() seenSecondKeysUploadWaiter := helpers.NewWaiter() tc := Instance().CreateTestContext(t, clientType, clientType) - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/upload").Listen(func(cd callback.Data) *callback.Response { - if cd.Method == "OPTIONS" { - return nil // ignore CORS - } - if terminated.Load() { - // make sure the 2nd upload 200 OKs - if cd.ResponseCode != 200 { - ct.Errorf(t, "2nd /keys/upload did not 200 OK => got %v", cd.ResponseCode) + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/upload", + Method: "POST", + }, + ResponseCallback: func(cd callback.Data) *callback.Response { + if terminated.Load() { + // make sure the 2nd upload 200 OKs + if cd.ResponseCode != 200 { + ct.Errorf(t, "2nd /keys/upload did not 200 OK => got %v", cd.ResponseCode) + } + seenSecondKeysUploadWaiter.Finish() + return nil } - seenSecondKeysUploadWaiter.Finish() + // destroy the client + mu.Lock() + if terminateClient != nil { + terminateClient() + } else { + ct.Errorf(t, "terminateClient is nil. Did WithMITMOptions lock?") + } + mu.Unlock() return nil - } - // destroy the client - mu.Lock() - if terminateClient != nil { - terminateClient() - } else { - ct.Errorf(t, "terminateClient is nil. Did WithMITMOptions lock?") - } - mu.Unlock() - return nil - }) - mitmConfiguration.Execute(func() { + }, + }, func() { clientWhichWillBeKilled := tc.MustCreateClient(t, &cc.ClientCreationRequest{ User: tc.Alice, Opts: api.ClientCreationOpts{ diff --git a/tests/to_device_test.go b/tests/to_device_test.go index f4f2ef4..102b6d3 100644 --- a/tests/to_device_test.go +++ b/tests/to_device_test.go @@ -3,6 +3,7 @@ package tests import ( "fmt" "net/http" + "sync/atomic" "testing" "time" @@ -31,9 +32,12 @@ func TestClientRetriesSendToDevice(t *testing.T) { var evID string var err error // now gateway timeout the /sendToDevice endpoint - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/sendToDevice").BlockResponse(0, http.StatusGatewayTimeout) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/sendToDevice", + }, + ResponseCallback: callback.SendError(0, http.StatusGatewayTimeout), + }, func() { evID, err = alice.TrySendMessage(t, roomID, wantMsgBody) if err != nil { // we allow clients to fail the send if they cannot call /sendToDevice @@ -43,7 +47,6 @@ func TestClientRetriesSendToDevice(t *testing.T) { t.Logf("TrySendMessage: => %s", evID) } }) - if err != nil { // retry now we have connectivity evID = alice.SendMessage(t, roomID, wantMsgBody) @@ -128,23 +131,26 @@ func TestUnprocessedToDeviceMessagesArentLostOnRestart(t *testing.T) { func testUnprocessedToDeviceMessagesArentLostOnRestartRust(t *testing.T, tc *cc.TestContext, bobOpts api.ClientCreationOpts, roomID, eventID string) { // sniff /sync traffic waitForRoomKey := helpers.NewWaiter() - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/sync").Listen(func(cd callback.Data) *callback.Response { - // When /sync shows a to-device message from Alice (indicating the room key), then SIGKILL Bob. - t.Logf("/sync => %v", string(cd.ResponseBody)) - body := gjson.ParseBytes(cd.ResponseBody) - toDeviceEvents := body.Get("extensions.to_device.events").Array() // Sliding Sync form - if len(toDeviceEvents) > 0 { - for _, ev := range toDeviceEvents { - if ev.Get("type").Str == "m.room.encrypted" { - t.Logf("detected potential room key") - waitForRoomKey.Finish() + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/sync", + }, + ResponseCallback: func(cd callback.Data) *callback.Response { + // When /sync shows a to-device message from Alice (indicating the room key), then SIGKILL Bob. + t.Logf("/sync => %v", string(cd.ResponseBody)) + body := gjson.ParseBytes(cd.ResponseBody) + toDeviceEvents := body.Get("extensions.to_device.events").Array() // Sliding Sync form + if len(toDeviceEvents) > 0 { + for _, ev := range toDeviceEvents { + if ev.Get("type").Str == "m.room.encrypted" { + t.Logf("detected potential room key") + waitForRoomKey.Finish() + } } } - } - return nil - }) - mitmConfiguration.Execute(func() { + return nil + }, + }, func() { // bob comes back online, and will be killed a short while later. // No need to login as we will reuse the session from before. // This is critical to ensure we get the room key update as it would have been sent @@ -278,36 +284,37 @@ func TestToDeviceMessagesAreBatched(t *testing.T) { waiter := helpers.NewWaiter() tc.WithAliceSyncing(t, func(alice api.Client) { // intercept /sendToDevice and check we are sending 100 messages per request - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/sendToDevice").Listen(func(cd callback.Data) *callback.Response { - if cd.Method != "PUT" { - return nil - } - // format is: - /* - { - "messages": { - "@alice:example.com": { - "TLLBEANAAG": { - "example_content_key": "value" - } - } - } + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/sendToDevice", + Method: "PUT", + }, + ResponseCallback: func(cd callback.Data) *callback.Response { + // format is: + /* + { + "messages": { + "@alice:example.com": { + "TLLBEANAAG": { + "example_content_key": "value" + } + } + } + } + */ + usersMap := gjson.GetBytes(cd.RequestBody, "messages") + if !usersMap.Exists() { + t.Logf("intercepted PUT /sendToDevice but no messages existed") + return nil } - */ - usersMap := gjson.GetBytes(cd.RequestBody, "messages") - if !usersMap.Exists() { - t.Logf("intercepted PUT /sendToDevice but no messages existed") + if len(usersMap.Map()) != 100 { + t.Errorf("PUT /sendToDevice did not batch messages, got %d want 100", len(usersMap.Map())) + t.Logf(usersMap.Raw) + } + waiter.Finish() return nil - } - if len(usersMap.Map()) != 100 { - t.Errorf("PUT /sendToDevice did not batch messages, got %d want 100", len(usersMap.Map())) - t.Logf(usersMap.Raw) - } - waiter.Finish() - return nil - }) - mitmConfiguration.Execute(func() { + }, + }, func() { alice.SendMessage(t, roomID, "this should cause to-device msgs to be sent") time.Sleep(time.Second) waiter.Waitf(t, 5*time.Second, "did not see /sendToDevice") @@ -348,13 +355,17 @@ func TestToDeviceMessagesArentLostWhenKeysQueryFails(t *testing.T) { var eventID string bobAccessToken := bob.CurrentAccessToken(t) t.Logf("Bob's token => %s", bobAccessToken) - mitmConfiguration := tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/keys/query").AccessToken(bobAccessToken).BlockRequest(3, http.StatusGatewayTimeout).Listen(func(cd callback.Data) *callback.Response { - t.Logf("%+v", cd) - waiter.Finish() - return nil - }) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/keys/query", + }, + RequestCallback: callback.SendError(3, http.StatusGatewayTimeout), + ResponseCallback: func(d callback.Data) *callback.Response { + t.Logf("%+v", d) + waiter.Finish() + return nil + }, + }, func() { // Alice logs in on a new device. csapiAlice2 := tc.MustRegisterNewDevice(t, tc.Alice, "OTHER_DEVICE") tc.WithClientSyncing(t, &cc.ClientCreationRequest{ @@ -419,7 +430,7 @@ func TestToDeviceMessagesAreProcessedInOrder(t *testing.T) { Body string }{} tc.WithAliceSyncing(t, func(alice api.Client) { - callback := func(cd callback.Data) *callback.Response { + callbackFn := func(cd callback.Data) *callback.Response { // try v2 sync then SS toDeviceEvents := gjson.ParseBytes(cd.ResponseBody).Get("to_device.events").Array() if len(toDeviceEvents) == 0 { @@ -430,12 +441,25 @@ func TestToDeviceMessagesAreProcessedInOrder(t *testing.T) { } return nil } + shouldBlockRequest := atomic.Bool{} + shouldBlockRequest.Store(true) + sendError := callback.SendError(0, http.StatusGatewayTimeout) // Block Alice's /sync - mitmConfiguration := tc.Deployment.MITM().Configure(t) // intercept /sync just so we can observe the number of to-device msgs coming down. // We also synchronise on this to know when the client has received the to-device msgs - mitmConfiguration.ForPath("/sync").AccessToken(alice.CurrentAccessToken(t)).BlockRequest(0, http.StatusGatewayTimeout).Listen(callback) - mitmConfiguration.Execute(func() { + tc.Deployment.MITM().Configure(t).WithIntercept(mitm.InterceptOpts{ + Filter: mitm.FilterParams{ + PathContains: "/sync", + AccessToken: alice.CurrentAccessToken(t), + }, + RequestCallback: func(d callback.Data) *callback.Response { + if shouldBlockRequest.Load() { + return sendError(d) + } + return nil + }, + ResponseCallback: callbackFn, + }, func() { // create 10 users and join the room creationReqs := make([]*cc.ClientCreationRequest, numClients) for i := range creationReqs { @@ -461,12 +485,9 @@ func TestToDeviceMessagesAreProcessedInOrder(t *testing.T) { } }) t.Logf("sent %d timeline events", len(timelineEvents)) - }) - // Alice's /sync is unblocked, wait until we see the last event. - // Re-add the callback server TODO: allow composing see https://github.com/matrix-org/complement-crypto/issues/68 - mitmConfiguration = tc.Deployment.MITM().Configure(t) - mitmConfiguration.ForPath("/sync").AccessToken(alice.CurrentAccessToken(t)).Listen(callback) - mitmConfiguration.Execute(func() { + // Alice's /sync is unblocked, wait until we see the last event. + shouldBlockRequest.Store(false) + lastTimelineEvent := timelineEvents[len(timelineEvents)-1] alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(lastTimelineEvent.ID)).Waitf( // wait a while here as we need to wait for both /sync to retry and a large response