Skip to content

Commit

Permalink
First check IO container status and optionally delay first gathering
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes committed Dec 1, 2020
1 parent 376ddfb commit b7da048
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 68 deletions.
41 changes: 38 additions & 3 deletions pkg/controller/operator.go
Expand Up @@ -5,19 +5,23 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"k8s.io/klog"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/rest"

configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/openshift/library-go/pkg/controller/controllercmd"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/openshift/insights-operator/pkg/authorizer/clusterauthorizer"
"github.com/openshift/insights-operator/pkg/config"
Expand Down Expand Up @@ -56,7 +60,7 @@ func (s *Support) LoadConfig(obj map[string]interface{}) error {

func (s *Support) Run(ctx context.Context, controller *controllercmd.ControllerContext) error {
klog.Infof("Starting insights-operator %s", version.Get().String())

initialDelay := 0 * time.Second
if err := s.LoadConfig(controller.ComponentConfig.Object); err != nil {
return err
}
Expand Down Expand Up @@ -123,7 +127,14 @@ func (s *Support) Run(ctx context.Context, controller *controllercmd.ControllerC
"clusterconfig": clusterConfigGatherer,
})
statusReporter.AddSources(periodic.Sources()...)
go periodic.Run(4, ctx.Done())

// check we can read IO container status and we are not in crash loop
err = wait.PollImmediate(20*time.Second, wait.Jitter(s.Controller.Interval/12, 0.1), isRunning(ctx, gatherKubeConfig))
if err != nil {
initialDelay = wait.Jitter(s.Controller.Interval/12, 1)
klog.Infof("Unable to check insights-operator pod status. Setting initial delay to %s", initialDelay)
}
go periodic.Run(4, ctx.Done(), initialDelay)

authorizer := clusterauthorizer.New(configObserver)
insightsClient := insightsclient.New(nil, 0, "default", authorizer, clusterConfigGatherer)
Expand Down Expand Up @@ -157,3 +168,27 @@ func (s *Support) Run(ctx context.Context, controller *controllercmd.ControllerC
<-ctx.Done()
return nil
}

func isRunning(ctx context.Context, config *rest.Config) wait.ConditionFunc {
return func() (bool, error) {
c, err := corev1client.NewForConfig(config)
if err != nil {
return false, err
}
pod, err := c.Pods(os.Getenv("POD_NAMESPACE")).Get(ctx, os.Getenv("POD_NAME"), metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Couldn't get Insights Operator Pod to detect its status. Error: %v", err)
}
return false, nil
}
for _, c := range pod.Status.ContainerStatuses {
// all containers has to be in running state to consider them healthy
if c.LastTerminationState.Terminated != nil || c.LastTerminationState.Waiting != nil {
klog.Info("The last pod state is unhealthy")
return false, nil
}
}
return true, nil
}
}
16 changes: 13 additions & 3 deletions pkg/controller/periodic/periodic.go
Expand Up @@ -79,17 +79,27 @@ func (c *Controller) sync(name string) error {
return gatherer.Gather(ctx, c.recorder)
}

func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
// Run starts gathering with initialDelay
func (c *Controller) Run(workers int, stopCh <-chan struct{}, initialDelay time.Duration) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

defer klog.Info("Shutting down")

// start watching for version changes
go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)

for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(func() {
if initialDelay > 0 {
select {
case <-stopCh:
return
case <-time.After(initialDelay):
c.runWorker()
}
}
c.runWorker()
}, time.Second, stopCh)
}

// seed the queue
Expand Down
49 changes: 4 additions & 45 deletions pkg/controller/status/status.go
Expand Up @@ -17,7 +17,6 @@ import (
configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/openshift/insights-operator/pkg/config"
"github.com/openshift/insights-operator/pkg/controllerstatus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -45,11 +44,10 @@ type Controller struct {
statusCh chan struct{}
configurator Configurator

lock sync.Mutex
sources []controllerstatus.Interface
reported Reported
start time.Time
safeInitialStart bool
lock sync.Mutex
sources []controllerstatus.Interface
reported Reported
start time.Time
}

func NewController(client configv1client.ConfigV1Interface, coreClient corev1client.CoreV1Interface, configurator Configurator, namespace string) *Controller {
Expand Down Expand Up @@ -96,18 +94,6 @@ func (c *Controller) SetLastReportedTime(at time.Time) {
c.triggerStatusUpdate()
}

func (c *Controller) SafeInitialStart() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.safeInitialStart
}

func (c *Controller) SetSafeInitialStart(safe bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.safeInitialStart = safe
}

