-
Notifications
You must be signed in to change notification settings - Fork 240
/
nats.go
131 lines (101 loc) · 3.01 KB
/
nats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package logs
import (
"context"
"encoding/json"
"fmt"
"net"
"github.com/nats-io/nats.go"
"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/flyctl"
"github.com/superfly/flyctl/pkg/agent"
)
type natsLogStream struct {
nc *nats.Conn
err error
}
func NewNatsStream(ctx context.Context, apiClient *api.Client, opts *LogOptions) (LogStream, error) {
app, err := apiClient.GetApp(ctx, opts.AppName)
if err != nil {
return nil, fmt.Errorf("failed fetching target app: %w", err)
}
agentclient, err := agent.Establish(ctx, apiClient)
if err != nil {
return nil, fmt.Errorf("failed establishing agent: %w", err)
}
dialer, err := agentclient.Dialer(ctx, app.Organization.Slug)
if err != nil {
return nil, fmt.Errorf("failed establishing wireguard connection for %s organization: %w", app.Organization.Slug, err)
}
if err = agentclient.WaitForTunnel(ctx, app.Organization.Slug); err != nil {
return nil, fmt.Errorf("failed connecting to WireGuard tunnel: %w", err)
}
nc, err := newNatsClient(ctx, dialer, app.Organization.Slug)
if err != nil {
return nil, fmt.Errorf("failed creating nats connection: %w", err)
}
return &natsLogStream{nc: nc}, nil
}
// natsLogStream implements LogStream
func (s *natsLogStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
s.err = fromNats(ctx, out, s.nc, opts)
}()
return out
}
func (s *natsLogStream) Err() error {
return s.err
}
func newNatsClient(ctx context.Context, dialer agent.Dialer, orgSlug string) (*nats.Conn, error) {
state := dialer.State()
peerIP := net.ParseIP(state.Peer.Peerip)
var natsIPBytes [16]byte
copy(natsIPBytes[0:], peerIP[0:6])
natsIPBytes[15] = 3
natsIP := net.IP(natsIPBytes[:])
url := fmt.Sprintf("nats://[%s]:4223", natsIP.String())
conn, err := nats.Connect(url, nats.SetCustomDialer(&natsDialer{dialer, ctx}), nats.UserInfo(orgSlug, flyctl.GetAPIToken()))
if err != nil {
return nil, fmt.Errorf("failed connecting to nats: %w", err)
}
return conn, nil
}
type natsDialer struct {
agent.Dialer
ctx context.Context
}
func (d *natsDialer) Dial(network, address string) (net.Conn, error) {
return d.Dialer.DialContext(d.ctx, network, address)
}
func fromNats(ctx context.Context, out chan<- LogEntry, nc *nats.Conn, opts *LogOptions) (err error) {
var sub *nats.Subscription
if sub, err = nc.SubscribeSync(opts.toNatsSubject()); err != nil {
return
}
defer sub.Unsubscribe()
var log natsLog
for {
var msg *nats.Msg
if msg, err = sub.NextMsgWithContext(ctx); err != nil {
break
}
if err = json.Unmarshal(msg.Data, &log); err != nil {
err = fmt.Errorf("failed parsing log: %w", err)
break
}
out <- LogEntry{
Instance: log.Fly.App.Instance,
Level: log.Log.Level,
Message: log.Message,
Region: log.Fly.Region,
Timestamp: log.Timestamp,
Meta: Meta{
Instance: log.Fly.App.Instance,
Region: log.Fly.Region,
Event: struct{ Provider string }{log.Event.Provider},
},
}
}
return
}