Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NO-JIRA: minor logging update when data gathering is disabled #919

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller

// start uploading status, so that we
// know any previous last reported time
go uploader.Run(ctx)
go uploader.Run(ctx, initialDelay)

reportGatherer := insightsreport.New(insightsClient, configAggregator, uploader, operatorClient.OperatorV1().InsightsOperators())
statusReporter.AddSources(reportGatherer)
Expand Down
126 changes: 57 additions & 69 deletions pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Controller struct {
apiConfigurator configobserver.InsightsDataGatherObserver
reporter StatusReporter
archiveUploaded chan struct{}
initialDelay time.Duration
uploadDelay time.Duration
backoff wait.Backoff
}

Expand All @@ -59,7 +59,7 @@ func New(summarizer Summarizer,
client: client,
reporter: statusReporter,
archiveUploaded: make(chan struct{}),
initialDelay: initialDelay,
uploadDelay: initialDelay,
}
ctrl.backoff = wait.Backoff{
Duration: ctrl.configurator.Config().DataReporting.Interval / 4, // 30 min as first wait by default
Expand All @@ -69,83 +69,70 @@ func New(summarizer Summarizer,
return ctrl
}

func (c *Controller) Run(ctx context.Context) {
func (c *Controller) Run(ctx context.Context, initialDelay time.Duration) {
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})

if c.client == nil {
klog.Infof("No reporting possible without a configured client")
return
}

// the controller periodically uploads results to the remote insights endpoint
cfg := c.configurator.Config()

interval := cfg.DataReporting.Interval
lastReported := c.reporter.LastReportedTime()
if !lastReported.IsZero() {
next := lastReported.Add(interval)
if now := time.Now(); next.After(now) {
c.initialDelay = wait.Jitter(next.Sub(now), 1.2)
}
}
klog.Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.initialDelay.Truncate(time.Second))
// set the initial upload delay as initial delay + 2 minutes
ud := 90 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't two minutes (per the comment above) 120 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D correct. Thanks. I am going to change the comment to 1m30s :)

c.uploadDelay = time.Duration(initialDelay.Nanoseconds() + ud.Nanoseconds())

klog.Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.uploadDelay.Truncate(time.Second))
go wait.Until(func() { c.periodicTrigger(ctx.Done()) }, 5*time.Second, ctx.Done())
}

func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
klog.Infof("Checking archives to upload periodically every %s", c.initialDelay)
lastReported := c.reporter.LastReportedTime()
cfg := c.configurator.Config()
interval := cfg.DataReporting.Interval
endpoint := cfg.DataReporting.UploadEndpoint
var disabledInAPI bool
if c.apiConfigurator != nil {
disabledInAPI = c.apiConfigurator.GatherDisabled()
}
reportingEnabled := cfg.DataReporting.Enabled && !disabledInAPI

configCh, cancelFn := c.configurator.ConfigChanged()
defer cancelFn()

if c.initialDelay <= 0 {
c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled)
return
}
ticker := time.NewTicker(c.initialDelay)
ticker := time.NewTicker(c.uploadDelay)
for {
select {
case <-stopCh:
ticker.Stop()
case <-ticker.C:
c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled)
ticker.Reset(c.initialDelay)
c.checkSummaryAndSend(reportingEnabled)
ticker.Reset(c.uploadDelay)
return
case <-configCh:
newCfg := c.configurator.Config()
endpoint = newCfg.DataReporting.UploadEndpoint
reportingEnabled = newCfg.DataReporting.Enabled
var disabledInAPI bool
if c.apiConfigurator != nil {
disabledInAPI = c.apiConfigurator.GatherDisabled()
}
if !reportingEnabled || disabledInAPI {
klog.Infof("Reporting was disabled")
c.initialDelay = newCfg.DataReporting.Interval
return
}
newInterval := newCfg.DataReporting.Interval
if newInterval == interval {
continue
if newInterval != interval {
c.uploadDelay = wait.Jitter(interval/8, 0.1)
ticker.Reset(c.uploadDelay)
return
}
interval = newInterval
// there's no return in this case so set the initial delay again
c.initialDelay = wait.Jitter(interval/8, 0.1)
ticker.Reset(c.initialDelay)
}
}
}

func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported time.Time, endpoint string, reportingEnabled bool) {
func (c *Controller) checkSummaryAndSend(reportingEnabled bool) {
lastReported := c.reporter.LastReportedTime()
endpoint := c.configurator.Config().DataReporting.UploadEndpoint
interval := c.configurator.Config().DataReporting.Interval
c.uploadDelay = wait.Jitter(interval/8, 0.1)

// attempt to get a summary to send to the server
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand All @@ -159,51 +146,52 @@ func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported ti
klog.Infof("Nothing to report since %s", lastReported.Format(time.RFC3339))
return
}
defer source.Contents.Close()
if reportingEnabled && len(endpoint) > 0 {
// send the results
start := time.Now()
id := start.Format(time.RFC3339)
klog.Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339))
source.ID = id
source.Type = "application/vnd.redhat.openshift.periodic"
if err := c.client.Send(ctx, endpoint, *source); err != nil {
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
if errors.Is(err, insightsclient.ErrWaitingForVersion) {
c.initialDelay = wait.Jitter(time.Second*15, 1)
return
}
if authorizer.IsAuthorizationError(err) {
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
c.initialDelay = wait.Jitter(interval/2, 2)

return
}
klog.Infof("Checking archives to upload periodically every %s", c.uploadDelay)
defer source.Contents.Close()

c.initialDelay = wait.Jitter(interval/8, 1.2)
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
return
}
klog.Infof("Uploaded report successfully in %s", time.Since(start))
select {
case c.archiveUploaded <- struct{}{}:
default:
}
lastReported = start.UTC()
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
} else {
if !reportingEnabled || len(endpoint) == 0 {
klog.Info("Display report that would be sent")
// display what would have been sent (to ensure we always exercise source processing)
if err := reportToLogs(source.Contents); err != nil {
klog.Errorf("Unable to log upload: %v", err)
}
// we didn't actually report logs, so don't advance the report date
return
}

// send the results
start := time.Now()
id := start.Format(time.RFC3339)
klog.Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339))
source.ID = id
source.Type = "application/vnd.redhat.openshift.periodic"
if err := c.client.Send(ctx, endpoint, *source); err != nil {
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
if errors.Is(err, insightsclient.ErrWaitingForVersion) {
c.uploadDelay = wait.Jitter(time.Second*15, 1)
return
}
if authorizer.IsAuthorizationError(err) {
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
c.uploadDelay = wait.Jitter(interval/2, 2)

return
}

c.uploadDelay = wait.Jitter(interval/8, 1.2)
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
return
}
klog.Infof("Uploaded report successfully in %s", time.Since(start))
select {
case c.archiveUploaded <- struct{}{}:
default:
}
lastReported = start.UTC()
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
c.reporter.SetLastReportedTime(lastReported)
c.initialDelay = wait.Jitter(interval/8, 0.1)
}

// ArchiveUploaded returns a channel that indicates when an archive is uploaded
Expand Down