Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: sync parsers acc to latest refactor changes #1720

Merged
merged 8 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module go.keploy.io/server/v2

go 1.21.0

replace github.com/jackc/pgproto3/v2 => github.com/keploy/pgproto3/v2 v2.0.2
replace github.com/jackc/pgproto3/v2 => github.com/keploy/pgproto3/v2 v2.0.5

require (
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs=
github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA=
github.com/keploy/pgproto3/v2 v2.0.2 h1:exp+WlBBWucEmiYsjXezGrhzShdyHWkvQoIXzdj7Vj8=
github.com/keploy/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/keploy/pgproto3/v2 v2.0.5 h1:8spdNKZ+nOnHVxiimDsqulBRN6viPXPghkA7xppnzJ8=
github.com/keploy/pgproto3/v2 v2.0.5/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (g *Generic) MatchType(_ context.Context, _ []byte) bool {
}

func (g *Generic) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

reqBuf, err := util.ReadInitialBuf(ctx, logger, src)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (g *Grpc) MatchType(_ context.Context, reqBuf []byte) bool {
}

func (g *Grpc) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

reqBuf, err := util.ReadInitialBuf(ctx, logger, src)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *HTTP) MatchType(_ context.Context, buf []byte) bool {
}

func (h *HTTP) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := h.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := h.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

h.logger.Debug("Recording the outgoing http call in record mode")

Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *Mongo) MatchType(_ context.Context, buffer []byte) bool {
}

func (m *Mongo) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))
reqBuf, err := util.ReadInitialBuf(ctx, logger, src)
if err != nil {
utils.LogError(logger, err, "failed to read the initial mongo message")
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *MySQL) MatchType(_ context.Context, _ []byte) bool {
}

func (m *MySQL) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

