diff --git a/pkg/graph/messages.go b/pkg/graph/messages.go index 154f4d2b..7bdda25b 100644 --- a/pkg/graph/messages.go +++ b/pkg/graph/messages.go @@ -12,7 +12,7 @@ import ( type MessagesController struct { log *zap.Logger - sync.Mutex + m *sync.Mutex db driver.Database col driver.Collection @@ -30,6 +30,7 @@ func NewMessagesController(logger *zap.Logger, db driver.Database) *MessagesCont log: log, db: db, col: col, + m: &sync.Mutex{}, } } @@ -98,8 +99,8 @@ func (c *MessagesController) Read(ctx context.Context, msg *cc.Message, reader s log := c.log.Named("Read") log.Debug("Req received") - c.Lock() - defer c.Unlock() + c.m.Lock() + defer c.m.Unlock() cur, err := c.db.Query(ctx, readMessageQuery, map[string]interface{}{ "message": driver.NewDocumentID(MESSAGES_COLLECTION, msg.GetUuid()), "reader": reader, diff --git a/pkg/stream/stream.go b/pkg/stream/stream.go index 713f850c..b460a68e 100644 --- a/pkg/stream/stream.go +++ b/pkg/stream/stream.go @@ -117,8 +117,21 @@ start_stream: newMessage, err := s.msgCtrl.Read(ctx, message, requestor) if err != nil { log.Error("Failed to update readers", zap.Error(err)) + return err } event.Item = &cc.Event_Msg{Msg: newMessage} + + readEvent := &cc.Event{ + Type: cc.EventType_CHAT_READ, + Item: &cc.Event_Msg{ + Msg: newMessage, + }, + } + err = serverStream.Send(readEvent) + if err != nil { + log.Error("Failed to send read event", zap.Error(err)) + return err + } } err = serverStream.Send(event)