Skip to content

Commit

Permalink
add(repair): respect parallel/intensity from resumed run
Browse files Browse the repository at this point in the history
Fixes #3580
  • Loading branch information
Michal-Leszczynski committed Sep 26, 2023
1 parent b9fb2b4 commit e188859
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/service/repair/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Run struct {

DC []string
Host string
Parallel int
Intensity int
PrevID uuid.UUID
StartTime time.Time
EndTime time.Time
Expand Down
53 changes: 40 additions & 13 deletions pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID
ID: runID,
DC: target.DC,
Host: target.Host,
Parallel: target.Parallel,
Intensity: int(target.Intensity),
StartTime: timeutc.Now(),
}
if err := s.putRun(run); err != nil {
Expand All @@ -224,6 +226,9 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID
if target.Continue {
if prev := pm.GetPrevRun(ctx, s.config.AgeMax); prev != nil {
prevID = prev.ID
// Respect parallel/intensity set in resumed run
run.Parallel = prev.Parallel
run.Intensity = prev.Intensity
}
}
if err := pm.Init(target.plan, prevID); err != nil {
Expand All @@ -232,9 +237,15 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID
pm.UpdatePlan(target.plan)
s.putRunLogError(ctx, run)

ih, cleanup := s.newIntensityHandler(ctx, clusterID,
target.plan.MaxHostIntensity, target.Intensity, target.plan.MaxParallel, target.Parallel)
ih, cleanup := s.newIntensityHandler(ctx, clusterID, taskID, runID,
target.plan.MaxHostIntensity, target.plan.MaxParallel)
defer cleanup()
if err := s.SetParallel(ctx, clusterID, run.Parallel); err != nil {
return errors.Wrap(err, "set initial parallel")
}
if err := s.SetIntensity(ctx, clusterID, float64(run.Intensity)); err != nil {
return errors.Wrap(err, "set initial intensity")
}

gen, err := newGenerator(ctx, target, client, ih, s.logger)
if err != nil {
Expand Down Expand Up @@ -310,20 +321,22 @@ func (s *Service) killAllRepairs(ctx context.Context, client *scyllaclient.Clien
}
}

func (s *Service) newIntensityHandler(ctx context.Context, clusterID uuid.UUID,
maxHostIntensity map[string]int, intensity float64, maxParallel, parallel int,
func (s *Service) newIntensityHandler(ctx context.Context, clusterID, taskID, runID uuid.UUID,
maxHostIntensity map[string]int, maxParallel int,
) (ih *intensityHandler, cleanup func()) {
ih = &intensityHandler{
taskID: taskID,
runID: runID,
logger: s.logger.Named("control"),
maxHostIntensity: maxHostIntensity,
intensity: atomic.NewFloat64(intensity),
intensity: &atomic.Float64{},
maxParallel: maxParallel,
parallel: atomic.NewInt64(int64(parallel)),
parallel: &atomic.Int64{},
}

s.mu.Lock()
if _, ok := s.intensityHandlers[clusterID]; ok {
s.logger.Error(ctx, "Overriding intensity handler", "cluster_id", clusterID, "intensity", intensity, "parallel", parallel)
s.logger.Error(ctx, "Overriding intensity handler", "cluster_id", clusterID)
}
s.intensityHandlers[clusterID] = ih
s.mu.Unlock()
Expand Down Expand Up @@ -399,9 +412,9 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid
// SetIntensity changes intensity of an ongoing repair.
func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensity float64) error {
s.mu.Lock()
ih, ok := s.intensityHandlers[clusterID]
s.mu.Unlock()
defer s.mu.Unlock()

ih, ok := s.intensityHandlers[clusterID]
if !ok {
return errors.Wrap(service.ErrNotFound, "repair task")
}
Expand All @@ -410,15 +423,21 @@ func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensi
return errors.Wrap(err, "set intensity")
}

return nil
err := table.RepairRun.UpdateBuilder("intensity").Query(s.session).BindMap(qb.M{
"cluster_id": clusterID,
"task_id": ih.taskID,
"id": ih.runID,
"intensity": ih.Intensity(),
}).ExecRelease()
return errors.Wrap(err, "update db")
}

// SetParallel changes parallelism of an ongoing repair.
func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel int) error {
s.mu.Lock()
ih, ok := s.intensityHandlers[clusterID]
s.mu.Unlock()
defer s.mu.Unlock()

ih, ok := s.intensityHandlers[clusterID]
if !ok {
return errors.Wrap(service.ErrNotFound, "repair task")
}
Expand All @@ -427,10 +446,18 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel
return errors.Wrap(err, "set parallel")
}

return nil
err := table.RepairRun.UpdateBuilder("parallel").Query(s.session).BindMap(qb.M{
"cluster_id": clusterID,
"task_id": ih.taskID,
"id": ih.runID,
"parallel": ih.Parallel(),
}).ExecRelease()
return errors.Wrap(err, "update db")
}

type intensityHandler struct {
taskID uuid.UUID
runID uuid.UUID
logger log.Logger
maxHostIntensity map[string]int
intensity *atomic.Float64
Expand Down

0 comments on commit e188859

Please sign in to comment.