Skip to content

Commit

Permalink
Use logs package everywhere instead of monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
rugwirobaker committed Oct 6, 2021
1 parent e8c7929 commit ebdc8ec
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 141 deletions.
158 changes: 19 additions & 139 deletions cmd/machine.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,30 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/user"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/AlecAivazis/survey/v2"
surveyterminal "github.com/AlecAivazis/survey/v2/terminal"
"github.com/dustin/go-humanize"
"github.com/google/shlex"
"github.com/logrusorgru/aurora"
"github.com/nats-io/nats.go"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/cmd/presenters"
"github.com/superfly/flyctl/cmdctx"
"github.com/superfly/flyctl/docstrings"
"github.com/superfly/flyctl/flyctl"
"github.com/superfly/flyctl/internal/build/imgsrc"
"github.com/superfly/flyctl/internal/client"
"github.com/superfly/flyctl/internal/cmdutil"
"github.com/superfly/flyctl/internal/monitor"
"github.com/superfly/flyctl/pkg/agent"
"github.com/superfly/flyctl/pkg/logs"
"github.com/superfly/flyctl/terminal"
"gopkg.in/yaml.v2"
)

func newMachineCommand(client *client.Client) *Command {
Expand Down Expand Up @@ -569,145 +561,33 @@ func runMachineRun(cmdCtx *cmdctx.CmdContext) error {

apiClient := cmdCtx.Client.API()

dialer, err := func() (agent.Dialer, error) {
ctx := createCancellableContext()
terminal.Debug("establishing agent connection")
agentclient, err := agent.Establish(ctx, apiClient)
if err != nil {
return nil, errors.Wrap(err, "error establishing agent")
}
opts := &logs.LogOptions{
AppName: app.Name,
VMID: machine.ID,
}

dialer, err := agentclient.Dialer(ctx, &app.Organization)
if err != nil {
return nil, errors.Wrapf(err, "error establishing wireguard connection for %s organization", app.Organization.Slug)
}
stream, err := logs.NewNatsStream(apiClient, opts)

tunnelCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
terminal.Debug("waiting for tunnel")
if err = agentclient.WaitForTunnel(tunnelCtx, &app.Organization); err != nil {
return nil, errors.Wrap(err, "unable to connect WireGuard tunnel")
}

return dialer, nil
}()
if err != nil {
terminal.Debugf("could not connect to wireguard tunnel, err: %v\n", err)
terminal.Debug("Falling back to log polling...")
err := monitor.WatchLogs(cmdCtx, cmdCtx.Out, monitor.LogOptions{
AppName: app.Name,
VMID: machine.ID,
})

return err
}

var flyConf flyConfig
usr, _ := user.Current()
flyConfFile, err := os.Open(filepath.Join(usr.HomeDir, ".fly", "config.yml"))
if err != nil {
return errors.Wrap(err, "could not read fly config yml")
}
if err := yaml.NewDecoder(flyConfFile).Decode(&flyConf); err != nil {
return errors.Wrap(err, "could not decode fly config yml")
}

state, ok := flyConf.WireGuardState[app.Organization.Slug]
if !ok {
return errors.New("could not find org in fly config")
}

peerIP := state.Peer.PeerIP

var natsIPBytes [16]byte
copy(natsIPBytes[0:], peerIP[0:6])
natsIPBytes[15] = 3

natsIP := net.IP(natsIPBytes[:])

terminal.Debug("connecting to nats")
natsConn, err := nats.Connect(fmt.Sprintf("nats://[%s]:4223", natsIP.String()), nats.SetCustomDialer(&natsDialer{dialer, ctx}), nats.UserInfo(app.Organization.Slug, flyConf.AccessToken))
if err != nil {
return errors.Wrap(err, "could not connect to nats")
}

subject := fmt.Sprintf("logs.%s.*.%s", app.Name, machine.ID)
terminal.Debugf("subscribing to nats subject: %s\n", subject)
sub, err := natsConn.Subscribe(subject, func(msg *nats.Msg) {
var log natsLog
if err := json.Unmarshal(msg.Data, &log); err != nil {
terminal.Error(errors.Wrap(err, "could not parse log"))
return
stream, err = logs.NewPollingStream(apiClient, opts)
if err != nil {
return err
}
w := os.Stdout
fmt.Fprintf(w, "%s ", aurora.Faint(log.Timestamp))
fmt.Fprintf(w, "%s[%s]", log.Event.Provider, log.Fly.App.Instance)
fmt.Fprint(w, " ")
fmt.Fprintf(w, "%s ", aurora.Green(log.Fly.Region))
fmt.Fprintf(w, "[%s] ", aurora.Colorize(log.Log.Level, levelColor(log.Log.Level)))
_, _ = w.Write([]byte(log.Message))
fmt.Fprintln(w, "")
})
if err != nil {
return errors.Wrap(err, "could not sub to logs via nats")
}
defer sub.Unsubscribe()

<-ctx.Done()
presenter := presenters.LogPresenter{}

return nil
}
entries := stream.Stream(ctx, opts)

func levelColor(level string) aurora.Color {
switch level {
case "debug":
return aurora.CyanFg
case "info":
return aurora.BlueFg
case "warn":
case "warning":
return aurora.YellowFg
for {
select {
case <-ctx.Done():
return stream.Err()
case entry := <-entries:
presenter.FPrint(cmdCtx.Out, cmdCtx.OutputJSON(), entry)
}
}
return aurora.RedFg
}

type natsLog struct {
Event struct {
Provider string `json:"provider"`
} `json:"event"`
Fly struct {
App struct {
Instance string `json:"instance"`
Name string `json:"name"`
} `json:"app"`
Region string `json:"region"`
} `json:"fly"`
Host string `json:"host"`
Log struct {
Level string `json:"level"`
} `json:"log"`
Message string `json:"message"`
Timestamp string `json:"timestamp"`
}

type natsDialer struct {
agent.Dialer
ctx context.Context
}

func (d *natsDialer) Dial(network, address string) (net.Conn, error) {
return d.Dialer.DialContext(d.ctx, network, address)
}

type flyConfig struct {
AccessToken string `yaml:"access_token"`
WireGuardState map[string]wgState `yaml:"wire_guard_state"`
}

type wgState struct {
Peer wgPeer `yaml:"peer"`
}

type wgPeer struct {
PeerIP net.IP `yaml:"peerip"`
}
17 changes: 15 additions & 2 deletions cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"

"github.com/superfly/flyctl/cmdctx"
"github.com/superfly/flyctl/pkg/logs"
"github.com/superfly/flyctl/terminal"

"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/cmd/presenters"
Expand Down Expand Up @@ -112,8 +114,19 @@ func monitorDeployment(ctx context.Context, commandContext *cmdctx.CmdContext) e
}

commandContext.Status("monitor", cmdctx.STITLE, "Recent Logs")
// logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
// logPresenter.FPrint(commandContext.Out, commandContext.OutputJSON(), alloc.RecentLogs)
logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
terminal.Debug("logs", "Fetching logs for %s", alloc.ID)
for _, e := range alloc.RecentLogs {
entry := logs.LogEntry{
Instance: e.Instance,
Level: e.Level,
Message: e.Message,
Region: e.Region,
Timestamp: e.Timestamp,
Meta: e.Meta,
}
logPresenter.FPrint(commandContext.Out, commandContext.OutputJSON(), entry)
}

}

Expand Down

0 comments on commit ebdc8ec

Please sign in to comment.