Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typing notifications to /sync responses - fixes #635 #718

Merged
merged 25 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b896fdc
Add typing notifications to /sync responses - fixes #635
Cnly Jun 18, 2019
55a4e60
Fix linting
Cnly Jun 25, 2019
ba35a0a
Fix nil pointer and goroutine safety in typing cache
Cnly Jun 25, 2019
e742b7d
Add warning for OnNewEvent when no user to wake up
Cnly Jun 25, 2019
e687027
Fix syncapi/sync/notifier_test.go
Cnly Jun 26, 2019
18c59e6
Fix more linting issues and docs
Cnly Jun 26, 2019
badd360
Add a simple test for EDU-only updates in notifier_test.go
Cnly Jun 26, 2019
ad4f69a
Remove room ID from m.typing client events
Cnly Jun 26, 2019
d3ee72c
Typing event consumer now supplies real, not bogus position to notifier
Cnly Jun 26, 2019
55219aa
Better logging in typing event consumer
Cnly Jun 26, 2019
2a9dab2
Fix sync position with partial info used as complete one in syncserve…
Cnly Jun 26, 2019
e7cf449
Fix necessary field not set properly in TypingEvent; clean up OutputT…
Cnly Jun 26, 2019
74fd292
Fix latest sync pos used as since pos in requestpool.go
Cnly Jun 26, 2019
aeceacd
Make Notifier.CurrentPosition() respect stream lock
Cnly Jun 26, 2019
6fd1c21
Fix docs for SyncPosition.WithUpdates
Cnly Jun 26, 2019
274db2f
Fix zero value of currPos may be used in OnIncomingSyncRequest
Cnly Jun 26, 2019
d79a6a2
Userstreams should store and use complete pos, not pos update ("pos d…
Cnly Jun 26, 2019
19f1a6f
Remove users from typing list when typing status times out
Cnly Jun 27, 2019
530ed11
Refine docs
Cnly Jun 27, 2019
1be2089
Add newly passing tests to testfile
Cnly Jul 1, 2019
8ed82c0
Merge branch 'master' into syncapi-typing-notifications-635
Cnly Jul 1, 2019
4e84c2f
Docs and code cleanup
Cnly Jul 12, 2019
4a323f1
Merge 'upstream/master' into syncapi-typing-notifications-635
Cnly Jul 12, 2019
24b3e1a
Apply suggestions from code review
Cnly Jul 12, 2019
801c664
Apply suggestions from code review
Cnly Jul 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions syncapi/consumers/clientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)

// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
db *storage.SyncServerDatasource
notifier *sync.Notifier
}

Expand All @@ -38,7 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatabase,
store *storage.SyncServerDatasource,
) *OutputClientDataConsumer {

consumer := common.ContinualConsumer{
Expand Down Expand Up @@ -78,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")

syncStreamPos, err := s.db.UpsertAccountData(
pduPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
Expand All @@ -89,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}

s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos})

return nil
}
12 changes: 6 additions & 6 deletions syncapi/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
db *storage.SyncServerDatasource
notifier *sync.Notifier
query api.RoomserverQueryAPI
}
Expand All @@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatabase,
store *storage.SyncServerDatasource,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {

Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
}

syncStreamPos, err := s.db.WriteEvent(
pduPos, err := s.db.WriteEvent(
ctx,
&ev,
addsStateEvents,
Expand All @@ -144,15 +144,15 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure")
return nil
}
s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
s.notifier.OnNewEvent(&ev, "", nil, types.SyncPosition{PDUPosition: pduPos})

return nil
}

func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
Expand All @@ -161,7 +161,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos)
s.notifier.OnNewEvent(&msg.Event, "", nil, types.SyncPosition{PDUPosition: pduPos})
return nil
}

Expand Down
96 changes: 96 additions & 0 deletions syncapi/consumers/typingserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2019 Alex Chen
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumers

import (
"encoding/json"

"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/api"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)

// OutputTypingEventConsumer consumes events that originated in the typing server.
type OutputTypingEventConsumer struct {
Cnly marked this conversation as resolved.
Show resolved Hide resolved
typingConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource
notifier *sync.Notifier
}

// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the typing server.
func NewOutputTypingEventConsumer(
Cnly marked this conversation as resolved.
Show resolved Hide resolved
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatasource,
) *OutputTypingEventConsumer {

consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}

s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
db: store,
notifier: n,
}

consumer.ProcessMessage = s.onMessage

return s
}

// Start consuming from typing api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition})
})

return s.typingConsumer.Start()
}

func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("typing server output log: message parse failure")
return nil
}

log.WithFields(log.Fields{
"room_id": output.Event.RoomID,
"user_id": output.Event.UserID,
"typing": output.Event.Typing,
}).Debug("received data from typing server")

var typingPos int64
typingEvent := output.Event
if typingEvent.Typing {
typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
} else {
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}

s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos})
return nil
}
2 changes: 1 addition & 1 deletion syncapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const pathPrefixR0 = "/_matrix/client/r0"
// Due to Setup being used to call many other functions, a gocyclo nolint is
// applied:
// nolint: gocyclo
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) {
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()

authData := auth.Data{
Expand Down
4 changes: 2 additions & 2 deletions syncapi/routing/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type stateEventInStateResp struct {
// TODO: Check if the user is in the room. If not, check if the room's history
// is publicly visible. Current behaviour is returning an empty array if the
// user cannot see the room's history.
func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse {
func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)

Expand Down Expand Up @@ -84,7 +84,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r
// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current
// state to see if there is an event with that type and state key, if there
// is then (by default) we return the content, otherwise a 404.
func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string, evType, stateKey string) util.JSONResponse {
func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string, evType, stateKey string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)

Expand Down
4 changes: 1 addition & 3 deletions syncapi/storage/account_data_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"database/sql"

"github.com/matrix-org/dendrite/common"

"github.com/matrix-org/dendrite/syncapi/types"
)

const accountDataSchema = `
Expand Down Expand Up @@ -94,7 +92,7 @@ func (s *accountDataStatements) insertAccountData(
func (s *accountDataStatements) selectAccountDataInRange(
ctx context.Context,
userID string,
oldPos, newPos types.StreamPosition,
oldPos, newPos int64,
) (data map[string][]string, err error) {
data = make(map[string][]string)

Expand Down
11 changes: 5 additions & 6 deletions syncapi/storage/output_room_events_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -109,11 +108,11 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
return
}

// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange(
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
) (map[string]map[string]bool, map[string]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)

Expand Down Expand Up @@ -171,7 +170,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(

eventIDToEvent[ev.EventID()] = streamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
streamPosition: streamPos,
}
}

Expand Down Expand Up @@ -223,7 +222,7 @@ func (s *outputRoomEventsStatements) insertEvent(
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
roomID string, fromPos, toPos int64, limit int,
) ([]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
Expand Down Expand Up @@ -286,7 +285,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {

result = append(result, streamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
streamPosition: streamPos,
transactionID: transactionID,
})
}
Expand Down
Loading