From 43920b9eaf880276d284e620a0003f2e3a23a64f Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 30 Oct 2023 12:35:18 +0100 Subject: [PATCH 1/6] Example use of sdpws client --- cmd/request.go | 418 +++++++++++++++++++++++++------------------------ 1 file changed, 213 insertions(+), 205 deletions(-) diff --git a/cmd/request.go b/cmd/request.go index 9f6dcf6e..bbb51ca4 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -3,7 +3,6 @@ package cmd import ( "context" "encoding/json" - "errors" "fmt" "os" "os/signal" @@ -14,16 +13,14 @@ import ( "github.com/google/uuid" "github.com/overmindtech/ovm-cli/tracing" "github.com/overmindtech/sdp-go" + "github.com/overmindtech/sdp-go/sdpws" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/timestamppb" - "nhooyr.io/websocket" - "nhooyr.io/websocket/wspb" ) // requestCmd represents the start command @@ -108,21 +105,16 @@ func Request(ctx context.Context, ready chan bool) int { return 1 } - options := &websocket.DialOptions{ - HTTPClient: NewAuthenticatedClient(ctx, otelhttp.DefaultClient), - } - - // nolint: bodyclose // nhooyr.io/websocket reads the body internally - c, _, err := websocket.Dial(ctx, gatewayUrl, options) + c, err := sdpws.Dial(ctx, gatewayUrl, + NewAuthenticatedClient(ctx, otelhttp.DefaultClient), + &sdpws.LoggingGatewayMessageHandler{Level: log.InfoLevel}, + ) if err != nil { lf["gateway-url"] = gatewayUrl log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to overmind API") return 1 } - defer c.Close(websocket.StatusGoingAway, "") - - // the default, 32kB is too small for cert bundles and rds-db-cluster-parameter-groups - c.SetReadLimit(2 * 1024 * 1024) + defer c.Close(ctx) // Log the request in JSON b, err := json.MarshalIndent(req, "", " ") @@ -132,186 +124,193 @@ func Request(ctx context.Context, ready chan bool) int { } log.WithContext(ctx).WithFields(lf).Infof("Request:\n%v", string(b)) - - err = wspb.Write(ctx, c, req) + q, err := createQuery() if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to send request") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to create query") return 1 } - - queriesSent := true - - responses := make(chan *sdp.GatewayResponse) - - // Start a goroutine that reads responses - go func() { - for { - res := new(sdp.GatewayResponse) - - err = wspb.Read(ctx, c, res) - - if err != nil { - var e websocket.CloseError - if errors.As(err, &e) { - log.WithContext(ctx).WithFields(log.Fields{ - "code": e.Code.String(), - "reason": e.Reason, - }).Info("Websocket closing") - return - } - log.WithContext(ctx).WithFields(log.Fields{ - "error": err, - }).Error("Failed to read response") - return - } - - responses <- res - } - }() - - activeQueries := make(map[uuid.UUID]bool) - - var numItems, numEdges int - - // Read the responses -responses: - for { - select { - case <-ctx.Done(): - log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") - return 1 - - case resp := <-responses: - switch resp.ResponseType.(type) { - case *sdp.GatewayResponse_Status: - status := resp.GetStatus() - statusFields := log.Fields{ - "summary": status.Summary, - "responders": status.Summary.Responders, - "queriesSent": queriesSent, - "postProcessingComplete": status.PostProcessingComplete, - "itemsReceived": numItems, - "edgesReceived": numEdges, - } - - if status.Done() { - // fall through from all "final" query states, check if there's still queries in progress; - // only break from the loop if all queries have already been sent - // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources - allDone := allDone(ctx, activeQueries, lf) - statusFields["allDone"] = allDone - if allDone && queriesSent { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done") - break responses - } else { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries") - } - } else { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders") - } - - case *sdp.GatewayResponse_QueryStatus: - status := resp.GetQueryStatus() - statusFields := log.Fields{ - "status": status.Status.String(), - } - queryUuid := status.GetUUIDParsed() - if queryUuid == nil { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") - continue responses - } - statusFields["query"] = queryUuid - - switch status.Status { - case sdp.QueryStatus_UNSPECIFIED: - statusFields["unexpected_status"] = true - case sdp.QueryStatus_STARTED: - activeQueries[*queryUuid] = true - case sdp.QueryStatus_FINISHED: - activeQueries[*queryUuid] = false - case sdp.QueryStatus_ERRORED: - activeQueries[*queryUuid] = false - case sdp.QueryStatus_CANCELLED: - activeQueries[*queryUuid] = false - default: - statusFields["unexpected_status"] = true - } - - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update") - - case *sdp.GatewayResponse_NewItem: - item := resp.GetNewItem() - numItems += 1 - log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") - - case *sdp.GatewayResponse_NewEdge: - edge := resp.GetNewEdge() - numEdges += 1 - log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ - "from": edge.From.GloballyUniqueName(), - "to": edge.To.GloballyUniqueName(), - }).Info("new edge") - - case *sdp.GatewayResponse_QueryError: - err := resp.GetQueryError() - log.WithContext(ctx).WithFields(lf).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) - - case *sdp.GatewayResponse_Error: - err := resp.GetError() - log.WithContext(ctx).WithFields(lf).Errorf("generic error: %v", err) - - default: - j := protojson.Format(resp) - log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) - } - } - } - - if viper.GetBool("snapshot-after") { - log.WithContext(ctx).Info("Starting snapshot") - msgId := uuid.New() - snapReq := &sdp.GatewayRequest{ - MinStatusInterval: minStatusInterval, - RequestType: &sdp.GatewayRequest_StoreSnapshot{ - StoreSnapshot: &sdp.StoreSnapshot{ - Name: viper.GetString("snapshot-name"), - Description: viper.GetString("snapshot-description"), - MsgID: msgId[:], - }, - }, - } - err = wspb.Write(ctx, c, snapReq) - if err != nil { - log.WithContext(ctx).WithFields(log.Fields{ - "error": err, - }).Error("Failed to send snapshot request") - return 1 - } - - for { - select { - case <-ctx.Done(): - log.WithContext(ctx).Info("Context cancelled, exiting") - return 1 - case resp := <-responses: - switch resp.ResponseType.(type) { - case *sdp.GatewayResponse_SnapshotStoreResult: - result := resp.GetSnapshotStoreResult() - if result.Success { - log.WithContext(ctx).Infof("Snapshot stored successfully: %v", uuid.UUID(result.SnapshotID)) - return 0 - } - - log.WithContext(ctx).Errorf("Snapshot store failed: %v", result.ErrorMessage) - return 1 - default: - j := protojson.Format(resp) - - log.WithContext(ctx).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) - } - } - } + err = c.SendQuery(ctx, q) + if err != nil { + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to execute query") + return 1 } + log.WithContext(ctx).WithFields(lf).WithError(err).Info("received items") + + c.Wait(ctx, uuid.UUIDs{uuid.UUID(q.UUID)}) + + // queriesSent := true + + // responses := make(chan *sdp.GatewayResponse) + + // // Start a goroutine that reads responses + // go func() { + // for { + // res := new(sdp.GatewayResponse) + + // err = wspb.Read(ctx, c, res) + + // if err != nil { + // var e websocket.CloseError + // if errors.As(err, &e) { + // log.WithContext(ctx).WithFields(log.Fields{ + // "code": e.Code.String(), + // "reason": e.Reason, + // }).Info("Websocket closing") + // return + // } + // log.WithContext(ctx).WithFields(log.Fields{ + // "error": err, + // }).Error("Failed to read response") + // return + // } + + // responses <- res + // } + // }() + + // activeQueries := make(map[uuid.UUID]bool) + + // var numItems, numEdges int + + // // Read the responses + // responses: + // for { + // select { + // case <-ctx.Done(): + // log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") + // return 1 + + // case resp := <-responses: + // switch resp.ResponseType.(type) { + // case *sdp.GatewayResponse_Status: + // status := resp.GetStatus() + // statusFields := log.Fields{ + // "summary": status.Summary, + // "responders": status.Summary.Responders, + // "queriesSent": queriesSent, + // "postProcessingComplete": status.PostProcessingComplete, + // "itemsReceived": numItems, + // "edgesReceived": numEdges, + // } + + // if status.Done() { + // // fall through from all "final" query states, check if there's still queries in progress; + // // only break from the loop if all queries have already been sent + // // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources + // allDone := allDone(ctx, activeQueries, lf) + // statusFields["allDone"] = allDone + // if allDone && queriesSent { + // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done") + // break responses + // } else { + // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries") + // } + // } else { + // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders") + // } + + // case *sdp.GatewayResponse_QueryStatus: + // status := resp.GetQueryStatus() + // statusFields := log.Fields{ + // "status": status.Status.String(), + // } + // queryUuid := status.GetUUIDParsed() + // if queryUuid == nil { + // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") + // continue responses + // } + // statusFields["query"] = queryUuid + + // switch status.Status { + // case sdp.QueryStatus_UNSPECIFIED: + // statusFields["unexpected_status"] = true + // case sdp.QueryStatus_STARTED: + // activeQueries[*queryUuid] = true + // case sdp.QueryStatus_FINISHED: + // activeQueries[*queryUuid] = false + // case sdp.QueryStatus_ERRORED: + // activeQueries[*queryUuid] = false + // case sdp.QueryStatus_CANCELLED: + // activeQueries[*queryUuid] = false + // default: + // statusFields["unexpected_status"] = true + // } + + // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update") + + // case *sdp.GatewayResponse_NewItem: + // item := resp.GetNewItem() + // numItems += 1 + // log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") + + // case *sdp.GatewayResponse_NewEdge: + // edge := resp.GetNewEdge() + // numEdges += 1 + // log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ + // "from": edge.From.GloballyUniqueName(), + // "to": edge.To.GloballyUniqueName(), + // }).Info("new edge") + + // case *sdp.GatewayResponse_QueryError: + // err := resp.GetQueryError() + // log.WithContext(ctx).WithFields(lf).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) + + // case *sdp.GatewayResponse_Error: + // err := resp.GetError() + // log.WithContext(ctx).WithFields(lf).Errorf("generic error: %v", err) + + // default: + // j := protojson.Format(resp) + // log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) + // } + // } + // } + + // if viper.GetBool("snapshot-after") { + // log.WithContext(ctx).Info("Starting snapshot") + // msgId := uuid.New() + // snapReq := &sdp.GatewayRequest{ + // MinStatusInterval: minStatusInterval, + // RequestType: &sdp.GatewayRequest_StoreSnapshot{ + // StoreSnapshot: &sdp.StoreSnapshot{ + // Name: viper.GetString("snapshot-name"), + // Description: viper.GetString("snapshot-description"), + // MsgID: msgId[:], + // }, + // }, + // } + // err = wspb.Write(ctx, c, snapReq) + // if err != nil { + // log.WithContext(ctx).WithFields(log.Fields{ + // "error": err, + // }).Error("Failed to send snapshot request") + // return 1 + // } + + // for { + // select { + // case <-ctx.Done(): + // log.WithContext(ctx).Info("Context cancelled, exiting") + // return 1 + // case resp := <-responses: + // switch resp.ResponseType.(type) { + // case *sdp.GatewayResponse_SnapshotStoreResult: + // result := resp.GetSnapshotStoreResult() + // if result.Success { + // log.WithContext(ctx).Infof("Snapshot stored successfully: %v", uuid.UUID(result.SnapshotID)) + // return 0 + // } + + // log.WithContext(ctx).Errorf("Snapshot store failed: %v", result.ErrorMessage) + // return 1 + // default: + // j := protojson.Format(resp) + + // log.WithContext(ctx).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) + // } + // } + // } + // } return 0 } @@ -331,33 +330,42 @@ func methodFromString(method string) (sdp.QueryMethod, error) { return result, nil } +func createQuery() (*sdp.Query, error) { + u := uuid.New() + method, err := methodFromString(viper.GetString("query-method")) + if err != nil { + return nil, err + } + + return &sdp.Query{ + Method: method, + Type: viper.GetString("query-type"), + Query: viper.GetString("query"), + Scope: viper.GetString("query-scope"), + Deadline: timestamppb.New(time.Now().Add(10 * time.Hour)), + UUID: u[:], + RecursionBehaviour: &sdp.Query_RecursionBehaviour{ + LinkDepth: viper.GetUint32("link-depth"), + FollowOnlyBlastPropagation: viper.GetBool("blast-radius"), + }, + IgnoreCache: viper.GetBool("ignore-cache"), + }, nil +} + func createInitialRequest() (*sdp.GatewayRequest, error) { req := &sdp.GatewayRequest{ MinStatusInterval: minStatusInterval, } - u := uuid.New() switch viper.GetString("request-type") { case "query": - method, err := methodFromString(viper.GetString("query-method")) + q, err := createQuery() if err != nil { return nil, err } req.RequestType = &sdp.GatewayRequest_Query{ - Query: &sdp.Query{ - Method: method, - Type: viper.GetString("query-type"), - Query: viper.GetString("query"), - Scope: viper.GetString("query-scope"), - Deadline: timestamppb.New(time.Now().Add(10 * time.Hour)), - UUID: u[:], - RecursionBehaviour: &sdp.Query_RecursionBehaviour{ - LinkDepth: viper.GetUint32("link-depth"), - FollowOnlyBlastPropagation: viper.GetBool("blast-radius"), - }, - IgnoreCache: viper.GetBool("ignore-cache"), - }, + Query: q, } case "load-bookmark": bookmarkUUID, err := uuid.Parse(viper.GetString("bookmark-uuid")) From 61c8cbb305bb198ae0c54b3f94d567d32c784215 Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Thu, 2 Nov 2023 16:00:48 +0100 Subject: [PATCH 2/6] Port over to new SDPWS gateway client --- cmd/manualchange.go | 199 +++++------------------------------------- cmd/submitplan.go | 207 +++----------------------------------------- go.mod | 11 ++- go.sum | 67 +++----------- 4 files changed, 52 insertions(+), 432 deletions(-) diff --git a/cmd/manualchange.go b/cmd/manualchange.go index d629964e..f99b356e 100644 --- a/cmd/manualchange.go +++ b/cmd/manualchange.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "errors" "fmt" "os" "os/signal" @@ -13,15 +12,13 @@ import ( "github.com/google/uuid" "github.com/overmindtech/ovm-cli/tracing" "github.com/overmindtech/sdp-go" + "github.com/overmindtech/sdp-go/sdpws" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "google.golang.org/protobuf/encoding/protojson" - "nhooyr.io/websocket" - "nhooyr.io/websocket/wspb" ) // manualChangeCmd is the equivalent to submit-plan for manual changes @@ -126,8 +123,6 @@ func ManualChange(ctx context.Context, ready chan bool) int { log.WithContext(ctx).WithFields(lf).Info("re-using change") } - receivedItems := []*sdp.Reference{} - method, err := methodFromString(viper.GetString("query-method")) if err != nil { log.WithContext(ctx).WithError(err).WithFields(lf).Error("can't parse --query-method") @@ -145,181 +140,28 @@ func ManualChange(ctx context.Context, ready chan bool) int { log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to wake up sources") return 1 } - u := uuid.New() - queries := []*sdp.Query{ - { - UUID: u[:], - Method: method, - Scope: viper.GetString("query-scope"), - Type: viper.GetString("query-type"), - Query: viper.GetString("query"), - IgnoreCache: true, - }, - } - options := &websocket.DialOptions{ - HTTPClient: NewAuthenticatedClient(ctx, otelhttp.DefaultClient), - } - - log.WithContext(ctx).WithFields(lf).WithField("item_count", len(queries)).Info("identifying items") - // nolint: bodyclose // nhooyr.io/websocket reads the body internally - c, _, err := websocket.Dial(ctx, viper.GetString("gateway-url"), options) + ws, err := sdpws.Dial(ctx, gatewayUrl, otelhttp.DefaultClient, nil) if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to overmind API") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to gateway") return 1 } - defer c.Close(websocket.StatusGoingAway, "") - - // the default, 32kB is too small for cert bundles and rds-db-cluster-parameter-groups - c.SetReadLimit(2 * 1024 * 1024) - - queriesSentChan := make(chan struct{}) - go func() { - for _, q := range queries { - req := sdp.GatewayRequest{ - MinStatusInterval: minStatusInterval, - RequestType: &sdp.GatewayRequest_Query{ - Query: q, - }, - } - err = wspb.Write(ctx, c, &req) - - if err == nil { - log.WithContext(ctx).WithFields(log.Fields{ - "scope": q.Scope, - "type": q.Type, - "query": q.Query, - "method": q.Method.String(), - "uuid": q.ParseUuid().String(), - }).Trace("Started query") - } - if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).WithField("req", &req).Error("Failed to send request") - continue - } - } - queriesSentChan <- struct{}{} - }() - - responses := make(chan *sdp.GatewayResponse) - - // Start a goroutine that reads responses - go func() { - for { - res := new(sdp.GatewayResponse) - - err = wspb.Read(ctx, c, res) - - if err != nil { - var e websocket.CloseError - if errors.As(err, &e) { - log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ - "code": e.Code.String(), - "reason": e.Reason, - }).Info("Websocket closing") - return - } - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to read response") - return - } - responses <- res - } - }() - - activeQueries := make(map[uuid.UUID]bool) - queriesSent := false - - // Read the responses -responses: - for { - select { - case <-queriesSentChan: - queriesSent = true - - case <-ctx.Done(): - log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") - return 1 + u := uuid.New() + q := &sdp.Query{ + UUID: u[:], + Method: method, + Scope: viper.GetString("query-scope"), + Type: viper.GetString("query-type"), + Query: viper.GetString("query"), + IgnoreCache: true, + } - case resp := <-responses: - switch resp.ResponseType.(type) { - - case *sdp.GatewayResponse_Status: - status := resp.GetStatus() - statusFields := log.Fields{ - "summary": status.Summary, - "responders": status.Summary.Responders, - "queriesSent": queriesSent, - "post_processing_complete": status.PostProcessingComplete, - } - - if status.Done() { - // fall through from all "final" query states, check if there's still queries in progress; - // only break from the loop if all queries have already been sent - // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources - allDone := allDone(ctx, activeQueries, lf) - statusFields["allDone"] = allDone - if allDone && queriesSent { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done") - break responses - } else { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries") - } - } else { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders") - } - - case *sdp.GatewayResponse_QueryStatus: - status := resp.GetQueryStatus() - statusFields := log.Fields{ - "status": status.Status.String(), - } - queryUuid := status.GetUUIDParsed() - if queryUuid == nil { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") - continue responses - } - statusFields["query"] = queryUuid - - switch status.Status { - case sdp.QueryStatus_UNSPECIFIED: - statusFields["unexpected_status"] = true - case sdp.QueryStatus_STARTED: - activeQueries[*queryUuid] = true - case sdp.QueryStatus_FINISHED: - activeQueries[*queryUuid] = false - case sdp.QueryStatus_ERRORED: - activeQueries[*queryUuid] = false - case sdp.QueryStatus_CANCELLED: - activeQueries[*queryUuid] = false - default: - statusFields["unexpected_status"] = true - } - - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update") - - case *sdp.GatewayResponse_NewItem: - item := resp.GetNewItem() - log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") - - receivedItems = append(receivedItems, item.Reference()) - - case *sdp.GatewayResponse_NewEdge: - log.WithContext(ctx).WithFields(lf).Debug("ignored edge") - - case *sdp.GatewayResponse_QueryError: - err := resp.GetQueryError() - log.WithContext(ctx).WithFields(lf).WithError(err).Errorf("Error from %v(%v)", err.ResponderName, err.SourceName) - - case *sdp.GatewayResponse_Error: - err := resp.GetError() - log.WithContext(ctx).WithFields(lf).WithField(log.ErrorKey, err).Errorf("generic error") - - default: - j := protojson.Format(resp) - log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) - } - } + log.WithContext(ctx).WithFields(lf).WithField("item_count", 1).Info("identifying items") + receivedItems, err := ws.Query(ctx, q) + if err != nil { + log.WithContext(ctx).WithFields(lf).WithError(err).Error("failed to send query") + return 1 } if len(receivedItems) > 0 { @@ -327,10 +169,15 @@ responses: } else { log.WithContext(ctx).WithFields(lf).WithField("received_items", len(receivedItems)).Info("updating change record with no items") } + + references := make([]*sdp.Reference, len(receivedItems)) + for i, item := range receivedItems { + references[i] = item.Reference() + } resultStream, err := client.UpdateChangingItems(ctx, &connect.Request[sdp.UpdateChangingItemsRequest]{ Msg: &sdp.UpdateChangingItemsRequest{ ChangeUUID: changeUuid[:], - ChangingItems: receivedItems, + ChangingItems: references, }, }) if err != nil { diff --git a/cmd/submitplan.go b/cmd/submitplan.go index 5e6c944d..13e61dc2 100644 --- a/cmd/submitplan.go +++ b/cmd/submitplan.go @@ -18,16 +18,15 @@ import ( "github.com/overmindtech/ovm-cli/cmd/datamaps" "github.com/overmindtech/ovm-cli/tracing" "github.com/overmindtech/sdp-go" + "github.com/overmindtech/sdp-go/sdpws" log "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/iter" "github.com/spf13/cobra" "github.com/spf13/viper" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/timestamppb" - "nhooyr.io/websocket" - "nhooyr.io/websocket/wspb" ) // submitPlanCmd represents the submit-plan command @@ -451,9 +450,9 @@ func SubmitPlan(ctx context.Context, files []string, ready chan bool) int { log.WithContext(ctx).WithFields(lf).Info("Re-using change") } + queries := planMappings.Queries() receivedItems := make([]*sdp.Item, 0) - - if len(planMappings.Queries()) > 0 { + if len(queries) > 0 { mgmtClient := AuthenticatedManagementClient(ctx) log.WithContext(ctx).WithFields(lf).Info("Waking up sources") _, err = mgmtClient.KeepaliveSources(ctx, &connect.Request[sdp.KeepaliveSourcesRequest]{ @@ -466,177 +465,23 @@ func SubmitPlan(ctx context.Context, files []string, ready chan bool) int { return 1 } - options := &websocket.DialOptions{ - HTTPClient: NewAuthenticatedClient(ctx, otelhttp.DefaultClient), + ws, err := sdpws.Dial(ctx, gatewayUrl, otelhttp.DefaultClient, nil) + if err != nil { + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to gateway") + return 1 } - log.WithContext(ctx).Infof("Finding expected changes in Overmind") + results, err := iter.MapErr(queries, func(q **sdp.Query) ([]*sdp.Item, error) { + return ws.Query(ctx, *q) + }) - // nolint: bodyclose // nhooyr.io/websocket reads the body internally - c, _, err := websocket.Dial(ctx, viper.GetString("gateway-url"), options) if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to overmind API") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to query items") return 1 } - defer c.Close(websocket.StatusGoingAway, "") - - // the default, 32kB is too small for cert bundles and rds-db-cluster-parameter-groups - c.SetReadLimit(2 * 1024 * 1024) - - queriesSentChan := make(chan struct{}) - go func() { - for _, q := range planMappings.Queries() { - req := sdp.GatewayRequest{ - MinStatusInterval: minStatusInterval, - RequestType: &sdp.GatewayRequest_Query{ - Query: q, - }, - } - err = wspb.Write(ctx, c, &req) - - if err == nil { - log.WithContext(ctx).WithFields(log.Fields{ - "scope": q.Scope, - "type": q.Type, - "query": q.Query, - "method": q.Method.String(), - "uuid": q.ParseUuid().String(), - }).Trace("Started query") - } - if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).WithField("req", &req).Error("Failed to send request") - continue - } - } - queriesSentChan <- struct{}{} - }() - - responses := make(chan *sdp.GatewayResponse) - // Start a goroutine that reads responses - go func() { - for { - res := new(sdp.GatewayResponse) - - err = wspb.Read(ctx, c, res) - - if err != nil { - var e websocket.CloseError - if errors.As(err, &e) { - log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ - "code": e.Code.String(), - "reason": e.Reason, - }).Debug("Websocket closing") - return - } - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to read response") - return - } - - responses <- res - } - }() - - activeQueries := make(map[uuid.UUID]bool) - queryErrors := make(map[uuid.UUID][]*sdp.QueryError) - queriesSent := false - - // Read the responses - responses: - for { - select { - case <-queriesSentChan: - queriesSent = true - - case <-ctx.Done(): - log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") - return 1 - - case resp := <-responses: - switch resp.ResponseType.(type) { - - case *sdp.GatewayResponse_Status: - status := resp.GetStatus() - statusFields := log.Fields{ - "summary": status.Summary, - "responders": status.Summary.Responders, - "queriesSent": queriesSent, - "post_processing_complete": status.PostProcessingComplete, - } - - if status.Done() { - // fall through from all "final" query states, check if there's still queries in progress; - // only break from the loop if all queries have already been sent - // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources - allDone := allDone(ctx, activeQueries, lf) - statusFields["allDone"] = allDone - if allDone && queriesSent { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("All queries complete") - break responses - } else { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("All responders done, with unfinished queries") - } - } else { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debug("Still waiting for responders") - } - - case *sdp.GatewayResponse_QueryStatus: - status := resp.GetQueryStatus() - statusFields := log.Fields{ - "status": status.Status.String(), - } - queryUuid := status.GetUUIDParsed() - if queryUuid == nil { - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debug("Received QueryStatus with nil UUID") - continue responses - } - statusFields["query"] = queryUuid - - switch status.Status { - case sdp.QueryStatus_UNSPECIFIED: - statusFields["unexpected_status"] = true - case sdp.QueryStatus_STARTED: - activeQueries[*queryUuid] = true - case sdp.QueryStatus_FINISHED: - activeQueries[*queryUuid] = false - case sdp.QueryStatus_ERRORED: - activeQueries[*queryUuid] = false - case sdp.QueryStatus_CANCELLED: - activeQueries[*queryUuid] = false - default: - statusFields["unexpected_status"] = true - } - - log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debug("Query status update") - - case *sdp.GatewayResponse_NewItem: - item := resp.GetNewItem() - log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Debug("New item") - - receivedItems = append(receivedItems, item) - - case *sdp.GatewayResponse_NewEdge: - log.WithContext(ctx).WithFields(lf).Debug("Ignored edge") - - case *sdp.GatewayResponse_QueryError: - err := resp.GetQueryError() - uuid := err.GetUUIDParsed() - - if uuid != nil { - queryErrors[*uuid] = append(queryErrors[*uuid], err) - } - - log.WithContext(ctx).WithFields(lf).WithError(err).Debugf("Error from %v(%v)", err.ResponderName, err.SourceName) - - case *sdp.GatewayResponse_Error: - err := resp.GetError() - log.WithContext(ctx).WithFields(lf).WithField(log.ErrorKey, err).Debug("Generic error") - - default: - j := protojson.Format(resp) - log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) - } - } + for _, items := range results { + receivedItems = append(receivedItems, items...) } // Print a summary of the results so far. I would like for this to be @@ -668,18 +513,6 @@ func SubmitPlan(ctx context.Context, files []string, ready chan bool) int { "query": mapping.OvermindQuery.Query, "method": mapping.OvermindQuery.Method.String(), }).Error(" No responses received") - - relatedErrors, found := queryErrors[queryUUID] - - if found { - for _, err := range relatedErrors { - log.WithContext(ctx).WithFields(log.Fields{ - "type": err.ErrorType, - "source": err.SourceName, - "responder": err.ResponderName, - }).Errorf(" %v", err.ErrorString) - } - } } } } @@ -759,18 +592,6 @@ func SubmitPlan(ctx context.Context, files []string, ready chan bool) int { return 0 } -func allDone(ctx context.Context, activeQueries map[uuid.UUID]bool, lf log.Fields) bool { - allDone := true - for q := range activeQueries { - if activeQueries[q] { - log.WithContext(ctx).WithFields(lf).WithField("query", q).Debugf("Query still active") - allDone = false - break - } - } - return allDone -} - func init() { rootCmd.AddCommand(submitPlanCmd) diff --git a/go.mod b/go.mod index 6904e132..79fd9546 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,9 @@ require ( github.com/google/uuid v1.4.0 github.com/jedib0t/go-pretty/v6 v6.4.9 github.com/mattn/go-isatty v0.0.20 - github.com/overmindtech/sdp-go v0.53.0 + github.com/overmindtech/sdp-go v0.56.0 github.com/sirupsen/logrus v1.9.3 + github.com/sourcegraph/conc v0.3.0 github.com/spf13/cobra v1.7.0 github.com/spf13/viper v1.17.0 github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 @@ -25,7 +26,6 @@ require ( go.opentelemetry.io/otel/trace v1.19.0 golang.org/x/oauth2 v0.13.0 google.golang.org/protobuf v1.31.0 - nhooyr.io/websocket v1.8.7 ) require ( @@ -41,7 +41,7 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -53,7 +53,6 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.10.0 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -62,8 +61,7 @@ require ( github.com/xiam/to v0.0.0-20200126224905-d60d31e03561 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect - go.uber.org/atomic v1.11.0 // indirect - go.uber.org/multierr v1.9.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.17.0 // indirect @@ -76,4 +74,5 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + nhooyr.io/websocket v1.8.10 // indirect ) diff --git a/go.sum b/go.sum index b6366694..e56d6aea 100644 --- a/go.sum +++ b/go.sum @@ -73,11 +73,6 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/getsentry/sentry-go v0.25.0 h1:q6Eo+hS+yoJlTO3uu/azhQadsD8V+jQn2D8VvX1eOyI= github.com/getsentry/sentry-go v0.25.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= -github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= -github.com/gin-gonic/gin v1.8.1 h1:4+fr/el88TOO3ewCmQr8cx/CtZ/umlIRIs5M4NTNjf8= -github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -88,24 +83,6 @@ github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= -github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= -github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= -github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= -github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= -github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= -github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= -github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= -github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= -github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= -github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= -github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= @@ -150,7 +127,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -171,8 +147,6 @@ github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= -github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= -github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -189,15 +163,11 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -206,12 +176,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= -github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= @@ -219,20 +185,14 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/overmindtech/sdp-go v0.53.0 h1:uXFaMaOQ8cP8fL3KEdC3wc1J0q5M+oiSbcE61YeFRoM= -github.com/overmindtech/sdp-go v0.53.0/go.mod h1:14PtSb4/IhjHjWJk0K/+fRSwEksqNFDZxzXMv7vQrjA= +github.com/overmindtech/sdp-go v0.56.0 h1:1VFF6kulsNIVVctBClZbeZxOsHKRQDPzfZPVmwcWGSo= +github.com/overmindtech/sdp-go v0.56.0/go.mod h1:WZ/CsRATgtF0KZpgrlJdzLieBO3Gk9mk5XcdrSVOkbQ= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -273,7 +233,6 @@ github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -286,11 +245,6 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= -github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= -github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= -github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= github.com/uptrace/opentelemetry-go-extra/otellogrus v0.2.3 h1:m5eNyOhch/7tyK6aN6eRRpNoD1vM8PNh64dA05X22Js= github.com/uptrace/opentelemetry-go-extra/otellogrus v0.2.3/go.mod h1:APPUXm9BbpH7NFkfpbw04raZSitzl19/3NOCu0rbI4E= github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.3 h1:LyGS9cIZV0YVhE81zwfMhIE2l2flcj3wn5IoK4VkbWA= @@ -331,10 +285,10 @@ go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1 go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -452,7 +406,6 @@ golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -676,8 +629,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= -nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= From 4da529fbe3204be4cb30a65c8cc5dafb4ed394ad Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Thu, 2 Nov 2023 17:43:38 +0100 Subject: [PATCH 3/6] Rebuild `request` logging --- cmd/request.go | 103 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 35 deletions(-) diff --git a/cmd/request.go b/cmd/request.go index bbb51ca4..8518e4dd 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -57,6 +57,68 @@ var requestCmd = &cobra.Command{ }, } +// requestHandler is a simple implementation of GatewayMessageHandler that +// implements the required logging for the `request` command. +type requestHandler struct { + lf log.Fields + + queriesStarted int + numItems int + numEdges int + + sdpws.NoopGatewayMessageHandler +} + +// assert that requestHandler implements GatewayMessageHandler +var _ sdpws.GatewayMessageHandler = (*requestHandler)(nil) + +func (l *requestHandler) NewItem(ctx context.Context, item *sdp.Item) { + l.numItems += 1 + log.WithContext(ctx).WithFields(l.lf).WithField("item", item.GloballyUniqueName()).Infof("new item") +} + +func (l *requestHandler) NewEdge(ctx context.Context, edge *sdp.Edge) { + l.numEdges += 1 + log.WithContext(ctx).WithFields(l.lf).WithFields(log.Fields{ + "from": edge.From.GloballyUniqueName(), + "to": edge.To.GloballyUniqueName(), + }).Info("new edge") +} + +func (l *requestHandler) Error(ctx context.Context, errorMessage string) { + log.WithContext(ctx).WithFields(l.lf).Errorf("generic error: %v", errorMessage) +} + +func (l *requestHandler) QueryError(ctx context.Context, err *sdp.QueryError) { + log.WithContext(ctx).WithFields(l.lf).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) +} + +func (l *requestHandler) QueryStatus(ctx context.Context, status *sdp.QueryStatus) { + statusFields := log.Fields{ + "status": status.Status.String(), + } + queryUuid := status.GetUUIDParsed() + if queryUuid == nil { + log.WithContext(ctx).WithFields(l.lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") + return + } + statusFields["query"] = queryUuid + + if status.Status == sdp.QueryStatus_STARTED { + l.queriesStarted += 1 + } + + // nolint:exhaustive // we _want_ to log all other status fields as unexpected + switch status.Status { + case sdp.QueryStatus_STARTED, sdp.QueryStatus_FINISHED, sdp.QueryStatus_ERRORED, sdp.QueryStatus_CANCELLED: + // do nothing + default: + statusFields["unexpected_status"] = true + } + + log.WithContext(ctx).WithFields(l.lf).WithFields(statusFields).Debugf("query status update") +} + func Request(ctx context.Context, ready chan bool) int { timeout, err := time.ParseDuration(viper.GetString("timeout")) if err != nil { @@ -107,7 +169,7 @@ func Request(ctx context.Context, ready chan bool) int { c, err := sdpws.Dial(ctx, gatewayUrl, NewAuthenticatedClient(ctx, otelhttp.DefaultClient), - &sdpws.LoggingGatewayMessageHandler{Level: log.InfoLevel}, + &requestHandler{lf: lf}, ) if err != nil { lf["gateway-url"] = gatewayUrl @@ -119,7 +181,7 @@ func Request(ctx context.Context, ready chan bool) int { // Log the request in JSON b, err := json.MarshalIndent(req, "", " ") if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to marshal request") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to marshal request for logging") return 1 } @@ -136,39 +198,10 @@ func Request(ctx context.Context, ready chan bool) int { } log.WithContext(ctx).WithFields(lf).WithError(err).Info("received items") - c.Wait(ctx, uuid.UUIDs{uuid.UUID(q.UUID)}) - - // queriesSent := true - - // responses := make(chan *sdp.GatewayResponse) - - // // Start a goroutine that reads responses - // go func() { - // for { - // res := new(sdp.GatewayResponse) - - // err = wspb.Read(ctx, c, res) - - // if err != nil { - // var e websocket.CloseError - // if errors.As(err, &e) { - // log.WithContext(ctx).WithFields(log.Fields{ - // "code": e.Code.String(), - // "reason": e.Reason, - // }).Info("Websocket closing") - // return - // } - // log.WithContext(ctx).WithFields(log.Fields{ - // "error": err, - // }).Error("Failed to read response") - // return - // } - - // responses <- res - // } - // }() - - // activeQueries := make(map[uuid.UUID]bool) + err = c.Wait(ctx, uuid.UUIDs{uuid.UUID(q.UUID)}) + if err != nil { + log.WithContext(ctx).WithFields(lf).WithError(err).Error("queries failed") + } // var numItems, numEdges int From a51355cf0394346133f82a133e98a28a4f2c2338 Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Thu, 2 Nov 2023 17:47:54 +0100 Subject: [PATCH 4/6] Reimplement `request --snapshot-after` --- cmd/request.go | 152 ++++--------------------------------------------- 1 file changed, 11 insertions(+), 141 deletions(-) diff --git a/cmd/request.go b/cmd/request.go index 8518e4dd..d27978a3 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -203,147 +203,17 @@ func Request(ctx context.Context, ready chan bool) int { log.WithContext(ctx).WithFields(lf).WithError(err).Error("queries failed") } - // var numItems, numEdges int - - // // Read the responses - // responses: - // for { - // select { - // case <-ctx.Done(): - // log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") - // return 1 - - // case resp := <-responses: - // switch resp.ResponseType.(type) { - // case *sdp.GatewayResponse_Status: - // status := resp.GetStatus() - // statusFields := log.Fields{ - // "summary": status.Summary, - // "responders": status.Summary.Responders, - // "queriesSent": queriesSent, - // "postProcessingComplete": status.PostProcessingComplete, - // "itemsReceived": numItems, - // "edgesReceived": numEdges, - // } - - // if status.Done() { - // // fall through from all "final" query states, check if there's still queries in progress; - // // only break from the loop if all queries have already been sent - // // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources - // allDone := allDone(ctx, activeQueries, lf) - // statusFields["allDone"] = allDone - // if allDone && queriesSent { - // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done") - // break responses - // } else { - // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries") - // } - // } else { - // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders") - // } - - // case *sdp.GatewayResponse_QueryStatus: - // status := resp.GetQueryStatus() - // statusFields := log.Fields{ - // "status": status.Status.String(), - // } - // queryUuid := status.GetUUIDParsed() - // if queryUuid == nil { - // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") - // continue responses - // } - // statusFields["query"] = queryUuid - - // switch status.Status { - // case sdp.QueryStatus_UNSPECIFIED: - // statusFields["unexpected_status"] = true - // case sdp.QueryStatus_STARTED: - // activeQueries[*queryUuid] = true - // case sdp.QueryStatus_FINISHED: - // activeQueries[*queryUuid] = false - // case sdp.QueryStatus_ERRORED: - // activeQueries[*queryUuid] = false - // case sdp.QueryStatus_CANCELLED: - // activeQueries[*queryUuid] = false - // default: - // statusFields["unexpected_status"] = true - // } - - // log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update") - - // case *sdp.GatewayResponse_NewItem: - // item := resp.GetNewItem() - // numItems += 1 - // log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") - - // case *sdp.GatewayResponse_NewEdge: - // edge := resp.GetNewEdge() - // numEdges += 1 - // log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ - // "from": edge.From.GloballyUniqueName(), - // "to": edge.To.GloballyUniqueName(), - // }).Info("new edge") - - // case *sdp.GatewayResponse_QueryError: - // err := resp.GetQueryError() - // log.WithContext(ctx).WithFields(lf).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) - - // case *sdp.GatewayResponse_Error: - // err := resp.GetError() - // log.WithContext(ctx).WithFields(lf).Errorf("generic error: %v", err) - - // default: - // j := protojson.Format(resp) - // log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) - // } - // } - // } - - // if viper.GetBool("snapshot-after") { - // log.WithContext(ctx).Info("Starting snapshot") - // msgId := uuid.New() - // snapReq := &sdp.GatewayRequest{ - // MinStatusInterval: minStatusInterval, - // RequestType: &sdp.GatewayRequest_StoreSnapshot{ - // StoreSnapshot: &sdp.StoreSnapshot{ - // Name: viper.GetString("snapshot-name"), - // Description: viper.GetString("snapshot-description"), - // MsgID: msgId[:], - // }, - // }, - // } - // err = wspb.Write(ctx, c, snapReq) - // if err != nil { - // log.WithContext(ctx).WithFields(log.Fields{ - // "error": err, - // }).Error("Failed to send snapshot request") - // return 1 - // } - - // for { - // select { - // case <-ctx.Done(): - // log.WithContext(ctx).Info("Context cancelled, exiting") - // return 1 - // case resp := <-responses: - // switch resp.ResponseType.(type) { - // case *sdp.GatewayResponse_SnapshotStoreResult: - // result := resp.GetSnapshotStoreResult() - // if result.Success { - // log.WithContext(ctx).Infof("Snapshot stored successfully: %v", uuid.UUID(result.SnapshotID)) - // return 0 - // } - - // log.WithContext(ctx).Errorf("Snapshot store failed: %v", result.ErrorMessage) - // return 1 - // default: - // j := protojson.Format(resp) - - // log.WithContext(ctx).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) - // } - // } - // } - // } + if viper.GetBool("snapshot-after") { + log.WithContext(ctx).Info("Starting snapshot") + snId, err := c.StoreSnapshot(ctx, viper.GetString("snapshot-name"), viper.GetString("snapshot-description")) + if err != nil { + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to send snapshot request") + return 1 + } + + log.WithContext(ctx).WithFields(lf).Infof("Snapshot stored successfully: %v", snId) + return 0 + } return 0 } From 7835bc0bda32b8426fc3e54c97818f2f92de78ec Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Thu, 2 Nov 2023 17:52:01 +0100 Subject: [PATCH 5/6] Cleanup --- cmd/request.go | 70 ++++++-------------------------------------------- cmd/root.go | 3 --- 2 files changed, 8 insertions(+), 65 deletions(-) diff --git a/cmd/request.go b/cmd/request.go index d27978a3..57026c74 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -130,13 +130,6 @@ func Request(ctx context.Context, ready chan bool) int { )) defer span.End() - // Construct the request - req, err := createInitialRequest() - if err != nil { - log.WithContext(ctx).WithError(err).Error("Failed to create initial request") - return 1 - } - gatewayUrl := viper.GetString("gateway-url") if gatewayUrl == "" { gatewayUrl = fmt.Sprintf("%v/api/gateway", viper.GetString("url")) @@ -178,14 +171,6 @@ func Request(ctx context.Context, ready chan bool) int { } defer c.Close(ctx) - // Log the request in JSON - b, err := json.MarshalIndent(req, "", " ") - if err != nil { - log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to marshal request for logging") - return 1 - } - - log.WithContext(ctx).WithFields(lf).Infof("Request:\n%v", string(b)) q, err := createQuery() if err != nil { log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to create query") @@ -198,6 +183,14 @@ func Request(ctx context.Context, ready chan bool) int { } log.WithContext(ctx).WithFields(lf).WithError(err).Info("received items") + // Log the request in JSON + b, err := json.MarshalIndent(q, "", " ") + if err != nil { + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to marshal query for logging") + return 1 + } + log.WithContext(ctx).WithFields(lf).Infof("Query:\n%v", string(b)) + err = c.Wait(ctx, uuid.UUIDs{uuid.UUID(q.UUID)}) if err != nil { log.WithContext(ctx).WithFields(lf).WithError(err).Error("queries failed") @@ -255,53 +248,6 @@ func createQuery() (*sdp.Query, error) { }, nil } -func createInitialRequest() (*sdp.GatewayRequest, error) { - req := &sdp.GatewayRequest{ - MinStatusInterval: minStatusInterval, - } - - switch viper.GetString("request-type") { - case "query": - q, err := createQuery() - if err != nil { - return nil, err - } - - req.RequestType = &sdp.GatewayRequest_Query{ - Query: q, - } - case "load-bookmark": - bookmarkUUID, err := uuid.Parse(viper.GetString("bookmark-uuid")) - if err != nil { - return nil, err - } - msgID := uuid.New() - req.RequestType = &sdp.GatewayRequest_LoadBookmark{ - LoadBookmark: &sdp.LoadBookmark{ - UUID: bookmarkUUID[:], - MsgID: msgID[:], - IgnoreCache: viper.GetBool("ignore-cache"), - }, - } - case "load-snapshot": - snapshotUUID, err := uuid.Parse(viper.GetString("snapshot-uuid")) - if err != nil { - return nil, err - } - msgID := uuid.New() - req.RequestType = &sdp.GatewayRequest_LoadSnapshot{ - LoadSnapshot: &sdp.LoadSnapshot{ - UUID: snapshotUUID[:], - MsgID: msgID[:], - }, - } - default: - return nil, fmt.Errorf("request type %v not supported", viper.GetString("request-type")) - } - - return req, nil -} - func init() { rootCmd.AddCommand(requestCmd) diff --git a/cmd/root.go b/cmd/root.go index 2b1a1faa..a5bc7911 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -24,13 +24,10 @@ import ( "github.com/spf13/viper" "github.com/uptrace/opentelemetry-go-extra/otellogrus" "golang.org/x/oauth2" - "google.golang.org/protobuf/types/known/durationpb" ) var logLevel string -var minStatusInterval = durationpb.New(250 * time.Millisecond) - //go:generate sh -c "echo -n $(git describe --tags --long) > commit.txt" //go:embed commit.txt var cliVersion string From b2e6aeb03cdcd1efc200aed0882bdf224d36e9bf Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Thu, 2 Nov 2023 18:00:48 +0100 Subject: [PATCH 6/6] Add final summary message for `request` --- cmd/request.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/request.go b/cmd/request.go index 57026c74..a6087a11 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -160,9 +160,10 @@ func Request(ctx context.Context, ready chan bool) int { return 1 } + handler := &requestHandler{lf: lf} c, err := sdpws.Dial(ctx, gatewayUrl, NewAuthenticatedClient(ctx, otelhttp.DefaultClient), - &requestHandler{lf: lf}, + handler, ) if err != nil { lf["gateway-url"] = gatewayUrl @@ -196,6 +197,12 @@ func Request(ctx context.Context, ready chan bool) int { log.WithContext(ctx).WithFields(lf).WithError(err).Error("queries failed") } + log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ + "queriesStarted": handler.queriesStarted, + "itemsReceived": handler.numItems, + "edgesReceived": handler.numEdges, + }).Info("all queries done") + if viper.GetBool("snapshot-after") { log.WithContext(ctx).Info("Starting snapshot") snId, err := c.StoreSnapshot(ctx, viper.GetString("snapshot-name"), viper.GetString("snapshot-description")) @@ -207,6 +214,7 @@ func Request(ctx context.Context, ready chan bool) int { log.WithContext(ctx).WithFields(lf).Infof("Snapshot stored successfully: %v", snId) return 0 } + return 0 }