Skip to content

Commit

Permalink
log cluster and controller events in the ringlog via logrus hook
Browse files Browse the repository at this point in the history
  • Loading branch information
mkabilov committed Aug 15, 2017
1 parent 82f58b5 commit 51fdfb9
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 31 deletions.
28 changes: 20 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
)

// Controller represents operator controller
Expand All @@ -26,29 +27,38 @@ type Controller struct {
KubeClient k8sutil.KubernetesClient
RestClient rest.Interface // kubernetes API group REST client

clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster
stopChs map[spec.NamespacedName]chan struct{}
stopCh chan struct{}

clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
teamClusters map[string][]spec.NamespacedName

postgresqlInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
podCh chan spec.PodEvent

clusterEventQueues []*cache.FIFO // [workerID]Queue
lastClusterSyncTime int64

workerLogs map[uint32]ringlog.RingLogger
}

// NewController creates a new controller
func NewController(controllerConfig *spec.ControllerConfig) *Controller {
logger := logrus.New()

c := &Controller{
config: *controllerConfig,
opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
podCh: make(chan spec.PodEvent),
config: *controllerConfig,
opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
teamClusters: make(map[string][]spec.NamespacedName),
stopCh: make(chan struct{}),
podCh: make(chan spec.PodEvent),
}
logger.Hooks.Add(c)

return c
}
Expand Down Expand Up @@ -149,6 +159,7 @@ func (c *Controller) initController() {
})

c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers)
for i := range c.clusterEventQueues {
c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
e, ok := obj.(spec.ClusterEvent)
Expand All @@ -172,6 +183,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {

for i := range c.clusterEventQueues {
wg.Add(1)
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
go c.processClusterEventsQueue(i, stopCh, wg)
}

Expand Down
62 changes: 39 additions & 23 deletions pkg/controller/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
)

func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -124,6 +127,21 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa
}), nil
}

func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *spec.Postgresql) *cluster.Cluster {
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg)
cl.Run(c.stopCh)
teamName := strings.ToLower(cl.Spec.TeamID)

defer c.clustersMu.Unlock()
c.clustersMu.Lock()

c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
c.clusters[clusterName] = cl
c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)

return cl
}

func (c *Controller) processEvent(obj interface{}) error {
var clusterName spec.NamespacedName

Expand Down Expand Up @@ -153,14 +171,7 @@ func (c *Controller) processEvent(obj interface{}) error {

lg.Infof("creation of the cluster started")

stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg)
cl.Run(stopCh)

c.clustersMu.Lock()
c.clusters[clusterName] = cl
c.stopChs[clusterName] = stopCh
c.clustersMu.Unlock()
cl = c.addCluster(lg, clusterName, event.NewSpec)

if err := cl.Create(); err != nil {
cl.Error = fmt.Errorf("could not create cluster: %v", err)
Expand All @@ -186,7 +197,9 @@ func (c *Controller) processEvent(obj interface{}) error {
cl.Error = nil
lg.Infoln("cluster has been updated")
case spec.EventDelete:
lg.Infof("deletion of the %q cluster started", clusterName)
teamName := strings.ToLower(cl.Spec.TeamID)

lg.Infoln("Deletion of the cluster started")
if !clusterFound {
lg.Errorf("unknown cluster: %q", clusterName)
return nil
Expand All @@ -196,31 +209,34 @@ func (c *Controller) processEvent(obj interface{}) error {
lg.Errorf("could not delete cluster: %v", err)
return nil
}
close(c.stopChs[clusterName])

c.clustersMu.Lock()
delete(c.clusters, clusterName)
delete(c.stopChs, clusterName)
c.clustersMu.Unlock()
func() {
defer c.clustersMu.Unlock()
c.clustersMu.Lock()

delete(c.clusters, clusterName)
delete(c.clusterLogs, clusterName)
for i, val := range c.teamClusters[teamName] { // on relativel
if val == clusterName {
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
break
}
}
}()

lg.Infof("cluster has been deleted")
case spec.EventSync:
lg.Infof("syncing of the cluster started")

// no race condition because a cluster is always processed by single worker
if !clusterFound {
stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg)
cl.Run(stopCh)

c.clustersMu.Lock()
c.clusters[clusterName] = cl
c.stopChs[clusterName] = stopCh
c.clustersMu.Unlock()
cl = c.addCluster(lg, clusterName, event.NewSpec)
}

if err := cl.Sync(); err != nil {
cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err)
cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/util/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
Workers uint32 `name:"workers" default:"4"`
RingLogLines int `name:"ring_log_lines" default:"100"`
}

// MustMarshal marshals the config or panics
Expand Down

0 comments on commit 51fdfb9

Please sign in to comment.