From 1ca032082589a600effe7f214cf903724b89744f Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 11:32:01 +0200 Subject: [PATCH 1/8] Cleanups --- cmd/changefromtfplan.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/changefromtfplan.go b/cmd/changefromtfplan.go index dd21f3d3..bd0c14b8 100644 --- a/cmd/changefromtfplan.go +++ b/cmd/changefromtfplan.go @@ -22,7 +22,7 @@ import ( // changeFromTfplanCmd represents the change-from-tfplan command var changeFromTfplanCmd = &cobra.Command{ Use: "change-from-tfplan [--title TITLE] [--description DESCRIPTION] [--ticket-link URL] [--tfplan FILE]", - Short: "Creates a new Change from a given terraform plan (in JSON format)", + Short: "Creates a new Change from a given terraform plan file", PreRun: func(cmd *cobra.Command, args []string) { // Bind these to viper err := viper.BindPFlags(cmd.PersistentFlags()) @@ -41,6 +41,7 @@ var changeFromTfplanCmd = &cobra.Command{ }, } +// test data var ( affecting_resource *sdp.Reference = &sdp.Reference{ Type: "elbv2-load-balancer", @@ -138,7 +139,7 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { msg := resultStream.Msg() - // log the first message and at most every 500ms during discovery + // log the first message and at most every 250ms during discovery // to avoid spanning the cli output time_since_last_log := time.Since(last_log) if first_log || msg.State != sdp.CalculateBlastRadiusResponse_STATE_DISCOVERING || time_since_last_log > 250*time.Millisecond { From 9dd39a73ef20b9a0c7f58e02c995288836b964b9 Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 11:37:44 +0200 Subject: [PATCH 2/8] Adjust message size to current client limits Since we have messages that regularly break the 32k barrier (see [BACKEND-Z](https://overmindtech.sentry.io/issues/4295740369)) and we already have the api-server gateway client configured to accept messages of up to this size (see SetReadLimit in api-server). --- cmd/request.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/request.go b/cmd/request.go index 0a9a50cc..85f2119b 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -101,6 +101,9 @@ func Request(signals chan os.Signal, ready chan bool) int { } defer c.Close(websocket.StatusGoingAway, "") + // the default, 32kB is too small for cert bundles and rds-db-cluster-parameter-groups + c.SetReadLimit(2 * 1024 * 1024) + // Log the request in JSON b, err := json.MarshalIndent(req, "", " ") if err != nil { From d441d1fa2e3d5e5253145569b650e0dcd0a5b6c2 Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 11:45:45 +0200 Subject: [PATCH 3/8] Wrap up ensureToken migration --- cmd/request.go | 9 +-------- cmd/root.go | 1 - 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/cmd/request.go b/cmd/request.go index 85f2119b..438e3a61 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "os" "os/signal" "syscall" @@ -83,13 +82,7 @@ func Request(signals chan os.Signal, ready chan bool) int { defer cancel() options := &websocket.DialOptions{ - HTTPClient: otelhttp.DefaultClient, - } - - if viper.GetString("token") != "" { - log.Info("Setting authorization token") - options.HTTPHeader = make(http.Header) - options.HTTPHeader.Set("Authorization", fmt.Sprintf("Bearer %v", viper.GetString("token"))) + HTTPClient: NewAuthenticatedClient(ctx, otelhttp.DefaultClient), } c, _, err := websocket.Dial(ctx, viper.GetString("url"), options) diff --git a/cmd/root.go b/cmd/root.go index 7cee773b..c6a5a300 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -145,7 +145,6 @@ func ensureToken(ctx context.Context, signals chan os.Signal) (context.Context, } // Set the token - viper.Set("token", token.AccessToken) return context.WithValue(ctx, sdp.UserTokenContextKey{}, token.AccessToken), nil } return ctx, fmt.Errorf("no token configured and target URL (%v) is insecure", gatewayURL) From d1bc353ef5acc01409639fe104241e2c6d152e9c Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 12:25:21 +0200 Subject: [PATCH 4/8] Prepare change-from-tfplan for terraform mapping --- cmd/changefromtfplan.go | 203 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 187 insertions(+), 16 deletions(-) diff --git a/cmd/changefromtfplan.go b/cmd/changefromtfplan.go index bd0c14b8..ff6ef613 100644 --- a/cmd/changefromtfplan.go +++ b/cmd/changefromtfplan.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "fmt" "os" "os/signal" @@ -15,8 +16,12 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/encoding/protojson" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wspb" ) // changeFromTfplanCmd represents the change-from-tfplan command @@ -43,18 +48,41 @@ var changeFromTfplanCmd = &cobra.Command{ // test data var ( - affecting_resource *sdp.Reference = &sdp.Reference{ - Type: "elbv2-load-balancer", - UniqueAttributeValue: "ingress", - Scope: "944651592624.eu-west-2", + affecting_uuid uuid.UUID = uuid.New() + affecting_resource *sdp.Query = &sdp.Query{ + Type: "elbv2-load-balancer", + Method: sdp.QueryMethod_GET, + Query: "ingress", + RecursionBehaviour: &sdp.Query_RecursionBehaviour{ + LinkDepth: 0, + }, + Scope: "944651592624.eu-west-2", + UUID: affecting_uuid[:], } - safe_resource *sdp.Reference = &sdp.Reference{ - Type: "ec2-security-group", - UniqueAttributeValue: "sg-09533c300cd1a41c1", - Scope: "944651592624.eu-west-2", + + safe_uuid uuid.UUID = uuid.New() + safe_resource *sdp.Query = &sdp.Query{ + Type: "ec2-security-group", + Method: sdp.QueryMethod_GET, + Query: "sg-09533c300cd1a41c1", + RecursionBehaviour: &sdp.Query_RecursionBehaviour{ + LinkDepth: 0, + }, + Scope: "944651592624.eu-west-2", + UUID: safe_uuid[:], } ) +func changingItemQueriesFromTfplan() []*sdp.Query { + var changing_items []*sdp.Query + if viper.GetBool("test-affecting") { + changing_items = []*sdp.Query{affecting_resource} + } else { + changing_items = []*sdp.Query{safe_resource} + } + return changing_items +} + func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { timeout, err := time.ParseDuration(viper.GetString("timeout")) if err != nil { @@ -105,17 +133,160 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), }).Info("created a new change") - var changing_items []*sdp.Reference - if viper.GetBool("test-affecting") { - changing_items = []*sdp.Reference{affecting_resource} - } else { - changing_items = []*sdp.Reference{safe_resource} + queries := changingItemQueriesFromTfplan() + + options := &websocket.DialOptions{ + HTTPClient: NewAuthenticatedClient(ctx, otelhttp.DefaultClient), + } + + c, _, err := websocket.Dial(ctx, viper.GetString("url"), options) + if err != nil { + log.WithContext(ctx).WithError(err).WithFields(log.Fields{ + "url": viper.GetString("url"), + }).Error("Failed to connect to overmind API") + return 1 + } + defer c.Close(websocket.StatusGoingAway, "") + + // the default, 32kB is too small for cert bundles and rds-db-cluster-parameter-groups + c.SetReadLimit(2 * 1024 * 1024) + + queriesSentChan := make(chan struct{}) + go func() { + for _, q := range queries { + req := sdp.GatewayRequest{ + RequestType: &sdp.GatewayRequest_Query{ + Query: q, + }, + } + err = wspb.Write(ctx, c, &req) + if err != nil { + log.WithContext(ctx).WithFields(log.Fields{ + "error": err, + }).Error("Failed to send request") + continue + } + } + queriesSentChan <- struct{}{} + }() + + responses := make(chan *sdp.GatewayResponse) + + // Start a goroutine that reads responses + go func() { + for { + res := new(sdp.GatewayResponse) + + err = wspb.Read(ctx, c, res) + + if err != nil { + var e websocket.CloseError + if errors.As(err, &e) { + log.WithContext(ctx).WithFields(log.Fields{ + "code": e.Code.String(), + "reason": e.Reason, + }).Info("Websocket closing") + return + } + log.WithContext(ctx).WithFields(log.Fields{ + "error": err, + }).Error("Failed to read response") + return + } + + responses <- res + } + }() + + activeQueries := make(map[uuid.UUID]bool) + queriesSent := false + + receivedItems := []*sdp.Reference{} + + // Read the responses +responses: + for { + select { + case <-queriesSentChan: + queriesSent = true + case <-signals: + log.WithContext(ctx).Info("Received interrupt, exiting") + return 1 + case <-ctx.Done(): + log.WithContext(ctx).Info("Context cancelled, exiting") + return 1 + case resp := <-responses: + switch resp.ResponseType.(type) { + case *sdp.GatewayResponse_QueryStatus: + status := resp.GetQueryStatus() + queryUuid := status.GetUUIDParsed() + if queryUuid == nil { + log.WithContext(ctx).Debugf("Received QueryStatus with nil UUID: %v", status.Status.String()) + continue responses + } + + log.WithContext(ctx).Debugf("Status for %v: %v", queryUuid, status.Status.String()) + + switch status.Status { + case sdp.QueryStatus_STARTED: + activeQueries[*queryUuid] = true + continue responses + case sdp.QueryStatus_FINISHED: + activeQueries[*queryUuid] = false + case sdp.QueryStatus_ERRORED: + activeQueries[*queryUuid] = false + case sdp.QueryStatus_CANCELLED: + activeQueries[*queryUuid] = false + default: + log.WithContext(ctx).Debugf("unexpected status %v: %v", queryUuid, status.Status.String()) + continue responses + } + + // fall through from all "final" query states, check if there's still queries in progress + // TODO: needs DefaultStartTimeout implemented to account for slow sources + allDone := true + active: + for q := range activeQueries { + if activeQueries[q] { + log.WithContext(ctx).Debugf("%v still active", q) + allDone = false + break active + } + } + + // only break from `responses` if all queries have already been sent + // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources + if allDone && queriesSent { + break responses + } + case *sdp.GatewayResponse_NewItem: + item := resp.GetNewItem() + log.WithContext(ctx).Infof("New item: %v", item.GloballyUniqueName()) + + receivedItems = append(receivedItems, item.Reference()) + + case *sdp.GatewayResponse_NewEdge: + log.WithContext(ctx).Debug("ignored edge") + + case *sdp.GatewayResponse_QueryError: + err := resp.GetQueryError() + + log.WithContext(ctx).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) + case *sdp.GatewayResponse_Error: + err := resp.GetError() + log.WithContext(ctx).Errorf("generic error: %v", err) + default: + j := protojson.Format(resp) + + log.WithContext(ctx).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) + } + } } resultStream, err := client.UpdateChangingItems(ctx, &connect.Request[sdp.UpdateChangingItemsRequest]{ Msg: &sdp.UpdateChangingItemsRequest{ ChangeUUID: createResponse.Msg.Change.Metadata.UUID, - ChangingItems: changing_items, + ChangingItems: receivedItems, }, }) if err != nil { @@ -192,8 +363,8 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { func init() { rootCmd.AddCommand(changeFromTfplanCmd) - changeFromTfplanCmd.PersistentFlags().String("changes-url", "https://api.prod.overmind.tech/", "The changes service API endpoint") - changeFromTfplanCmd.PersistentFlags().String("frontend", "https://app.overmind.tech/", "The frontend base URL") + changeFromTfplanCmd.PersistentFlags().String("changes-url", "https://api.prod.overmind.tech", "The changes service API endpoint") + changeFromTfplanCmd.PersistentFlags().String("frontend", "https://app.overmind.tech", "The frontend base URL") changeFromTfplanCmd.PersistentFlags().String("terraform", "terraform", "The binary to use for calling terraform. Will be looked up in the system PATH.") changeFromTfplanCmd.PersistentFlags().String("tfplan", "./tfplan", "Parse changing items from this terraform plan file.") From 79d1e55a5ed54d42a0602ef832d178cd9232ae1d Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 14:21:12 +0200 Subject: [PATCH 5/8] Clean up logging field handling --- cmd/changefromtfplan.go | 90 ++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 55 deletions(-) diff --git a/cmd/changefromtfplan.go b/cmd/changefromtfplan.go index ff6ef613..f8630dc7 100644 --- a/cmd/changefromtfplan.go +++ b/cmd/changefromtfplan.go @@ -98,11 +98,13 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { // Connect to the websocket log.WithContext(ctx).Debugf("Connecting to overmind API: %v", viper.GetString("url")) + lf := log.Fields{ + "url": viper.GetString("url"), + } + ctx, err = ensureToken(ctx, signals) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - }).Error("failed to authenticate") + log.WithContext(ctx).WithError(err).WithFields(lf).Error("failed to authenticate") return 1 } @@ -123,16 +125,14 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { }, }) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - }).Error("failed to create change") + log.WithContext(ctx).WithError(err).WithFields(lf).Error("failed to create change") return 1 } - log.WithContext(ctx).WithFields(log.Fields{ - "url": viper.GetString("url"), - "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), - }).Info("created a new change") + lf["change"] = createResponse.Msg.Change.Metadata.GetUUIDParsed() + log.WithContext(ctx).WithFields(lf).Info("created a new change") + + log.WithContext(ctx).WithFields(lf).Info("resolving items from terraform plan") queries := changingItemQueriesFromTfplan() options := &websocket.DialOptions{ @@ -141,9 +141,7 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { c, _, err := websocket.Dial(ctx, viper.GetString("url"), options) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - }).Error("Failed to connect to overmind API") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to overmind API") return 1 } defer c.Close(websocket.StatusGoingAway, "") @@ -161,9 +159,7 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { } err = wspb.Write(ctx, c, &req) if err != nil { - log.WithContext(ctx).WithFields(log.Fields{ - "error": err, - }).Error("Failed to send request") + log.WithContext(ctx).WithFields(lf).WithError(err).WithField("req", &req).Error("Failed to send request") continue } } @@ -182,15 +178,13 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { if err != nil { var e websocket.CloseError if errors.As(err, &e) { - log.WithContext(ctx).WithFields(log.Fields{ + log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ "code": e.Code.String(), "reason": e.Reason, }).Info("Websocket closing") return } - log.WithContext(ctx).WithFields(log.Fields{ - "error": err, - }).Error("Failed to read response") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to read response") return } @@ -209,12 +203,15 @@ responses: select { case <-queriesSentChan: queriesSent = true + case <-signals: - log.WithContext(ctx).Info("Received interrupt, exiting") + log.WithContext(ctx).WithFields(lf).Info("Received interrupt, exiting") return 1 + case <-ctx.Done(): - log.WithContext(ctx).Info("Context cancelled, exiting") + log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") return 1 + case resp := <-responses: switch resp.ResponseType.(type) { case *sdp.GatewayResponse_QueryStatus: @@ -261,24 +258,24 @@ responses: } case *sdp.GatewayResponse_NewItem: item := resp.GetNewItem() - log.WithContext(ctx).Infof("New item: %v", item.GloballyUniqueName()) + log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") receivedItems = append(receivedItems, item.Reference()) case *sdp.GatewayResponse_NewEdge: - log.WithContext(ctx).Debug("ignored edge") + log.WithContext(ctx).WithFields(lf).Debug("ignored edge") case *sdp.GatewayResponse_QueryError: err := resp.GetQueryError() + log.WithContext(ctx).WithFields(lf).WithError(err).Errorf("Error from %v(%v)", err.ResponderName, err.SourceName) - log.WithContext(ctx).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) case *sdp.GatewayResponse_Error: err := resp.GetError() - log.WithContext(ctx).Errorf("generic error: %v", err) + log.WithContext(ctx).WithFields(lf).WithField(log.ErrorKey, err).Errorf("generic error") + default: j := protojson.Format(resp) - - log.WithContext(ctx).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) + log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) } } } @@ -290,10 +287,7 @@ responses: }, }) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), - }).Error("failed to update changing items") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("failed to update changing items") return 1 } @@ -301,10 +295,7 @@ responses: first_log := true for resultStream.Receive() { if resultStream.Err() != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), - }).Error("error streaming results") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("error streaming results") return 1 } @@ -314,20 +305,14 @@ responses: // to avoid spanning the cli output time_since_last_log := time.Since(last_log) if first_log || msg.State != sdp.CalculateBlastRadiusResponse_STATE_DISCOVERING || time_since_last_log > 250*time.Millisecond { - log.WithContext(ctx).WithFields(log.Fields{ - "url": viper.GetString("url"), - "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), - "msg": msg, - }).Info("status update") + log.WithContext(ctx).WithFields(lf).WithField("msg", msg).Info("status update") last_log = time.Now() first_log = false } } - log.WithContext(ctx).WithFields(log.Fields{ - "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), - "change-url": fmt.Sprintf("%v/changes/%v", viper.GetString("frontend"), createResponse.Msg.Change.Metadata.GetUUIDParsed()), - }).Info("change ready") + changeUrl := fmt.Sprintf("%v/changes/%v", viper.GetString("frontend"), createResponse.Msg.Change.Metadata.GetUUIDParsed()) + log.WithContext(ctx).WithFields(lf).WithField("change-url", changeUrl).Info("change ready") fetchResponse, err := client.GetChange(ctx, &connect.Request[sdp.GetChangeRequest]{ Msg: &sdp.GetChangeRequest{ @@ -335,23 +320,18 @@ responses: }, }) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - }).Error("failed to get updated change") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("failed to get updated change") return 1 } + for _, a := range fetchResponse.Msg.Change.Properties.AffectedAppsUUID { appUuid, err := uuid.FromBytes(a) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - "value": a, - }).Error("received invalid app uuid") + log.WithContext(ctx).WithFields(lf).WithError(err).WithField("app", a).Error("received invalid app uuid") continue } - log.WithContext(ctx).WithFields(log.Fields{ - "change": createResponse.Msg.Change.Metadata.GetUUIDParsed(), - "change-url": fmt.Sprintf("%v/changes/%v", viper.GetString("frontend"), createResponse.Msg.Change.Metadata.GetUUIDParsed()), + log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ + "change-url": changeUrl, "app": appUuid, "app-url": fmt.Sprintf("%v/apps/%v", viper.GetString("frontend"), appUuid), }).Info("affected app") From 07c7e6c0a8b172098a6fedd02cb1bf0913786729 Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 14:21:51 +0200 Subject: [PATCH 6/8] Ask for status updates --- cmd/changefromtfplan.go | 1 + cmd/request.go | 5 ++++- cmd/root.go | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/changefromtfplan.go b/cmd/changefromtfplan.go index f8630dc7..31c03db3 100644 --- a/cmd/changefromtfplan.go +++ b/cmd/changefromtfplan.go @@ -153,6 +153,7 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int { go func() { for _, q := range queries { req := sdp.GatewayRequest{ + MinStatusInterval: minStatusInterval, RequestType: &sdp.GatewayRequest_Query{ Query: q, }, diff --git a/cmd/request.go b/cmd/request.go index 438e3a61..af7d05f8 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -223,6 +223,7 @@ responses: log.WithContext(ctx).Info("Starting snapshot") msgId := uuid.New() snapReq := &sdp.GatewayRequest{ + MinStatusInterval: minStatusInterval, RequestType: &sdp.GatewayRequest_StoreSnapshot{ StoreSnapshot: &sdp.StoreSnapshot{ Name: viper.GetString("snapshot-name"), @@ -270,7 +271,9 @@ responses: } func createInitialRequest() (*sdp.GatewayRequest, error) { - req := new(sdp.GatewayRequest) + req := &sdp.GatewayRequest{ + MinStatusInterval: minStatusInterval, + } u := uuid.New() switch viper.GetString("request-type") { diff --git a/cmd/root.go b/cmd/root.go index c6a5a300..6e89dda2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "strings" + "time" "github.com/google/uuid" "github.com/overmindtech/ovm-cli/tracing" @@ -17,11 +18,14 @@ import ( "github.com/spf13/viper" "github.com/uptrace/opentelemetry-go-extra/otellogrus" "golang.org/x/oauth2" + "google.golang.org/protobuf/types/known/durationpb" ) var cfgFile string var logLevel string +var minStatusInterval = durationpb.New(250 * time.Millisecond) + // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ Use: "ovm-cli", From 6cb3c841d036dfce94484f80ed59d18fc6eec93f Mon Sep 17 00:00:00 2001 From: David Schmitt Date: Mon, 10 Jul 2023 14:39:29 +0200 Subject: [PATCH 7/8] Add Status processing to request and change-from-tfplan --- cmd/changefromtfplan.go | 67 +++++++++++++++++++---------- cmd/request.go | 95 ++++++++++++++++++++++++----------------- 2 files changed, 101 insertions(+), 61 deletions(-) diff --git a/cmd/changefromtfplan.go b/cmd/changefromtfplan.go index 31c03db3..c8a53095 100644 --- a/cmd/changefromtfplan.go +++ b/cmd/changefromtfplan.go @@ -215,20 +215,47 @@ responses: case resp := <-responses: switch resp.ResponseType.(type) { + + case *sdp.GatewayResponse_Status: + status := resp.GetStatus() + statusFields := log.Fields{ + "summary": status.Summary, + "responders": status.Summary.Responders, + "queriesSent": queriesSent, + "post_processing_complete": status.PostProcessingComplete, + } + + if status.Summary != nil && status.Summary.Responders > 0 && status.Summary.Working == 0 && status.PostProcessingComplete { + // fall through from all "final" query states, check if there's still queries in progress; + // only break from the loop if all queries have already been sent + // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources + allDone := allDone(ctx, activeQueries, lf) + statusFields["allDone"] = allDone + if allDone && queriesSent { + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done") + break responses + } else { + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries") + } + } else { + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders") + } + case *sdp.GatewayResponse_QueryStatus: status := resp.GetQueryStatus() + statusFields := log.Fields{ + "status": status.Status.String(), + } queryUuid := status.GetUUIDParsed() if queryUuid == nil { - log.WithContext(ctx).Debugf("Received QueryStatus with nil UUID: %v", status.Status.String()) + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") continue responses } - - log.WithContext(ctx).Debugf("Status for %v: %v", queryUuid, status.Status.String()) + statusFields["query"] = queryUuid switch status.Status { case sdp.QueryStatus_STARTED: activeQueries[*queryUuid] = true - continue responses case sdp.QueryStatus_FINISHED: activeQueries[*queryUuid] = false case sdp.QueryStatus_ERRORED: @@ -236,27 +263,11 @@ responses: case sdp.QueryStatus_CANCELLED: activeQueries[*queryUuid] = false default: - log.WithContext(ctx).Debugf("unexpected status %v: %v", queryUuid, status.Status.String()) - continue responses + statusFields["unexpected_status"] = true } - // fall through from all "final" query states, check if there's still queries in progress - // TODO: needs DefaultStartTimeout implemented to account for slow sources - allDone := true - active: - for q := range activeQueries { - if activeQueries[q] { - log.WithContext(ctx).Debugf("%v still active", q) - allDone = false - break active - } - } + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update") - // only break from `responses` if all queries have already been sent - // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources - if allDone && queriesSent { - break responses - } case *sdp.GatewayResponse_NewItem: item := resp.GetNewItem() log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") @@ -341,6 +352,18 @@ responses: return 0 } +func allDone(ctx context.Context, activeQueries map[uuid.UUID]bool, lf log.Fields) bool { + allDone := true + for q := range activeQueries { + if activeQueries[q] { + log.WithContext(ctx).WithFields(lf).WithField("query", q).Debugf("query still active") + allDone = false + break + } + } + return allDone +} + func init() { rootCmd.AddCommand(changeFromTfplanCmd) diff --git a/cmd/request.go b/cmd/request.go index af7d05f8..66814305 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -66,14 +66,16 @@ func Request(signals chan os.Signal, ready chan bool) int { return 1 } + lf := log.Fields{ + "url": viper.GetString("url"), + } + // Connect to the websocket log.WithContext(ctx).Debugf("Connecting to overmind API: %v", viper.GetString("url")) ctx, err = ensureToken(ctx, signals) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - }).Error("failed to authenticate") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("failed to authenticate") return 1 } @@ -87,9 +89,7 @@ func Request(signals chan os.Signal, ready chan bool) int { c, _, err := websocket.Dial(ctx, viper.GetString("url"), options) if err != nil { - log.WithContext(ctx).WithError(err).WithFields(log.Fields{ - "url": viper.GetString("url"), - }).Error("Failed to connect to overmind API") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to overmind API") return 1 } defer c.Close(websocket.StatusGoingAway, "") @@ -100,20 +100,20 @@ func Request(signals chan os.Signal, ready chan bool) int { // Log the request in JSON b, err := json.MarshalIndent(req, "", " ") if err != nil { - log.WithContext(ctx).WithError(err).Error("Failed to marshal request") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to marshal request") return 1 } - log.WithContext(ctx).Infof("Request:\n%v", string(b)) + log.WithContext(ctx).WithFields(lf).Infof("Request:\n%v", string(b)) err = wspb.Write(ctx, c, req) if err != nil { - log.WithContext(ctx).WithFields(log.Fields{ - "error": err, - }).Error("Failed to send request") + log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to send request") return 1 } + queriesSent := true + responses := make(chan *sdp.GatewayResponse) // Start a goroutine that reads responses @@ -149,27 +149,55 @@ responses: for { select { case <-signals: - log.WithContext(ctx).Info("Received interrupt, exiting") + log.WithContext(ctx).WithFields(lf).Info("Received interrupt, exiting") return 1 + case <-ctx.Done(): - log.WithContext(ctx).Info("Context cancelled, exiting") + log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting") return 1 + case resp := <-responses: switch resp.ResponseType.(type) { + case *sdp.GatewayResponse_Status: + status := resp.GetStatus() + statusFields := log.Fields{ + "summary": status.Summary, + "responders": status.Summary.Responders, + "queriesSent": queriesSent, + "post_processing_complete": status.PostProcessingComplete, + } + + if status.Summary != nil && status.Summary.Responders > 0 && status.Summary.Working == 0 && status.PostProcessingComplete { + // fall through from all "final" query states, check if there's still queries in progress; + // only break from the loop if all queries have already been sent + // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources + allDone := allDone(ctx, activeQueries, lf) + statusFields["allDone"] = allDone + if allDone && queriesSent { + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done") + break responses + } else { + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries") + } + } else { + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders") + } + case *sdp.GatewayResponse_QueryStatus: status := resp.GetQueryStatus() + statusFields := log.Fields{ + "status": status.Status.String(), + } queryUuid := status.GetUUIDParsed() if queryUuid == nil { - log.WithContext(ctx).Debugf("Received QueryStatus with nil UUID: %v", status.Status.String()) + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID") continue responses } - - log.WithContext(ctx).Debugf("Status for %v: %v", queryUuid, status.Status.String()) + statusFields["query"] = queryUuid switch status.Status { case sdp.QueryStatus_STARTED: activeQueries[*queryUuid] = true - continue responses case sdp.QueryStatus_FINISHED: activeQueries[*queryUuid] = false case sdp.QueryStatus_ERRORED: @@ -177,44 +205,33 @@ responses: case sdp.QueryStatus_CANCELLED: activeQueries[*queryUuid] = false default: - log.WithContext(ctx).Debugf("unexpected status %v: %v", queryUuid, status.Status.String()) - continue responses + statusFields["unexpected_status"] = true } - // fall through from all "final" query states, check if there's still queries in progress - // TODO: needs DefaultStartTimeout implemented to account for slow sources - allDone := true - active: - for q := range activeQueries { - if activeQueries[q] { - log.WithContext(ctx).Debugf("%v still active", q) - allDone = false - break active - } - } + log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update") - if allDone { - break responses - } case *sdp.GatewayResponse_NewItem: item := resp.GetNewItem() + log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item") - log.WithContext(ctx).Infof("New item: %v", item.GloballyUniqueName()) case *sdp.GatewayResponse_NewEdge: edge := resp.GetNewEdge() + log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{ + "from": edge.From.GloballyUniqueName(), + "to": edge.To.GloballyUniqueName(), + }).Info("new edge") - log.WithContext(ctx).Infof("New edge: %v->%v", edge.From.GloballyUniqueName(), edge.To.GloballyUniqueName()) case *sdp.GatewayResponse_QueryError: err := resp.GetQueryError() + log.WithContext(ctx).WithFields(lf).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) - log.WithContext(ctx).Errorf("Error from %v(%v): %v", err.ResponderName, err.SourceName, err) case *sdp.GatewayResponse_Error: err := resp.GetError() - log.WithContext(ctx).Errorf("generic error: %v", err) + log.WithContext(ctx).WithFields(lf).Errorf("generic error: %v", err) + default: j := protojson.Format(resp) - - log.WithContext(ctx).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) + log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j) } } } From 6e023370b008d6d19665c19feb0c732710b0e203 Mon Sep 17 00:00:00 2001 From: David Schmitt <118179693+DavidS-om@users.noreply.github.com> Date: Mon, 10 Jul 2023 16:53:57 +0200 Subject: [PATCH 8/8] Use status.Done() Co-authored-by: Dylan --- cmd/changefromtfplan.go | 2 +- cmd/request.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/changefromtfplan.go b/cmd/changefromtfplan.go index c8a53095..fd796e54 100644 --- a/cmd/changefromtfplan.go +++ b/cmd/changefromtfplan.go @@ -225,7 +225,7 @@ responses: "post_processing_complete": status.PostProcessingComplete, } - if status.Summary != nil && status.Summary.Responders > 0 && status.Summary.Working == 0 && status.PostProcessingComplete { + if status.Done() { // fall through from all "final" query states, check if there's still queries in progress; // only break from the loop if all queries have already been sent // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources diff --git a/cmd/request.go b/cmd/request.go index 66814305..3f8698c0 100644 --- a/cmd/request.go +++ b/cmd/request.go @@ -167,7 +167,7 @@ responses: "post_processing_complete": status.PostProcessingComplete, } - if status.Summary != nil && status.Summary.Responders > 0 && status.Summary.Working == 0 && status.PostProcessingComplete { + if status.Done() { // fall through from all "final" query states, check if there's still queries in progress; // only break from the loop if all queries have already been sent // TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources