Skip to content

Commit

Permalink
OCPBUGS-20034: improve on-demand data gathering timing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes committed Oct 3, 2023
1 parent 8ca514d commit 33bf389
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 38 deletions.
80 changes: 42 additions & 38 deletions pkg/controller/periodic/periodic.go
Expand Up @@ -156,13 +156,10 @@ func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration) {
c.Gather()
}
}

go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)
if c.techPreview {
go wait.Until(func() { c.periodicTriggerTechPreview(stopCh) }, time.Second, stopCh)
} else {
go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)
go wait.Until(func() { c.onDemandGather(stopCh) }, time.Second, stopCh)
}

<-stopCh
}

Expand Down Expand Up @@ -254,48 +251,46 @@ func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
klog.Infof("Gathering cluster info every %s", interval)

case <-time.After(interval):
c.Gather()
if c.techPreview {
c.GatherJob()
} else {
c.Gather()
}
}
}
}

// periodicTriggerTechPreview is a techpreview alternative to the same function above,
// but this adds a listerner for the dataGatherInforme, which is nil (not initialized) in
// non-techpreview clusters.
func (c *Controller) periodicTriggerTechPreview(stopCh <-chan struct{}) {
configCh, closeFn := c.secretConfigurator.ConfigChanged()
defer closeFn()

ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4)
defer cancel()

interval := c.secretConfigurator.Config().Interval
klog.Infof("Gathering cluster info every %s", interval)
// onDemandGather listens to newly created DataGather resources and checks
// the state of each resource. If the state is not an empty string, it means that
// the corresponding job is already running or has been started and new data gathering
// is not triggered.
func (c *Controller) onDemandGather(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return

case <-configCh:
newInterval := c.secretConfigurator.Config().Interval
if newInterval == interval {
continue
}
interval = newInterval
klog.Infof("Gathering cluster info every %s", interval)

case <-time.After(interval):
c.GatherJob()

// lister to on-demand dataGather creations
case dgName := <-c.dgInf.DataGatherCreated():
err := c.updateNewDataGatherCRStatus(ctx, dgName)
if err != nil {
klog.Errorf("Failed to update status of the %s DataGather resource: %v", dgName, err)
return
}
klog.Infof("Starting on-demand data gathering for the %s DataGather resource", dgName)
go c.runJobAndCheckResults(ctx, dgName)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4)
defer cancel()

state, err := c.dataGatherState(ctx, dgName)
if err != nil {
klog.Errorf("Failed to read %s DataGather resource", dgName)
return
}
if state != "" {
klog.Infof("DataGather %s resource state is %s. Not triggering any data gathering", dgName, state)
return
}
err = c.updateNewDataGatherCRStatus(ctx, dgName)
if err != nil {
klog.Errorf("Failed to update status of the %s DataGather resource: %v", dgName, err)
return
}
klog.Infof("Starting on-demand data gathering for the %s DataGather resource", dgName)
c.runJobAndCheckResults(ctx, dgName)
}()
}
}
}
Expand Down Expand Up @@ -595,6 +590,15 @@ func (c *Controller) updateNewDataGatherCRStatus(ctx context.Context, dgName str
return nil
}

// dataGatherState gets the DataGather resource with the provided name and returns its state.
func (c *Controller) dataGatherState(ctx context.Context, dgName string) (insightsv1alpha1.DataGatherState, error) {
dg, err := c.dataGatherClient.DataGathers().Get(ctx, dgName, metav1.GetOptions{})
if err != nil {
return "", err
}
return dg.Status.State, nil
}

// createDataGatherAttributeValues reads the current "insightsdatagather.config.openshift.io" configuration
// and checks custom period gatherers and returns list of disabled gatherers based on this two values
// and also data policy set in the "insightsdatagather.config.openshift.io"
Expand Down
56 changes: 56 additions & 0 deletions pkg/controller/periodic/periodic_test.go
Expand Up @@ -8,8 +8,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

configv1alpha1 "github.com/openshift/api/config/v1alpha1"
"github.com/openshift/api/insights/v1alpha1"
Expand Down Expand Up @@ -1085,3 +1087,57 @@ func TestUpdateInsightsReportInDataGather(t *testing.T) {
})
}
}

func TestDataGatherState(t *testing.T) {
tests := []struct {
name string
dataGatherName string
dataGather *v1alpha1.DataGather
expectedState v1alpha1.DataGatherState
err error
}{
{
name: "Existing DataGather state is read correctly",
dataGatherName: "test-dg",
dataGather: &v1alpha1.DataGather{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dg",
},
Status: v1alpha1.DataGatherStatus{
State: v1alpha1.Pending,
},
},
expectedState: v1alpha1.Pending,
err: nil,
},
{
name: "Non-existing DataGather state returns an error",
dataGatherName: "non-existing",
dataGather: nil,
expectedState: v1alpha1.Pending,
err: errors.NewNotFound(schema.GroupResource{
Group: "insights.openshift.io",
Resource: "datagathers",
}, "non-existing"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var insightsCs *insightsFakeCli.Clientset
if tt.err != nil {
insightsCs = insightsFakeCli.NewSimpleClientset()
} else {
insightsCs = insightsFakeCli.NewSimpleClientset(tt.dataGather)
}
mockController := NewWithTechPreview(nil, nil, nil, nil, nil, insightsCs.InsightsV1alpha1(), nil, nil)
state, err := mockController.dataGatherState(context.Background(), tt.dataGatherName)
if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.dataGather.Status.State, state)
}
})
}
}

0 comments on commit 33bf389

Please sign in to comment.