Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .tools/nvim/dap/go.lua
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,16 @@ dap.configurations.go = {
MAX_CONCURRENT_RECONCILES = "1",
},
},
{
type = "go",
name = "Debug wireguard-operator",
request = "launch",
program = vim.g.root_dir .. "/operators/wireguard",
args = { "--dev" },
console = "externalTerminal",
-- externalTerminal = true,
envFile = {
vim.g.root_dir .. "/operators/wireguard" .. "/.secrets/env",
},
},
}
2 changes: 2 additions & 0 deletions agent/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Env struct {
ImagePullSecretNamespace string `env:"IMAGE_PULL_SECRET_NAMESPACE" required:"true"`

VectorProxyGrpcServerAddr string `env:"VECTOR_PROXY_GRPC_SERVER_ADDR" required:"true"`
ResourceWatcherName string `env:"RESOURCE_WATCHER_NAME" required:"true"`
ResourceWatcherNamespace string `env:"RESOURCE_WATCHER_NAMESPACE" required:"true"`
}

func GetEnvOrDie() *Env {
Expand Down
85 changes: 59 additions & 26 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -49,12 +50,7 @@ func (g *grpcHandler) handleErrorOnApply(err error, msg t.AgentMessage) error {
return err
}

return g.errorsCli.Send(&messages.ErrorData{
AccessToken: g.ev.AccessToken,
ClusterName: g.ev.ClusterName,
AccountName: g.ev.AccountName,
Data: b,
})
return g.errorsCli.Send(&messages.ErrorData{Message: b})
}

func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
Expand Down Expand Up @@ -116,57 +112,88 @@ func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
}
}

func (g *grpcHandler) ensureAccessToken(ctx context.Context) error {
if g.ev.AccessToken != "" {
return nil
func (g *grpcHandler) ensureAccessToken() error {
ctx, cf := context.WithTimeout(context.TODO(), 50*time.Second)
defer cf()
if g.ev.AccessToken == "" {
g.logger.Infof("waiting on clusterToken exchange for accessToken")
}

validationOut, err := g.msgDispatchCli.ValidateAccessToken(ctx, &messages.ValidateAccessTokenIn{
AccountName: g.ev.AccountName,
ClusterName: g.ev.ClusterName,
AccessToken: g.ev.AccessToken,
})

if err != nil || validationOut == nil || !validationOut.Valid {
g.logger.Infof("accessToken is invalid, requesting new accessToken ...")
}

g.logger.Infof("waiting on clusterToken exchange for accessToken")
if validationOut.Valid {
g.logger.Infof("accessToken is valid, proceeding with it ...")
return nil
}

out, err := g.msgDispatchCli.GetAccessToken(ctx, &messages.GetClusterTokenIn{
AccountName: g.ev.AccountName,
ClusterName: g.ev.ClusterName,
ClusterToken: g.ev.ClusterToken,
})
if err != nil {
return err
}

g.logger.Infof("valid access token has been obtained, persisting it in k8s secret (%s/%s)...", g.ev.AccessTokenSecretNamespace, g.ev.AccessTokenSecretName)

s, err := g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.AccessTokenSecretNamespace).Get(context.TODO(), g.ev.AccessTokenSecretName, metav1.GetOptions{})
if err != nil {
return err
}

delete(s.Data, "CLUSTER_TOKEN")
s.Data["ACCESS_TOKEN"] = []byte(out.AccessToken)
_, err = g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.AccessTokenSecretNamespace).Update(context.TODO(), s, metav1.UpdateOptions{})
if err != nil {
return err
}

g.ev.AccessToken = out.AccessToken
return nil

if g.ev.ResourceWatcherNamespace != "" {
// need to restart resource watcher
d, err := g.yamlClient.K8sClient.AppsV1().Deployments(g.ev.ResourceWatcherNamespace).Get(ctx, g.ev.ResourceWatcherName, metav1.GetOptions{})
if err != nil {
return err
}
podLabelSelector := metav1.LabelSelector{}
for k, v := range d.Spec.Selector.MatchLabels {
metav1.AddLabelToSelector(&podLabelSelector, k, v)
}

if err := g.yamlClient.K8sClient.CoreV1().Pods(g.ev.ResourceWatcherNamespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(&podLabelSelector)}); err != nil {
g.logger.Errorf(err, "failed to delete pods for resource watcher")
}
g.logger.Infof("deleted all pods for resource watcher, they will be recreated")
}

return nil
}

