From 8f294833260955a555d3ad577970aba3786b892a Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Tue, 18 Mar 2025 11:16:12 +0000 Subject: [PATCH 1/2] wait for all repo shutdown before exit --- config_test.go | 2 +- main.go | 17 +++++++--- pkg/mirror/example_noworktree_test.go | 4 +-- pkg/mirror/example_worktree_test.go | 4 +-- pkg/mirror/repo_pool.go | 45 ++++++++++++++++++++++++--- pkg/mirror/repo_pool_test.go | 2 +- pkg/mirror/repository.go | 1 + pkg/mirror/z_e2e_test.go | 10 +++--- 8 files changed, 65 insertions(+), 20 deletions(-) diff --git a/config_test.go b/config_test.go index 972901a..1e19544 100644 --- a/config_test.go +++ b/config_test.go @@ -74,7 +74,7 @@ func Test_diffRepositories(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { applyGitDefaults(tt.initialConfig) - repoPool, err := mirror.NewRepoPool(*tt.initialConfig, nil, nil) + repoPool, err := mirror.NewRepoPool(t.Context(), *tt.initialConfig, nil, nil) if err != nil { t.Fatalf("could not create git mirror pool err:%v", err) } diff --git a/main.go b/main.go index a948d8c..3d389e6 100644 --- a/main.go +++ b/main.go @@ -72,7 +72,7 @@ func main() { gitENV := []string{fmt.Sprintf("PATH=%s", os.Getenv("PATH"))} // create empty repo pool which will be populated by watchConfig - repoPool, err := mirror.NewRepoPool(mirror.RepoPoolConfig{}, logger.With("logger", "git-mirror"), gitENV) + repoPool, err := mirror.NewRepoPool(ctx, mirror.RepoPoolConfig{}, logger.With("logger", "git-mirror"), gitENV) if err != nil { logger.Error("could not create git mirror pool", "err", err) os.Exit(1) @@ -81,23 +81,30 @@ func main() { onConfigChange := func(config *mirror.RepoPoolConfig) { ensureConfig(repoPool, config) // start mirror Loop on newly added repos - repoPool.StartLoop(ctx) + repoPool.StartLoop() } // Start watching the config file go WatchConfig(c.String("config"), 10*time.Second, onConfigChange) //listenForShutdown - stop := make(chan os.Signal, 1) + stop := make(chan os.Signal, 2) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) <-stop - logger.Info("Shutting down...") + logger.Info("shutting down...") cancel() - // TODO: wait for all repo sync to terminate + select { + case <-repoPool.Stopped: + logger.Info("all repositories mirror loop is stopped") + os.Exit(0) + case <-stop: + logger.Info("second signal received, terminating") + os.Exit(1) + } return nil }, } diff --git a/pkg/mirror/example_noworktree_test.go b/pkg/mirror/example_noworktree_test.go index 26dc6df..49146eb 100644 --- a/pkg/mirror/example_noworktree_test.go +++ b/pkg/mirror/example_noworktree_test.go @@ -41,7 +41,7 @@ repositories: } conf.Defaults.Root = tmpRoot - repos, err := mirror.NewRepoPool(conf, slog.Default(), nil) + repos, err := mirror.NewRepoPool(ctx, conf, slog.Default(), nil) if err != nil { panic(err) } @@ -53,7 +53,7 @@ repositories: } // start mirror Loop - repos.StartLoop(ctx) + repos.StartLoop() hash, err := repos.Hash(ctx, "https://github.com/utilitywarehouse/git-mirror.git", "main", "") if err != nil { diff --git a/pkg/mirror/example_worktree_test.go b/pkg/mirror/example_worktree_test.go index 99a82a5..a43d468 100644 --- a/pkg/mirror/example_worktree_test.go +++ b/pkg/mirror/example_worktree_test.go @@ -44,7 +44,7 @@ repositories: conf.Defaults.Root = tmpRoot - repos, err := mirror.NewRepoPool(conf, slog.Default(), nil) + repos, err := mirror.NewRepoPool(ctx, conf, slog.Default(), nil) if err != nil { panic(err) } @@ -56,7 +56,7 @@ repositories: } // start mirror Loop - repos.StartLoop(ctx) + repos.StartLoop() hash, err := repos.Hash(ctx, "https://github.com/utilitywarehouse/git-mirror.git", "main", "") if err != nil { diff --git a/pkg/mirror/repo_pool.go b/pkg/mirror/repo_pool.go index ae5890b..0d3412d 100644 --- a/pkg/mirror/repo_pool.go +++ b/pkg/mirror/repo_pool.go @@ -21,15 +21,17 @@ var ( // it provides simple wrapper around Repository methods. // A RepoPool is safe for concurrent use by multiple goroutines. type RepoPool struct { + ctx context.Context lock lock.RWMutex log *slog.Logger repos []*Repository commonENVs []string + Stopped chan bool } // NewRepoPool will create mirror repositories based on given config. // Remote repo will not be mirrored until either Mirror() or StartLoop() is called -func NewRepoPool(conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*RepoPool, error) { +func NewRepoPool(ctx context.Context, conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*RepoPool, error) { if err := conf.ValidateAndApplyDefaults(); err != nil { return nil, err } @@ -37,8 +39,43 @@ func NewRepoPool(conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*R if log == nil { log = slog.Default() } + repoCtx, repoCancel := context.WithCancel(ctx) - rp := &RepoPool{log: log, commonENVs: commonENVs} + rp := &RepoPool{ + ctx: repoCtx, + log: log, + commonENVs: commonENVs, + Stopped: make(chan bool), + } + + // start shutdown watcher + go func() { + defer func() { + close(rp.Stopped) + }() + + // wait for shutdown signal + <-ctx.Done() + + // signal repository + repoCancel() + + for { + time.Sleep(time.Second) + // check if any repo mirror is still running + var running bool + for _, repo := range rp.repos { + if repo.running { + running = true + break + } + } + + if !running { + return + } + } + }() for _, repoConf := range conf.Repositories { if err := rp.AddRepository(repoConf); err != nil { @@ -101,13 +138,13 @@ func (rp *RepoPool) Mirror(ctx context.Context, remote string) error { // StartLoop will start mirror loop on all repositories // if its not already started -func (rp *RepoPool) StartLoop(ctx context.Context) { +func (rp *RepoPool) StartLoop() { rp.lock.RLock() defer rp.lock.RUnlock() for _, repo := range rp.repos { if !repo.running { - go repo.StartLoop(ctx) + go repo.StartLoop(rp.ctx) continue } } diff --git a/pkg/mirror/repo_pool_test.go b/pkg/mirror/repo_pool_test.go index e82fab8..81fbcec 100644 --- a/pkg/mirror/repo_pool_test.go +++ b/pkg/mirror/repo_pool_test.go @@ -25,7 +25,7 @@ func TestRepoPool_validateLinkPath(t *testing.T) { }, } - rp, err := NewRepoPool(rpc, nil, testENVs) + rp, err := NewRepoPool(t.Context(), rpc, nil, testENVs) if err != nil { t.Fatalf("unexpected err:%s", err) } diff --git a/pkg/mirror/repository.go b/pkg/mirror/repository.go index db036d4..0ef92c8 100644 --- a/pkg/mirror/repository.go +++ b/pkg/mirror/repository.go @@ -418,6 +418,7 @@ func (r *Repository) StartLoop(ctx context.Context) { select { case <-t.C: case <-ctx.Done(): + r.log.Info("context cancelled stopping mirror loop") return case <-r.stop: return diff --git a/pkg/mirror/z_e2e_test.go b/pkg/mirror/z_e2e_test.go index 19e1752..7d76d6a 100644 --- a/pkg/mirror/z_e2e_test.go +++ b/pkg/mirror/z_e2e_test.go @@ -1334,7 +1334,7 @@ func Test_RepoPool_Success(t *testing.T) { }, } - rp, err := NewRepoPool(rpc, testLog, testENVs) + rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -1387,11 +1387,11 @@ func Test_RepoPool_Success(t *testing.T) { t.Log("TEST-2: forward both upstream and test mirrors") // start mirror loop - rp.StartLoop(t.Context()) + rp.StartLoop() time.Sleep(time.Second) // start mirror loop again this should be no op - rp.StartLoop(t.Context()) + rp.StartLoop() fileU1SHA2 := mustCommit(t, upstream1, "file", t.Name()+"-u1-main-2") fileU2SHA2 := mustCommit(t, upstream2, "file", t.Name()+"-u2-main-2") @@ -1530,12 +1530,12 @@ func Test_RepoPool_Error(t *testing.T) { }, } - rp, err := NewRepoPool(rpc, testLog, testENVs) + rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs) if err != nil { t.Fatalf("unexpected error: %v", err) } // start mirror loop - rp.StartLoop(t.Context()) + rp.StartLoop() time.Sleep(time.Second) From ed49ecce77cd1001b29d5823b30c21f5346d2fb9 Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Tue, 18 Mar 2025 12:17:16 +0000 Subject: [PATCH 2/2] stop config watcher on shutdown --- config.go | 26 ++++++++++++-------------- main.go | 2 +- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/config.go b/config.go index c284523..64888f7 100644 --- a/config.go +++ b/config.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "os" "path" @@ -22,30 +23,20 @@ const ( var defaultRoot = path.Join(os.TempDir(), "git-mirror", "src") // WatchConfig polls the config file every interval and reloads if modified -func WatchConfig(path string, interval time.Duration, onChange func(*mirror.RepoPoolConfig)) { +func WatchConfig(ctx context.Context, path string, interval time.Duration, onChange func(*mirror.RepoPoolConfig)) { var lastModTime time.Time - // Load initial config - config, err := parseConfigFile(path) - if err != nil { - logger.Error("failed to load config", "err", err) - } else { - onChange(config) - } - - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for range ticker.C { + for { fileInfo, err := os.Stat(path) if err != nil { logger.Error("Error checking config file", "err", err) + time.Sleep(interval) // retry after given interval continue } modTime := fileInfo.ModTime() if modTime.After(lastModTime) { - logger.Info("config file modified, reloading...") + logger.Info("reloading config file...") lastModTime = modTime newConfig, err := parseConfigFile(path) @@ -55,6 +46,13 @@ func WatchConfig(path string, interval time.Duration, onChange func(*mirror.Repo onChange(newConfig) } } + + t := time.NewTimer(interval) + select { + case <-t.C: + case <-ctx.Done(): + return + } } } diff --git a/main.go b/main.go index 3d389e6..a1cf80c 100644 --- a/main.go +++ b/main.go @@ -85,7 +85,7 @@ func main() { } // Start watching the config file - go WatchConfig(c.String("config"), 10*time.Second, onConfigChange) + go WatchConfig(ctx, c.String("config"), 10*time.Second, onConfigChange) //listenForShutdown stop := make(chan os.Signal, 2)