Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"os"
"path"
Expand All @@ -22,30 +23,20 @@ const (
var defaultRoot = path.Join(os.TempDir(), "git-mirror", "src")

// WatchConfig polls the config file every interval and reloads if modified
func WatchConfig(path string, interval time.Duration, onChange func(*mirror.RepoPoolConfig)) {
func WatchConfig(ctx context.Context, path string, interval time.Duration, onChange func(*mirror.RepoPoolConfig)) {
var lastModTime time.Time

// Load initial config
config, err := parseConfigFile(path)
if err != nil {
logger.Error("failed to load config", "err", err)
} else {
onChange(config)
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

for range ticker.C {
for {
fileInfo, err := os.Stat(path)
if err != nil {
logger.Error("Error checking config file", "err", err)
time.Sleep(interval) // retry after given interval
continue
}

modTime := fileInfo.ModTime()
if modTime.After(lastModTime) {
logger.Info("config file modified, reloading...")
logger.Info("reloading config file...")
lastModTime = modTime

newConfig, err := parseConfigFile(path)
Expand All @@ -55,6 +46,13 @@ func WatchConfig(path string, interval time.Duration, onChange func(*mirror.Repo
onChange(newConfig)
}
}

t := time.NewTimer(interval)
select {
case <-t.C:
case <-ctx.Done():
return
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Test_diffRepositories(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
applyGitDefaults(tt.initialConfig)
repoPool, err := mirror.NewRepoPool(*tt.initialConfig, nil, nil)
repoPool, err := mirror.NewRepoPool(t.Context(), *tt.initialConfig, nil, nil)
if err != nil {
t.Fatalf("could not create git mirror pool err:%v", err)
}
Expand Down
19 changes: 13 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
gitENV := []string{fmt.Sprintf("PATH=%s", os.Getenv("PATH"))}

// create empty repo pool which will be populated by watchConfig
repoPool, err := mirror.NewRepoPool(mirror.RepoPoolConfig{}, logger.With("logger", "git-mirror"), gitENV)
repoPool, err := mirror.NewRepoPool(ctx, mirror.RepoPoolConfig{}, logger.With("logger", "git-mirror"), gitENV)
if err != nil {
logger.Error("could not create git mirror pool", "err", err)
os.Exit(1)
Expand All @@ -81,23 +81,30 @@ func main() {
onConfigChange := func(config *mirror.RepoPoolConfig) {
ensureConfig(repoPool, config)
// start mirror Loop on newly added repos
repoPool.StartLoop(ctx)
repoPool.StartLoop()
}

// Start watching the config file
go WatchConfig(c.String("config"), 10*time.Second, onConfigChange)
go WatchConfig(ctx, c.String("config"), 10*time.Second, onConfigChange)

//listenForShutdown
stop := make(chan os.Signal, 1)
stop := make(chan os.Signal, 2)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)

<-stop

logger.Info("Shutting down...")
logger.Info("shutting down...")
cancel()

// TODO: wait for all repo sync to terminate
select {
case <-repoPool.Stopped:
logger.Info("all repositories mirror loop is stopped")
os.Exit(0)

case <-stop:
logger.Info("second signal received, terminating")
os.Exit(1)
}
return nil
},
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mirror/example_noworktree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ repositories:
}
conf.Defaults.Root = tmpRoot

repos, err := mirror.NewRepoPool(conf, slog.Default(), nil)
repos, err := mirror.NewRepoPool(ctx, conf, slog.Default(), nil)
if err != nil {
panic(err)
}
Expand All @@ -53,7 +53,7 @@ repositories:
}

// start mirror Loop
repos.StartLoop(ctx)
repos.StartLoop()

hash, err := repos.Hash(ctx, "https://github.com/utilitywarehouse/git-mirror.git", "main", "")
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mirror/example_worktree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ repositories:

conf.Defaults.Root = tmpRoot

repos, err := mirror.NewRepoPool(conf, slog.Default(), nil)
repos, err := mirror.NewRepoPool(ctx, conf, slog.Default(), nil)
if err != nil {
panic(err)
}
Expand All @@ -56,7 +56,7 @@ repositories:
}

// start mirror Loop
repos.StartLoop(ctx)
repos.StartLoop()

hash, err := repos.Hash(ctx, "https://github.com/utilitywarehouse/git-mirror.git", "main", "")
if err != nil {
Expand Down
45 changes: 41 additions & 4 deletions pkg/mirror/repo_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,61 @@ var (
// it provides simple wrapper around Repository methods.
// A RepoPool is safe for concurrent use by multiple goroutines.
type RepoPool struct {
ctx context.Context
lock lock.RWMutex
log *slog.Logger
repos []*Repository
commonENVs []string
Stopped chan bool
}

// NewRepoPool will create mirror repositories based on given config.
// Remote repo will not be mirrored until either Mirror() or StartLoop() is called
func NewRepoPool(conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*RepoPool, error) {
func NewRepoPool(ctx context.Context, conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*RepoPool, error) {
if err := conf.ValidateAndApplyDefaults(); err != nil {
return nil, err
}

if log == nil {
log = slog.Default()
}
repoCtx, repoCancel := context.WithCancel(ctx)

rp := &RepoPool{log: log, commonENVs: commonENVs}
rp := &RepoPool{
ctx: repoCtx,
log: log,
commonENVs: commonENVs,
Stopped: make(chan bool),
}

// start shutdown watcher
go func() {
defer func() {
close(rp.Stopped)
}()

// wait for shutdown signal
<-ctx.Done()

// signal repository
repoCancel()

for {
time.Sleep(time.Second)
// check if any repo mirror is still running
var running bool
for _, repo := range rp.repos {
if repo.running {
running = true
break
}
}

if !running {
return
}
}
}()

for _, repoConf := range conf.Repositories {
if err := rp.AddRepository(repoConf); err != nil {
Expand Down Expand Up @@ -101,13 +138,13 @@ func (rp *RepoPool) Mirror(ctx context.Context, remote string) error {

// StartLoop will start mirror loop on all repositories
// if its not already started
func (rp *RepoPool) StartLoop(ctx context.Context) {
func (rp *RepoPool) StartLoop() {
rp.lock.RLock()
defer rp.lock.RUnlock()

for _, repo := range rp.repos {
if !repo.running {
go repo.StartLoop(ctx)
go repo.StartLoop(rp.ctx)
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mirror/repo_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestRepoPool_validateLinkPath(t *testing.T) {
},
}

rp, err := NewRepoPool(rpc, nil, testENVs)
rp, err := NewRepoPool(t.Context(), rpc, nil, testENVs)
if err != nil {
t.Fatalf("unexpected err:%s", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/mirror/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (r *Repository) StartLoop(ctx context.Context) {
select {
case <-t.C:
case <-ctx.Done():
r.log.Info("context cancelled stopping mirror loop")
return
case <-r.stop:
return
Expand Down
10 changes: 5 additions & 5 deletions pkg/mirror/z_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ func Test_RepoPool_Success(t *testing.T) {
},
}

rp, err := NewRepoPool(rpc, testLog, testENVs)
rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1387,11 +1387,11 @@ func Test_RepoPool_Success(t *testing.T) {
t.Log("TEST-2: forward both upstream and test mirrors")

// start mirror loop
rp.StartLoop(t.Context())
rp.StartLoop()

time.Sleep(time.Second)
// start mirror loop again this should be no op
rp.StartLoop(t.Context())
rp.StartLoop()

fileU1SHA2 := mustCommit(t, upstream1, "file", t.Name()+"-u1-main-2")
fileU2SHA2 := mustCommit(t, upstream2, "file", t.Name()+"-u2-main-2")
Expand Down Expand Up @@ -1530,12 +1530,12 @@ func Test_RepoPool_Error(t *testing.T) {
},
}

rp, err := NewRepoPool(rpc, testLog, testENVs)
rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// start mirror loop
rp.StartLoop(t.Context())
rp.StartLoop()

time.Sleep(time.Second)

Expand Down
Loading