-
Notifications
You must be signed in to change notification settings - Fork 7k
/
shared_channel_notifier.go
144 lines (119 loc) · 4.47 KB
/
shared_channel_notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package app
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/services/sharedchannel"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
)
var sharedChannelEventsForSync model.StringArray = []string{
model.WEBSOCKET_EVENT_POSTED,
model.WEBSOCKET_EVENT_POST_EDITED,
model.WEBSOCKET_EVENT_POST_DELETED,
model.WEBSOCKET_EVENT_REACTION_ADDED,
model.WEBSOCKET_EVENT_REACTION_REMOVED,
}
var sharedChannelEventsForInvitation model.StringArray = []string{
model.WEBSOCKET_EVENT_DIRECT_ADDED,
}
// SharedChannelSyncHandler is called when a websocket event is received by a cluster node.
// Only on the leader node it will notify the sync service to perform necessary updates to the remote for the given
// shared channel.
func (s *Server) SharedChannelSyncHandler(event *model.WebSocketEvent) {
syncService := s.GetSharedChannelSyncService()
if isEligibleForEvents(syncService, event, sharedChannelEventsForSync) {
err := handleContentSync(s, syncService, event)
if err != nil {
mlog.Warn(
err.Error(),
mlog.String("event", event.EventType()),
mlog.String("action", "content_sync"),
)
}
} else if isEligibleForEvents(syncService, event, sharedChannelEventsForInvitation) {
err := handleInvitation(s, syncService, event)
if err != nil {
mlog.Warn(
err.Error(),
mlog.String("event", event.EventType()),
mlog.String("action", "invitation"),
)
}
}
}
func isEligibleForEvents(syncService SharedChannelServiceIFace, event *model.WebSocketEvent, events model.StringArray) bool {
return syncServiceEnabled(syncService) &&
eventHasChannel(event) &&
events.Contains(event.EventType())
}
func eventHasChannel(event *model.WebSocketEvent) bool {
return event.GetBroadcast() != nil &&
event.GetBroadcast().ChannelId != ""
}
func syncServiceEnabled(syncService SharedChannelServiceIFace) bool {
return syncService != nil &&
syncService.Active()
}
func handleContentSync(s *Server, syncService SharedChannelServiceIFace, event *model.WebSocketEvent) error {
channel, err := findChannel(s, event.GetBroadcast().ChannelId)
if err != nil {
return err
}
if channel != nil && channel.IsShared() {
syncService.NotifyChannelChanged(channel.Id)
}
return nil
}
func handleInvitation(s *Server, syncService SharedChannelServiceIFace, event *model.WebSocketEvent) error {
channel, err := findChannel(s, event.GetBroadcast().ChannelId)
if err != nil {
return err
}
if channel == nil || !channel.IsShared() {
return nil
}
creator, err := getUserFromEvent(s, event, "creator_id")
if err != nil {
return err
}
// This is a termination condition, since on the other end when we are processing
// the invite we are re-triggering a model.WEBSOCKET_EVENT_DIRECT_ADDED, which will call this handler.
// When the creator is remote, it means that this is a DM that was not originated from the current server
// and therefore we do not need to do anything.
if creator == nil || creator.IsRemote() {
return nil
}
participant, err := getUserFromEvent(s, event, "teammate_id")
if err != nil {
return err
}
if participant == nil {
return nil
}
rc, err := s.Store.RemoteCluster().Get(*participant.RemoteId)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("couldn't find remote cluster %s, for creating shared channel invitation for a DM", *participant.RemoteId))
}
return syncService.SendChannelInvite(channel, creator.Id, "", rc, sharedchannel.WithDirectParticipantID(creator.Id), sharedchannel.WithDirectParticipantID(participant.Id))
}
func getUserFromEvent(s *Server, event *model.WebSocketEvent, key string) (*model.User, error) {
userID, ok := event.GetData()[key].(string)
if !ok || userID == "" {
return nil, fmt.Errorf("received websocket message that is eligible for sending an invitation but message does not have `%s` present", key)
}
user, err := s.Store.User().Get(context.Background(), userID)
if err != nil {
return nil, errors.Wrap(err, "couldn't find user for creating shared channel invitation for a DM")
}
return user, nil
}
func findChannel(server *Server, channelId string) (*model.Channel, error) {
channel, err := server.Store.Channel().Get(channelId, true)
if err != nil {
return nil, errors.Wrap(err, "received websocket message that is eligible for shared channel sync but channel does not exist")
}
return channel, nil
}