Skip to content

Commit

Permalink
Merge pull request #136 from martinkunc/release-4.4-dont-delay-first
Browse files Browse the repository at this point in the history
[release-4.4] Bug 1842489: Don't delay first upload
  • Loading branch information
openshift-merge-robot committed Aug 25, 2020
2 parents d111a68 + dca3299 commit 5c18a02
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *Support) Run(ctx context.Context, controller *controllercmd.ControllerC

// the status controller initializes the cluster operator object and retrieves
// the last sync time, if any was set
statusReporter := status.NewController(configClient, configObserver, os.Getenv("POD_NAMESPACE"))
statusReporter := status.NewController(configClient, gatherKubeClient.CoreV1(), configObserver, os.Getenv("POD_NAMESPACE"))

// the recorder periodically flushes any recorded data to disk as tar.gz files
// in s.StoragePath, and also prunes files above a certain age
Expand Down
82 changes: 66 additions & 16 deletions pkg/controller/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (

"golang.org/x/time/rate"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"
configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/openshift/insights-operator/pkg/config"
"github.com/openshift/insights-operator/pkg/controllerstatus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog"
)

// How many upload failures in a row we tolerate before starting reporting
Expand All @@ -40,19 +41,22 @@ type Controller struct {
name string
namespace string
client configv1client.ConfigV1Interface
coreClient corev1client.CoreV1Interface
statusCh chan struct{}
configurator Configurator

lock sync.Mutex
sources []controllerstatus.Interface
reported Reported
start time.Time
lock sync.Mutex
sources []controllerstatus.Interface
reported Reported
start time.Time
safeInitialStart bool
}

func NewController(client configv1client.ConfigV1Interface, configurator Configurator, namespace string) *Controller {
func NewController(client configv1client.ConfigV1Interface, coreClient corev1client.CoreV1Interface, configurator Configurator, namespace string) *Controller {
c := &Controller{
name: "insights",
client: client,
coreClient: coreClient,
statusCh: make(chan struct{}, 1),
configurator: configurator,
namespace: namespace,
Expand Down Expand Up @@ -92,6 +96,18 @@ func (c *Controller) SetLastReportedTime(at time.Time) {
c.triggerStatusUpdate()
}

func (c *Controller) SafeInitialStart() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.safeInitialStart
}

func (c *Controller) SetSafeInitialStart(safe bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.safeInitialStart = safe
}

func (c *Controller) AddSources(sources ...controllerstatus.Interface) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -350,14 +366,48 @@ func (c *Controller) updateStatus(initial bool) error {
}
existing = nil
}
if initial && existing != nil {
var reported Reported
if len(existing.Status.Extension.Raw) > 0 {
if err := json.Unmarshal(existing.Status.Extension.Raw, &reported); err != nil {
klog.Errorf("The initial operator extension status is invalid: %v", err)
if initial {
ophealthy := false
if existing != nil {
var reported Reported
if len(existing.Status.Extension.Raw) > 0 {
if err := json.Unmarshal(existing.Status.Extension.Raw, &reported); err != nil {
klog.Errorf("The initial operator extension status is invalid: %v", err)
}
}
c.SetLastReportedTime(reported.LastReportTime.Time.UTC())
if con := findOperatorStatusCondition(existing.Status.Conditions, configv1.OperatorDegraded); con == nil ||
con != nil && con.Status == configv1.ConditionFalse {
klog.Info("The initial operator extension status is healthy")
ophealthy = true
}
}
c.SetLastReportedTime(reported.LastReportTime.Time.UTC())
if os.Getenv("POD_NAME") != "" && ophealthy {
var pod *v1.Pod
pod, err = c.coreClient.Pods(os.Getenv("POD_NAMESPACE")).Get(os.Getenv("POD_NAME"), metav1.GetOptions{})
if err == nil {
for _, c := range pod.Status.ContainerStatuses {
// all containers has to be in running state to consider them healthy
if c.LastTerminationState.Terminated != nil || c.LastTerminationState.Waiting != nil {
klog.Info("The last pod state is unhealthy")
ophealthy = false
break
}
}
} else {
if !errors.IsNotFound(err) {
klog.Errorf("Couldn't get Insights Operator Pod to detect its status. Error: %v", err)
ophealthy = false
}
}
}

if existing == nil || ophealthy {
klog.Info("It is safe to use fast upload")
c.SetSafeInitialStart(true)
} else {
klog.Info("Not safe for fast upload")
}
}

updated := c.merge(existing)
Expand Down
89 changes: 89 additions & 0 deletions pkg/controller/status/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package status

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"
configfake "github.com/openshift/client-go/config/clientset/versioned/fake"
"github.com/openshift/insights-operator/pkg/config"
"github.com/openshift/insights-operator/pkg/config/configobserver"
"github.com/openshift/insights-operator/pkg/utils"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
)

func TestSaveInitialStart(t *testing.T) {

tests := []struct {
name string
clusterOperator *configv1.ClusterOperator
expErr error
initialRun bool
expectedSafeInitialStart bool
}{
{
name: "Non-initial run is has upload delayed",
initialRun: false,
expectedSafeInitialStart: false,
},
{
name: "Initial run with not existing Insights operator is not delayed",
initialRun: true,
clusterOperator: nil,
expectedSafeInitialStart: true,
},
{
name: "Initial run with existing Insights operator which is degraded is delayed",
initialRun: true,
clusterOperator: &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: "insights",
},
Status: configv1.ClusterOperatorStatus{Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: configv1.OperatorDegraded, Status: configv1.ConditionTrue},
}},
},
expectedSafeInitialStart: false,
},
{
name: "Initial run with existing Insights operator which is not degraded not delayed",
initialRun: true,
clusterOperator: &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: "insights",
},
Status: configv1.ClusterOperatorStatus{Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: configv1.OperatorDegraded, Status: configv1.ConditionFalse},
}},
},
expectedSafeInitialStart: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

