Skip to content

Commit

Permalink
update & refactor "checkSummaryAndSend" function in the insightsuploader
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes committed Mar 18, 2024
1 parent d00f74f commit 474ba39
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller

// start uploading status, so that we
// know any previous last reported time
go uploader.Run(ctx)
go uploader.Run(ctx, initialDelay)

reportGatherer := insightsreport.New(insightsClient, configAggregator, uploader, operatorClient.OperatorV1().InsightsOperators())
statusReporter.AddSources(reportGatherer)
Expand Down
126 changes: 57 additions & 69 deletions pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Controller struct {
apiConfigurator configobserver.InsightsDataGatherObserver
reporter StatusReporter
archiveUploaded chan struct{}
initialDelay time.Duration
uploadDelay time.Duration
backoff wait.Backoff
}

Expand All @@ -59,7 +59,7 @@ func New(summarizer Summarizer,
client: client,
reporter: statusReporter,
archiveUploaded: make(chan struct{}),
initialDelay: initialDelay,
uploadDelay: initialDelay,
}
ctrl.backoff = wait.Backoff{
Duration: ctrl.configurator.Config().DataReporting.Interval / 4, // 30 min as first wait by default
Expand All @@ -69,83 +69,70 @@ func New(summarizer Summarizer,
return ctrl
}

func (c *Controller) Run(ctx context.Context) {
func (c *Controller) Run(ctx context.Context, initialDelay time.Duration) {
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})

if c.client == nil {
klog.Infof("No reporting possible without a configured client")
return
}

// the controller periodically uploads results to the remote insights endpoint
cfg := c.configurator.Config()

interval := cfg.DataReporting.Interval
lastReported := c.reporter.LastReportedTime()
if !lastReported.IsZero() {
next := lastReported.Add(interval)
if now := time.Now(); next.After(now) {
c.initialDelay = wait.Jitter(next.Sub(now), 1.2)
}
}
klog.Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.initialDelay.Truncate(time.Second))
// set the initial upload delay as initial delay + 2 minutes
ud := 90 * time.Second
c.uploadDelay = time.Duration(initialDelay.Nanoseconds() + ud.Nanoseconds())

klog.Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.uploadDelay.Truncate(time.Second))
go wait.Until(func() { c.periodicTrigger(ctx.Done()) }, 5*time.Second, ctx.Done())
}

func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
klog.Infof("Checking archives to upload periodically every %s", c.initialDelay)
lastReported := c.reporter.LastReportedTime()
cfg := c.configurator.Config()
interval := cfg.DataReporting.Interval
endpoint := cfg.DataReporting.UploadEndpoint
var disabledInAPI bool
if c.apiConfigurator != nil {
disabledInAPI = c.apiConfigurator.GatherDisabled()
}
reportingEnabled := cfg.DataReporting.Enabled && !disabledInAPI

configCh, cancelFn := c.configurator.ConfigChanged()
defer cancelFn()

if c.initialDelay <= 0 {
c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled)
return
}
ticker := time.NewTicker(c.initialDelay)
ticker := time.NewTicker(c.uploadDelay)
for {
select {
case <-stopCh:
ticker.Stop()
case <-ticker.C:
c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled)
ticker.Reset(c.initialDelay)
c.checkSummaryAndSend(reportingEnabled)
ticker.Reset(c.uploadDelay)
return
case <-configCh:
newCfg := c.configurator.Config()
endpoint = newCfg.DataReporting.UploadEndpoint
reportingEnabled = newCfg.DataReporting.Enabled
var disabledInAPI bool
if c.apiConfigurator != nil {
disabledInAPI = c.apiConfigurator.GatherDisabled()
}
if !reportingEnabled || disabledInAPI {
klog.Infof("Reporting was disabled")
c.initialDelay = newCfg.DataReporting.Interval
return
}
newInterval := newCfg.DataReporting.Interval
if newInterval == interval {
continue
if newInterval != interval {
c.uploadDelay = wait.Jitter(interval/8, 0.1)
ticker.Reset(c.uploadDelay)
return
}
interval = newInterval
// there's no return in this case so set the initial delay again
c.initialDelay = wait.Jitter(interval/8, 0.1)
ticker.Reset(c.initialDelay)
}
}
}

func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported time.Time, endpoint string, reportingEnabled bool) {
func (c *Controller) checkSummaryAndSend(reportingEnabled bool) {
lastReported := c.reporter.LastReportedTime()
endpoint := c.configurator.Config().DataReporting.UploadEndpoint
interval := c.configurator.Config().DataReporting.Interval
c.uploadDelay = wait.Jitter(interval/8, 0.1)

// attempt to get a summary to send to the server
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand All @@ -159,51 +146,52 @@ func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported ti
klog.Infof("Nothing to report since %s", lastReported.Format(time.RFC3339))
return
}
defer source.Contents.Close()
if reportingEnabled && len(endpoint) > 0 {
// send the results
start := time.Now()
id := start.Format(time.RFC3339)
klog.Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339))
source.ID = id
source.Type = "application/vnd.redhat.openshift.periodic"
if err := c.client.Send(ctx, endpoint, *source); err != nil {
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
if errors.Is(err, insightsclient.ErrWaitingForVersion) {
c.initialDelay = wait.Jitter(time.Second*15, 1)
return
}
if authorizer.IsAuthorizationError(err) {
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
c.initialDelay = wait.Jitter(interval/2, 2)

return
}
klog.Infof("Checking archives to upload periodically every %s", c.uploadDelay)
defer source.Contents.Close()

c.initialDelay = wait.Jitter(interval/8, 1.2)
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
return
}
klog.Infof("Uploaded report successfully in %s", time.Since(start))
select {
case c.archiveUploaded <- struct{}{}:
default:
}
lastReported = start.UTC()
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
} else {
if !reportingEnabled || len(endpoint) == 0 {
klog.Info("Display report that would be sent")
// display what would have been sent (to ensure we always exercise source processing)
if err := reportToLogs(source.Contents); err != nil {
klog.Errorf("Unable to log upload: %v", err)
}
// we didn't actually report logs, so don't advance the report date
return
}

// send the results
start := time.Now()
id := start.Format(time.RFC3339)
klog.Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339))
source.ID = id
source.Type = "application/vnd.redhat.openshift.periodic"
if err := c.client.Send(ctx, endpoint, *source); err != nil {
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
if errors.Is(err, insightsclient.ErrWaitingForVersion) {
c.uploadDelay = wait.Jitter(time.Second*15, 1)
return
}
if authorizer.IsAuthorizationError(err) {
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
c.uploadDelay = wait.Jitter(interval/2, 2)

return
}

c.uploadDelay = wait.Jitter(interval/8, 1.2)
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
return
}
klog.Infof("Uploaded report successfully in %s", time.Since(start))
select {
case c.archiveUploaded <- struct{}{}:
default:
}
lastReported = start.UTC()
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
c.reporter.SetLastReportedTime(lastReported)
c.initialDelay = wait.Jitter(interval/8, 0.1)
}

// ArchiveUploaded returns a channel that indicates when an archive is uploaded
Expand Down

0 comments on commit 474ba39

Please sign in to comment.