Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ jobs:
run: go test -v -cover ./...

- name: Test with race
run: go test -v -cover -race -count 1 -timeout 20s --tags deadlock_test -run Test_mirror_detect_race ./pkg/mirror/...
run: |
go test -v -cover -race -count 1 -timeout 20s --tags deadlock_test -run Test_mirror_detect_race_clone ./pkg/mirror/...
go test -v -cover -race -count 1 -timeout 60s --tags deadlock_test -run Test_mirror_detect_race_repo_pool ./pkg/mirror/...
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func loadConfig(path string, lastModTime time.Time, onChange func(*mirror.RepoPo
return modTime, onChange(newConfig)
}

// ensureConfig will do the diff between current repoPool state and new config
// and based on that diff it will add/remove repositories and worktrees
func ensureConfig(repoPool *mirror.RepoPool, newConfig *mirror.RepoPoolConfig) bool {
success := true

Expand Down Expand Up @@ -180,6 +182,8 @@ func parseConfigFile(path string) (*mirror.RepoPoolConfig, error) {
return conf, nil
}

// diffRepositories will do the diff between current state and new config and
// return new repositories config and list of remote url which are not found in config
func diffRepositories(repoPool *mirror.RepoPool, newConfig *mirror.RepoPoolConfig) (
newRepos []mirror.RepositoryConfig,
removedRepos []string,
Expand All @@ -206,6 +210,8 @@ func diffRepositories(repoPool *mirror.RepoPool, newConfig *mirror.RepoPoolConfi
return
}

// diffWorktrees will do the diff between current repo's worktree state and new worktree config
// it will return new worktree configs and link names of the link not found in new config
func diffWorktrees(repo *mirror.Repository, newRepoConf *mirror.RepositoryConfig) (
newWTCs []mirror.WorktreeConfig,
removedWTs []string,
Expand Down
19 changes: 14 additions & 5 deletions pkg/mirror/repo_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ func NewRepoPool(ctx context.Context, conf RepoPoolConfig, log *slog.Logger, com
// signal repository
repoCancel()

rp.lock.RLock()
defer rp.lock.RUnlock()

for {
time.Sleep(time.Second)
// check if any repo mirror is still running
var running bool
for _, repo := range rp.repos {
if repo.running {
if repo.IsRunning() {
running = true
break
}
Expand Down Expand Up @@ -143,7 +146,7 @@ func (rp *RepoPool) StartLoop() {
defer rp.lock.RUnlock()

for _, repo := range rp.repos {
if !repo.running {
if !repo.IsRunning() {
go repo.StartLoop(rp.ctx)
continue
}
Expand All @@ -168,6 +171,7 @@ func (rp *RepoPool) Repository(remote string) (*Repository, error) {
return nil, ErrNotExist
}

// RepositoriesRemote returns remote URLs of all the repositories
func (rp *RepoPool) RepositoriesRemote() []string {
rp.lock.RLock()
defer rp.lock.RUnlock()
Expand All @@ -179,6 +183,7 @@ func (rp *RepoPool) RepositoriesRemote() []string {
return urls
}

// RepositoriesDirPath returns local paths of all the mirrored repositories
func (rp *RepoPool) RepositoriesDirPath() []string {
rp.lock.RLock()
defer rp.lock.RUnlock()
Expand All @@ -192,22 +197,26 @@ func (rp *RepoPool) RepositoriesDirPath() []string {

// AddWorktreeLink is wrapper around repositories AddWorktreeLink method
func (rp *RepoPool) AddWorktreeLink(remote string, wt WorktreeConfig) error {
rp.lock.RLock()
defer rp.lock.RUnlock()

repo, err := rp.Repository(remote)
if err != nil {
return err
}
if err := rp.validateLinkPath(repo, wt.Link); err != nil {
return err
}

rp.lock.Lock()
defer rp.lock.Unlock()

return repo.AddWorktreeLink(wt)
}

func (rp *RepoPool) validateLinkPath(repo *Repository, link string) error {
newAbsLink := absLink(repo.root, link)

rp.lock.RLock()
defer rp.lock.RUnlock()

for _, r := range rp.repos {
for _, wl := range r.WorktreeLinks() {
if wl.linkAbs == newAbsLink {
Expand Down
23 changes: 21 additions & 2 deletions pkg/mirror/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func NewRepository(repoConf RepositoryConfig, envs []string, log *slog.Logger) (

// AddWorktreeLink adds workTree link to the mirror repository.
func (r *Repository) AddWorktreeLink(wtc WorktreeConfig) error {
r.lock.Lock()
defer r.lock.Unlock()

if wtc.Link == "" {
return fmt.Errorf("symlink path cannot be empty")
}
Expand Down Expand Up @@ -162,6 +165,7 @@ func (r *Repository) AddWorktreeLink(wtc WorktreeConfig) error {
return nil
}

// WorktreeLinks returns current clone of worktree maps
func (r *Repository) WorktreeLinks() map[string]*WorkTreeLink {
r.lock.RLock()
defer r.lock.RUnlock()
Expand Down Expand Up @@ -391,17 +395,32 @@ func (r *Repository) cloneByRef(ctx context.Context, dst, ref, pathspec string,
return hash, nil
}

// IsRunning returns if repositories mirror loop is running or not
func (r *Repository) IsRunning() bool {
r.lock.RLock()
defer r.lock.RUnlock()

return r.running
}

// StartLoop mirrors repository periodically based on repo's mirror interval
func (r *Repository) StartLoop(ctx context.Context) {
if r.running {
if r.IsRunning() {
r.log.Error("mirror loop has already been started")
return
}

r.lock.Lock()
r.running = true
r.lock.Unlock()

r.log.Info("started repository mirror loop", "interval", r.interval)

defer func() {
r.lock.Lock()
r.running = false
r.lock.Unlock()

close(r.stopped)
}()

Expand Down Expand Up @@ -480,7 +499,7 @@ func (r *Repository) Mirror(ctx context.Context) error {
}

// RemoveWorktreeLink removes workTree link from the mirror repository.
// it will remove published link as well even if it failed to remove worktree
// it will remove published link as well even (if it failed to remove worktree)
func (r *Repository) RemoveWorktreeLink(link string) error {
r.lock.Lock()
defer r.lock.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/mirror/worktree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type WorkTreeLink struct {
log *slog.Logger
}

// Equals returns if given worktree and its config is equal
// they are considered equal only if link, ref and pathspecs are matching.
// order of pothspecs is ignored
func (wt *WorkTreeLink) Equals(wtc WorktreeConfig) bool {
sortedConfigPaths := slices.Clone(wtc.Pathspecs)
slices.Sort(sortedConfigPaths)
Expand Down
176 changes: 175 additions & 1 deletion pkg/mirror/z_e2e_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"path/filepath"
"sync"
"testing"
"time"
)

func Test_mirror_detect_race(t *testing.T) {
func Test_mirror_detect_race_clone(t *testing.T) {
testTmpDir := mustTmpDir(t)
defer os.RemoveAll(testTmpDir)

Expand Down Expand Up @@ -106,3 +107,176 @@ func Test_mirror_detect_race(t *testing.T) {
})

}

func Test_mirror_detect_race_repo_pool(t *testing.T) {
testTmpDir := mustTmpDir(t)
defer os.RemoveAll(testTmpDir)

tempClone := mustTmpDir(t)
defer os.RemoveAll(tempClone)

upstream1 := filepath.Join(testTmpDir, testUpstreamRepo)
remote1 := "file://" + upstream1
upstream2 := filepath.Join(testTmpDir, "upstream2")
remote2 := "file://" + upstream2
root := filepath.Join(testTmpDir, testRoot)

fileU1SHA1 := mustInitRepo(t, upstream1, "file", t.Name()+"-u1-main-1")
fileU2SHA1 := mustInitRepo(t, upstream2, "file", t.Name()+"-u2-main-1")

rpc := RepoPoolConfig{
Defaults: DefaultConfig{
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
},
}

rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

t.Run("add-remove-repo-test", func(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

// add/remove 2 repositories
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
t.Log("adding remote1", "count", i)
readStopped := make(chan bool)
ctx, cancel := context.WithCancel(t.Context())

newConfig := RepoPoolConfig{
Defaults: DefaultConfig{
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
},
Repositories: []RepositoryConfig{{
Remote: remote1,
Worktrees: []WorktreeConfig{{Link: "link1"}}},
},
}
if err := newConfig.ValidateAndApplyDefaults(); err != nil {
t.Error("failed to validate new config", "err", err)
return
}

if err := rp.AddRepository(newConfig.Repositories[0]); err != nil {
t.Error("unexpected err", "err", err)
return
}

rp.StartLoop()

go func() {
for {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
close(readStopped)
return
default:
if got, err := rp.Hash(txtCtx, remote1, "HEAD", ""); err != nil {
t.Error("unexpected err", "count", i, "err", err)
} else if got != fileU1SHA1 {
t.Errorf("remote1 hash mismatch got:%s want:%s", got, fileU1SHA1)
}
}

}
}()

if err := rp.AddWorktreeLink(remote1, WorktreeConfig{"link2", "", []string{}}); err != nil {
t.Error("unexpected err", "err", err)
return
}

time.Sleep(2 * time.Second)

if err := rp.RemoveWorktreeLink(remote1, "link1"); err != nil {
t.Error("unexpected err", "err", err)
return
}

cancel()
<-readStopped

if err := rp.RemoveRepository(remote1); err != nil {
t.Error("unexpected err", "err", err)
return
}
}

}()

go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
t.Log("adding remote2", "count", i)
readStopped := make(chan bool)
ctx, cancel := context.WithCancel(t.Context())

newConfig := RepoPoolConfig{
Defaults: DefaultConfig{
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
},
Repositories: []RepositoryConfig{{Remote: remote2,
Worktrees: []WorktreeConfig{{Link: "link3"}}},
},
}
if err := newConfig.ValidateAndApplyDefaults(); err != nil {
t.Error("failed to validate new config", "err", err)
return
}

if err := rp.AddRepository(newConfig.Repositories[0]); err != nil {
t.Error("unexpected err", "err", err)
return
}

rp.StartLoop()

// start loop to trigger read on repo pool
go func() {
for {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
close(readStopped)
return
default:
if got, err := rp.Hash(txtCtx, remote2, "HEAD", ""); err != nil {
t.Error("unexpected err", "count", i, "err", err)
} else if got != fileU2SHA1 {
t.Errorf("remote2 hash mismatch got:%s want:%s", got, fileU2SHA1)
}
}
}
}()

if err := rp.AddWorktreeLink(remote2, WorktreeConfig{"link4", "", []string{}}); err != nil {
t.Error("unexpected err", "err", err)
return
}

time.Sleep(2 * time.Second)

if err := rp.RemoveWorktreeLink(remote2, "link3"); err != nil {
t.Error("unexpected err", "err", err)
return
}

cancel()

<-readStopped

if err := rp.RemoveRepository(remote2); err != nil {
t.Error("unexpected err", "err", err)
return
}
}
}()

wg.Wait()
})
}
4 changes: 2 additions & 2 deletions pkg/mirror/z_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ func Test_mirror_loop(t *testing.T) {
go repo.StartLoop(txtCtx)

time.Sleep(testInterval)
if repo.running != true {
if repo.IsRunning() != true {
t.Errorf("repo running state is still false after starting mirror loop")
}

Expand Down Expand Up @@ -1326,7 +1326,7 @@ func Test_mirror_loop(t *testing.T) {

// STOP mirror loop
repo.StopLoop()
if repo.running {
if repo.IsRunning() {
t.Errorf("repo still running after calling StopLoop")
}
}
Expand Down
Loading