Skip to content

Commit

Permalink
add -maxworker parameter to specify how many Goroutines are allowed t…
Browse files Browse the repository at this point in the history
…o run in parallel for Git and Forge module resolving
  • Loading branch information
xorpaul committed Feb 24, 2017
1 parent 7dbdf8f commit db95104
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 49 deletions.
38 changes: 36 additions & 2 deletions forge.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,50 @@ func resolveForgeModules(modules map[string]ForgeModule) {
bar.PrependFunc(func(b *uiprogress.Bar) string {
return fmt.Sprintf("Resolving Forge modules (%d/%d)", b.Current(), len(modules))
})
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.
concurrentGoroutines := make(chan struct{}, maxNbConcurrentGoroutines)
// Fill the dummy channel with maxNbConcurrentGoroutines empty struct.
for i := 0; i < maxNbConcurrentGoroutines; i++ {
concurrentGoroutines <- struct{}{}
}

// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for i := 1; i <= len(modules); i++ {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}
// We have collected all the jobs, the program
// can now terminate 8.6s with git (13.7s sync, I/O 1.2s)
waitForAllJobs <- true
}()

for m, fm := range modules {
wgForge.Add(1)
go func(m string, fm ForgeModule, bar *uiprogress.Bar) {
defer wgForge.Done()
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
defer bar.Incr()
Debugf("resolveForgeModules(): Trying to get forge module " + m + " with Forge base url " + fm.baseUrl + " and CacheTtl set to " + fm.cacheTtl.String())
doModuleInstallOrNothing(m, fm)
done <- true
}(m, fm, bar)
}
wgForge.Wait()
// Wait for all jobs to finish
<-waitForAllJobs
}