func (c *Controller) AddSources(sources ...controllerstatus.Interface) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -374,7 +360,6 @@ func (c *Controller) updateStatus(ctx context.Context, initial bool) error {
existing = nil
}
if initial {
ophealthy := false
if existing != nil {
var reported Reported
if len(existing.Status.Extension.Raw) > 0 {
Expand All @@ -386,35 +371,9 @@ func (c *Controller) updateStatus(ctx context.Context, initial bool) error {
if con := findOperatorStatusCondition(existing.Status.Conditions, configv1.OperatorDegraded); con == nil ||
con != nil && con.Status == configv1.ConditionFalse {
klog.Info("The initial operator extension status is healthy")
ophealthy = true
}
}
if os.Getenv("POD_NAME") != "" && ophealthy {
var pod *v1.Pod
pod, err = c.coreClient.Pods(os.Getenv("POD_NAMESPACE")).Get(ctx, os.Getenv("POD_NAME"), metav1.GetOptions{})
if err == nil {
for _, c := range pod.Status.ContainerStatuses {
// all containers has to be in running state to consider them healthy
if c.LastTerminationState.Terminated != nil || c.LastTerminationState.Waiting != nil {
klog.Info("The last pod state is unhealthy")
ophealthy = false
break
}
}
} else {
if !errors.IsNotFound(err) {
klog.Errorf("Couldn't get Insights Operator Pod to detect its status. Error: %v", err)
ophealthy = false
}
}
}

if existing == nil || ophealthy {
klog.Info("It is safe to use fast upload")
c.SetSafeInitialStart(true)
} else {
klog.Info("Not safe for fast upload")
}
}

updated := c.merge(existing)
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/status/status_test.go
Expand Up @@ -78,13 +78,9 @@ func TestSaveInitialStart(t *testing.T) {
ctrl := &Controller{name: "insights", client: client.ConfigV1(), configurator: configobserver.New(config.Controller{Report: true}, kubeclientsetclient)}

err := ctrl.updateStatus(context.Background(), tt.initialRun)
isSafe := ctrl.SafeInitialStart()
if err != tt.expErr {
t.Fatalf("updateStatus returned unexpected error: %s Expected %s", err, tt.expErr)
}
if isSafe != tt.expectedSafeInitialStart {
t.Fatalf("unexpected SafeInitialStart was: %t Expected %t", isSafe, tt.expectedSafeInitialStart)
}
})
}
}
14 changes: 2 additions & 12 deletions pkg/insights/insightsuploader/insightsuploader.go
Expand Up @@ -34,8 +34,6 @@ type Summarizer interface {
type StatusReporter interface {
LastReportedTime() time.Time
SetLastReportedTime(time.Time)
SafeInitialStart() bool
SetSafeInitialStart(s bool)
}

type Controller struct {
Expand Down Expand Up @@ -76,17 +74,14 @@ func (c *Controller) Run(ctx context.Context) {
enabled := cfg.Report
endpoint := cfg.Endpoint
interval := cfg.Interval
initialDelay := wait.Jitter(interval/8, 2)
initialDelay := 0 * time.Second
lastReported := c.reporter.LastReportedTime()
if !lastReported.IsZero() {
next := lastReported.Add(interval)
if now := time.Now(); next.After(now) {
initialDelay = wait.Jitter(now.Sub(next), 1.2)
}
}
if c.reporter.SafeInitialStart() {
initialDelay = 0
}
klog.V(2).Infof("Reporting status periodically to %s every %s, starting in %s", cfg.Endpoint, interval, initialDelay.Truncate(time.Second))

wait.Until(func() {
Expand Down Expand Up @@ -138,13 +133,9 @@ func (c *Controller) Run(ctx context.Context) {
}); err != nil {
klog.V(2).Infof("Unable to upload report after %s: %v", time.Now().Sub(start).Truncate(time.Second/100), err)
if err == insightsclient.ErrWaitingForVersion {
initialDelay = wait.Jitter(interval/8, 1) - interval/8
if c.reporter.SafeInitialStart() {
initialDelay = wait.Jitter(time.Second*15, 1)
}
initialDelay = wait.Jitter(time.Second*15, 1)
return
}
c.reporter.SetSafeInitialStart(false)
if authorizer.IsAuthorizationError(err) {
c.Simple.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
Expand All @@ -157,7 +148,6 @@ func (c *Controller) Run(ctx context.Context) {
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
return
}
c.reporter.SetSafeInitialStart(false)
klog.V(4).Infof("Uploaded report successfully in %s", time.Now().Sub(start))
select {
case c.archiveUploaded <- struct{}{}:
Expand Down
1 change: 0 additions & 1 deletion test/integration/basic_test.go
Expand Up @@ -48,7 +48,6 @@ func TestIsIOHealthy(t *testing.T) {
// Check if an archive is uploaded and insights results retrieved in a reasonable amount of time
// This test can be performed on OCP 4.7 and newer
func TestArchiveUploadedAndResultReceived(t *testing.T) {
t.Skip("Skipping until CCXDEV-3397 gets resolved")
start := logLineTime(t, `Reporting status periodically to .* every`)
end := logLineTime(t, `Successfully reported id=`)
uploadingTime := duration(t, start, end)
Expand Down

0 comments on commit b7da048

Please sign in to comment.