/
mgo_message.go
104 lines (94 loc) · 2.79 KB
/
mgo_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package infra
import (
"context"
"time"
"github.com/wxdao/chatroom/internal/pkg/domain"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
// MgoMessageRepository is an implementation of domain.MessageRepository using mongodb
type MgoMessageRepository struct {
messageColl *mongo.Collection
}
// NewMgoMessageRepository creates a new MgoMessageRepository
func NewMgoMessageRepository(messageColl *mongo.Collection) *MgoMessageRepository {
return &MgoMessageRepository{messageColl: messageColl}
}
// MessageByID implements MessageRepository.MessageByID
func (repo *MgoMessageRepository) MessageByID(id string) (*domain.Message, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
filter := bson.M{"id": id}
var model mgoMessageModel
err := repo.messageColl.FindOne(ctx, filter).Decode(&model)
if err == mongo.ErrNoDocuments {
return nil, nil
}
if err != nil {
return nil, err
}
return model.message(), nil
}
// MessageInRange implements MessageRepository.MessageInRange
func (repo *MgoMessageRepository) MessageInRange(chatroomID string, startID string, endID string) ([]*domain.Message, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
filter := bson.M{
"chatroomID": chatroomID,
"id": bson.M{"$lte": endID, "$gte": startID},
}
cur, err := repo.messageColl.Find(ctx, filter)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
var messages []*domain.Message
for cur.Next(ctx) {
var model mgoMessageModel
err := cur.Decode(&model)
if err != nil {
return nil, err
}
messages = append(messages, model.message())
}
if err := cur.Err(); err != nil {
return nil, err
}
return messages, nil
}
// SaveMessage implements MessageRepository.SaveMessage
func (repo *MgoMessageRepository) SaveMessage(message *domain.Message) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
model := newMgoMessageModel(message)
_, err := repo.messageColl.InsertOne(ctx, model)
if err != nil {
return err
}
return nil
}
type mgoMessageModel struct {
ID string `bson:"id"`
ChatroomID string `bson:"chatroomID"`
Username string `bson:"username"`
Content string `bson:"content"`
CreateTime int64 `bson:"CreateTime"`
}
func newMgoMessageModel(message *domain.Message) *mgoMessageModel {
var m mgoMessageModel
m.ID = message.Xid
m.ChatroomID = message.XchatroomID
m.Username = message.Xusername
m.Content = message.Xcontent
m.CreateTime = message.XcreateTime.UnixNano()
return &m
}
func (m mgoMessageModel) message() *domain.Message {
var dm domain.Message
dm.Xid = m.ID
dm.XchatroomID = m.ChatroomID
dm.Xusername = m.Username
dm.Xcontent = m.Content
dm.XcreateTime = time.Unix(0, m.CreateTime)
return &dm
}