-
Notifications
You must be signed in to change notification settings - Fork 507
/
cloud_messages.go
147 lines (126 loc) · 4.52 KB
/
cloud_messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package cli
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"os"
"strings"
"time"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
empty "google.golang.org/protobuf/types/known/emptypb"
"github.com/datawire/dlib/dtime"
"github.com/telepresenceio/telepresence/rpc/v2/systema"
"github.com/telepresenceio/telepresence/v2/pkg/client"
"github.com/telepresenceio/telepresence/v2/pkg/client/cache"
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/cliutil"
)
const messagesCacheFilename = "cloud-messages.json"
type cloudMessageCache struct {
NextCheck time.Time `json:"next_check"`
Intercept string `json:"intercept"`
MessagesDelivered map[string]struct{} `json:"messagess_delivered"`
}
// newCloudMessageCache returns a new CloudMessageCache, initialized from the users' if it exists
func newCloudMessageCache(ctx context.Context) (*cloudMessageCache, error) {
cmc := &cloudMessageCache{}
if err := cache.LoadFromUserCache(ctx, cmc, messagesCacheFilename); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
cmc.NextCheck = time.Time{}
cmc.MessagesDelivered = make(map[string]struct{})
_ = cache.SaveToUserCache(ctx, cmc, messagesCacheFilename)
}
return cmc, nil
}
// getCloudMessages communicates with Ambassador Cloud and stores those messages
// in the cloudMessageCache.
func getCloudMessages(ctx context.Context, systemaURL string) (*systema.CommandMessageResponse, error) {
u, err := url.Parse(systemaURL)
if err != nil {
return &systema.CommandMessageResponse{}, err
}
conn, err := grpc.DialContext(ctx,
(&url.URL{Scheme: "dns", Path: "/" + u.Host}).String(),
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ServerName: u.Hostname()})))
if err != nil {
return &systema.CommandMessageResponse{}, err
}
systemaClient := systema.NewSystemACliClient(conn)
return systemaClient.GetUnauthenticatedCommandMessages(ctx, &empty.Empty{})
}
func (cmc *cloudMessageCache) updateCacheMessages(ctx context.Context, resp *systema.CommandMessageResponse) {
// Update the messages
cmc.Intercept = resp.GetIntercept()
// Update the time to do the next check since we were successful
refreshMsgs := client.GetConfig(ctx).Cloud.RefreshMessages
cmc.NextCheck = dtime.Now().Add(refreshMsgs)
// We reset the messages delivered for all commands since they
// may have changed
cmc.MessagesDelivered = make(map[string]struct{})
}
func (cmc *cloudMessageCache) getMessageFromCache(ctx context.Context, cmdUsed string) string {
// Ensure that the message hasn't already been delivered to the user
// if it has, then we don't want to print any output so as to not
// annoy the user.
var msg string
if _, ok := cmc.MessagesDelivered[cmdUsed]; ok {
return msg
}
// Check if we have a message for the given command
switch cmdUsed {
case "intercept":
msg = cmc.Intercept
default:
// We don't currently have any messages for this command
// so our msg will remain empty
}
cmc.MessagesDelivered[cmdUsed] = struct{}{}
return msg
}
// raiseCloudMessage is what is called from `PostRunE` in a command and is responsible
// for raising the message for the command used.
func raiseCloudMessage(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
// Currently we only have messages that should be served when a user
// isn't logged in, so we check that here
if cliutil.HasLoggedIn(cmd.Context()) {
if _, err := cliutil.GetCloudUserInfo(ctx, false, true); err == nil {
return nil
}
}
// If the user has specified they are in an air-gapped cluster,
// we shouldn't try to get messages
cloudCfg := client.GetConfig(cmd.Context()).Cloud
if cloudCfg.SkipLogin {
return nil
}
// The command is the first word of cmd.Use
cmdUsed := strings.Split(cmd.Use, " ")[0]
// Load the config
cmc, err := newCloudMessageCache(ctx)
if err != nil {
return err
}
// Check if it is time to get new messages from Ambassador Cloud
if dtime.Now().After(cmc.NextCheck) {
systemaURL := fmt.Sprintf("https://%s:%s", cloudCfg.SystemaHost, cloudCfg.SystemaPort)
resp, err := getCloudMessages(ctx, systemaURL)
if err != nil {
// We try again in an hour since we encountered an error
cmc.NextCheck = dtime.Now().Add(1 * time.Hour)
} else {
cmc.updateCacheMessages(ctx, resp)
}
}
// Get the message from the cache that should be delivered to the user
msg := cmc.getMessageFromCache(ctx, cmdUsed)
if msg != "" {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n", msg)
}
_ = cache.SaveToUserCache(ctx, cmc, messagesCacheFilename)
return nil
}