diff --git a/main.go b/main.go index 70f100d..79500e8 100644 --- a/main.go +++ b/main.go @@ -58,27 +58,31 @@ const ( other label = "other" ) -func main() { - config := struct { - KubeConfig string - Namespace string - StatefulSetLabel string - ClusterDomain string - ConfigMapName string - ConfigMapGeneratedName string - FileName string - Port int - Scheme string - InternalAddr string - AllowOnlyReadyReplicas bool - AllowDynamicScaling bool - AnnotatePodsOnChange bool - ScaleTimeout time.Duration - }{} +type CmdConfig struct { + KubeConfig string + Namespace string + StatefulSetLabel string + Label string + ClusterDomain string + ConfigMapName string + ConfigMapGeneratedName string + FileName string + Port int + Scheme string + InternalAddr string + AllowOnlyReadyReplicas bool + AllowDynamicScaling bool + AnnotatePodsOnChange bool + ScaleTimeout time.Duration +} + +func parseFlags() CmdConfig { + var config CmdConfig flag.StringVar(&config.KubeConfig, "kubeconfig", "", "Path to kubeconfig") flag.StringVar(&config.Namespace, "namespace", "default", "The namespace to watch") - flag.StringVar(&config.StatefulSetLabel, "statefulset-label", "controller.receive.thanos.io=thanos-receive-controller", "The label StatefulSets must have to be watched by the controller") + flag.StringVar(&config.StatefulSetLabel, "statefulset-label", "", "[DEPRECATED] The label StatefulSets must have to be watched by the controller") + flag.StringVar(&config.Label, "label", "controller.receive.thanos.io=thanos-receive-controller", "The label workloads must have to be watched by the controller.") flag.StringVar(&config.ClusterDomain, "cluster-domain", "cluster.local", "The DNS domain of the cluster") flag.StringVar(&config.ConfigMapName, "configmap-name", "", "The name of the original ConfigMap containing the hashring tenant configuration") flag.StringVar(&config.ConfigMapGeneratedName, "configmap-generated-name", "", "The name of the generated and populated ConfigMap") @@ -88,15 +92,30 @@ func main() { flag.StringVar(&config.InternalAddr, "internal-addr", ":8080", "The address on which internal server runs") flag.BoolVar(&config.AllowOnlyReadyReplicas, "allow-only-ready-replicas", false, "Populate only Ready receiver replicas in the hashring configuration") flag.BoolVar(&config.AllowDynamicScaling, "allow-dynamic-scaling", false, "Update the hashring configuration on scale down events.") - flag.BoolVar(&config.AnnotatePodsOnChange, "annotate-pods-on-change", false, "Annotates pods with latest configuration hash on a hashring change") + flag.BoolVar(&config.AnnotatePodsOnChange, "annotate-pods-on-change", false, "Annotates pods with current timestamp on a hashring change") flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy") flag.Parse() + return config +} + +func main() { logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) logger = log.WithPrefix(logger, "ts", log.DefaultTimestampUTC) logger = log.WithPrefix(logger, "caller", log.DefaultCaller) - labelKey, labelValue := splitLabel(config.StatefulSetLabel) + config := parseFlags() + + var tmpControllerLabel string + if len(config.StatefulSetLabel) > 0 { + tmpControllerLabel = config.StatefulSetLabel + + level.Warn(logger).Log("msg", "The --statefulset-label flag is deprecated. Please see the manual page for updates.") + } else { + tmpControllerLabel = config.Label + } + + labelKey, labelValue := splitLabel(tmpControllerLabel) konfig, err := clientcmd.BuildConfigFromFlags("", config.KubeConfig) if err != nil { @@ -267,7 +286,7 @@ const labelParts = 2 func splitLabel(in string) (key, value string) { parts := strings.Split(in, "=") if len(parts) != labelParts { - stdlog.Fatal("--statefulset-label must be of the form 'key=value'") + stdlog.Fatal("Labels consist of a key-value pair f.ex: 'key=value'") } return parts[0], parts[1] @@ -578,7 +597,7 @@ func (c *controller) sync(ctx context.Context) { // opposed to having to wait kubelet sync period + cache (see // https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#mounted-configmaps-are-updated-automatically) if err == nil && c.options.annotatePodsOnChange { - c.annotatePods(ctx, hashrings, statefulsets) + c.annotatePods(ctx) } } @@ -721,30 +740,34 @@ func (c *controller) saveHashring(ctx context.Context, hashring []receive.Hashri return nil } -func (c *controller) annotatePods(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) { - for _, h := range hashrings { - if sts, exists := statefulsets[h.Hashring]; exists { - for i := 0; i < int(*sts.Spec.Replicas); i++ { - podName := fmt.Sprintf("%s-%d", sts.Name, i) +func (c *controller) annotatePods(ctx context.Context) { + annotationKey := fmt.Sprintf("%s/%s", c.options.labelKey, "lastControllerUpdate") + updateTime := fmt.Sprintf("%d", time.Now().Unix()) - pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - level.Error(c.logger).Log("msg", "failed to get pod", "err", err) - } + // Select pods that have a controllerLabel matching ours. + podList, err := c.klient.CoreV1().Pods(c.options.namespace).List(ctx, + metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", c.options.labelKey, c.options.labelValue), + }) + if err != nil { + level.Error(c.logger).Log("msg", "failed to list pods belonging to controller", "err", err) + return + } - annotations := pod.Annotations - if annotations == nil { - annotations = make(map[string]string) - } + for _, pod := range podList.Items { + podObj := pod.DeepCopy() - annotations["annotationTimestamp"] = fmt.Sprintf("%d", time.Now().Unix()) - pod.SetAnnotations(annotations) + annotations := podObj.ObjectMeta.Annotations + if annotations == nil { + annotations = make(map[string]string) + } - _, err = c.klient.CoreV1().Pods(c.options.namespace).Update(ctx, pod, metav1.UpdateOptions{}) - if err != nil { - level.Error(c.logger).Log("msg", "failed to update pod", "err", err) - } - } + annotations[annotationKey] = updateTime + podObj.SetAnnotations(annotations) + + _, err := c.klient.CoreV1().Pods(pod.Namespace).Update(ctx, podObj, metav1.UpdateOptions{}) + if err != nil { + level.Error(c.logger).Log("msg", "failed to update pod", "err", err) } } }