Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ config/crd/bases/*.yaml binary
zz_generated_deepcopy.go binary

apps/releases/assets/operators/** binary
*.pb.go binary
13 changes: 2 additions & 11 deletions agent/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ import (
)

type Env struct {
// KafkaSASLUser string `env:"KAFKA_SASL_USER" required:"true"`
// KafkaSASLPassword string `env:"KAFKA_SASL_PASSWORD" required:"true"`
// KafkaSASLMechanism redpanda.SASLMechanism `env:"KAFKA_SASL_MECHANISM"`

// KafkaBrokers string `env:"KAFKA_BROKERS" required:"true"`
// KafkaConsumerGroupId string `env:"KAFKA_CONSUMER_GROUP_ID" required:"true"`
// KafkaIncomingTopic string `env:"KAFKA_INCOMING_TOPIC" required:"true"`
// KafkaErrorOnApplyTopic string `env:"KAFKA_ERROR_ON_APPLY_TOPIC" required:"true"`

GrpcAddr string `env:"GRPC_ADDR" required:"true"`

ClusterToken string `env:"CLUSTER_TOKEN" required:"false"`
Expand All @@ -24,8 +15,8 @@ type Env struct {
ClusterName string `env:"CLUSTER_NAME" required:"true"`
AccountName string `env:"ACCOUNT_NAME" required:"true"`

HarborSecretName string `env:"HARBOR_SECRET_NAME" required:"true"`
HarborSecretNamespace string `env:"HARBOR_SECRET_NAMESPACE" required:"true"`
ImagePullSecretName string `env:"IMAGE_PULL_SECRET_NAME" required:"true"`
ImagePullSecretNamespace string `env:"IMAGE_PULL_SECRET_NAMESPACE" required:"true"`
}

func GetEnvOrDie() *Env {
Expand Down
149 changes: 94 additions & 55 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"encoding/json"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"log"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/rest"
Expand All @@ -34,7 +35,7 @@ type grpcHandler struct {
msgDispatchCli messages.MessageDispatchServiceClient
}

func (g *grpcHandler) handleErrorOnApply(ctx context.Context, err error, msg t.AgentMessage) error {
func (g *grpcHandler) handleErrorOnApply(err error, msg t.AgentMessage) error {
g.logger.Debugf("[ERROR]: %s", err.Error())

b, err := json.Marshal(t.AgentErrMessage{
Expand Down Expand Up @@ -70,28 +71,25 @@ func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
mLogger := g.logger.WithKV("gvk", obj.GetObjectKind().GroupVersionKind().String()).WithKV("clusterName", msg.ClusterName).WithKV("accountName", msg.AccountName).WithKV("action", msg.Action)

mLogger.Infof("[%d] received message", g.inMemCounter)
// defer func() {
// mLogger.Infof("processed message [%d]", g.inMemCounter)
// }()

if len(strings.TrimSpace(msg.AccountName)) == 0 {
return g.handleErrorOnApply(ctx, fmt.Errorf("field 'accountName' must be defined in message"), msg)
return g.handleErrorOnApply(fmt.Errorf("field 'accountName' must be defined in message"), msg)
}

switch msg.Action {
case "apply", "delete":
{
b, err := yaml.Marshal(msg.Object)
if err != nil {
return g.handleErrorOnApply(ctx, err, msg)
return g.handleErrorOnApply(err, msg)
}

if msg.Action == "apply" {
_, err := g.yamlClient.ApplyYAML(ctx, b)
if err != nil {
mLogger.Infof("[%d] [error-on-apply]: %s", g.inMemCounter, err.Error())
mLogger.Infof("[%d] failed to process message", g.inMemCounter)
return g.handleErrorOnApply(ctx, err, msg)
return g.handleErrorOnApply(err, msg)
}
mLogger.Infof("[%d] processed message", g.inMemCounter)
return nil
Expand All @@ -101,7 +99,7 @@ func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
err := g.yamlClient.DeleteYAML(ctx, b)
if err != nil {
mLogger.Infof("[%d] [error-on-delete]: %s", err.Error())
return g.handleErrorOnApply(ctx, err, msg)
return g.handleErrorOnApply(err, msg)
}
mLogger.Infof("[%d] processed message", g.inMemCounter)
return nil
Expand All @@ -113,69 +111,107 @@ func (g *grpcHandler) handleMessage(msg t.AgentMessage) error {
err := fmt.Errorf("invalid action (%s)", msg.Action)
mLogger.Infof("[%d] [error]: %s", err.Error())
mLogger.Infof("[%d] failed to process message", g.inMemCounter)
return g.handleErrorOnApply(ctx, err, msg)
return g.handleErrorOnApply(err, msg)
}
}
}

func (g *grpcHandler) ensureAccessToken() error {
if g.ev.AccessToken == "" {
g.logger.Infof("waiting on clusterToken exchange for accessToken")
func (g *grpcHandler) ensureAccessToken(ctx context.Context) error {
if g.ev.AccessToken != "" {
return nil
}

out, err := g.msgDispatchCli.GetAccessToken(context.TODO(), &messages.GetClusterTokenIn{
ClusterToken: g.ev.ClusterToken,
})
if err != nil {
return err
}
g.logger.Infof("waiting on clusterToken exchange for accessToken")

s, err := g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.AccessTokenSecretNamespace).Get(context.TODO(), g.ev.AccessTokenSecretName, metav1.GetOptions{})
if err != nil {
return err
}
out, err := g.msgDispatchCli.GetAccessToken(ctx, &messages.GetClusterTokenIn{
ClusterToken: g.ev.ClusterToken,
})
if err != nil {
return err
}

delete(s.Data, "CLUSTER_TOKEN")
s.Data["ACCESS_TOKEN"] = []byte(out.AccessToken)
_, err = g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.AccessTokenSecretNamespace).Update(context.TODO(), s, metav1.UpdateOptions{})
if err != nil {
return err
}
s, err := g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.AccessTokenSecretNamespace).Get(context.TODO(), g.ev.AccessTokenSecretName, metav1.GetOptions{})
if err != nil {
return err
}

g.ev.AccessToken = out.AccessToken

harborSecret, err := yaml.Marshal(corev1.Secret{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Secret",
},
ObjectMeta: metav1.ObjectMeta{
Name: g.ev.HarborSecretName,
Namespace: g.ev.HarborSecretNamespace,
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{
".dockerconfigjson": []byte(out.HarborDockerConfigJson),
},
})
if err != nil {
return err
}
delete(s.Data, "CLUSTER_TOKEN")
s.Data["ACCESS_TOKEN"] = []byte(out.AccessToken)
_, err = g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.AccessTokenSecretNamespace).Update(context.TODO(), s, metav1.UpdateOptions{})
if err != nil {
return err
}

g.ev.AccessToken = out.AccessToken
return nil

if _, err := g.yamlClient.ApplyYAML(context.TODO(), harborSecret); err != nil {
}

func (g *grpcHandler) ensureImagePullSecretCreds(ctx context.Context) error {
hs, err := g.yamlClient.K8sClient.CoreV1().Secrets(g.ev.ImagePullSecretNamespace).Get(ctx, g.ev.ImagePullSecretName, metav1.GetOptions{})
if err != nil {
if !apiErrors.IsNotFound(err) {
return err
}
g.logger.Infof("image pull secret not found, will now be asking for it from message office")
hs = nil
}

if hs != nil {
g.logger.Infof("image-pull-secret credentials secret found")
return nil
}

g.logger.Infof("waiting on image-pull-secret credentials from message office")

out, err := g.msgDispatchCli.GetDockerCredentials(ctx, &messages.GetDockerCredentialsIn{
AccessToken: g.ev.AccessToken,
ClusterName: g.ev.ClusterName,
AccountName: g.ev.AccountName,
})
if err != nil {
return err
}
return nil
}

func (g *grpcHandler) run(conn *grpc.ClientConn) error {
if err := g.ensureAccessToken(); err != nil {
imgPullSecret, err := yaml.Marshal(corev1.Secret{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Secret",
},
ObjectMeta: metav1.ObjectMeta{
Name: g.ev.ImagePullSecretName,
Namespace: g.ev.ImagePullSecretNamespace,
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{
".dockerconfigjson": []byte(out.DockerConfigJson),
},
})
if err != nil {
return err
}

if _, err := g.yamlClient.ApplyYAML(ctx, imgPullSecret); err != nil {
return err
}

g.logger.Infof("image-pull-secret credentials received from message office, and written to k8s secret (%s/%s)", g.ev.ImagePullSecretNamespace, g.ev.ImagePullSecretName)

return nil
}

func (g *grpcHandler) run() error {
ctx, cf := context.WithCancel(context.TODO())
defer cf()

if err := g.ensureAccessToken(ctx); err != nil {
return err
}

if err := g.ensureImagePullSecretCreds(ctx); err != nil {
return err
}

errorsCli, err := g.msgDispatchCli.ReceiveErrors(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -210,7 +246,10 @@ func (g *grpcHandler) run(conn *grpc.ClientConn) error {
return err
}

g.handleMessage(msg)
if err := g.handleMessage(msg); err != nil {
g.logger.Errorf(err, "[ERROR] while handling message")
return err
}
}
}

Expand Down Expand Up @@ -266,7 +305,7 @@ func main() {
logger.Infof("GRPC connection successful")

g.msgDispatchCli = messages.NewMessageDispatchServiceClient(cc)
if err := g.run(cc); err != nil {
if err := g.run(); err != nil {
logger.Errorf(err, "running grpc sendActions")
}

Expand Down
6 changes: 5 additions & 1 deletion apis/crds/v1/project_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func (p *Project) GetStatus() *rApi.Status {
}

func (p *Project) GetEnsuredLabels() map[string]string {
return map[string]string{constants.ProjectNameKey: p.Name}
return map[string]string{
constants.ProjectNameKey: p.Name,
constants.AccountNameKey: p.Spec.AccountName,
constants.ClusterNameKey: p.Spec.ClusterName,
}
}

func (p *Project) GetEnsuredAnnotations() map[string]string {
Expand Down
Loading