Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"context"
"log"
"fmt"
"os"
"os/signal"
"regexp"
"runtime"
"strings"
"time"
Expand Down Expand Up @@ -114,26 +115,32 @@ this repository has new commits, Pico will automatically reconfigure.`,
case err = <-errs:
}

if strings.ToLower(os.Getenv("LOG_LEVEL")) == "debug" {
doTrace()
}

return
},
},
}

if strings.ToLower(os.Getenv("LOG_LEVEL")) == "debug" {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
buf := make([]byte, 1<<20)
for {
<-sigs
stacklen := runtime.Stack(buf, true)
log.Printf("\nPrinting goroutine stack trace because `DEBUG` was set.\n%s\n", buf[:stacklen])
}
}()
}

err := app.Run(os.Args)
if err != nil {
zap.L().Fatal("exit", zap.Error(err))
}
}

var waitpoints = regexp.MustCompile(`__waitpoint__(.+)\(`)

func doTrace() {
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)

fmt.Printf("\nPrinting goroutine stack trace because `DEBUG` was set.\n%s\n", buf[:stacklen])
fmt.Println("Code that was waiting:")

for _, s := range waitpoints.FindAllStringSubmatch(string(buf[:stacklen]), 1) {
fmt.Printf(" - %s\n", s[1])
}
fmt.Println("\nSee the docs on https://pico.sh/ for more information.")
}
11 changes: 8 additions & 3 deletions reconfigurer/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ func (p *GitProvider) watchConfig() (err error) {
}()
zap.L().Debug("created new config watcher, awaiting setup")

err = p.__waitpoint__watch_config(errs)

zap.L().Debug("config watcher initialised")

return
}

func (p *GitProvider) __waitpoint__watch_config(errs chan error) (err error) {
select {
case <-p.configWatcher.InitialDone:
case err = <-errs:
}

zap.L().Debug("config watcher initialised")

return
}

Expand Down
77 changes: 43 additions & 34 deletions watcher/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,45 +60,49 @@ func NewGitWatcher(
}
}

// Start runs the watcher loop and blocks until a fatal error occurs
func (w *GitWatcher) Start() error {
zap.L().Debug("git watcher initialising, waiting for first state to be set")

// wait for the first config event to set the initial state
func (w *GitWatcher) __waitpoint__start_wait_init() {
<-w.initialise
}

zap.L().Debug("git watcher initialised", zap.Any("initial_state", w.state))

f := func() (err error) {
select {
case newState := <-w.newState:
zap.L().Debug("git watcher received new state",
zap.Any("new_state", newState))

return w.doReconfigure(newState)
func (w *GitWatcher) __waitpoint__start_select_states() (err error) {
select {
case newState := <-w.newState:
zap.L().Debug("git watcher received new state",
zap.Any("new_state", newState))

case <-w.stateReq:
w.stateRes <- w.state
return w.doReconfigure(newState)

case event := <-w.targetsWatcher.Events:
zap.L().Debug("git watcher received a target event",
zap.Any("new_state", event))
case <-w.stateReq:
w.stateRes <- w.state

if e := w.handle(event); e != nil {
zap.L().Error("failed to handle event",
zap.String("url", event.URL),
zap.Error(e))
}
case event := <-w.targetsWatcher.Events:
zap.L().Debug("git watcher received a target event",
zap.Any("new_state", event))

case e := <-errorMultiplex(w.errors, w.targetsWatcher.Errors):
zap.L().Error("git error",
if e := w.handle(event); e != nil {
zap.L().Error("failed to handle event",
zap.String("url", event.URL),
zap.Error(e))
}
return

case e := <-errorMultiplex(w.errors, w.targetsWatcher.Errors):
zap.L().Error("git error",
zap.Error(e))
}
return
}

// Start runs the watcher loop and blocks until a fatal error occurs
func (w *GitWatcher) Start() error {
zap.L().Debug("git watcher initialising, waiting for first state to be set")

// wait for the first config event to set the initial state
w.__waitpoint__start_wait_init()

zap.L().Debug("git watcher initialised", zap.Any("initial_state", w.state))

for {
err := f()
err := w.__waitpoint__start_select_states()
if err != nil {
return err
}
Expand Down Expand Up @@ -203,13 +207,18 @@ func (w *GitWatcher) watchTargets() (err error) {
}()
zap.L().Debug("created targets watcher, awaiting setup")

err = w.__waitpoint__watch_targets(errs)

zap.L().Debug("targets watcher initialised")

return
}

func (w *GitWatcher) __waitpoint__watch_targets(errs chan error) (err error) {
select {
case <-w.targetsWatcher.InitialDone:
case err = <-errs:
}

zap.L().Debug("targets watcher initialised")

return
}

Expand All @@ -222,7 +231,7 @@ func (w *GitWatcher) handle(e gitwatch.Event) (err error) {
zap.String("target", target.Name),
zap.String("url", target.RepoURL),
zap.Time("timestamp", e.Timestamp))
w.send(target, e.Path, false)
w.__waitpoint__send_target_task(target, e.Path, false)
return nil
}

Expand Down Expand Up @@ -257,7 +266,7 @@ func (w GitWatcher) executeTargets(targets []task.Target, shutdown bool) {
zap.Int("targets", len(targets)))

for _, t := range targets {
w.send(t, filepath.Join(w.directory, t.Name), shutdown)
w.__waitpoint__send_target_task(t, filepath.Join(w.directory, t.Name), shutdown)
}
}

Expand All @@ -270,7 +279,7 @@ func (w GitWatcher) getTarget(url string) (target task.Target, exists bool) {
return
}

func (w GitWatcher) send(target task.Target, path string, shutdown bool) {
func (w GitWatcher) __waitpoint__send_target_task(target task.Target, path string, shutdown bool) {
w.bus <- task.ExecutionTask{
Target: target,
Path: path,
Expand Down