Skip to content

Commit

Permalink
refactor:parser for redis
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamsouravjha committed May 26, 2024
1 parent 005ed23 commit cf45392
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 102 deletions.
36 changes: 15 additions & 21 deletions pkg/core/proxy/integrations/redis/decode.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// Package redis provides functionality for decoding redis dependencies.
package redis

Check warning on line 1 in pkg/core/proxy/integrations/redis/decode.go

View workflow job for this annotation

GitHub Actions / lint

package-comments: should have a package comment (revive)

import (
Expand All @@ -19,29 +18,25 @@ func decodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
redisRequests := [][]byte{reqBuf}
logger.Debug("Into the redis parser in test mode")
errCh := make(chan error, 1)

go func(errCh chan error, redisRequests [][]byte) {
defer pUtil.Recover(logger, clientConn, nil)
defer close(errCh)
for {
// Since protocol packets have to be parsed for checking stream end,
// clientConnection have deadline for read to determine the end of stream.
err := clientConn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
if err != nil {
utils.LogError(logger, err, "failed to set the read deadline for the client conn")
return
}

// To read the stream of request packets from the client
// Read the stream of request packets from the client
for {
clientConn.SetReadDeadline(time.Now().Add(1 * time.Second))

if len(redisRequests) > 0 {
break
}
clientConn.SetReadDeadline(time.Now().Add(10 * time.Second))
buffer, err := pUtil.ReadBytes(ctx, logger, clientConn)
if netErr, ok := err.(net.Error); !(ok && netErr.Timeout()) && err != nil && err.Error() != "EOF" {
utils.LogError(logger, err, "failed to read the request message in proxy for redis dependency")
return
}
if netErr, ok := err.(net.Error); (ok && netErr.Timeout()) || (err != nil && err.Error() == "EOF") {
logger.Debug("the timeout for the client read in redis or EOF")
logger.Debug("timeout for client read in redis or EOF")
break
}
if len(buffer) > 0 {
Expand All @@ -51,12 +46,11 @@ func decodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}

if len(redisRequests) == 0 {
logger.Debug("the redis request buffer is empty")
logger.Debug("redis request buffer is empty")
continue
}

// bestMatchedIndx := 0
// fuzzy match gives the index for the best matched redis mock
// Fuzzy match to get the best matched redis mock
matched, redisResponses, err := fuzzyMatch(ctx, redisRequests, mockDb)
if err != nil {
utils.LogError(logger, err, "error while matching redis mocks")
Expand All @@ -69,9 +63,9 @@ func decodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
return
}

logger.Debug("the redisRequests before pass through are", zap.Any("length", len(redisRequests)))
for _, genReq := range redisRequests {
logger.Debug("the redisRequests are:", zap.Any("h", string(genReq)))
logger.Debug("redisRequests before pass through:", zap.Any("length", len(redisRequests)))
for _, redReq := range redisRequests {
logger.Debug("redisRequests:", zap.Any("h", string(redReq)))
}

reqBuffer, err := pUtil.PassThrough(ctx, logger, clientConn, dstCfg, redisRequests)
Expand All @@ -81,11 +75,11 @@ func decodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}

redisRequests = [][]byte{}
logger.Debug("the request buffer after pass through in redis", zap.Any("buffer", string(reqBuffer)))
logger.Debug("request buffer after pass through in redis:", zap.Any("buffer", string(reqBuffer)))
if len(reqBuffer) > 0 {
redisRequests = [][]byte{reqBuffer}
}
logger.Debug("the length of redisRequests after passThrough ", zap.Any("length", len(redisRequests)))
logger.Debug("length of redisRequests after passThrough:", zap.Any("length", len(redisRequests)))
continue
}
for _, redisResponse := range redisResponses {
Expand All @@ -109,7 +103,7 @@ func decodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC

// Clear the redisRequests buffer for the next dependency call
redisRequests = [][]byte{}
logger.Debug("the redisRequests after the iteration", zap.Any("length", len(redisRequests)))
logger.Debug("redisRequests after the iteration:", zap.Any("length", len(redisRequests)))
}
}(errCh, redisRequests)

Expand Down
156 changes: 80 additions & 76 deletions pkg/core/proxy/integrations/redis/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package redis

import (
"context"
"encoding/base64"
"errors"
"io"
"net"
"time"

"golang.org/x/sync/errgroup"

"go.keploy.io/server/v2/pkg/core/proxy/integrations/util"
pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
Expand All @@ -20,13 +18,10 @@ import (
func encodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {

var redisRequests []models.Payload
var redisResponses []models.Payload

bufStr := string(reqBuf)
dataType := models.String
if !util.IsASCII(string(reqBuf)) {
bufStr = util.EncodeBase64(reqBuf)
dataType = "binary"
}

if bufStr != "" {
redisRequests = append(redisRequests, models.Payload{
Expand All @@ -41,108 +36,117 @@ func encodeRedis(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}
_, err := destConn.Write(reqBuf)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
utils.LogError(logger, err, "failed to write request message to the destination server")
return err
}
var redisResponses []models.Payload

clientBuffChan := make(chan []byte)
destBuffChan := make(chan []byte)
errChan := make(chan error)
//TODO: where to close the error channel since it is used in both the go routines
//close(errChan)
errCh := make(chan error, 1)

//get the error group from the context
g, ok := ctx.Value(models.ErrGroupKey).(*errgroup.Group)
if !ok {
return errors.New("failed to get the error group from the context")
}

// read requests from client
g.Go(func() error {
defer pUtil.Recover(logger, clientConn, nil)
defer close(clientBuffChan)
pUtil.ReadBuffConn(ctx, logger, clientConn, clientBuffChan, errChan)
return nil
})
// read responses from destination
g.Go(func() error {
defer pUtil.Recover(logger, nil, destConn)
defer close(destBuffChan)
pUtil.ReadBuffConn(ctx, logger, destConn, destBuffChan, errChan)
return nil
})
reqTimestampMock := time.Now()

prevChunkWasReq := false
var reqTimestampMock = time.Now()
var resTimestampMock time.Time

logger.Debug("the iteration for the redis request starts", zap.Any("redisReqs", len(redisRequests)), zap.Any("redisResps", len(redisResponses)))
for {
select {
case <-ctx.Done():
if !prevChunkWasReq && len(redisRequests) > 0 && len(redisResponses) > 0 {
saveMock(redisRequests, redisResponses, reqTimestampMock, resTimestampMock, mocks)
return ctx.Err()
}
case buffer, ok := <-clientBuffChan:
if !ok {
return nil // Channel closed, end the loop
}
// Write the request message to the destination
if _, err := destConn.Write(buffer); err != nil {
if ctx.Err() != nil {
return ctx.Err()
// Read and process responses from the destination server
g.Go(func() error {
defer pUtil.Recover(logger, clientConn, destConn)
defer close(errCh)

for {
// Read the response from the destination server
resp, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
if err == io.EOF {
logger.Debug("Response complete, exiting the loop.")
// if there is any buffer left before EOF, we must send it to the client and save this as mock
if len(resp) != 0 {
resTimestampMock := time.Now()
_, err = clientConn.Write(resp)
if err != nil {
utils.LogError(logger, err, "failed to write response message to the client")
errCh <- err
return nil
}
processBuffer(resp, models.FromServer, &redisResponses)
saveMock(redisRequests, redisResponses, reqTimestampMock, resTimestampMock, mocks)
}
break
}
utils.LogError(logger, err, "failed to write request message to the destination server")
return err
utils.LogError(logger, err, "failed to read the response message from the destination server")
errCh <- err
return nil
}

processBuffer(buffer, models.FromClient, &redisRequests)
prevChunkWasReq = true
case buffer, ok := <-destBuffChan:
if !ok {
return nil // Channel closed, end the loop
}
if prevChunkWasReq {
// Store the request timestamp
reqTimestampMock = time.Now()
}
// Write the response message to the client
if _, err := clientConn.Write(buffer); err != nil {
_, err = clientConn.Write(resp)
if err != nil {
utils.LogError(logger, err, "failed to write response message to the client")
return err
errCh <- err
return nil
}

processBuffer(buffer, models.FromServer, &redisResponses)
resTimestampMock = time.Now()
resTimestampMock := time.Now()
processBuffer(resp, models.FromServer, &redisResponses)

if prevChunkWasReq && len(redisRequests) > 0 && len(redisResponses) > 0 {
// Save the mock with both request and response
if len(redisRequests) > 0 && len(redisResponses) > 0 {
saveMock(redisRequests, redisResponses, reqTimestampMock, resTimestampMock, mocks)
redisRequests = []models.Payload{}
redisResponses = []models.Payload{}
}

prevChunkWasReq = false
case err := <-errChan:
if err == io.EOF {
// Read the next request from the client
reqBuf, err = pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
if err != io.EOF {
utils.LogError(logger, err, "failed to read the request message from the client")
errCh <- err
return nil
}
errCh <- err
return nil
}

bufStr := string(reqBuf)
dataType := models.String

if bufStr != "" {
redisRequests = append(redisRequests, models.Payload{
Origin: models.FromClient,
Message: []models.OutputBinary{
{
Type: dataType,
Data: bufStr,
},
},
})
}
_, err = destConn.Write(reqBuf)
if err != nil {
utils.LogError(logger, err, "failed to write request message to the destination server")
errCh <- err
return nil
}
return err
reqTimestampMock = time.Now()
}
return nil
})

select {
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
}

func processBuffer(buffer []byte, origin models.OriginType, payloads *[]models.Payload) {
bufStr := string(buffer)
buffDataType := models.String
if !util.IsASCII(bufStr) {
bufStr = base64.StdEncoding.EncodeToString(buffer)
buffDataType = "binary"
}

if bufStr != "" {
*payloads = append(*payloads, models.Payload{
Expand Down
15 changes: 10 additions & 5 deletions pkg/core/proxy/integrations/redis/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package redis

import (
"context"
"encoding/base64"
"fmt"
"math"

Expand Down Expand Up @@ -33,6 +32,9 @@ func fuzzyMatch(ctx context.Context, reqBuff [][]byte, mockDb integrations.MockM
var unfilteredMocks []*models.Mock

for _, mock := range mocks {
if mock.Kind != "Redis" {
continue
}
if mock.TestModeInfo.IsFiltered {
filteredMocks = append(filteredMocks, mock)
} else {
Expand Down Expand Up @@ -84,6 +86,7 @@ func fuzzyMatch(ctx context.Context, reqBuff [][]byte, mockDb integrations.MockM
}
return true, responseMock, nil
}

return false, nil, nil
}
}
Expand All @@ -96,16 +99,18 @@ func findBinaryMatch(tcsMocks []*models.Mock, reqBuffs [][]byte, mxSim float64)
for idx, mock := range tcsMocks {
if len(mock.Spec.RedisRequests) == len(reqBuffs) {
for requestIndex, reqBuff := range reqBuffs {
_ = base64.StdEncoding.EncodeToString(reqBuff)
encoded, _ := util.DecodeBase64(mock.Spec.RedisRequests[requestIndex].Message[0].Data)

similarity := fuzzyCheck(encoded, reqBuff)
mockReq, err := util.DecodeBase64(mock.Spec.RedisRequests[requestIndex].Message[0].Data)
if err != nil {
mockReq = []byte(mock.Spec.RedisRequests[requestIndex].Message[0].Data)
}

similarity := fuzzyCheck(mockReq, reqBuff)
if mxSim < similarity {
mxSim = similarity
mxIdx = idx
}
}

}
}
return mxIdx
Expand Down

0 comments on commit cf45392

Please sign in to comment.