-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.go
218 lines (182 loc) · 3.63 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package mongowire
import (
"github.com/tychoish/birch"
"github.com/tychoish/birch/x/mrpc/model"
)
type Message interface {
Header() MessageHeader
Serialize() []byte
HasResponse() bool
Scope() *OpScope
}
// OP_REPLY
type ReplyMessage struct {
header MessageHeader
Flags int32
CursorId int64
StartingFrom int32
NumberReturned int32
Docs []birch.Document
}
// OP_UPDATE
type updateMessage struct {
header MessageHeader
Reserved int32
Flags int32
Namespace string
Filter *birch.Document
Update *birch.Document
}
// OP_QUERY
type queryMessage struct {
header MessageHeader
Flags int32
Skip int32
NReturn int32
Namespace string
Query *birch.Document
Project *birch.Document
}
// OP_GET_MORE
type getMoreMessage struct {
header MessageHeader
Reserved int32
NReturn int32
CursorId int64
Namespace string
}
// OP_INSERT
type insertMessage struct {
header MessageHeader
Flags int32
Namespace string
Docs []birch.Document
}
// OP_DELETE
type deleteMessage struct {
header MessageHeader
Reserved int32
Flags int32
Namespace string
Filter *birch.Document
}
// OP_KILL_CURSORS
type killCursorsMessage struct {
header MessageHeader
Reserved int32
NumCursors int32
CursorIds []int64
}
// OP_COMMAND
type CommandMessage struct {
header MessageHeader
DB string
CmdName string
CommandArgs *birch.Document
Metadata *birch.Document
InputDocs []birch.Document
// internal bookekeeping
upconverted bool
}
// OP_COMMAND_REPLY
type CommandReplyMessage struct {
header MessageHeader
CommandReply *birch.Document
Metadata *birch.Document
OutputDocs []birch.Document
}
// OP_MSG
type OpMessage struct {
header MessageHeader
serialized []byte
Flags uint32
DB string
Collection string
Operation string
Items []OpMessageSection
Checksum int32
}
func GetModel(msg Message) (any, OpType) {
switch m := msg.(type) {
case *CommandMessage:
return &model.Command{
DB: m.DB,
Command: m.CmdName,
Arguments: m.CommandArgs,
Metadata: m.Metadata,
Inputs: m.InputDocs,
ConvertedFromQuery: m.upconverted,
}, OP_COMMAND
case *OpMessage:
op := &model.Message{
Database: m.DB,
Collection: m.Collection,
Operation: m.Operation,
}
switch m.Flags {
case 0:
op.Checksum = true
case 1:
op.MoreToCome = true
case 3:
op.Checksum = true
op.MoreToCome = true
}
for _, section := range m.Items {
op.Items = append(op.Items, model.SequenceItem{
Identifier: section.Name(),
Documents: section.Documents(),
})
}
return op, OP_MSG
case *deleteMessage:
return &model.Delete{
Namespace: m.Namespace,
Filter: m.Filter,
}, OP_DELETE
case *insertMessage:
return &model.Insert{
Namespace: m.Namespace,
Documents: m.Docs,
}, OP_INSERT
case *queryMessage:
return &model.Query{
Namespace: m.Namespace,
Skip: m.Skip,
NReturn: m.NReturn,
Query: m.Query,
Project: m.Project,
}, OP_QUERY
case *updateMessage:
update := &model.Update{
Namespace: m.Namespace,
Filter: m.Filter,
Update: m.Update,
}
switch m.Flags {
case 1:
update.Upsert = true
case 2:
update.Multi = true
case 3:
update.Upsert = true
update.Multi = true
}
return update, OP_UPDATE
case *ReplyMessage:
reply := &model.Reply{
StartingFrom: m.StartingFrom,
CursorID: m.CursorId,
Contents: m.Docs,
}
switch m.Flags {
case 1:
reply.QueryFailure = true
case 0:
reply.CursorNotFound = true
}
return reply, OP_REPLY
default:
return nil, OpType(0)
}
}