Skip to content

Commit

Permalink
Use state from agent.Dialer
Browse files Browse the repository at this point in the history
  • Loading branch information
rugwirobaker committed Oct 6, 2021
1 parent 5481760 commit c0812bf
Showing 1 changed file with 6 additions and 36 deletions.
42 changes: 6 additions & 36 deletions pkg/logs/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"os/user"
"path/filepath"

"net"
"time"

"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/flyctl"
"github.com/superfly/flyctl/pkg/agent"
"github.com/superfly/flyctl/terminal"
"gopkg.in/yaml.v2"
)

type natsLogStream struct {
Expand Down Expand Up @@ -45,7 +42,7 @@ func NewNatsStream(apiClient *api.Client, opts *LogOptions) (LogStream, error) {

tunnelCtx, cancel := context.WithTimeout(ctx, 4*time.Minute)
defer cancel()
// wait for the tunnel to be ready

if err = agentclient.WaitForTunnel(tunnelCtx, &app.Organization); err != nil {
return nil, errors.Wrap(err, "unable to connect WireGuard tunnel")
}
Expand Down Expand Up @@ -74,7 +71,6 @@ func (s *natsLogStream) Stream(ctx context.Context, opts *LogOptions) <-chan Log
} else {
subject = fmt.Sprintf("%s.%s", subject, "*")
}
// subject = fmt.Sprintf("%s.%s", subject, ">")

terminal.Debug("subscribing to nats subject: ", subject)

Expand Down Expand Up @@ -119,32 +115,19 @@ func (s *natsLogStream) Err() error {

func newNatsClient(ctx context.Context, dialer agent.Dialer, app *api.App) (*nats.Conn, error) {

var flyConf flyConfig
usr, _ := user.Current()
flyConfFile, err := os.Open(filepath.Join(usr.HomeDir, ".fly", "config.yml"))
if err != nil {
return nil, errors.Wrap(err, "could not read fly config yml")
}
if err := yaml.NewDecoder(flyConfFile).Decode(&flyConf); err != nil {
return nil, errors.Wrap(err, "could not decode fly config yml")
}

state, ok := flyConf.WireGuardState[app.Organization.Slug]
if !ok {
return nil, errors.New("could not find org in fly config")
}
state := dialer.State()

peerIP := state.Peer.PeerIP
peerIP := net.ParseIP(state.Peer.Peerip)

var natsIPBytes [16]byte
copy(natsIPBytes[0:], peerIP[0:6])
natsIPBytes[15] = 3

natsIP := net.IP(natsIPBytes[:])

terminal.Debug("connecting to nats")
terminal.Debug("connecting to nats server: ", natsIP.String())

conn, err := nats.Connect(fmt.Sprintf("nats://[%s]:4223", natsIP.String()), nats.SetCustomDialer(&natsDialer{dialer, ctx}), nats.UserInfo(app.Organization.Slug, flyConf.AccessToken))
conn, err := nats.Connect(fmt.Sprintf("nats://[%s]:4223", natsIP.String()), nats.SetCustomDialer(&natsDialer{dialer, ctx}), nats.UserInfo(app.Organization.Slug, flyctl.GetAPIToken()))
if err != nil {
return nil, errors.Wrap(err, "could not connect to nats")
}
Expand All @@ -159,16 +142,3 @@ type natsDialer struct {
func (d *natsDialer) Dial(network, address string) (net.Conn, error) {
return d.Dialer.DialContext(d.ctx, network, address)
}

type flyConfig struct {
AccessToken string `yaml:"access_token"`
WireGuardState map[string]wgState `yaml:"wire_guard_state"`
}

type wgState struct {
Peer wgPeer `yaml:"peer"`
}

type wgPeer struct {
PeerIP net.IP `yaml:"peerip"`
}

0 comments on commit c0812bf

Please sign in to comment.