Skip to content

Commit

Permalink
Peeking via MSC2753 (#1370)
Browse files Browse the repository at this point in the history
Initial implementation of MSC2753, as tested by matrix-org/sytest#944.
Doesn't yet handle unpeeks, peeked EDUs, or history viz changing during a peek - these will follow.
#1370 has full details.
  • Loading branch information
ara4n committed Sep 10, 2020
1 parent 35564dd commit 39507ba
Show file tree
Hide file tree
Showing 29 changed files with 1,209 additions and 59 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ matrixdotorg/sytest-dendrite:latest tests/50federation/40devicelists.pl
```
See [sytest.md](docs/sytest.md) for the full description of these flags.

You can try running sytest outside of docker for faster runs, but the dependencies can be temperamental
and we recommend using docker where possible.
```
cd sytest
export PERL5LIB=$HOME/lib/perl5
export PERL_MB_OPT=--install_base=$HOME
export PERL_MM_OPT=INSTALL_BASE=$HOME
./install-deps.pl
./run-tests.pl -I Dendrite::Monolith -d $PATH_TO_DENDRITE_BINARIES
```

Sometimes Sytest is testing the wrong thing or is flakey, so it will need to be patched.
Ask on `#dendrite-dev:matrix.org` if you think this is the case for you and we'll be happy to help.

Expand Down
2 changes: 1 addition & 1 deletion clientapi/routing/joinroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func JoinRoomByIDOrAlias(
}
}

// If content was provided in the request then incude that
// If content was provided in the request then include that
// in the request. It'll get used as a part of the membership
// event content.
_ = httputil.UnmarshalJSONRequest(req, &joinReq.Content)
Expand Down
79 changes: 79 additions & 0 deletions clientapi/routing/peekroom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routing

import (
"net/http"

roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)

func PeekRoomByIDOrAlias(
req *http.Request,
device *api.Device,
rsAPI roomserverAPI.RoomserverInternalAPI,
accountDB accounts.Database,
roomIDOrAlias string,
) util.JSONResponse {
// if this is a remote roomIDOrAlias, we have to ask the roomserver (or federation sender?) to
// to call /peek and /state on the remote server.
// TODO: in future we could skip this if we know we're already participating in the room,
// but this is fiddly in case we stop participating in the room.

// then we create a local peek.
peekReq := roomserverAPI.PerformPeekRequest{
RoomIDOrAlias: roomIDOrAlias,
UserID: device.UserID,
DeviceID: device.ID,
}
peekRes := roomserverAPI.PerformPeekResponse{}

// Check to see if any ?server_name= query parameters were
// given in the request.
if serverNames, ok := req.URL.Query()["server_name"]; ok {
for _, serverName := range serverNames {
peekReq.ServerNames = append(
peekReq.ServerNames,
gomatrixserverlib.ServerName(serverName),
)
}
}

// Ask the roomserver to perform the peek.
rsAPI.PerformPeek(req.Context(), &peekReq, &peekRes)
if peekRes.Error != nil {
return peekRes.Error.JSONResponse()
}

// if this user is already joined to the room, we let them peek anyway
// (given they might be about to part the room, and it makes things less fiddly)

// Peeking stops if none of the devices who started peeking have been
// /syncing for a while, or if everyone who was peeking calls /leave
// (or /unpeek with a server_name param? or DELETE /peek?)
// on the peeked room.

return util.JSONResponse{
Code: http.StatusOK,
// TODO: Put the response struct somewhere internal.
JSON: struct {
RoomID string `json:"room_id"`
}{peekRes.RoomID},
}
}
11 changes: 11 additions & 0 deletions clientapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ func Setup(
)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/peek/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Peek, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return PeekRoomByIDOrAlias(
req, device, rsAPI, accountDB, vars["roomIDOrAlias"],
)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/joined_rooms",
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return GetJoinedRooms(req, device, rsAPI)
Expand Down
19 changes: 19 additions & 0 deletions docs/peeking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Peeking

Peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).

Implementationwise, this means:
* Users call `/peek` and `/unpeek` on the clientapi from a given device.
* The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room
* The roomserver writes an NewPeek event into the kafka log headed to the syncserver
* The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.

