Skip to content

Commit

Permalink
Refactor logs tail command with channel and visitors (#661)
Browse files Browse the repository at this point in the history
* Refactor logs tail command

* cleanup

* renaming

* Renaming
  • Loading branch information
vcheung-stripe committed May 10, 2021
1 parent 106cfa8 commit 75e8179
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 101 deletions.
134 changes: 122 additions & 12 deletions pkg/cmd/logs/tail.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
package logs

import (
"fmt"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"time"

"github.com/briandowns/spinner"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"context"

"github.com/stripe/stripe-cli/pkg/ansi"
"github.com/stripe/stripe-cli/pkg/config"
"github.com/stripe/stripe-cli/pkg/logtailing"
logTailing "github.com/stripe/stripe-cli/pkg/logtailing"
"github.com/stripe/stripe-cli/pkg/validators"
"github.com/stripe/stripe-cli/pkg/version"
)

const requestLogsWebSocketFeature = "request_logs"
const outputFormatJSON = "JSON"

// TailCmd wraps the configuration for the tail command
type TailCmd struct {
Expand Down Expand Up @@ -118,6 +127,21 @@ Acceptable values:
return tailCmd
}

func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context {
// Create a context that will be canceled when Ctrl+C is pressed
ctx, cancel := context.WithCancel(ctx)

interruptCh := make(chan os.Signal, 1)
signal.Notify(interruptCh, os.Interrupt, syscall.SIGTERM)

go func() {
<-interruptCh
onCancel()
cancel()
}()
return ctx
}

func (tailCmd *TailCmd) runTailCmd(cmd *cobra.Command, args []string) error {
err := tailCmd.validateArgs()
if err != nil {
Expand All @@ -141,20 +165,35 @@ func (tailCmd *TailCmd) runTailCmd(cmd *cobra.Command, args []string) error {

version.CheckLatestVersion()

logger := log.StandardLogger()

logtailingVisitor := createVisitor(logger, tailCmd.format)

logtailingOutCh := make(chan logTailing.IElement)

tailer := logTailing.New(&logTailing.Config{
APIBaseURL: tailCmd.apiBaseURL,
DeviceName: deviceName,
Filters: tailCmd.LogFilters,
Key: key,
Log: log.StandardLogger(),
NoWSS: tailCmd.noWSS,
OutputFormat: strings.ToUpper(tailCmd.format),
WebSocketFeature: requestLogsWebSocketFeature,
APIBaseURL: tailCmd.apiBaseURL,
DeviceName: deviceName,
Filters: tailCmd.LogFilters,
Key: key,
Log: logger,
NoWSS: tailCmd.noWSS,
OutCh: logtailingOutCh,
})

err = tailer.Run(context.Background())
if err != nil {
return err
ctx := withSIGTERMCancel(context.Background(), func() {
log.WithFields(log.Fields{
"prefix": "logtailing.Tailer.Run",
}).Debug("Ctrl+C received, cleaning up...")
})

go tailer.Run(ctx)

for el := range logtailingOutCh {
err := el.Accept(logtailingVisitor)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -204,3 +243,74 @@ func (tailCmd *TailCmd) convertArgs() error {

return nil
}

func createVisitor(logger *log.Logger, format string) *logtailing.Visitor {
var s *spinner.Spinner

return &logtailing.Visitor{
VisitError: func(ee logTailing.ErrorElement) error {
ansi.StopSpinner(s, "", logger.Out)
return ee.Error
},
VisitWarning: func(we logTailing.WarningElement) error {
color := ansi.Color(os.Stdout)
fmt.Printf("%s %s\n", color.Yellow("Warning"), we.Warning)
return nil
},
VisitStatus: func(se logTailing.StateElement) error {
switch se.State {
case logTailing.Loading:
s = ansi.StartNewSpinner("Getting ready...", logger.Out)
case logtailing.Reconnecting:
ansi.StartSpinner(s, "Session expired, reconnecting...", logger.Out)
case logtailing.Ready:
ansi.StopSpinner(s, "Ready! You're now waiting to receive API request logs (^C to quit)", logger.Out)
case logtailing.Done:
ansi.StopSpinner(s, "", logger.Out)
}
return nil
},
VisitLog: func(le logTailing.LogElement) error {
if strings.ToUpper(format) == outputFormatJSON {
fmt.Println(ansi.ColorizeJSON(le.MarshalledLog, false, os.Stdout))
return nil
}

coloredStatus := ansi.ColorizeStatus(le.Log.Status)

url := urlForRequestID(&le.Log)
requestLink := ansi.Linkify(le.Log.RequestID, url, os.Stdout)

if le.Log.URL == "" {
le.Log.URL = "[View path in dashboard]"
}

exampleLayout := "2006-01-02 15:04:05"
localTime := time.Unix(int64(le.Log.CreatedAt), 0).Format(exampleLayout)

color := ansi.Color(os.Stdout)
outputStr := fmt.Sprintf("%s [%d] %s %s [%s]", color.Faint(localTime), coloredStatus, le.Log.Method, le.Log.URL, requestLink)
fmt.Println(outputStr)

errorValues := reflect.ValueOf(&le.Log.Error).Elem()
errType := errorValues.Type()

for i := 0; i < errorValues.NumField(); i++ {
fieldValue := errorValues.Field(i).Interface()
if fieldValue != "" {
fmt.Printf("%s: %s\n", errType.Field(i).Name, fieldValue)
}
}
return nil
},
}
}

func urlForRequestID(payload *logtailing.EventPayload) string {
maybeTest := ""
if !payload.Livemode {
maybeTest = "/test"
}

return fmt.Sprintf("https://dashboard.stripe.com%s/logs/%s", maybeTest, payload.RequestID)
}
115 changes: 34 additions & 81 deletions pkg/logtailing/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ import (
"fmt"
"io/ioutil"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"time"

log "github.com/sirupsen/logrus"

"github.com/stripe/stripe-cli/pkg/ansi"
"github.com/stripe/stripe-cli/pkg/stripeauth"
"github.com/stripe/stripe-cli/pkg/websocket"
)

const outputFormatJSON = "JSON"
const requestLogsWebSocketFeature = "request_logs"

// LogFilters contains all of the potential user-provided filters for log tailing
type LogFilters struct {
Expand Down Expand Up @@ -52,11 +47,8 @@ type Config struct {
// Force use of unencrypted ws:// protocol instead of wss://
NoWSS bool

// Output format for request logs
OutputFormat string

// WebSocketFeature is the feature specified for the websocket connection
WebSocketFeature string
// OutCh is the channel to send logs and statuses to for processing in other packages
OutCh chan IElement
}

// Tailer is the main interface for running the log tailing session
Expand Down Expand Up @@ -106,47 +98,33 @@ func New(cfg *Config) *Tailer {
}
}

func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context {
// Create a context that will be canceled when Ctrl+C is pressed
ctx, cancel := context.WithCancel(ctx)

interruptCh := make(chan os.Signal, 1)
signal.Notify(interruptCh, os.Interrupt, syscall.SIGTERM)

go func() {
<-interruptCh
onCancel()
cancel()
}()
return ctx
}

const maxConnectAttempts = 3

// Run sets the websocket connection
func (t *Tailer) Run(ctx context.Context) error {
s := ansi.StartNewSpinner("Getting ready...", t.cfg.Log.Out)

ctx = withSIGTERMCancel(ctx, func() {
log.WithFields(log.Fields{
"prefix": "logtailing.Tailer.Run",
}).Debug("Ctrl+C received, cleaning up...")
})
defer close(t.cfg.OutCh)

var warned = false
var nAttempts int = 0

t.cfg.OutCh <- StateElement{
State: Loading,
}

for nAttempts < maxConnectAttempts {
session, err := t.createSession(ctx)

if err != nil {
ansi.StopSpinner(s, "", t.cfg.Log.Out)
t.cfg.Log.Fatalf("Error while authenticating with Stripe: %v", err)
t.cfg.OutCh <- ErrorElement{
Error: fmt.Errorf("Error while authenticating with Stripe: %v", err),
}
return err
}

if session.DisplayConnectFilterWarning && !warned {
color := ansi.Color(os.Stdout)
fmt.Printf("%s you specified the 'account' filter for Connect accounts but are not a Connect user, so the filter will not be applied.\n", color.Yellow("Warning"))
t.cfg.OutCh <- WarningElement{
Warning: "you specified the 'account' filter for Connect accounts but are not a Connect user, so the filter will not be applied.",
}
// Only display this warning once
warned = true
}
Expand All @@ -166,21 +144,31 @@ func (t *Tailer) Run(ctx context.Context) error {
go func() {
<-t.webSocketClient.Connected()
nAttempts = 0
ansi.StopSpinner(s, "Ready! You're now waiting to receive API request logs (^C to quit)", t.cfg.Log.Out)
t.cfg.OutCh <- StateElement{
State: Ready,
}
}()

go t.webSocketClient.Run(ctx)
nAttempts++

select {
case <-ctx.Done():
ansi.StopSpinner(s, "", t.cfg.Log.Out)
t.cfg.OutCh <- &StateElement{
State: Done,
}
return nil
case <-t.webSocketClient.NotifyExpired:
if nAttempts < maxConnectAttempts {
ansi.StartSpinner(s, "Session expired, reconnecting...", t.cfg.Log.Out)
t.cfg.OutCh <- &StateElement{
State: Reconnecting,
}
} else {
t.cfg.Log.Fatalf("Session expired. Terminating after %d failed attempts to reauthorize", nAttempts)
err := fmt.Errorf("Session expired. Terminating after %d failed attempts to reauthorize", nAttempts)
t.cfg.OutCh <- ErrorElement{
Error: err,
}
return err
}
}
}
Expand All @@ -205,14 +193,14 @@ func (t *Tailer) createSession(ctx context.Context) (*stripeauth.StripeCLISessio

filters, err := jsonifyFilters(t.cfg.Filters)
if err != nil {
t.cfg.Log.Fatalf("Error while converting log filters to JSON encoding: %v", err)
return nil, fmt.Errorf("Error while converting log filters to JSON encoding: %v", err)
}

go func() {
// Try to authorize at least 5 times before failing. Sometimes we have random
// transient errors that we just need to retry for.
for i := 0; i <= 5; i++ {
session, err = t.stripeAuthClient.Authorize(ctx, t.cfg.DeviceName, t.cfg.WebSocketFeature, &filters)
session, err = t.stripeAuthClient.Authorize(ctx, t.cfg.DeviceName, requestLogsWebSocketFeature, &filters)

if err == nil {
exitCh <- struct{}{}
Expand Down Expand Up @@ -258,35 +246,9 @@ func (t *Tailer) processRequestLogEvent(msg websocket.IncomingMessage) {
return
}

if strings.ToUpper(t.cfg.OutputFormat) == outputFormatJSON {
fmt.Println(ansi.ColorizeJSON(requestLogEvent.EventPayload, false, os.Stdout))
return
}

coloredStatus := ansi.ColorizeStatus(payload.Status)

url := urlForRequestID(&payload)
requestLink := ansi.Linkify(payload.RequestID, url, os.Stdout)

if payload.URL == "" {
payload.URL = "[View path in dashboard]"
}

exampleLayout := "2006-01-02 15:04:05"
localTime := time.Unix(int64(payload.CreatedAt), 0).Format(exampleLayout)

color := ansi.Color(os.Stdout)
outputStr := fmt.Sprintf("%s [%d] %s %s [%s]", color.Faint(localTime), coloredStatus, payload.Method, payload.URL, requestLink)
fmt.Println(outputStr)

errorValues := reflect.ValueOf(&payload.Error).Elem()
errType := errorValues.Type()

for i := 0; i < errorValues.NumField(); i++ {
fieldValue := errorValues.Field(i).Interface()
if fieldValue != "" {
fmt.Printf("%s: %s\n", errType.Field(i).Name, fieldValue)
}
t.cfg.OutCh <- LogElement{
Log: payload,
MarshalledLog: requestLogEvent.EventPayload,
}
}

Expand All @@ -300,12 +262,3 @@ func jsonifyFilters(logFilters *LogFilters) (string, error) {

return jsonStr, nil
}

func urlForRequestID(payload *EventPayload) string {
maybeTest := ""
if !payload.Livemode {
maybeTest = "/test"
}

return fmt.Sprintf("https://dashboard.stripe.com%s/logs/%s", maybeTest, payload.RequestID)
}
8 changes: 0 additions & 8 deletions pkg/logtailing/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,3 @@ func TestJsonifyFiltersEmpty(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "{}", filtersStr)
}

func TestURLForRequestID(t *testing.T) {
evt := &EventPayload{RequestID: "req_123", Livemode: false}
require.Equal(t, "https://dashboard.stripe.com/test/logs/req_123", urlForRequestID(evt))

evt = &EventPayload{RequestID: "req_123", Livemode: true}
require.Equal(t, "https://dashboard.stripe.com/logs/req_123", urlForRequestID(evt))
}

0 comments on commit 75e8179

Please sign in to comment.