Skip to content

Commit

Permalink
add(repair): respect parallel/intensity from resumed run
Browse files Browse the repository at this point in the history
Fixes #3580
  • Loading branch information
Michal-Leszczynski committed Sep 25, 2023
1 parent b2f2516 commit 1e799aa
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/service/repair/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Run struct {

DC []string
Host string
Parallel int
Intensity int
PrevID uuid.UUID
StartTime time.Time
EndTime time.Time
Expand Down
35 changes: 27 additions & 8 deletions pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID
if target.Continue {
if prev := pm.GetPrevRun(ctx, s.config.AgeMax); prev != nil {
prevID = prev.ID
// Respect parallel/intensity set in resumed run
target.Parallel = prev.Parallel
target.Intensity = float64(prev.Intensity)
}
}
if err := pm.Init(target.plan, prevID); err != nil {
Expand All @@ -232,7 +235,7 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID
pm.UpdatePlan(target.plan)
s.putRunLogError(ctx, run)

ih, cleanup := s.newIntensityHandler(ctx, clusterID,
ih, cleanup := s.newIntensityHandler(ctx, clusterID, taskID, runID,
target.plan.MaxHostIntensity, target.Intensity, target.plan.MaxParallel, target.Parallel)
defer cleanup()

Expand Down Expand Up @@ -310,10 +313,12 @@ func (s *Service) killAllRepairs(ctx context.Context, client *scyllaclient.Clien
}
}

func (s *Service) newIntensityHandler(ctx context.Context, clusterID uuid.UUID,
func (s *Service) newIntensityHandler(ctx context.Context, clusterID, taskID, runID uuid.UUID,
maxHostIntensity map[string]int, intensity float64, maxParallel, parallel int,
) (ih *intensityHandler, cleanup func()) {
ih = &intensityHandler{
taskID: taskID,
runID: runID,
logger: s.logger.Named("control"),
maxHostIntensity: maxHostIntensity,
intensity: atomic.NewFloat64(intensity),
Expand Down Expand Up @@ -399,9 +404,9 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid
// SetIntensity changes intensity of an ongoing repair.
func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensity float64) error {
s.mu.Lock()
ih, ok := s.intensityHandlers[clusterID]
s.mu.Unlock()
defer s.mu.Unlock()

ih, ok := s.intensityHandlers[clusterID]
if !ok {
return errors.Wrap(service.ErrNotFound, "repair task")
}
Expand All @@ -410,15 +415,21 @@ func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensi
return errors.Wrap(err, "set intensity")
}

return nil
err := table.RepairRun.UpdateBuilder("intensity").Query(s.session).BindMap(qb.M{
"cluster_id": clusterID,
"task_id": ih.taskID,
"id": ih.runID,
"intensity": ih.Intensity(),
}).ExecRelease()
return errors.Wrap(err, "update db")
}

// SetParallel changes parallelism of an ongoing repair.
func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel int) error {
s.mu.Lock()
ih, ok := s.intensityHandlers[clusterID]
s.mu.Unlock()
defer s.mu.Unlock()

ih, ok := s.intensityHandlers[clusterID]
if !ok {
return errors.Wrap(service.ErrNotFound, "repair task")
}
Expand All @@ -427,10 +438,18 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel
return errors.Wrap(err, "set parallel")
}

return nil
err := table.RepairRun.UpdateBuilder("parallel").Query(s.session).BindMap(qb.M{
"cluster_id": clusterID,
"task_id": ih.taskID,
"id": ih.runID,
"parallel": ih.Parallel(),
}).ExecRelease()
return errors.Wrap(err, "update db")
}

type intensityHandler struct {
taskID uuid.UUID
runID uuid.UUID
logger log.Logger
maxHostIntensity map[string]int
intensity *atomic.Float64
Expand Down

0 comments on commit 1e799aa

Please sign in to comment.