Skip to content

Commit

Permalink
docs: adds comments to describe the mongo parser
Browse files Browse the repository at this point in the history
Signed-off-by: re-Tick <jain.ritik.1001@gmail.com>
  • Loading branch information
re-Tick committed Apr 12, 2024
1 parent aaa712e commit 821a130
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 32 deletions.
2 changes: 2 additions & 0 deletions pkg/core/proxy/integrations/mongo/command.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Package mongo provides functionality for working with MongoDB outgoing calls.
package mongo

// This file contains code from the coinbase mongobetween
// https://github.com/coinbase/mongobetween/blob/1034c5a0c3f10cb1dd84af2981bc55ea1d3b45c0/mongo/command.go#L10
import (
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)
Expand Down
25 changes: 24 additions & 1 deletion pkg/core/proxy/integrations/mongo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"go.uber.org/zap"
)

// decodeMongo decodes the mongo wire message from the client connection
// and sends the response back to the client.
func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn net.Conn, dstCfg *integrations.ConditionalDstCfg, mockDb integrations.MockMemDb, opts models.OutgoingOptions) error {
startedDecoding := time.Now()
requestBuffers := [][]byte{reqBuf}
Expand All @@ -29,17 +31,20 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
defer close(errCh)
var readRequestDelay time.Duration
for {
// get the config mocks from the mockDb for responding to heartbeat calls
configMocks, err := mockDb.GetUnFilteredMocks()
if err != nil {
utils.LogError(logger, err, "error while getting config mock")
}
logger.Debug(fmt.Sprintf("the config mocks are: %v", configMocks))

var (
mongoRequests []models.MongoRequest
mongoRequests []models.MongoRequest // stores the request packet
)
// check to read the request buffer from the client connection after the initial packeyt
if string(reqBuf) == "read form client conn" {
started := time.Now()
// reads the first chunk of the mongo request
reqBuf, err = util.ReadBytes(ctx, logger, clientConn)
if err != nil {
if err == io.EOF {
Expand All @@ -60,6 +65,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
return
}
logger.Debug(fmt.Sprintf("the loop starts with the time delay: %v", time.Since(startedDecoding)))
// convert the request buffer to the mongo wire message in the go struct
opReq, requestHeader, mongoRequest, err := Decode(reqBuf, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the client")
Expand All @@ -71,10 +77,13 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
Message: mongoRequest,
ReadDelay: int64(readRequestDelay),
})
// check for the more_to_come flag bit in the mongo request
// header to read the next chunks of the request
if val, ok := mongoRequest.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for {
started := time.Now()
logger.Debug("into the for loop for request stream")
// reads the next chunk of the mongo request
requestBuffer1, err := util.ReadBytes(ctx, logger, clientConn)
if err != nil {
if err == io.EOF {
Expand All @@ -93,6 +102,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
logger.Debug("the response from the server is complete")
break
}
// convert the request buffer to the mongo response wire message in the go struct
_, reqHeader, mongoReq, err := Decode(requestBuffer1, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the mongo client")
Expand All @@ -110,19 +120,24 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
})
}
}
// check for the heartbeat request from client and use the config mocks to respond
if isHeartBeat(logger, opReq, *mongoRequests[0].Header, mongoRequests[0].Message) {
logger.Debug("recieved a heartbeat request for mongo", zap.Any("config mocks", len(configMocks)))
maxMatchScore := 0.0
bestMatchIndex := -1
// loop over the recorded config mocks to match with the incoming request
for configIndex, configMock := range configMocks {
logger.Debug("the config mock is: ", zap.Any("config mock", configMock), zap.Any("actual request", mongoRequests))
// checking the number of chunks for recorded config mocks and the incoming request
if len(configMock.Spec.MongoRequests) == len(mongoRequests) {
for i, req := range configMock.Spec.MongoRequests {
// check the opcode of the incoming request and the recorded config mocks
if len(configMock.Spec.MongoRequests) != len(mongoRequests) || req.Header.Opcode != mongoRequests[i].Header.Opcode {
continue
}
switch req.Header.Opcode {
case wiremessage.OpQuery:
// check the query fields of the incoming request and the recorded config mocks
expectedQuery := req.Message.(*models.MongoOpQuery)
actualQuery := mongoRequests[i].Message.(*models.MongoOpQuery)
if actualQuery.FullCollectionName != expectedQuery.FullCollectionName ||
Expand All @@ -133,6 +148,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
continue
}

// calculate the matching score for query bson dcouments of the incoming request and the recorded config mocks
expected := map[string]interface{}{}
actual := map[string]interface{}{}
err = bson.UnmarshalExtJSON([]byte(expectedQuery.Query), true, &expected)
Expand All @@ -153,13 +169,15 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}

case wiremessage.OpMsg:
// check the OpMsg sections of the incoming request and the recorded config mocks
if req.Message.(*models.MongoOpMessage).FlagBits != mongoRequests[i].Message.(*models.MongoOpMessage).FlagBits {
continue
}
scoreSum := 0.0
if len(req.Message.(*models.MongoOpMessage).Sections) != len(mongoRequests[i].Message.(*models.MongoOpMessage).Sections) {
continue
}
// calculate the matching score for each section of the incoming request and the recorded config mocks
for sectionIndx, section := range req.Message.(*models.MongoOpMessage).Sections {
if len(req.Message.(*models.MongoOpMessage).Sections) == len(mongoRequests[i].Message.(*models.MongoOpMessage).Sections) {
score := compareOpMsgSection(logger, section, mongoRequests[i].Message.(*models.MongoOpMessage).Sections[sectionIndx])
Expand Down Expand Up @@ -190,6 +208,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
errCh <- err
return
}
// write the mongo response to the client connection from the recorded config mocks that most matches the incoming request
for _, mongoResponse := range configMocks[bestMatchIndex].Spec.MongoResponses {
switch mongoResponse.Header.Opcode {
case wiremessage.OpReply:
Expand Down Expand Up @@ -238,6 +257,9 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}
}
} else {
// handle for the non-heartbeat request from the client

// match the incoming request with the recorded tcsMocks and return a mocked response which matches most with incoming request
matched, matchedMock, err := match(ctx, logger, mongoRequests, mockDb)
if err != nil {
errCh <- err
Expand All @@ -258,6 +280,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
responseTo := mongoRequests[0].Header.RequestID
logger.Debug("the mock matched with the current request", zap.Any("mock", matchedMock), zap.Any("responseTo", responseTo))

// write the mongo response to the client connection from the recorded tcsMocks that most matches the incoming request
for _, resp := range matchedMock.Spec.MongoResponses {
respMessage := resp.Message.(*models.MongoOpMessage)
var expectedRequestSections []string
Expand Down
48 changes: 27 additions & 21 deletions pkg/core/proxy/integrations/mongo/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"go.uber.org/zap"
)

// encodeMongo records the outgoing mongo messages of the client connection,
// decodes the wiremessage binary and writes readable string
// to the yaml file.
func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {

errCh := make(chan error, 1)
Expand All @@ -32,11 +35,10 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
for {
var err error
var readRequestDelay time.Duration
// var logStr string = fmt.Sprintln("the conn id: ", clientConnId, " the destination conn id: ", destConnId)

// logStr += fmt.Sprintln("started reading from the client: ", started)

// reads the request packets from the client connection after the first request packet.
// Since, that is already read in the RecordOutgoing function.
if string(reqBuf) == "read form client conn" {
// lstr := ""
started := time.Now()
reqBuf, err = util.ReadBytes(ctx, logger, clientConn)
logger.Debug("reading from the mongo conn", zap.Any("", string(reqBuf)))
Expand All @@ -56,9 +58,10 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}

var (
mongoRequests []models.MongoRequest
mongoResponses []models.MongoResponse
mongoRequests []models.MongoRequest // stores the decoded binary packets for a request
mongoResponses []models.MongoResponse // stores the decoded binary packets for a response
)
// decode the binary packet and store the values in the corresponding struct
opReq, requestHeader, mongoRequest, err := Decode(reqBuf, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the client")
Expand All @@ -71,6 +74,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
Message: mongoRequest,
ReadDelay: int64(readRequestDelay),
})
// forwards the request packet to the destination server
_, err = destConn.Write(reqBuf)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -82,12 +86,11 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
logger.Debug(fmt.Sprintf("the request in the mongo parser after passing to dest: %v", len(reqBuf)))

// logStr += fmt.Sprintln("after writing the request to the destination: ", time.Since(started))
// check for the request packet streaming for the mongo wire message
if val, ok := mongoRequest.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for {
// read the streaming request packets
requestBuffer1, err := util.ReadBytes(ctx, logger, clientConn)

// logStr += tmpStr
if err != nil {
if err == io.EOF {
logger.Debug("recieved request buffer is empty in record mode for mongo request")
Expand All @@ -99,7 +102,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
return nil
}

// write the reply to mongo client
// forward the request packet to the destination server
_, err = destConn.Write(requestBuffer1)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -110,12 +113,12 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
return nil
}

// logStr += fmt.Sprintln("after writting response to the client: ", time.Since(started), "current time is: ", time.Now())

if len(requestBuffer1) == 0 {
logger.Debug("the response from the server is complete")
break
}
// decode the binary packet and return the values in the corresponding structs
// for header and message.
_, reqHeader, mongoReq, err := Decode(requestBuffer1, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the destination server")
Expand All @@ -134,10 +137,9 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
}

// read reply message from the mongo server
// tmpStr := ""
reqTimestampMock := time.Now()
started := time.Now()
// read reply message length from the destination mongo server
responsePckLengthBuffer, err := util.ReadRequiredBytes(ctx, logger, destConn, 4)
if err != nil {
if err == io.EOF {
Expand All @@ -152,16 +154,17 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by

logger.Debug("recieved these pck length packets", zap.Any("packets", responsePckLengthBuffer))

// convert packet length to LittleEndian integer.
pckLength := getPacketLength(responsePckLengthBuffer)
logger.Debug("received pck length ", zap.Any("packet length", pckLength))

// read the entire response packet
responsePckDataBuffer, err := util.ReadRequiredBytes(ctx, logger, destConn, int(pckLength)-4)

logger.Debug("recieved these packets", zap.Any("packets", responsePckDataBuffer))

responseBuffer := append(responsePckLengthBuffer, responsePckDataBuffer...)
logger.Debug("reading from the destination mongo server", zap.Any("", string(responseBuffer)))
// logStr += tmpStr
if err != nil {
if err == io.EOF {
logger.Debug("recieved response buffer is empty in record mode for mongo call")
Expand All @@ -174,7 +177,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
readResponseDelay := time.Since(started)

// write the reply to mongo client
// write the response packet to mongo client
_, err = clientConn.Write(responseBuffer)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -185,8 +188,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
return nil
}

// logStr += fmt.Sprintln("after writting response to the client: ", time.Since(started), "current time is: ", time.Now())

// decode the binary packet of response and return the values in the corresponding structs
_, responseHeader, mongoResponse, err := Decode(responseBuffer, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the destination server")
Expand All @@ -198,14 +200,16 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
Message: mongoResponse,
ReadDelay: int64(readResponseDelay),
})
// check for the response packet streaming for the mongo wire message
if val, ok := mongoResponse.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for i := 0; ; i++ {
// handle the streaming response packets for heartbeat calls
if i == 0 && isHeartBeat(logger, opReq, *mongoRequests[0].Header, mongoRequests[0].Message) {
m.recordMessage(ctx, logger, mongoRequests, mongoResponses, opReq, reqTimestampMock, mocks)
}
started = time.Now()
// read the response packets from the destination server
responseBuffer, err = util.ReadBytes(ctx, logger, destConn)
// logStr += tmpStr
if err != nil {
if err == io.EOF {
logger.Debug("recieved response buffer is empty in record mode for mongo call")
Expand All @@ -232,12 +236,11 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
logger.Debug(fmt.Sprintf("the response in the mongo parser after passing to client: %v", len(responseBuffer)))

// logStr += fmt.Sprintln("after writting response to the client: ", time.Since(started), "current time is: ", time.Now())

if len(responseBuffer) == 0 {
logger.Debug("the response from the server is complete")
break
}
// decode the binary packet for response and return the values in the corresponding structs
_, respHeader, mongoResp, err := Decode(responseBuffer, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the destination server")
Expand All @@ -256,7 +259,9 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
}

// write the response packet to the yaml file
m.recordMessage(ctx, logger, mongoRequests, mongoResponses, opReq, reqTimestampMock, mocks)
// assigns "read form client conn" to the reqBuf to read the next request packet from the client connection
reqBuf = []byte("read form client conn")
}
})
Expand All @@ -272,6 +277,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
}

// getPacketLength returns the length of the packet from the first 4 bytes of the packet.
func getPacketLength(src []byte) (length int32) {
length = int32(src[0]) | int32(src[1])<<8 | int32(src[2])<<16 | int32(src[3])<<24
return length
Expand Down
4 changes: 4 additions & 0 deletions pkg/core/proxy/integrations/mongo/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/zap"
)

// match mathces and returns the best matching mock for the incoming mongo requests.
func match(ctx context.Context, logger *zap.Logger, mongoRequests []models.MongoRequest, mockDb integrations.MockMemDb) (bool, *models.Mock, error) {
for {
select {
Expand All @@ -27,15 +28,18 @@ func match(ctx context.Context, logger *zap.Logger, mongoRequests []models.Mongo
}
maxMatchScore := 0.0
bestMatchIndex := -1
// iterate over the tcsMocks and compare the incoming mongo requests with the recorded mongo requests.
for tcsIndx, tcsMock := range tcsMocks {
if ctx.Err() != nil {
return false, nil, ctx.Err()
}
// check for the number of chunks in the incoming mongo requests and the recorded mongo requests.
if len(tcsMock.Spec.MongoRequests) == len(mongoRequests) {
for i, req := range tcsMock.Spec.MongoRequests {
if ctx.Err() != nil {
return false, nil, ctx.Err()
}
// check for the opcode of the incoming mongo requests and the recorded mongo requests.
if len(tcsMock.Spec.MongoRequests) != len(mongoRequests) || req.Header.Opcode != mongoRequests[i].Header.Opcode {
logger.Debug("the recieved request is not of same type with the tcmocks", zap.Any("at index", tcsIndx))
continue
Expand Down
Loading

0 comments on commit 821a130

Please sign in to comment.