Skip to content

Commit

Permalink
Add typing notifications to /sync responses - fixes #635
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chen <minecnly@gmail.com>
  • Loading branch information
Cnly committed Jun 25, 2019
1 parent ce189a7 commit b896fdc
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 165 deletions.
9 changes: 5 additions & 4 deletions syncapi/consumers/clientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)

// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
db *storage.SyncServerDatasource
notifier *sync.Notifier
}

Expand All @@ -38,7 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatabase,
store *storage.SyncServerDatasource,
) *OutputClientDataConsumer {

consumer := common.ContinualConsumer{
Expand Down Expand Up @@ -78,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")

syncStreamPos, err := s.db.UpsertAccountData(
pduPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
Expand All @@ -89,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}

s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos})

return nil
}
12 changes: 6 additions & 6 deletions syncapi/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
db *storage.SyncServerDatasource
notifier *sync.Notifier
query api.RoomserverQueryAPI
}
Expand All @@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatabase,
store *storage.SyncServerDatasource,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {

Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
}

syncStreamPos, err := s.db.WriteEvent(
pduPos, err := s.db.WriteEvent(
ctx,
&ev,
addsStateEvents,
Expand All @@ -144,15 +144,15 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure")
return nil
}
s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
s.notifier.OnNewEvent(&ev, "", nil, types.SyncPosition{PDUPosition: pduPos})

return nil
}

func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
Expand All @@ -161,7 +161,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos)
s.notifier.OnNewEvent(&msg.Event, "", nil, types.SyncPosition{PDUPosition: pduPos})
return nil
}

Expand Down
84 changes: 84 additions & 0 deletions syncapi/consumers/typingserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019 Alex Chen
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumers

import (
"encoding/json"

"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/api"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)

type OutputTypingEventConsumer struct {
typingConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource
notifier *sync.Notifier
}

func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatasource,
) *OutputTypingEventConsumer {

consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
db: store,
notifier: n,
}
consumer.ProcessMessage = s.onMessage

return s
}

// Start consuming from typing api
func (s *OutputTypingEventConsumer) Start() error {
return s.typingConsumer.Start()
}

func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("typing server output log: message parse failure")
return nil
}

log.WithFields(log.Fields{
"room_id": output.Event.RoomID,
}).Info("received data from typing server")

typingEvent := output.Event
if typingEvent.Typing {
s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
} else {
s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}

s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: 1})
return nil
}
2 changes: 1 addition & 1 deletion syncapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
const pathPrefixR0 = "/_matrix/client/r0"

// Setup configures the given mux with sync-server listeners
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) {
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()

authData := auth.Data{
Expand Down
4 changes: 2 additions & 2 deletions syncapi/routing/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type stateEventInStateResp struct {
// TODO: Check if the user is in the room. If not, check if the room's history
// is publicly visible. Current behaviour is returning an empty array if the
// user cannot see the room's history.
func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse {
func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)

Expand Down Expand Up @@ -84,7 +84,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r
// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current
// state to see if there is an event with that type and state key, if there
// is then (by default) we return the content, otherwise a 404.
func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string, evType, stateKey string) util.JSONResponse {
func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string, evType, stateKey string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)

Expand Down
4 changes: 1 addition & 3 deletions syncapi/storage/account_data_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"database/sql"

"github.com/matrix-org/dendrite/common"

"github.com/matrix-org/dendrite/syncapi/types"
)

const accountDataSchema = `
Expand Down Expand Up @@ -94,7 +92,7 @@ func (s *accountDataStatements) insertAccountData(
func (s *accountDataStatements) selectAccountDataInRange(
ctx context.Context,
userID string,
oldPos, newPos types.StreamPosition,
oldPos, newPos int64,
) (data map[string][]string, err error) {
data = make(map[string][]string)

Expand Down
9 changes: 4 additions & 5 deletions syncapi/storage/output_room_events_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -113,7 +112,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange(
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
) (map[string]map[string]bool, map[string]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)

Expand Down Expand Up @@ -171,7 +170,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(

eventIDToEvent[ev.EventID()] = streamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
streamPosition: streamPos,
}
}

Expand Down Expand Up @@ -223,7 +222,7 @@ func (s *outputRoomEventsStatements) insertEvent(
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
roomID string, fromPos, toPos int64, limit int,
) ([]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
Expand Down Expand Up @@ -286,7 +285,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {

result = append(result, streamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
streamPosition: streamPos,
transactionID: transactionID,
})
}
Expand Down
Loading

0 comments on commit b896fdc

Please sign in to comment.