forked from heroiclabs/nakama
/
core_message.go
139 lines (121 loc) · 5.47 KB
/
core_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package server
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"time"
"github.com/gofrs/uuid"
"github.com/jackc/pgtype"
"github.com/talktonpc/nakama-common/api"
"github.com/talktonpc/nakama-common/rtapi"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
var errInvalidMessageId = errors.New("Invalid message identifier")
var errInvalidMessageContent = errors.New("Message content must be a valid JSON object")
var errMessageNotFound = errors.New("Could not find message to update in channel history")
var errMessagePersist = errors.New("Error persisting channel message")
func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
if maybeJSON := []byte(content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
return nil, errInvalidMessageContent
}
ts := time.Now().Unix()
message := &api.ChannelMessage{
ChannelId: channelId,
MessageId: uuid.Must(uuid.NewV4()).String(),
Code: &wrapperspb.Int32Value{Value: ChannelMessageTypeChat},
SenderId: senderId,
Username: senderUsername,
Content: content,
CreateTime: ×tamppb.Timestamp{Seconds: ts},
UpdateTime: ×tamppb.Timestamp{Seconds: ts},
Persistent: &wrapperspb.BoolValue{Value: persist},
}
ack := &rtapi.ChannelMessageAck{
ChannelId: message.ChannelId,
MessageId: message.MessageId,
Code: message.Code,
Username: message.Username,
CreateTime: message.CreateTime,
UpdateTime: message.UpdateTime,
Persistent: message.Persistent,
}
switch channelStream.Mode {
case StreamModeChannel:
message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
case StreamModeGroup:
message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
case StreamModeDM:
message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
}
if persist {
query := `INSERT INTO message (id, code, sender_id, username, stream_mode, stream_subject, stream_descriptor, stream_label, content, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, $10, $10)`
_, err := db.ExecContext(ctx, query, message.MessageId, message.Code.Value, message.SenderId, message.Username, channelStream.Mode, channelStream.Subject, channelStream.Subcontext, channelStream.Label, message.Content, time.Unix(message.CreateTime.Seconds, 0).UTC())
if err != nil {
logger.Error("Error persisting channel message", zap.Error(err))
return nil, errMessagePersist
}
}
router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)
return ack, nil
}
func ChannelMessageUpdate(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, messageId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
if _, err := uuid.FromString(messageId); err != nil {
return nil, errInvalidMessageId
}
if maybeJSON := []byte(content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
return nil, errInvalidMessageContent
}
ts := time.Now().Unix()
message := &api.ChannelMessage{
ChannelId: channelId,
MessageId: messageId,
Code: &wrapperspb.Int32Value{Value: ChannelMessageTypeChatUpdate},
SenderId: senderId,
Username: senderUsername,
Content: content,
CreateTime: ×tamppb.Timestamp{Seconds: ts},
UpdateTime: ×tamppb.Timestamp{Seconds: ts},
Persistent: &wrapperspb.BoolValue{Value: persist},
}
ack := &rtapi.ChannelMessageAck{
ChannelId: message.ChannelId,
MessageId: message.MessageId,
Code: message.Code,
Username: message.Username,
CreateTime: message.CreateTime,
UpdateTime: message.UpdateTime,
Persistent: message.Persistent,
}
switch channelStream.Mode {
case StreamModeChannel:
message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
case StreamModeGroup:
message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
case StreamModeDM:
message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
}
if persist {
// First find and update the referenced message.
var dbCreateTime pgtype.Timestamptz
query := "UPDATE message SET update_time = $5, username = $4, content = $3 WHERE id = $1 AND sender_id = $2 RETURNING create_time"
err := db.QueryRowContext(ctx, query, messageId, message.SenderId, message.Content, message.Username, time.Unix(message.UpdateTime.Seconds, 0).UTC()).Scan(&dbCreateTime)
if err != nil {
if err == sql.ErrNoRows {
return nil, errMessageNotFound
}
logger.Error("Error persisting channel message update", zap.Error(err))
return nil, errMessagePersist
}
// Replace the message create time with the real one from DB.
message.CreateTime = ×tamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
}
router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)
return ack, nil
}