func check4ForgeUpdate(moduleName string, currentVersion string, latestVersion string) {
Expand Down
85 changes: 44 additions & 41 deletions g10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,35 @@ import (
)

var (
debug bool
verbose bool
info bool
force bool
usemove bool
pfMode bool
dryRun bool
check4update bool
checkSum bool
moduleDirParam string
config ConfigSettings
wg sync.WaitGroup
mutex sync.Mutex
empty struct{}
syncGitCount int
syncForgeCount int
needSyncGitCount int
needSyncForgeCount int
syncGitTime float64
syncForgeTime float64
ioGitTime float64
ioForgeTime float64
forgeJsonParseTime float64
metadataJsonParseTime float64
gmetadataJsonParseTime float64
buildtime string
uniqueForgeModules map[string]ForgeModule
latestForgeModules LatestForgeModules
debug bool
verbose bool
info bool
force bool
usemove bool
pfMode bool
dryRun bool
check4update bool
checkSum bool
moduleDirParam string
config ConfigSettings
wg sync.WaitGroup
mutex sync.Mutex
empty struct{}
syncGitCount int
syncForgeCount int
needSyncGitCount int
needSyncForgeCount int
syncGitTime float64
syncForgeTime float64
ioGitTime float64
ioForgeTime float64
forgeJsonParseTime float64
metadataJsonParseTime float64
gmetadataJsonParseTime float64
buildtime string
uniqueForgeModules map[string]ForgeModule
latestForgeModules LatestForgeModules
maxNbConcurrentGoroutines int
)

type LatestForgeModules struct {
Expand Down Expand Up @@ -130,19 +131,20 @@ type ExecResult struct {
func main() {

var (
configFile = flag.String("config", "", "which config file to use")
envBranchFlag = flag.String("branch", "", "which git branch of the Puppet environment to update, e.g. core_foobar")
moduleDirFlag = flag.String("moduledir", "", "allows overriding of Puppetfile specific moduledir setting, the folder in which Puppet modules will be extracted")
pfFlag = flag.Bool("puppetfile", false, "install all modules from Puppetfile in cwd")
forceFlag = flag.Bool("force", false, "purge the Puppet environment directory and do a full sync")
dryRunFlag = flag.Bool("dryrun", false, "do not modify anything, just print what would be changed")
usemoveFlag = flag.Bool("usemove", false, "do not use hardlinks to populate your Puppet environments with Puppetlabs Forge modules. Instead uses simple move commands and purges the Forge cache directory after each run! (Useful for g10k runs inside a Docker container)")
check4updateFlag = flag.Bool("check4update", false, "only check if the is newer version of the Puppet module avaialable. Does implicitly set dryrun to true")
checkSumFlag = flag.Bool("checksum", false, "get the md5 check sum for each Puppetlabs Forge module and verify the integrity of the downloaded archive. Increases g10k run time!")
debugFlag = flag.Bool("debug", false, "log debug output, defaults to false")
verboseFlag = flag.Bool("verbose", false, "log verbose output, defaults to false")
infoFlag = flag.Bool("info", false, "log info output, defaults to false")
versionFlag = flag.Bool("version", false, "show build time and version number")
configFile = flag.String("config", "", "which config file to use")
envBranchFlag = flag.String("branch", "", "which git branch of the Puppet environment to update, e.g. core_foobar")
moduleDirFlag = flag.String("moduledir", "", "allows overriding of Puppetfile specific moduledir setting, the folder in which Puppet modules will be extracted")
maxConcurrentFlag = flag.Int("maxworker", 100, "how many Goroutines are allowed to run in parallel for Git and Forge module resolving")
pfFlag = flag.Bool("puppetfile", false, "install all modules from Puppetfile in cwd")
forceFlag = flag.Bool("force", false, "purge the Puppet environment directory and do a full sync")
dryRunFlag = flag.Bool("dryrun", false, "do not modify anything, just print what would be changed")
usemoveFlag = flag.Bool("usemove", false, "do not use hardlinks to populate your Puppet environments with Puppetlabs Forge modules. Instead uses simple move commands and purges the Forge cache directory after each run! (Useful for g10k runs inside a Docker container)")
check4updateFlag = flag.Bool("check4update", false, "only check if the is newer version of the Puppet module avaialable. Does implicitly set dryrun to true")
checkSumFlag = flag.Bool("checksum", false, "get the md5 check sum for each Puppetlabs Forge module and verify the integrity of the downloaded archive. Increases g10k run time!")
debugFlag = flag.Bool("debug", false, "log debug output, defaults to false")
verboseFlag = flag.Bool("verbose", false, "log verbose output, defaults to false")
infoFlag = flag.Bool("info", false, "log info output, defaults to false")
versionFlag = flag.Bool("version", false, "show build time and version number")
)
flag.Parse()

Expand All @@ -156,6 +158,7 @@ func main() {
pfMode = *pfFlag
checkSum = *checkSumFlag
moduleDirParam = *moduleDirFlag
maxNbConcurrentGoroutines = *maxConcurrentFlag

if *versionFlag {
fmt.Println("g10k Version 1.0 Build time:", buildtime, "UTC")
Expand Down
49 changes: 43 additions & 6 deletions git.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os/exec"
"strconv"
"strings"
"sync"
"time"

"github.com/xorpaul/uiprogress"
Expand All @@ -18,17 +17,52 @@ func resolveGitRepositories(uniqueGitModules map[string]GitModule) {
Debugf("uniqueGitModules[] is empty, skipping...")
return
}
var wgGit sync.WaitGroup
bar := uiprogress.AddBar(len(uniqueGitModules)).AppendCompleted().PrependElapsed()
bar.PrependFunc(func(b *uiprogress.Bar) string {
return fmt.Sprintf("Resolving Git modules (%d/%d)", b.Current(), len(uniqueGitModules))
})
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.
maxNbConcurrentGoroutines := 1
concurrentGoroutines := make(chan struct{}, maxNbConcurrentGoroutines)
// Fill the dummy channel with maxNbConcurrentGoroutines empty struct.
for i := 0; i < maxNbConcurrentGoroutines; i++ {
concurrentGoroutines <- struct{}{}
}

// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for _, gm := range uniqueGitModules {
go func(gm GitModule) {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}(gm)
}
// We have collected all the jobs, the program
// can now terminate 8.6s with git (13.7s sync, I/O 1.2s)
waitForAllJobs <- true
}()

for url, gm := range uniqueGitModules {
wgGit.Add(1)
Debugf("git repo url " + url)
privateKey := gm.privateKey
go func(url string, privateKey string, gm GitModule) {
defer wgGit.Done()
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
defer bar.Incr()

if len(gm.privateKey) > 0 {
Debugf("git repo url " + url + " with ssh key " + privateKey)
} else {
Expand All @@ -44,8 +78,11 @@ func resolveGitRepositories(uniqueGitModules map[string]GitModule) {
// doCloneOrPull(source, workDir, targetDir, sa.Remote, branch, sa.PrivateKey)

}(url, privateKey, gm)
done <- true
}
wgGit.Wait()

// Wait for all jobs to finish
<-waitForAllJobs
}

func doMirrorOrUpdate(url string, workDir string, sshPrivateKey string, allowFail bool) bool {
Expand Down Expand Up @@ -77,7 +114,7 @@ func syncToModuleDir(srcDir string, targetDir string, tree string, allowFail boo
mutex.Lock()
syncGitCount++
mutex.Unlock()
logCmd := "git --git-dir " + srcDir + " log -n1 --pretty=format:%H " + tree
logCmd := "git --git-dir " + srcDir + " rev-parse --verify '" + tree + "'"
er := executeCommand(logCmd, config.Timeout, allowFail)
hashFile := targetDir + "/.latest_commit"
needToSync := true
Expand Down
6 changes: 6 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func executeCommand(command string, timeout int, allowFail bool) ExecResult {
out, err := exec.Command(cmd, cmdArgs...).CombinedOutput()
duration := time.Since(before).Seconds()
er := ExecResult{0, string(out)}
if err != nil {
msg, ok := err.(*exec.ExitError)
er.returnCode = msg.Sys().(syscall.WaitStatus).ExitStatus()
Debugf(fmt.Sprintf("Error message: %v", err))
Debugf(fmt.Sprintf("er.returnCode, ok: %q, %q", er.returnCode, ok))
}
if msg, ok := err.(*exec.ExitError); ok { // there is error code
er.returnCode = msg.Sys().(syscall.WaitStatus).ExitStatus()
}
Expand Down

0 comments on commit db95104

Please sign in to comment.