Skip to content

Commit

Permalink
Stream logs from nats
Browse files Browse the repository at this point in the history
Try and stream log from nats and fallback to polling case of an error.

- [ ] Streaming from nats  currently blocks forever.
- [ ] Need to make a better unified format.
- [ ] Handle errors
  • Loading branch information
rugwirobaker committed Oct 6, 2021
1 parent e6a9156 commit a06cb0d
Show file tree
Hide file tree
Showing 12 changed files with 626 additions and 24 deletions.
4 changes: 2 additions & 2 deletions cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ func watchDeployment(ctx context.Context, cmdCtx *cmdctx.CmdContext) error {
}

cmdCtx.Status("deploy", cmdctx.STITLE, "Recent Logs")
logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
logPresenter.FPrint(cmdCtx.Out, cmdCtx.OutputJSON(), alloc.RecentLogs)
// logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
// logPresenter.FPrint(cmdCtx.Out, cmdCtx.OutputJSON(), alloc.RecentLogs)
}

}
Expand Down
40 changes: 32 additions & 8 deletions cmd/logs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cmd

import (
"github.com/superfly/flyctl/cmd/presenters"
"github.com/superfly/flyctl/cmdctx"
"github.com/superfly/flyctl/internal/client"
"github.com/superfly/flyctl/internal/monitor"
"github.com/superfly/flyctl/pkg/logs"
"github.com/superfly/flyctl/terminal"

"github.com/superfly/flyctl/docstrings"
)
Expand All @@ -27,12 +29,34 @@ func newLogsCommand(client *client.Client) *Command {
return cmd
}

func runLogs(ctx *cmdctx.CmdContext) error {
err := monitor.WatchLogs(ctx, ctx.Out, monitor.LogOptions{
AppName: ctx.AppName,
VMID: ctx.Config.GetString("instance"),
RegionCode: ctx.Config.GetString("region"),
})
func runLogs(cc *cmdctx.CmdContext) error {
ctx := createCancellableContext()

client := cc.Client.API()

opts := &logs.LogOptions{
AppName: cc.Config.GetString("app"),
RegionCode: cc.Config.GetString("region"),
VMID: cc.Config.GetString("instance"),
}

stream, err := logs.NewNatsStream(client, opts)

if err != nil {
terminal.Debugf("could not connect to wireguard tunnel, err: %v\n", err)
terminal.Debug("Falling back to log polling...")

stream, err = logs.NewPollingStream(client)
if err != nil {
return err
}
}

presenter := presenters.LogPresenter{}

for entry := range stream.Stream(ctx, opts) {
presenter.FPrint(cc.Out, false, entry)
}

return err
return nil
}
4 changes: 2 additions & 2 deletions cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func monitorDeployment(ctx context.Context, commandContext *cmdctx.CmdContext) e
}

commandContext.Status("monitor", cmdctx.STITLE, "Recent Logs")
logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
logPresenter.FPrint(commandContext.Out, commandContext.OutputJSON(), alloc.RecentLogs)
// logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
// logPresenter.FPrint(commandContext.Out, commandContext.OutputJSON(), alloc.RecentLogs)

}

Expand Down
10 changes: 4 additions & 6 deletions cmd/presenters/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"

"github.com/logrusorgru/aurora"
"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/pkg/logs"
)

type LogPresenter struct {
Expand All @@ -16,16 +16,14 @@ type LogPresenter struct {
HideAllocID bool
}

func (lp *LogPresenter) FPrint(w io.Writer, asJSON bool, entries []api.LogEntry) {
for _, entry := range entries {
lp.printEntry(w, asJSON, entry)
}
func (lp *LogPresenter) FPrint(w io.Writer, asJSON bool, entry logs.LogEntry) {
lp.printEntry(w, asJSON, entry)
}

var newLineReplacer = strings.NewReplacer("\r\n", aurora.Faint("↩︎").String(), "\n", aurora.Faint("↩︎").String())
var newline = []byte("\n")

func (lp *LogPresenter) printEntry(w io.Writer, asJSON bool, entry api.LogEntry) {
func (lp *LogPresenter) printEntry(w io.Writer, asJSON bool, entry logs.LogEntry) {
if asJSON {
outBuf, _ := json.MarshalIndent(entry, "", " ")
fmt.Fprintln(w, string(outBuf))
Expand Down
4 changes: 2 additions & 2 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func runAllocStatus(ctx *cmdctx.CmdContext) error {
p = ctx.Out
}

logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
logPresenter.FPrint(p, ctx.OutputJSON(), alloc.RecentLogs)
// logPresenter := presenters.LogPresenter{HideAllocID: true, HideRegion: true, RemoveNewlines: true}
// logPresenter.FPrint(p, ctx.OutputJSON(), alloc.RecentLogs)

if p != ctx.Out {
_ = pw.Flush()
Expand Down
5 changes: 2 additions & 3 deletions internal/monitor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/jpillora/backoff"
"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/cmd/presenters"
"github.com/superfly/flyctl/cmdctx"
"github.com/superfly/flyctl/terminal"
)
Expand Down Expand Up @@ -37,7 +36,7 @@ func WatchLogs(cc *cmdctx.CmdContext, w io.Writer, opts LogOptions) error {

nextToken := ""

logPresenter := presenters.LogPresenter{}
// logPresenter := presenters.LogPresenter{}

for {
entries, token, err := cc.Client.API().GetAppLogs(opts.AppName, nextToken, opts.RegionCode, opts.VMID)
Expand All @@ -63,7 +62,7 @@ func WatchLogs(cc *cmdctx.CmdContext, w io.Writer, opts LogOptions) error {
} else {
b.Reset()

logPresenter.FPrint(w, false, entries)
// logPresenter.FPrint(w, false, entries)

if token != "" {
nextToken = token
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func StartDaemon(ctx context.Context, api *api.Client, command string) (*Client,
return nil, err
}
agentPid := cmd.Process.Pid
terminal.Debugf("started agent process %d", agentPid)
terminal.Debug("started agent process ", agentPid)

// read stdout and stderr from the daemon process. If it
// includes "[pid] OK" we know it started successfully, and
Expand Down

0 comments on commit a06cb0d

Please sign in to comment.