klog.SetOutput(utils.NewTestLog(t).Writer())
operators := []runtime.Object{}
if tt.clusterOperator != nil {
operators = append(operators, tt.clusterOperator)
}
kubeclientsetclient := kubeclientfake.NewSimpleClientset()

client := configfake.NewSimpleClientset(operators...)
ctrl := &Controller{name: "insights", client: client.ConfigV1(), configurator: configobserver.New(config.Controller{Report: true}, kubeclientsetclient)}

err := ctrl.updateStatus(tt.initialRun)
isSafe := ctrl.SafeInitialStart()
if err != tt.expErr {
t.Fatalf("updateStatus returned unexpected error: %s Expected %s", err, tt.expErr)
}
if isSafe != tt.expectedSafeInitialStart {
t.Fatalf("unexpected SafeInitialStart was: %t Expected %t", isSafe, tt.expectedSafeInitialStart)
}
})
}
}
11 changes: 10 additions & 1 deletion pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Summarizer interface {
type StatusReporter interface {
LastReportedTime() time.Time
SetLastReportedTime(time.Time)
SafeInitialStart() bool
SetSafeInitialStart(s bool)
}

type Controller struct {
Expand Down Expand Up @@ -80,6 +82,9 @@ func (c *Controller) Run(ctx context.Context) {
initialDelay = wait.Jitter(now.Sub(next), 1.2)
}
}
if c.reporter.SafeInitialStart() {
initialDelay = 0
}
klog.V(2).Infof("Reporting status periodically to %s every %s, starting in %s", cfg.Endpoint, interval, initialDelay.Truncate(time.Second))

wait.Until(func() {
Expand Down Expand Up @@ -132,8 +137,12 @@ func (c *Controller) Run(ctx context.Context) {
klog.V(2).Infof("Unable to upload report after %s: %v", time.Now().Sub(start).Truncate(time.Second/100), err)
if err == insightsclient.ErrWaitingForVersion {
initialDelay = wait.Jitter(interval/8, 1) - interval/8
if c.reporter.SafeInitialStart() {
initialDelay = wait.Jitter(time.Second*15, 1)
}
return
}
c.reporter.SetSafeInitialStart(false)
if authorizer.IsAuthorizationError(err) {
c.Simple.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
Expand All @@ -146,7 +155,7 @@ func (c *Controller) Run(ctx context.Context) {
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
return
}

c.reporter.SetSafeInitialStart(false)
klog.V(4).Infof("Uploaded report successfully in %s", time.Now().Sub(start))
lastReported = start.UTC()
c.Simple.UpdateStatus(controllerstatus.Summary{Healthy: true})
Expand Down

0 comments on commit 5c18a02

Please sign in to comment.