diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5466b22..8cb2cde 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -27,4 +27,6 @@ jobs: run: go test -v -cover ./... - name: Test with race - run: go test -v -cover -race -count 1 -timeout 20s --tags deadlock_test -run Test_mirror_detect_race ./pkg/mirror/... + run: | + go test -v -cover -race -count 1 -timeout 20s --tags deadlock_test -run Test_mirror_detect_race_clone ./pkg/mirror/... + go test -v -cover -race -count 1 -timeout 60s --tags deadlock_test -run Test_mirror_detect_race_repo_pool ./pkg/mirror/... diff --git a/config.go b/config.go index a5f9c11..b89c7ae 100644 --- a/config.go +++ b/config.go @@ -83,6 +83,8 @@ func loadConfig(path string, lastModTime time.Time, onChange func(*mirror.RepoPo return modTime, onChange(newConfig) } +// ensureConfig will do the diff between current repoPool state and new config +// and based on that diff it will add/remove repositories and worktrees func ensureConfig(repoPool *mirror.RepoPool, newConfig *mirror.RepoPoolConfig) bool { success := true @@ -180,6 +182,8 @@ func parseConfigFile(path string) (*mirror.RepoPoolConfig, error) { return conf, nil } +// diffRepositories will do the diff between current state and new config and +// return new repositories config and list of remote url which are not found in config func diffRepositories(repoPool *mirror.RepoPool, newConfig *mirror.RepoPoolConfig) ( newRepos []mirror.RepositoryConfig, removedRepos []string, @@ -206,6 +210,8 @@ func diffRepositories(repoPool *mirror.RepoPool, newConfig *mirror.RepoPoolConfi return } +// diffWorktrees will do the diff between current repo's worktree state and new worktree config +// it will return new worktree configs and link names of the link not found in new config func diffWorktrees(repo *mirror.Repository, newRepoConf *mirror.RepositoryConfig) ( newWTCs []mirror.WorktreeConfig, removedWTs []string, diff --git a/pkg/mirror/repo_pool.go b/pkg/mirror/repo_pool.go index bb4f47a..331e53d 100644 --- a/pkg/mirror/repo_pool.go +++ b/pkg/mirror/repo_pool.go @@ -60,12 +60,15 @@ func NewRepoPool(ctx context.Context, conf RepoPoolConfig, log *slog.Logger, com // signal repository repoCancel() + rp.lock.RLock() + defer rp.lock.RUnlock() + for { time.Sleep(time.Second) // check if any repo mirror is still running var running bool for _, repo := range rp.repos { - if repo.running { + if repo.IsRunning() { running = true break } @@ -143,7 +146,7 @@ func (rp *RepoPool) StartLoop() { defer rp.lock.RUnlock() for _, repo := range rp.repos { - if !repo.running { + if !repo.IsRunning() { go repo.StartLoop(rp.ctx) continue } @@ -168,6 +171,7 @@ func (rp *RepoPool) Repository(remote string) (*Repository, error) { return nil, ErrNotExist } +// RepositoriesRemote returns remote URLs of all the repositories func (rp *RepoPool) RepositoriesRemote() []string { rp.lock.RLock() defer rp.lock.RUnlock() @@ -179,6 +183,7 @@ func (rp *RepoPool) RepositoriesRemote() []string { return urls } +// RepositoriesDirPath returns local paths of all the mirrored repositories func (rp *RepoPool) RepositoriesDirPath() []string { rp.lock.RLock() defer rp.lock.RUnlock() @@ -192,9 +197,6 @@ func (rp *RepoPool) RepositoriesDirPath() []string { // AddWorktreeLink is wrapper around repositories AddWorktreeLink method func (rp *RepoPool) AddWorktreeLink(remote string, wt WorktreeConfig) error { - rp.lock.RLock() - defer rp.lock.RUnlock() - repo, err := rp.Repository(remote) if err != nil { return err @@ -202,12 +204,19 @@ func (rp *RepoPool) AddWorktreeLink(remote string, wt WorktreeConfig) error { if err := rp.validateLinkPath(repo, wt.Link); err != nil { return err } + + rp.lock.Lock() + defer rp.lock.Unlock() + return repo.AddWorktreeLink(wt) } func (rp *RepoPool) validateLinkPath(repo *Repository, link string) error { newAbsLink := absLink(repo.root, link) + rp.lock.RLock() + defer rp.lock.RUnlock() + for _, r := range rp.repos { for _, wl := range r.WorktreeLinks() { if wl.linkAbs == newAbsLink { diff --git a/pkg/mirror/repository.go b/pkg/mirror/repository.go index bf5a368..dba97bb 100644 --- a/pkg/mirror/repository.go +++ b/pkg/mirror/repository.go @@ -133,6 +133,9 @@ func NewRepository(repoConf RepositoryConfig, envs []string, log *slog.Logger) ( // AddWorktreeLink adds workTree link to the mirror repository. func (r *Repository) AddWorktreeLink(wtc WorktreeConfig) error { + r.lock.Lock() + defer r.lock.Unlock() + if wtc.Link == "" { return fmt.Errorf("symlink path cannot be empty") } @@ -162,6 +165,7 @@ func (r *Repository) AddWorktreeLink(wtc WorktreeConfig) error { return nil } +// WorktreeLinks returns current clone of worktree maps func (r *Repository) WorktreeLinks() map[string]*WorkTreeLink { r.lock.RLock() defer r.lock.RUnlock() @@ -391,17 +395,32 @@ func (r *Repository) cloneByRef(ctx context.Context, dst, ref, pathspec string, return hash, nil } +// IsRunning returns if repositories mirror loop is running or not +func (r *Repository) IsRunning() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.running +} + // StartLoop mirrors repository periodically based on repo's mirror interval func (r *Repository) StartLoop(ctx context.Context) { - if r.running { + if r.IsRunning() { r.log.Error("mirror loop has already been started") return } + + r.lock.Lock() r.running = true + r.lock.Unlock() + r.log.Info("started repository mirror loop", "interval", r.interval) defer func() { + r.lock.Lock() r.running = false + r.lock.Unlock() + close(r.stopped) }() @@ -480,7 +499,7 @@ func (r *Repository) Mirror(ctx context.Context) error { } // RemoveWorktreeLink removes workTree link from the mirror repository. -// it will remove published link as well even if it failed to remove worktree +// it will remove published link as well even (if it failed to remove worktree) func (r *Repository) RemoveWorktreeLink(link string) error { r.lock.Lock() defer r.lock.Unlock() diff --git a/pkg/mirror/worktree.go b/pkg/mirror/worktree.go index 87e9b00..7f87717 100644 --- a/pkg/mirror/worktree.go +++ b/pkg/mirror/worktree.go @@ -17,6 +17,9 @@ type WorkTreeLink struct { log *slog.Logger } +// Equals returns if given worktree and its config is equal +// they are considered equal only if link, ref and pathspecs are matching. +// order of pothspecs is ignored func (wt *WorkTreeLink) Equals(wtc WorktreeConfig) bool { sortedConfigPaths := slices.Clone(wtc.Pathspecs) slices.Sort(sortedConfigPaths) diff --git a/pkg/mirror/z_e2e_race_test.go b/pkg/mirror/z_e2e_race_test.go index 959a8c3..70c1103 100644 --- a/pkg/mirror/z_e2e_race_test.go +++ b/pkg/mirror/z_e2e_race_test.go @@ -8,9 +8,10 @@ import ( "path/filepath" "sync" "testing" + "time" ) -func Test_mirror_detect_race(t *testing.T) { +func Test_mirror_detect_race_clone(t *testing.T) { testTmpDir := mustTmpDir(t) defer os.RemoveAll(testTmpDir) @@ -106,3 +107,176 @@ func Test_mirror_detect_race(t *testing.T) { }) } + +func Test_mirror_detect_race_repo_pool(t *testing.T) { + testTmpDir := mustTmpDir(t) + defer os.RemoveAll(testTmpDir) + + tempClone := mustTmpDir(t) + defer os.RemoveAll(tempClone) + + upstream1 := filepath.Join(testTmpDir, testUpstreamRepo) + remote1 := "file://" + upstream1 + upstream2 := filepath.Join(testTmpDir, "upstream2") + remote2 := "file://" + upstream2 + root := filepath.Join(testTmpDir, testRoot) + + fileU1SHA1 := mustInitRepo(t, upstream1, "file", t.Name()+"-u1-main-1") + fileU2SHA1 := mustInitRepo(t, upstream2, "file", t.Name()+"-u2-main-1") + + rpc := RepoPoolConfig{ + Defaults: DefaultConfig{ + Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always", + }, + } + + rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + t.Run("add-remove-repo-test", func(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + // add/remove 2 repositories + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + t.Log("adding remote1", "count", i) + readStopped := make(chan bool) + ctx, cancel := context.WithCancel(t.Context()) + + newConfig := RepoPoolConfig{ + Defaults: DefaultConfig{ + Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always", + }, + Repositories: []RepositoryConfig{{ + Remote: remote1, + Worktrees: []WorktreeConfig{{Link: "link1"}}}, + }, + } + if err := newConfig.ValidateAndApplyDefaults(); err != nil { + t.Error("failed to validate new config", "err", err) + return + } + + if err := rp.AddRepository(newConfig.Repositories[0]); err != nil { + t.Error("unexpected err", "err", err) + return + } + + rp.StartLoop() + + go func() { + for { + time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + close(readStopped) + return + default: + if got, err := rp.Hash(txtCtx, remote1, "HEAD", ""); err != nil { + t.Error("unexpected err", "count", i, "err", err) + } else if got != fileU1SHA1 { + t.Errorf("remote1 hash mismatch got:%s want:%s", got, fileU1SHA1) + } + } + + } + }() + + if err := rp.AddWorktreeLink(remote1, WorktreeConfig{"link2", "", []string{}}); err != nil { + t.Error("unexpected err", "err", err) + return + } + + time.Sleep(2 * time.Second) + + if err := rp.RemoveWorktreeLink(remote1, "link1"); err != nil { + t.Error("unexpected err", "err", err) + return + } + + cancel() + <-readStopped + + if err := rp.RemoveRepository(remote1); err != nil { + t.Error("unexpected err", "err", err) + return + } + } + + }() + + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + t.Log("adding remote2", "count", i) + readStopped := make(chan bool) + ctx, cancel := context.WithCancel(t.Context()) + + newConfig := RepoPoolConfig{ + Defaults: DefaultConfig{ + Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always", + }, + Repositories: []RepositoryConfig{{Remote: remote2, + Worktrees: []WorktreeConfig{{Link: "link3"}}}, + }, + } + if err := newConfig.ValidateAndApplyDefaults(); err != nil { + t.Error("failed to validate new config", "err", err) + return + } + + if err := rp.AddRepository(newConfig.Repositories[0]); err != nil { + t.Error("unexpected err", "err", err) + return + } + + rp.StartLoop() + + // start loop to trigger read on repo pool + go func() { + for { + time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + close(readStopped) + return + default: + if got, err := rp.Hash(txtCtx, remote2, "HEAD", ""); err != nil { + t.Error("unexpected err", "count", i, "err", err) + } else if got != fileU2SHA1 { + t.Errorf("remote2 hash mismatch got:%s want:%s", got, fileU2SHA1) + } + } + } + }() + + if err := rp.AddWorktreeLink(remote2, WorktreeConfig{"link4", "", []string{}}); err != nil { + t.Error("unexpected err", "err", err) + return + } + + time.Sleep(2 * time.Second) + + if err := rp.RemoveWorktreeLink(remote2, "link3"); err != nil { + t.Error("unexpected err", "err", err) + return + } + + cancel() + + <-readStopped + + if err := rp.RemoveRepository(remote2); err != nil { + t.Error("unexpected err", "err", err) + return + } + } + }() + + wg.Wait() + }) +} diff --git a/pkg/mirror/z_e2e_test.go b/pkg/mirror/z_e2e_test.go index 729087b..8059ed3 100644 --- a/pkg/mirror/z_e2e_test.go +++ b/pkg/mirror/z_e2e_test.go @@ -1293,7 +1293,7 @@ func Test_mirror_loop(t *testing.T) { go repo.StartLoop(txtCtx) time.Sleep(testInterval) - if repo.running != true { + if repo.IsRunning() != true { t.Errorf("repo running state is still false after starting mirror loop") } @@ -1326,7 +1326,7 @@ func Test_mirror_loop(t *testing.T) { // STOP mirror loop repo.StopLoop() - if repo.running { + if repo.IsRunning() { t.Errorf("repo still running after calling StopLoop") } }