Questions (given this is [my](https://github.com/ara4n) first time hacking on Dendrite):
* The whole clientapi -> roomserver -> syncapi flow to initiate a peek seems very indirect. Is there a reason not to just let syncapi itself host the implementation of `/peek`?

In future, peeking over federation will be added as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).
* The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated `/peek`
* The `federationsender` tracks the existence of the remote peek in question
* The `federationsender` regularly renews the remote peek as long as there are still peeking devices syncing for it.
* TBD: how do we tell if there are no devices currently syncing for a given peeked room? The syncserver needs to tell the roomserver
somehow who then needs to warn the federationsender.
7 changes: 7 additions & 0 deletions federationapi/routing/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (t *testRoomserverAPI) PerformJoin(
) {
}

func (t *testRoomserverAPI) PerformPeek(
ctx context.Context,
req *api.PerformPeekRequest,
res *api.PerformPeekResponse,
) {
}

func (t *testRoomserverAPI) PerformPublish(
ctx context.Context,
req *api.PerformPublishRequest,
Expand Down
2 changes: 1 addition & 1 deletion federationsender/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (d *Database) StoreJSON(
var err error
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js)
return nil
return err
})
if err != nil {
return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/sqlutil/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [

b := strings.Builder{}
for i, val := range dest {
b.WriteString(fmt.Sprintf("%v", val))
b.WriteString(fmt.Sprintf("%q", val))
if i+1 <= len(dest)-1 {
b.WriteString(" | ")
}
Expand Down
2 changes: 1 addition & 1 deletion keyserver/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID str
func (d *Database) StoreOneTimeKeys(ctx context.Context, keys api.OneTimeKeys) (counts *api.OneTimeKeysCount, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
counts, err = d.OneTimeKeysTable.InsertOneTimeKeys(ctx, txn, keys)
return nil
return err
})
return
}
Expand Down
6 changes: 6 additions & 0 deletions roomserver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type RoomserverInternalAPI interface {
res *PerformLeaveResponse,
) error

PerformPeek(
ctx context.Context,
req *PerformPeekRequest,
res *PerformPeekResponse,
)

PerformPublish(
ctx context.Context,
req *PerformPublishRequest,
Expand Down
9 changes: 9 additions & 0 deletions roomserver/api/api_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func (t *RoomserverInternalAPITrace) PerformInvite(
return t.Impl.PerformInvite(ctx, req, res)
}

func (t *RoomserverInternalAPITrace) PerformPeek(
ctx context.Context,
req *PerformPeekRequest,
res *PerformPeekResponse,
) {
t.Impl.PerformPeek(ctx, req, res)
util.GetLogger(ctx).Infof("PerformPeek req=%+v res=%+v", js(req), js(res))
}

func (t *RoomserverInternalAPITrace) PerformJoin(
ctx context.Context,
req *PerformJoinRequest,
Expand Down
15 changes: 14 additions & 1 deletion roomserver/api/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (
// - Redact the event and set the corresponding `unsigned` fields to indicate it as redacted.
// - Replace the event in the database.
OutputTypeRedactedEvent OutputType = "redacted_event"

// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek"
)

// An OutputEvent is an entry in the roomserver output kafka log.
Expand All @@ -59,8 +62,10 @@ type OutputEvent struct {
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
// The content of event with type OutputTypeRedactedEvent
// The content of event with type OutputTypeRedactedEvent
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
}

// An OutputNewRoomEvent is written when the roomserver receives a new event.
Expand Down Expand Up @@ -195,3 +200,11 @@ type OutputRedactedEvent struct {
// The value of `unsigned.redacted_because` - the redaction event itself
RedactedBecause gomatrixserverlib.HeaderedEvent
}

// An OutputNewPeek is written whenever a user starts peeking into a room
// using a given device.
type OutputNewPeek struct {
RoomID string
UserID string
DeviceID string
}
14 changes: 14 additions & 0 deletions roomserver/api/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ type PerformInviteResponse struct {
Error *PerformError
}

type PerformPeekRequest struct {
RoomIDOrAlias string `json:"room_id_or_alias"`
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}

type PerformPeekResponse struct {
// The room ID, populated on success.
RoomID string `json:"room_id"`
// If non-nil, the join request failed. Contains more information why it failed.
Error *PerformError
}

// PerformBackfillRequest is a request to PerformBackfill.
type PerformBackfillRequest struct {
// The room to backfill
Expand Down
8 changes: 8 additions & 0 deletions roomserver/internal/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type RoomserverInternalAPI struct {
*query.Queryer
*perform.Inviter
*perform.Joiner
*perform.Peeker
*perform.Leaver
*perform.Publisher
*perform.Backfiller
Expand Down Expand Up @@ -83,6 +84,13 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Peeker = &perform.Peeker{
ServerName: r.Cfg.Matrix.ServerName,
Cfg: r.Cfg,
DB: r.DB,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Leaver = &perform.Leaver{
Cfg: r.Cfg,
DB: r.DB,
Expand Down

0 comments on commit 39507ba

Please sign in to comment.