Skip to content

Commit

Permalink
Merge pull request agola-io#177 from sgotti/runservice_use_trylock_on…
Browse files Browse the repository at this point in the history
…_fetching

runservice: use etcd mutex TryLock on fetching
  • Loading branch information
sgotti committed Nov 13, 2019
2 parents 3eeffe2 + 9299971 commit 7cb2918
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 40 deletions.
13 changes: 9 additions & 4 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,22 @@ var (

EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping")

EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
EtcdLocksDir = path.Join(EtcdSchedulerBaseDir, "locks")

EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups")
EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater")

EtcdMaintenanceKey = "maintenance"
)

func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
func EtcdExecutorKey(taskID string) string { return path.Join(EtcdExecutorsDir, taskID) }
func EtcdTaskKey(taskID string) string { return path.Join(EtcdTasksDir, taskID) }
func EtcdTaskFetcherLockKey(taskID string) string {
return path.Join(EtcdLocksDir, "taskfetcher", taskID)
}

const (
EtcdChangeGroupMinRevisionRange = 100
Expand Down
98 changes: 62 additions & 36 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,10 @@ func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types.
if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted {
if err := s.fetchLog(ctx, rt, true, 0); err != nil {
log.Errorf("err: %+v", err)
}
if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil {
log.Errorf("err: %+v", err)
} else {
if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil {
log.Errorf("err: %+v", err)
}
}
}

Expand Down Expand Up @@ -1133,45 +1134,70 @@ func (s *Runservice) fetcher(ctx context.Context) error {
log.Debugf("r: %s", util.Dump(r))
for _, rt := range r.Tasks {
log.Debugf("rt: %s", util.Dump(rt))
if rt.Status.IsFinished() {
// write related logs runID
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
exists, err := s.OSTFileExists(runIDPath)
if err != nil {
log.Errorf("err: %+v", err)
} else if !exists {
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
log.Errorf("err: %+v", err)
}
}
if err := s.taskFetcher(ctx, r, rt); err != nil {
return err
}
}
}

// write related archives runID
runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID)
exists, err = s.OSTFileExists(runIDPath)
if err != nil {
log.Errorf("err: %+v", err)
} else if !exists {
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
log.Errorf("err: %+v", err)
}
}
return nil
}

s.fetchTaskLogs(ctx, r.ID, rt)
s.fetchTaskArchives(ctx, r.ID, rt)
func (s *Runservice) taskFetcher(ctx context.Context, r *types.Run, rt *types.RunTask) error {
if !rt.Status.IsFinished() {
return nil
}
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()

// if the fetching is finished we can remove the executor tasks. We cannot
// remove it before since it contains the reference to the executor where we
// should fetch the data
if rt.LogsFetchFinished() && rt.ArchivesFetchFinished() {
if err := store.DeleteExecutorTask(ctx, s.e, rt.ID); err != nil {
return err
}
}
}
m := etcd.NewMutex(session, common.EtcdTaskFetcherLockKey(rt.ID))

if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return err
}
defer func() { _ = m.Unlock(ctx) }()

// write related logs runID
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
exists, err := s.OSTFileExists(runIDPath)
if err != nil {
log.Errorf("err: %+v", err)
} else if !exists {
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
log.Errorf("err: %+v", err)
}
}

// write related archives runID
runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID)
exists, err = s.OSTFileExists(runIDPath)
if err != nil {
log.Errorf("err: %+v", err)
} else if !exists {
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
log.Errorf("err: %+v", err)
}
}
return nil

s.fetchTaskLogs(ctx, r.ID, rt)
s.fetchTaskArchives(ctx, r.ID, rt)

// if the fetching is finished we can remove the executor tasks. We cannot
// remove it before since it contains the reference to the executor where we
// should fetch the data
if rt.LogsFetchFinished() && rt.ArchivesFetchFinished() {
if err := store.DeleteExecutorTask(ctx, s.e, rt.ID); err != nil {
return err
}
}

return nil
}

func (s *Runservice) runsSchedulerLoop(ctx context.Context) {
Expand Down

0 comments on commit 7cb2918

Please sign in to comment.