Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 231 additions & 55 deletions cmd/changefromtfplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -15,14 +16,18 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/encoding/protojson"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wspb"
)

// changeFromTfplanCmd represents the change-from-tfplan command
var changeFromTfplanCmd = &cobra.Command{
Use: "change-from-tfplan [--title TITLE] [--description DESCRIPTION] [--ticket-link URL] [--tfplan FILE]",
Short: "Creates a new Change from a given terraform plan (in JSON format)",
Short: "Creates a new Change from a given terraform plan file",
PreRun: func(cmd *cobra.Command, args []string) {
// Bind these to viper
err := viper.BindPFlags(cmd.PersistentFlags())
Expand All @@ -41,19 +46,43 @@ var changeFromTfplanCmd = &cobra.Command{
},
}

// test data
var (
affecting_resource *sdp.Reference = &sdp.Reference{
Type: "elbv2-load-balancer",
UniqueAttributeValue: "ingress",
Scope: "944651592624.eu-west-2",
affecting_uuid uuid.UUID = uuid.New()
affecting_resource *sdp.Query = &sdp.Query{
Type: "elbv2-load-balancer",
Method: sdp.QueryMethod_GET,
Query: "ingress",
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
LinkDepth: 0,
},
Scope: "944651592624.eu-west-2",
UUID: affecting_uuid[:],
}
safe_resource *sdp.Reference = &sdp.Reference{
Type: "ec2-security-group",
UniqueAttributeValue: "sg-09533c300cd1a41c1",
Scope: "944651592624.eu-west-2",

safe_uuid uuid.UUID = uuid.New()
safe_resource *sdp.Query = &sdp.Query{
Type: "ec2-security-group",
Method: sdp.QueryMethod_GET,
Query: "sg-09533c300cd1a41c1",
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
LinkDepth: 0,
},
Scope: "944651592624.eu-west-2",
UUID: safe_uuid[:],
}
)

func changingItemQueriesFromTfplan() []*sdp.Query {
var changing_items []*sdp.Query
if viper.GetBool("test-affecting") {
changing_items = []*sdp.Query{affecting_resource}
} else {
changing_items = []*sdp.Query{safe_resource}
}
return changing_items
}

func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int {
timeout, err := time.ParseDuration(viper.GetString("timeout"))
if err != nil {
Expand All @@ -69,11 +98,13 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int {
// Connect to the websocket
log.WithContext(ctx).Debugf("Connecting to overmind API: %v", viper.GetString("url"))

lf := log.Fields{
"url": viper.GetString("url"),
}

ctx, err = ensureToken(ctx, signals)
if err != nil {
log.WithContext(ctx).WithError(err).WithFields(log.Fields{
"url": viper.GetString("url"),
}).Error("failed to authenticate")
log.WithContext(ctx).WithError(err).WithFields(lf).Error("failed to authenticate")
return 1
}

Expand All @@ -94,92 +125,225 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int {
},
})
if err != nil {
log.WithContext(ctx).WithError(err).WithFields(log.Fields{
"url": viper.GetString("url"),
}).Error("failed to create change")
log.WithContext(ctx).WithError(err).WithFields(lf).Error("failed to create change")
return 1
}
log.WithContext(ctx).WithFields(log.Fields{
"url": viper.GetString("url"),
"change": createResponse.Msg.Change.Metadata.GetUUIDParsed(),
}).Info("created a new change")

var changing_items []*sdp.Reference
if viper.GetBool("test-affecting") {
changing_items = []*sdp.Reference{affecting_resource}
} else {
changing_items = []*sdp.Reference{safe_resource}
lf["change"] = createResponse.Msg.Change.Metadata.GetUUIDParsed()
log.WithContext(ctx).WithFields(lf).Info("created a new change")

log.WithContext(ctx).WithFields(lf).Info("resolving items from terraform plan")
queries := changingItemQueriesFromTfplan()

options := &websocket.DialOptions{
HTTPClient: NewAuthenticatedClient(ctx, otelhttp.DefaultClient),
}

c, _, err := websocket.Dial(ctx, viper.GetString("url"), options)
if err != nil {
log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to connect to overmind API")
return 1
}
defer c.Close(websocket.StatusGoingAway, "")

// the default, 32kB is too small for cert bundles and rds-db-cluster-parameter-groups
c.SetReadLimit(2 * 1024 * 1024)

queriesSentChan := make(chan struct{})
go func() {
for _, q := range queries {
req := sdp.GatewayRequest{
MinStatusInterval: minStatusInterval,
RequestType: &sdp.GatewayRequest_Query{
Query: q,
},
}
err = wspb.Write(ctx, c, &req)
if err != nil {
log.WithContext(ctx).WithFields(lf).WithError(err).WithField("req", &req).Error("Failed to send request")
continue
}
}
queriesSentChan <- struct{}{}
}()

responses := make(chan *sdp.GatewayResponse)

// Start a goroutine that reads responses
go func() {
for {
res := new(sdp.GatewayResponse)

err = wspb.Read(ctx, c, res)

if err != nil {
var e websocket.CloseError
if errors.As(err, &e) {
log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{
"code": e.Code.String(),
"reason": e.Reason,
}).Info("Websocket closing")
return
}
log.WithContext(ctx).WithFields(lf).WithError(err).Error("Failed to read response")
return
}

responses <- res
}
}()

activeQueries := make(map[uuid.UUID]bool)
queriesSent := false

receivedItems := []*sdp.Reference{}

// Read the responses
responses:
for {
select {
case <-queriesSentChan:
queriesSent = true

case <-signals:
log.WithContext(ctx).WithFields(lf).Info("Received interrupt, exiting")
return 1

case <-ctx.Done():
log.WithContext(ctx).WithFields(lf).Info("Context cancelled, exiting")
return 1

case resp := <-responses:
switch resp.ResponseType.(type) {

case *sdp.GatewayResponse_Status:
status := resp.GetStatus()
statusFields := log.Fields{
"summary": status.Summary,
"responders": status.Summary.Responders,
"queriesSent": queriesSent,
"post_processing_complete": status.PostProcessingComplete,
}

if status.Done() {
// fall through from all "final" query states, check if there's still queries in progress;
// only break from the loop if all queries have already been sent
// TODO: see above, still needs DefaultStartTimeout implemented to account for slow sources
allDone := allDone(ctx, activeQueries, lf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary but no hard in having it I guess. If you were to get rid of this it would make sense to also get rid if the whole *sdp.GatewayResponse_QueryStatus branch too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another notch for having to write a proper client library as the responder status does not provide detailed information on what is and is not done. I'll leave it as is for now as a note-to-self that this part of SDP also needs to be considered as part of the client work.

statusFields["allDone"] = allDone
if allDone && queriesSent {
log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders and queries done")
break responses
} else {
log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("all responders done, with unfinished queries")
}
} else {
log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Info("still waiting for responders")
}

case *sdp.GatewayResponse_QueryStatus:
status := resp.GetQueryStatus()
statusFields := log.Fields{
"status": status.Status.String(),
}
queryUuid := status.GetUUIDParsed()
if queryUuid == nil {
log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("Received QueryStatus with nil UUID")
continue responses
}
statusFields["query"] = queryUuid

switch status.Status {
case sdp.QueryStatus_STARTED:
activeQueries[*queryUuid] = true
case sdp.QueryStatus_FINISHED:
activeQueries[*queryUuid] = false
case sdp.QueryStatus_ERRORED:
activeQueries[*queryUuid] = false
case sdp.QueryStatus_CANCELLED:
activeQueries[*queryUuid] = false
default:
statusFields["unexpected_status"] = true
}

log.WithContext(ctx).WithFields(lf).WithFields(statusFields).Debugf("query status update")

case *sdp.GatewayResponse_NewItem:
item := resp.GetNewItem()
log.WithContext(ctx).WithFields(lf).WithField("item", item.GloballyUniqueName()).Infof("new item")

receivedItems = append(receivedItems, item.Reference())

case *sdp.GatewayResponse_NewEdge:
log.WithContext(ctx).WithFields(lf).Debug("ignored edge")

case *sdp.GatewayResponse_QueryError:
err := resp.GetQueryError()
log.WithContext(ctx).WithFields(lf).WithError(err).Errorf("Error from %v(%v)", err.ResponderName, err.SourceName)

case *sdp.GatewayResponse_Error:
err := resp.GetError()
log.WithContext(ctx).WithFields(lf).WithField(log.ErrorKey, err).Errorf("generic error")

default:
j := protojson.Format(resp)
log.WithContext(ctx).WithFields(lf).Infof("Unknown %T Response:\n%v", resp.ResponseType, j)
}
}
}

resultStream, err := client.UpdateChangingItems(ctx, &connect.Request[sdp.UpdateChangingItemsRequest]{
Msg: &sdp.UpdateChangingItemsRequest{
ChangeUUID: createResponse.Msg.Change.Metadata.UUID,
ChangingItems: changing_items,
ChangingItems: receivedItems,
},
})
if err != nil {
log.WithContext(ctx).WithError(err).WithFields(log.Fields{
"url": viper.GetString("url"),
"change": createResponse.Msg.Change.Metadata.GetUUIDParsed(),
}).Error("failed to update changing items")
log.WithContext(ctx).WithFields(lf).WithError(err).Error("failed to update changing items")
return 1
}

last_log := time.Now()
first_log := true
for resultStream.Receive() {
if resultStream.Err() != nil {
log.WithContext(ctx).WithError(err).WithFields(log.Fields{
"url": viper.GetString("url"),
"change": createResponse.Msg.Change.Metadata.GetUUIDParsed(),
}).Error("error streaming results")
log.WithContext(ctx).WithFields(lf).WithError(err).Error("error streaming results")
return 1
}

msg := resultStream.Msg()

// log the first message and at most every 500ms during discovery
// log the first message and at most every 250ms during discovery
// to avoid spanning the cli output
time_since_last_log := time.Since(last_log)
if first_log || msg.State != sdp.CalculateBlastRadiusResponse_STATE_DISCOVERING || time_since_last_log > 250*time.Millisecond {
log.WithContext(ctx).WithFields(log.Fields{
"url": viper.GetString("url"),
"change": createResponse.Msg.Change.Metadata.GetUUIDParsed(),
"msg": msg,
}).Info("status update")
log.WithContext(ctx).WithFields(lf).WithField("msg", msg).Info("status update")
last_log = time.Now()
first_log = false
}
}

log.WithContext(ctx).WithFields(log.Fields{
"change": createResponse.Msg.Change.Metadata.GetUUIDParsed(),
"change-url": fmt.Sprintf("%v/changes/%v", viper.GetString("frontend"), createResponse.Msg.Change.Metadata.GetUUIDParsed()),
}).Info("change ready")
changeUrl := fmt.Sprintf("%v/changes/%v", viper.GetString("frontend"), createResponse.Msg.Change.Metadata.GetUUIDParsed())
log.WithContext(ctx).WithFields(lf).WithField("change-url", changeUrl).Info("change ready")

fetchResponse, err := client.GetChange(ctx, &connect.Request[sdp.GetChangeRequest]{
Msg: &sdp.GetChangeRequest{
UUID: createResponse.Msg.Change.Metadata.UUID,
},
})
if err != nil {
log.WithContext(ctx).WithError(err).WithFields(log.Fields{
"url": viper.GetString("url"),
}).Error("failed to get updated change")
log.WithContext(ctx).WithFields(lf).WithError(err).Error("failed to get updated change")
return 1
}

for _, a := range fetchResponse.Msg.Change.Properties.AffectedAppsUUID {
appUuid, err := uuid.FromBytes(a)
if err != nil {
log.WithContext(ctx).WithError(err).WithFields(log.Fields{
"url": viper.GetString("url"),
"value": a,
}).Error("received invalid app uuid")
log.WithContext(ctx).WithFields(lf).WithError(err).WithField("app", a).Error("received invalid app uuid")
continue
}
log.WithContext(ctx).WithFields(log.Fields{
"change": createResponse.Msg.Change.Metadata.GetUUIDParsed(),
"change-url": fmt.Sprintf("%v/changes/%v", viper.GetString("frontend"), createResponse.Msg.Change.Metadata.GetUUIDParsed()),
log.WithContext(ctx).WithFields(lf).WithFields(log.Fields{
"change-url": changeUrl,
"app": appUuid,
"app-url": fmt.Sprintf("%v/apps/%v", viper.GetString("frontend"), appUuid),
}).Info("affected app")
Expand All @@ -188,11 +352,23 @@ func ChangeFromTfplan(signals chan os.Signal, ready chan bool) int {
return 0
}

func allDone(ctx context.Context, activeQueries map[uuid.UUID]bool, lf log.Fields) bool {
allDone := true
for q := range activeQueries {
if activeQueries[q] {
log.WithContext(ctx).WithFields(lf).WithField("query", q).Debugf("query still active")
allDone = false
break
}
}
return allDone
}

func init() {
rootCmd.AddCommand(changeFromTfplanCmd)

changeFromTfplanCmd.PersistentFlags().String("changes-url", "https://api.prod.overmind.tech/", "The changes service API endpoint")
changeFromTfplanCmd.PersistentFlags().String("frontend", "https://app.overmind.tech/", "The frontend base URL")
changeFromTfplanCmd.PersistentFlags().String("changes-url", "https://api.prod.overmind.tech", "The changes service API endpoint")
changeFromTfplanCmd.PersistentFlags().String("frontend", "https://app.overmind.tech", "The frontend base URL")

changeFromTfplanCmd.PersistentFlags().String("terraform", "terraform", "The binary to use for calling terraform. Will be looked up in the system PATH.")
changeFromTfplanCmd.PersistentFlags().String("tfplan", "./tfplan", "Parse changing items from this terraform plan file.")
Expand Down
Loading