-
Notifications
You must be signed in to change notification settings - Fork 893
/
sent_message.go
196 lines (166 loc) · 5.9 KB
/
sent_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mtest
import (
"errors"
"fmt"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
)
// SentMessage represents a message sent by the driver to the server.
type SentMessage struct {
RequestID int32
RawMessage wiremessage.WireMessage
Command bsoncore.Document
// The $readPreference document. This is separated into its own field even though it's included in the larger
// command document in both OP_QUERY and OP_MSG because OP_QUERY separates the command into a $query sub-document
// if there is a read preference. To unify OP_QUERY and OP_MSG, we pull this out into a separate field and set
// the Command field to the $query sub-document.
ReadPreference bsoncore.Document
// The documents sent for an insert, update, or delete command. This is separated into its own field because it's
// sent as part of the command document in OP_QUERY and as a document sequence outside the command document in
// OP_MSG.
DocumentSequence *bsoncore.DocumentSequence
}
type sentMsgParseFn func([]byte) (*SentMessage, error)
func getSentMessageParser(opcode wiremessage.OpCode) (sentMsgParseFn, bool) {
switch opcode {
case wiremessage.OpQuery:
return parseOpQuery, true
case wiremessage.OpMsg:
return parseSentOpMsg, true
case wiremessage.OpCompressed:
return parseSentOpCompressed, true
default:
return nil, false
}
}
func parseSentMessage(wm []byte) (*SentMessage, error) {
// Re-assign the wire message to "remaining" so "wm" continues to point to the entire message after parsing.
_, requestID, _, opcode, remaining, ok := wiremessage.ReadHeader(wm)
if !ok {
return nil, errors.New("failed to read wiremessage header")
}
parseFn, ok := getSentMessageParser(opcode)
if !ok {
return nil, fmt.Errorf("unknown opcode: %v", opcode)
}
sent, err := parseFn(remaining)
if err != nil {
return nil, fmt.Errorf("error parsing wiremessage with opcode %s: %v", opcode, err)
}
sent.RequestID = requestID
sent.RawMessage = wm
return sent, nil
}
func parseOpQuery(wm []byte) (*SentMessage, error) {
var ok bool
if _, wm, ok = wiremessage.ReadQueryFlags(wm); !ok {
return nil, errors.New("failed to read query flags")
}
if _, wm, ok = wiremessage.ReadQueryFullCollectionName(wm); !ok {
return nil, errors.New("failed to read full collection name")
}
if _, wm, ok = wiremessage.ReadQueryNumberToSkip(wm); !ok {
return nil, errors.New("failed to read number to skip")
}
if _, wm, ok = wiremessage.ReadQueryNumberToReturn(wm); !ok {
return nil, errors.New("failed to read number to return")
}
query, wm, ok := wiremessage.ReadQueryQuery(wm)
if !ok {
return nil, errors.New("failed to read query")
}
// If there is no read preference document, the command document is query.
// Otherwise, query is in the format {$query: <command document>, $readPreference: <read preference document>}.
commandDoc := query
var rpDoc bsoncore.Document
dollarQueryVal, err := query.LookupErr("$query")
if err == nil {
commandDoc = dollarQueryVal.Document()
rpVal, err := query.LookupErr("$readPreference")
if err != nil {
return nil, fmt.Errorf("query %s contains $query but not $readPreference fields", query)
}
rpDoc = rpVal.Document()
}
// For OP_QUERY, inserts, updates, and deletes are sent as a BSON array of documents inside the main command
// document. Pull these sequences out into an ArrayStyle DocumentSequence.
var docSequence *bsoncore.DocumentSequence
cmdElems, _ := commandDoc.Elements()
for _, elem := range cmdElems {
switch elem.Key() {
case "documents", "updates", "deletes":
docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.ArrayStyle,
Data: elem.Value().Array(),
}
break
}
if docSequence != nil {
// There can only be one of these arrays in a well-formed command, so we exit the loop once one is found.
break
}
}
sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
}
return sm, nil
}
func parseSentOpMsg(wm []byte) (*SentMessage, error) {
var ok bool
var err error
if _, wm, ok = wiremessage.ReadMsgFlags(wm); !ok {
return nil, errors.New("failed to read flags")
}
if wm, err = assertMsgSectionType(wm, wiremessage.SingleDocument); err != nil {
return nil, fmt.Errorf("error verifying section type for command document: %v", err)
}
var commandDoc bsoncore.Document
commandDoc, wm, ok = wiremessage.ReadMsgSectionSingleDocument(wm)
if !ok {
return nil, errors.New("failed to read command document")
}
var rpDoc bsoncore.Document
if rpVal, err := commandDoc.LookupErr("$readPreference"); err == nil {
rpDoc = rpVal.Document()
}
var docSequence *bsoncore.DocumentSequence
if len(wm) != 0 {
// If there are bytes remaining in the wire message, they must correspond to a DocumentSequence section.
if wm, err = assertMsgSectionType(wm, wiremessage.DocumentSequence); err != nil {
return nil, fmt.Errorf("error verifying section type for document sequence: %v", err)
}
var data []byte
_, data, wm, ok = wiremessage.ReadMsgSectionRawDocumentSequence(wm)
if !ok {
return nil, errors.New("failed to read document sequence")
}
docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.SequenceStyle,
Data: data,
}
}
sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
}
return sm, nil
}
func parseSentOpCompressed(wm []byte) (*SentMessage, error) {
originalOpcode, wm, err := parseOpCompressed(wm)
if err != nil {
return nil, err
}
parser, ok := getSentMessageParser(originalOpcode)
if !ok {
return nil, fmt.Errorf("unknown original opcode %v", originalOpcode)
}
return parser(wm)
}