Skip to content

Commit

Permalink
annotate all pods under our domain, not just those in the STS (#120)
Browse files Browse the repository at this point in the history
* PR120 rebase to master

* pr feedback, deprecate --statefulset-label

* refactor flags into parseFlags() due to lint limit

* make lint and github actions lint do not align

* collapse if else and reduce indentation

* remove comments, set updateTime once for all pods
  • Loading branch information
tekicode committed Nov 6, 2023
1 parent f168dd7 commit c57219e
Showing 1 changed file with 65 additions and 42 deletions.
107 changes: 65 additions & 42 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,31 @@ const (
other label = "other"
)

func main() {
config := struct {
KubeConfig string
Namespace string
StatefulSetLabel string
ClusterDomain string
ConfigMapName string
ConfigMapGeneratedName string
FileName string
Port int
Scheme string
InternalAddr string
AllowOnlyReadyReplicas bool
AllowDynamicScaling bool
AnnotatePodsOnChange bool
ScaleTimeout time.Duration
}{}
type CmdConfig struct {
KubeConfig string
Namespace string
StatefulSetLabel string
Label string
ClusterDomain string
ConfigMapName string
ConfigMapGeneratedName string
FileName string
Port int
Scheme string
InternalAddr string
AllowOnlyReadyReplicas bool
AllowDynamicScaling bool
AnnotatePodsOnChange bool
ScaleTimeout time.Duration
}

func parseFlags() CmdConfig {
var config CmdConfig

flag.StringVar(&config.KubeConfig, "kubeconfig", "", "Path to kubeconfig")
flag.StringVar(&config.Namespace, "namespace", "default", "The namespace to watch")
flag.StringVar(&config.StatefulSetLabel, "statefulset-label", "controller.receive.thanos.io=thanos-receive-controller", "The label StatefulSets must have to be watched by the controller")
flag.StringVar(&config.StatefulSetLabel, "statefulset-label", "", "[DEPRECATED] The label StatefulSets must have to be watched by the controller")
flag.StringVar(&config.Label, "label", "controller.receive.thanos.io=thanos-receive-controller", "The label workloads must have to be watched by the controller.")
flag.StringVar(&config.ClusterDomain, "cluster-domain", "cluster.local", "The DNS domain of the cluster")
flag.StringVar(&config.ConfigMapName, "configmap-name", "", "The name of the original ConfigMap containing the hashring tenant configuration")
flag.StringVar(&config.ConfigMapGeneratedName, "configmap-generated-name", "", "The name of the generated and populated ConfigMap")
Expand All @@ -88,15 +92,30 @@ func main() {
flag.StringVar(&config.InternalAddr, "internal-addr", ":8080", "The address on which internal server runs")
flag.BoolVar(&config.AllowOnlyReadyReplicas, "allow-only-ready-replicas", false, "Populate only Ready receiver replicas in the hashring configuration")
flag.BoolVar(&config.AllowDynamicScaling, "allow-dynamic-scaling", false, "Update the hashring configuration on scale down events.")
flag.BoolVar(&config.AnnotatePodsOnChange, "annotate-pods-on-change", false, "Annotates pods with latest configuration hash on a hashring change")
flag.BoolVar(&config.AnnotatePodsOnChange, "annotate-pods-on-change", false, "Annotates pods with current timestamp on a hashring change")
flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy")
flag.Parse()

return config
}

func main() {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.WithPrefix(logger, "ts", log.DefaultTimestampUTC)
logger = log.WithPrefix(logger, "caller", log.DefaultCaller)

labelKey, labelValue := splitLabel(config.StatefulSetLabel)
config := parseFlags()

var tmpControllerLabel string
if len(config.StatefulSetLabel) > 0 {
tmpControllerLabel = config.StatefulSetLabel

level.Warn(logger).Log("msg", "The --statefulset-label flag is deprecated. Please see the manual page for updates.")
} else {
tmpControllerLabel = config.Label
}

labelKey, labelValue := splitLabel(tmpControllerLabel)

konfig, err := clientcmd.BuildConfigFromFlags("", config.KubeConfig)
if err != nil {
Expand Down Expand Up @@ -267,7 +286,7 @@ const labelParts = 2
func splitLabel(in string) (key, value string) {
parts := strings.Split(in, "=")
if len(parts) != labelParts {
stdlog.Fatal("--statefulset-label must be of the form 'key=value'")
stdlog.Fatal("Labels consist of a key-value pair f.ex: 'key=value'")
}

return parts[0], parts[1]
Expand Down Expand Up @@ -578,7 +597,7 @@ func (c *controller) sync(ctx context.Context) {
// opposed to having to wait kubelet sync period + cache (see
// https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#mounted-configmaps-are-updated-automatically)
if err == nil && c.options.annotatePodsOnChange {
c.annotatePods(ctx, hashrings, statefulsets)
c.annotatePods(ctx)
}
}

Expand Down Expand Up @@ -721,30 +740,34 @@ func (c *controller) saveHashring(ctx context.Context, hashring []receive.Hashri
return nil
}

func (c *controller) annotatePods(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) {
for _, h := range hashrings {
if sts, exists := statefulsets[h.Hashring]; exists {
for i := 0; i < int(*sts.Spec.Replicas); i++ {
podName := fmt.Sprintf("%s-%d", sts.Name, i)
func (c *controller) annotatePods(ctx context.Context) {
annotationKey := fmt.Sprintf("%s/%s", c.options.labelKey, "lastControllerUpdate")
updateTime := fmt.Sprintf("%d", time.Now().Unix())

pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
level.Error(c.logger).Log("msg", "failed to get pod", "err", err)
}
// Select pods that have a controllerLabel matching ours.
podList, err := c.klient.CoreV1().Pods(c.options.namespace).List(ctx,
metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", c.options.labelKey, c.options.labelValue),
})
if err != nil {
level.Error(c.logger).Log("msg", "failed to list pods belonging to controller", "err", err)
return
}

annotations := pod.Annotations
if annotations == nil {
annotations = make(map[string]string)
}
for _, pod := range podList.Items {
podObj := pod.DeepCopy()

annotations["annotationTimestamp"] = fmt.Sprintf("%d", time.Now().Unix())
pod.SetAnnotations(annotations)
annotations := podObj.ObjectMeta.Annotations
if annotations == nil {
annotations = make(map[string]string)
}

_, err = c.klient.CoreV1().Pods(c.options.namespace).Update(ctx, pod, metav1.UpdateOptions{})
if err != nil {
level.Error(c.logger).Log("msg", "failed to update pod", "err", err)
}
}
annotations[annotationKey] = updateTime
podObj.SetAnnotations(annotations)

_, err := c.klient.CoreV1().Pods(pod.Namespace).Update(ctx, podObj, metav1.UpdateOptions{})
if err != nil {
level.Error(c.logger).Log("msg", "failed to update pod", "err", err)
}
}
}
Expand Down

0 comments on commit c57219e

Please sign in to comment.