diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e48375dfd..7e340abb3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -326,6 +326,18 @@ func (c *Controller) initSharedInformers() { func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.initController() + // start workers reading from the events queue to prevent the initial sync from blocking on it. + for i := range c.clusterEventQueues { + wg.Add(1) + c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines) + go c.processClusterEventsQueue(i, stopCh, wg) + } + + // populate clusters before starting nodeInformer that relies on it and run the initial sync + if err := c.acquireInitialListOfClusters(); err != nil { + panic("could not acquire initial list of clusters") + } + wg.Add(5) go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) @@ -333,11 +345,6 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { go c.apiserver.Run(stopCh, wg) go c.kubeNodesInformer(stopCh, wg) - for i := range c.clusterEventQueues { - wg.Add(1) - c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines) - go c.processClusterEventsQueue(i, stopCh, wg) - } c.logger.Info("started working in background") } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 58a7afcb2..4e5df42a7 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -32,7 +32,7 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { for { select { case <-ticker.C: - if _, err := c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"}); err != nil { + if err := c.clusterListAndSync(); err != nil { c.logger.Errorf("could not list clusters: %v", err) } case <-stopCh: @@ -41,15 +41,10 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { } } -// TODO: make a separate function to be called from InitSharedInformers -// clusterListFunc obtains a list of all PostgreSQL clusters and runs sync when necessary -// NB: as this function is called directly by the informer, it needs to avoid acquiring locks -// on individual cluster structures. Therefore, it acts on the manifests obtained from Kubernetes -// and not on the internal state of the clusters. -func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) { +// clusterListFunc obtains a list of all PostgreSQL clusters +func (c *Controller) listClusters(options metav1.ListOptions) (*spec.PostgresqlList, error) { var ( - list spec.PostgresqlList - event spec.EventType + list spec.PostgresqlList ) req := c.KubeClient.CRDREST. @@ -67,21 +62,42 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object c.logger.Warningf("could not unmarshal list of clusters: %v", err) } + return &list, err + +} + +// A separate function to be called from InitSharedInformers +func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) { + return c.listClusters(options) +} + +// clusterListAndSync lists all manifests and decides whether to run the sync or repair. +func (c *Controller) clusterListAndSync() error { + var ( + err error + event spec.EventType + ) + currentTime := time.Now().Unix() timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime) timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime) + if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) { event = spec.EventSync } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) { event = spec.EventRepair } if event != "" { - c.queueEvents(&list, event) + var list *spec.PostgresqlList + if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil { + return err + } + c.queueEvents(list, event) } else { c.logger.Infof("not enough time passed since the last sync (%s seconds) or repair (%s seconds)", timeFromPreviousSync, timeFromPreviousRepair) } - return &list, err + return nil } // queueEvents queues a sync or repair event for every cluster with a valid manifest @@ -125,6 +141,30 @@ func (c *Controller) queueEvents(list *spec.PostgresqlList, event spec.EventType } } +func (c *Controller) acquireInitialListOfClusters() error { + var ( + list *spec.PostgresqlList + err error + clusterName spec.NamespacedName + ) + + if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil { + return err + } + c.logger.Debugf("acquiring initial list of clusters") + for _, pg := range list.Items { + if pg.Error != nil { + continue + } + clusterName = util.NameFromMeta(pg.ObjectMeta) + c.addCluster(c.logger, clusterName, &pg) + c.logger.Debugf("added new cluster: %q", clusterName) + } + // initiate initial sync of all clusters. + c.queueEvents(list, spec.EventSync) + return nil +} + type crdDecoder struct { dec *json.Decoder close func() error diff --git a/pkg/util/config/util.go b/pkg/util/config/util.go index dbe411045..aef333ce7 100644 --- a/pkg/util/config/util.go +++ b/pkg/util/config/util.go @@ -172,10 +172,9 @@ func processField(value string, field reflect.Value) error { type parserState int const ( - Plain parserState = iota - DoubleQuoted - SingleQuoted - Escape + plain parserState = iota + doubleQuoted + singleQuoted ) // Split the pair candidates by commas not located inside open quotes @@ -183,7 +182,7 @@ const ( // expect to find them inside the map values for our use cases func getMapPairsFromString(value string) (pairs []string, err error) { pairs = make([]string, 0) - state := Plain + state := plain var start, quote int for i, ch := range strings.Split(value, "") { @@ -191,29 +190,29 @@ func getMapPairsFromString(value string) (pairs []string, err error) { fmt.Printf("Parser warning: ecape character '\\' have no effect on quotes inside the configuration value %s\n", value) } if ch == `"` { - if state == Plain { - state = DoubleQuoted + if state == plain { + state = doubleQuoted quote = i - } else if state == DoubleQuoted { - state = Plain + } else if state == doubleQuoted { + state = plain quote = 0 } } if ch == "'" { - if state == Plain { - state = SingleQuoted + if state == plain { + state = singleQuoted quote = i - } else if state == SingleQuoted { - state = Plain + } else if state == singleQuoted { + state = plain quote = 0 } } - if ch == "," && state == Plain { + if ch == "," && state == plain { pairs = append(pairs, strings.Trim(value[start:i], " \t")) start = i + 1 } } - if state != Plain { + if state != plain { err = fmt.Errorf("unmatched quote starting at position %d", quote+1) pairs = nil } else {