err := encodeMySQL(ctx, logger, src, dst, mocks, opts)
if err != nil {
Expand Down
52 changes: 48 additions & 4 deletions pkg/core/proxy/integrations/postgres/v1/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"time"

"go.keploy.io/server/v2/pkg/core/proxy/integrations"
Expand All @@ -19,8 +20,10 @@ import (
func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn net.Conn, dstCfg *integrations.ConditionalDstCfg, mockDb integrations.MockMemDb, _ models.OutgoingOptions) error {
pgRequests := [][]byte{reqBuf}
errCh := make(chan error, 1)
defer close(errCh)

go func(errCh chan error, pgRequests [][]byte) {
// close should be called from the producer of the channel
defer close(errCh)
for {
// Since protocol packets have to be parsed for checking stream end,
// clientConnection have deadline for read to determine the end of stream.
Expand All @@ -34,12 +37,11 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
for {
buffer, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
if netErr, ok := err.(net.Error); !(ok && netErr.Timeout()) && err != nil {
if netErr, ok := err.(net.Error); !(ok && netErr.Timeout()) {
if err == io.EOF {
logger.Debug("EOF error received from client. Closing conn in postgres !!")
errCh <- err
}
//TODO: why debug log sarthak?
logger.Debug("failed to read the request message in proxy for postgres dependency")
errCh <- err
}
Expand All @@ -55,7 +57,6 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
logger.Debug("the postgres request buffer is empty")
continue
}

matched, pgResponses, err := matchingReadablePG(ctx, logger, pgRequests, mockDb)
if err != nil {
errCh <- fmt.Errorf("error while matching tcs mocks %v", err)
Expand Down Expand Up @@ -97,3 +98,46 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
return err
}
}

type QueryData struct {
PrepIdentifier string `json:"PrepIdentifier" yaml:"PrepIdentifier"`
Query string `json:"Query" yaml:"Query"`
}

type PrepMap map[string][]QueryData

type TestPrepMap map[string][]QueryData

func getRecordPrepStatement(allMocks []*models.Mock) PrepMap {
preparedstatement := make(PrepMap)
for _, v := range allMocks {
if v.Kind != "Postgres" {
continue
}
for _, req := range v.Spec.PostgresRequests {
var querydata []QueryData
psMap := make(map[string]string)
if len(req.PacketTypes) > 0 && req.PacketTypes[0] != "p" && req.Identfier != "StartupRequest" {
p := 0
for _, header := range req.PacketTypes {
if header == "P" {
if strings.Contains(req.Parses[p].Name, "S_") {
psMap[req.Parses[p].Query] = req.Parses[p].Name
querydata = append(querydata, QueryData{PrepIdentifier: req.Parses[p].Name,
Query: req.Parses[p].Query,
})

}
p++
}
}
}
// also append the query data for the prepared statement
if len(querydata) > 0 {
preparedstatement[v.ConnectionID] = append(preparedstatement[v.ConnectionID], querydata...)
}
}

}
return preparedstatement
}
32 changes: 18 additions & 14 deletions pkg/core/proxy/integrations/postgres/v1/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ import (
)

func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

logger.Debug("Inside the encodePostgresOutgoing function")
var pgRequests []models.Backend
Expand Down Expand Up @@ -79,28 +72,35 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie

clientBuffChan := make(chan []byte)
destBuffChan := make(chan []byte)
errChan := make(chan error)
defer close(clientBuffChan)
defer close(destBuffChan)
defer close(errChan)
errChan := make(chan error, 1)

//get the error group from the context
g := ctx.Value(models.ErrGroupKey).(*errgroup.Group)

// read requests from client
g.Go(func() error {
defer utils.Recover(logger)
defer close(clientBuffChan)
pUtil.ReadBuffConn(ctx, logger, clientConn, clientBuffChan, errChan)
return nil
})

// read responses from destination
g.Go(func() error {
defer utils.Recover(logger)
defer close(destBuffChan)
pUtil.ReadBuffConn(ctx, logger, destConn, destBuffChan, errChan)
return nil
})

go func() {
err := g.Wait()
if err != nil {
logger.Info("error group is returning an error", zap.Error(err))
}
close(errChan)
}()

prevChunkWasReq := false
logger.Debug("the iteration for the pg request starts", zap.Any("pgReqs", len(pgRequests)), zap.Any("pgResps", len(pgResponses)))

Expand All @@ -125,6 +125,7 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
ResTimestampMock: resTimestampMock,
Metadata: metadata,
},
ConnectionID: ctx.Value(models.ClientConnectionIDKey).(string),
}
return ctx.Err()
}
Expand Down Expand Up @@ -153,6 +154,7 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
ResTimestampMock: resTimestampMock,
Metadata: metadata,
},
ConnectionID: ctx.Value(models.ClientConnectionIDKey).(string),
}
pgRequests = []models.Backend{}
pgResponses = []models.Frontend{}
Expand Down Expand Up @@ -295,17 +297,19 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
}
if pg.FrontendWrapper.MsgType == 'C' {
pg.FrontendWrapper.CommandComplete = *msg.(*pgproto3.CommandComplete)
// empty the command tag
pg.FrontendWrapper.CommandComplete.CommandTag = []byte{}
pg.FrontendWrapper.CommandCompletes = append(pg.FrontendWrapper.CommandCompletes, pg.FrontendWrapper.CommandComplete)
}
if pg.FrontendWrapper.DataRow.RowValues != nil {
if pg.FrontendWrapper.MsgType == 'D' && pg.FrontendWrapper.DataRow.RowValues != nil {
// Create a new slice for each DataRow
valuesCopy := make([]string, len(pg.FrontendWrapper.DataRow.RowValues))
copy(valuesCopy, pg.FrontendWrapper.DataRow.RowValues)

row := pgproto3.DataRow{
RowValues: valuesCopy, // Use the copy of the values
Values: pg.FrontendWrapper.DataRow.Values,
}
// fmt.Println("row is ", row)
dataRows = append(dataRows, row)
}
}
Expand Down Expand Up @@ -362,7 +366,7 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
if err != nil {
logger.Debug("failed to decode the response message in proxy for postgres dependency", zap.Error(err))
}
if (len(afterEncoded) != len(buffer) && pgMock.PacketTypes[0] != "R") || len(pgMock.DataRows) > 0 {
if len(afterEncoded) != len(buffer) && pgMock.PacketTypes[0] != "R" {
logger.Debug("the length of the encoded buffer is not equal to the length of the original buffer", zap.Any("after_encoded", len(afterEncoded)), zap.Any("buffer", len(buffer)))
pgMock.Payload = bufStr
}
Expand Down
Loading
Loading