Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,18 +326,25 @@ func (c *Controller) initSharedInformers() {
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
c.initController()

// start workers reading from the events queue to prevent the initial sync from blocking on it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that basically means that workers must always start before acquireInitialListOfClusters executes since it queues the 1st Sync event, right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and if there are no consumers of that queue acquireInitialListOfClusters function will block forever...

for i := range c.clusterEventQueues {
wg.Add(1)
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
go c.processClusterEventsQueue(i, stopCh, wg)
}

// populate clusters before starting nodeInformer that relies on it and run the initial sync
if err := c.acquireInitialListOfClusters(); err != nil {
panic("could not acquire initial list of clusters")
}

wg.Add(5)
go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg)
go c.clusterResync(stopCh, wg)
go c.apiserver.Run(stopCh, wg)
go c.kubeNodesInformer(stopCh, wg)

for i := range c.clusterEventQueues {
wg.Add(1)
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
go c.processClusterEventsQueue(i, stopCh, wg)
}

c.logger.Info("started working in background")
}
Expand Down
62 changes: 51 additions & 11 deletions pkg/controller/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
for {
select {
case <-ticker.C:
if _, err := c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
if err := c.clusterListAndSync(); err != nil {
c.logger.Errorf("could not list clusters: %v", err)
}
case <-stopCh:
Expand All @@ -41,15 +41,10 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
}
}

// TODO: make a separate function to be called from InitSharedInformers
// clusterListFunc obtains a list of all PostgreSQL clusters and runs sync when necessary
// NB: as this function is called directly by the informer, it needs to avoid acquiring locks
// on individual cluster structures. Therefore, it acts on the manifests obtained from Kubernetes
// and not on the internal state of the clusters.
func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) {
// clusterListFunc obtains a list of all PostgreSQL clusters
func (c *Controller) listClusters(options metav1.ListOptions) (*spec.PostgresqlList, error) {
var (
list spec.PostgresqlList
event spec.EventType
list spec.PostgresqlList
)

req := c.KubeClient.CRDREST.
Expand All @@ -67,21 +62,42 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
c.logger.Warningf("could not unmarshal list of clusters: %v", err)
}

return &list, err

}

// A separate function to be called from InitSharedInformers
func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) {
return c.listClusters(options)
}

// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
func (c *Controller) clusterListAndSync() error {
var (
err error
event spec.EventType
)

currentTime := time.Now().Unix()
timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)

if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
event = spec.EventSync
} else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
event = spec.EventRepair
}
if event != "" {
c.queueEvents(&list, event)
var list *spec.PostgresqlList
if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
return err
}
c.queueEvents(list, event)
} else {
c.logger.Infof("not enough time passed since the last sync (%s seconds) or repair (%s seconds)",
timeFromPreviousSync, timeFromPreviousRepair)
}
return &list, err
return nil
}

// queueEvents queues a sync or repair event for every cluster with a valid manifest
Expand Down Expand Up @@ -125,6 +141,30 @@ func (c *Controller) queueEvents(list *spec.PostgresqlList, event spec.EventType
}
}

func (c *Controller) acquireInitialListOfClusters() error {
var (
list *spec.PostgresqlList
err error
clusterName spec.NamespacedName
)

if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
return err
}
c.logger.Debugf("acquiring initial list of clusters")
for _, pg := range list.Items {
if pg.Error != nil {
continue
}
clusterName = util.NameFromMeta(pg.ObjectMeta)
c.addCluster(c.logger, clusterName, &pg)
c.logger.Debugf("added new cluster: %q", clusterName)
}
// initiate initial sync of all clusters.
c.queueEvents(list, spec.EventSync)
return nil
}

type crdDecoder struct {
dec *json.Decoder
close func() error
Expand Down
29 changes: 14 additions & 15 deletions pkg/util/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,48 +172,47 @@ func processField(value string, field reflect.Value) error {
type parserState int

const (
Plain parserState = iota
DoubleQuoted
SingleQuoted
Escape
plain parserState = iota
doubleQuoted
singleQuoted
)

// Split the pair candidates by commas not located inside open quotes
// Escape characters are not supported for simplicity, as we don't
// expect to find them inside the map values for our use cases
func getMapPairsFromString(value string) (pairs []string, err error) {
pairs = make([]string, 0)
state := Plain
state := plain
var start, quote int

for i, ch := range strings.Split(value, "") {
if (ch == `"` || ch == `'`) && i > 0 && value[i-1] == '\\' {
fmt.Printf("Parser warning: ecape character '\\' have no effect on quotes inside the configuration value %s\n", value)
}
if ch == `"` {
if state == Plain {
state = DoubleQuoted
if state == plain {
state = doubleQuoted
quote = i
} else if state == DoubleQuoted {
state = Plain
} else if state == doubleQuoted {
state = plain
quote = 0
}
}
if ch == "'" {
if state == Plain {
state = SingleQuoted
if state == plain {
state = singleQuoted
quote = i
} else if state == SingleQuoted {
state = Plain
} else if state == singleQuoted {
state = plain
quote = 0
}
}
if ch == "," && state == Plain {
if ch == "," && state == plain {
pairs = append(pairs, strings.Trim(value[start:i], " \t"))
start = i + 1
}
}
if state != Plain {
if state != plain {
err = fmt.Errorf("unmatched quote starting at position %d", quote+1)
pairs = nil
} else {
Expand Down