func (g *grpcHandler) run() error {
ctx, cf := context.WithCancel(context.TODO())
defer cf()

if err := g.ensureAccessToken(ctx); err != nil {
return err
}
md := metadata.MD{}
md.Set("authorization", g.ev.AccessToken)
outgoingCtx := metadata.NewOutgoingContext(ctx, md)

errorsCli, err := g.msgDispatchCli.ReceiveErrors(ctx)
errorsCli, err := g.msgDispatchCli.ReceiveErrors(outgoingCtx)
if err != nil {
return err
}

g.errorsCli = errorsCli

msgActionsCli, err := g.msgDispatchCli.SendActions(ctx, &messages.StreamActionsRequest{
AccessToken: g.ev.AccessToken,
ClusterName: g.ev.ClusterName,
AccountName: g.ev.AccountName,
})
msgActionsCli, err := g.msgDispatchCli.SendActions(outgoingCtx, &messages.Empty{})
if err != nil {
return err
}
Expand All @@ -184,7 +211,7 @@ func (g *grpcHandler) run() error {
return err
}

if err := json.Unmarshal(a.Data, &msg); err != nil {
if err := json.Unmarshal(a.Message, &msg); err != nil {
g.logger.Errorf(err, "[ERROR] while json unmarshal")
return err
}
Expand Down Expand Up @@ -218,7 +245,7 @@ func main() {

yamlClient := func() *kubectl.YAMLClient {
if isDev {
return kubectl.NewYAMLClientOrDie(&rest.Config{Host: "localhost:8080"})
return kubectl.NewYAMLClientOrDie(&rest.Config{Host: "localhost:8081"})
}
config, err := rest.InClusterConfig()
if err != nil {
Expand Down Expand Up @@ -255,9 +282,9 @@ func main() {

for {
cc, err := func() (*grpc.ClientConn, error) {
// if isDev {
// return libGrpc.Connect(ev.GrpcAddr)
// }
if isDev {
return libGrpc.Connect(ev.GrpcAddr)
}
return libGrpc.ConnectSecure(ev.GrpcAddr)
}()
if err != nil {
Expand All @@ -267,6 +294,12 @@ func main() {
logger.Infof("GRPC connection successful")

g.msgDispatchCli = messages.NewMessageDispatchServiceClient(cc)

if err := g.ensureAccessToken(); err != nil {
logger.Errorf(err, "ensuring access token")
}

vps.accessToken = g.ev.AccessToken
vps.realVectorClient = proto_rpc.NewVectorClient(cc)

if err := g.run(); err != nil {
Expand Down
24 changes: 6 additions & 18 deletions agent/vector-grpc-proxy-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ import (
"google.golang.org/grpc/metadata"
)

const (
AccessTokenHeader string = "kloudlite-access-token"
AccountNameHeader string = "kloudlite-account-name"
ClusterNameHeader string = "kloudlite-cluster-name"
)

type vectorGrpcProxyServer struct {
proto_rpc.UnimplementedVectorServer
realVectorClient proto_rpc.VectorClient
Expand All @@ -30,17 +24,14 @@ type vectorGrpcProxyServer struct {

func (v *vectorGrpcProxyServer) PushEvents(ctx context.Context, msg *proto_rpc.PushEventsRequest) (*proto_rpc.PushEventsResponse, error) {
if v.realVectorClient == nil {
return nil, fmt.Errorf("real vector client is not established yet")
return nil, fmt.Errorf("vector client is not yet connected to message-office")
}

md := metadata.Pairs(AccessTokenHeader, v.accessToken)
md.Append(AccountNameHeader, v.accountName)
md.Append(ClusterNameHeader, v.clusterName)

outgoingCtx := metadata.NewOutgoingContext(ctx, md)
outgoingCtx := metadata.NewOutgoingContext(ctx, metadata.Pairs("authorization", v.accessToken))

v.pushEventsCounter++
v.logger.Infof("[%v] received push-events message", v.pushEventsCounter)
defer v.logger.Infof("[%v] dispatched push-events message", v.pushEventsCounter)

per, err := v.realVectorClient.PushEvents(outgoingCtx, msg)
if err != nil {
Expand All @@ -53,17 +44,14 @@ func (v *vectorGrpcProxyServer) PushEvents(ctx context.Context, msg *proto_rpc.P

func (v *vectorGrpcProxyServer) HealthCheck(ctx context.Context, msg *proto_rpc.HealthCheckRequest) (*proto_rpc.HealthCheckResponse, error) {
if v.realVectorClient == nil {
return nil, fmt.Errorf("real vector client is not established yet")
return nil, fmt.Errorf("vector client is not yet connected to message-office")
}

md := metadata.Pairs(AccessTokenHeader, v.accessToken)
md.Append(AccountNameHeader, v.accountName)
md.Append(ClusterNameHeader, v.clusterName)

outgoingCtx := metadata.NewOutgoingContext(ctx, md)
outgoingCtx := metadata.NewOutgoingContext(ctx, metadata.Pairs("authorization", v.accessToken))

v.healthCheckCounter++
v.logger.Infof("[%v] received health-check message", v.healthCheckCounter)
defer v.logger.Infof("[%v] dispatched health-check message", v.healthCheckCounter)
hcr, err := v.realVectorClient.HealthCheck(outgoingCtx, msg)
if err != nil {
v.logger.Error(err)
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/sykesm/zap-logfmt v0.0.4 h1:U2WzRvmIWG1wDLCFY3sz8UeEmsdHQjHFNlIdmroVFaI=
github.com/sykesm/zap-logfmt v0.0.4/go.mod h1:AuBd9xQjAe3URrWT1BBDk2v2onAZHkZkWRMiYZXiZWA=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/twmb/franz-go v1.6.0 h1:yri7YsVBe/k1LKcoZSLILgUI3U14e82qtD9i4VOcs9c=
Expand Down Expand Up @@ -722,14 +724,18 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.starlark.net v0.0.0-20220928063852-5fccb4daaf6d h1:aF+anaRVZu22kdETjLavnIn/cvD+arhmik6vMU3joW4=
go.starlark.net v0.0.0-20220928063852-5fccb4daaf6d/go.mod h1:kIVgS18CjmEC3PqMd5kaJSGEifyV/CeB9x506ZJ1Vbk=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
Expand Down Expand Up @@ -982,6 +988,8 @@ golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
Loading