From 7b09104e152eadb72792e5c926526f9c29104914 Mon Sep 17 00:00:00 2001 From: fosslinux Date: Mon, 18 Oct 2021 12:50:27 +1100 Subject: [PATCH] pkg/scheduler: add a scheduler. Along with this add a example local CapacityProvider. --- .gitignore | 1 + cmd/graph/main.go | 7 ++ pkg/scheduler/build.go | 8 +++ pkg/scheduler/http.go | 26 +++++++ pkg/scheduler/local/localCapacity.go | 86 ++++++++++++++++++++++ pkg/scheduler/local/types.go | 15 ++++ pkg/scheduler/scheduler.go | 104 +++++++++++++++++++++++++++ pkg/scheduler/types.go | 41 +++++++++++ 8 files changed, 288 insertions(+) create mode 100644 pkg/scheduler/build.go create mode 100644 pkg/scheduler/http.go create mode 100644 pkg/scheduler/local/localCapacity.go create mode 100644 pkg/scheduler/local/types.go create mode 100644 pkg/scheduler/scheduler.go create mode 100644 pkg/scheduler/types.go diff --git a/.gitignore b/.gitignore index e02d43a..0156add 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +build-packages/ void-packages/ state.json main diff --git a/cmd/graph/main.go b/cmd/graph/main.go index bd4e4e1..20ec505 100644 --- a/cmd/graph/main.go +++ b/cmd/graph/main.go @@ -10,6 +10,8 @@ import ( "github.com/the-maldridge/nbuild/pkg/config" "github.com/the-maldridge/nbuild/pkg/graph" "github.com/the-maldridge/nbuild/pkg/http" + "github.com/the-maldridge/nbuild/pkg/scheduler" + "github.com/the-maldridge/nbuild/pkg/scheduler/local" "github.com/the-maldridge/nbuild/pkg/storage" _ "github.com/the-maldridge/nbuild/pkg/storage/bc" @@ -50,6 +52,11 @@ func main() { mgr.Bootstrap() mgr.SetIndexURLs(cfg.RepoDataURLs) + capacityProvider := local.NewLocalCapacityProvider(appLogger, "build-packages") + scheduler := scheduler.NewScheduler(appLogger, capacityProvider, "localhost:8080") + go scheduler.Run() + + srv.Mount("/api/scheduler", scheduler.HTTPEntry()) srv.Mount("/api/graph", mgr.HTTPEntry()) go srv.Serve(":8080") diff --git a/pkg/scheduler/build.go b/pkg/scheduler/build.go new file mode 100644 index 0000000..cfdfcb8 --- /dev/null +++ b/pkg/scheduler/build.go @@ -0,0 +1,8 @@ +package scheduler + +func (b *Build) Equal(c *Build) bool { + return b.Spec.Host == c.Spec.Host && + b.Spec.Target == c.Spec.Target && + b.Pkg == c.Pkg && + b.Rev == c.Rev +} diff --git a/pkg/scheduler/http.go b/pkg/scheduler/http.go new file mode 100644 index 0000000..b5805b6 --- /dev/null +++ b/pkg/scheduler/http.go @@ -0,0 +1,26 @@ +package scheduler + +import ( + "net/http" + + "github.com/go-chi/chi/v5" +) + +// HTTPEntry provides the mountpoint for this service into the shared +// webserver routing tree. +func (s *Scheduler) HTTPEntry() chi.Router { + r := chi.NewRouter() + + r.Get("/done", s.httpDone) + return r +} + +func (s *Scheduler) httpDone(w http.ResponseWriter, r *http.Request) { + ok := s.Reconstruct() + if !ok { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/scheduler/local/localCapacity.go b/pkg/scheduler/local/localCapacity.go new file mode 100644 index 0000000..cc4a9ee --- /dev/null +++ b/pkg/scheduler/local/localCapacity.go @@ -0,0 +1,86 @@ +package local + +import ( + "os" + "os/exec" + "path/filepath" + + "github.com/hashicorp/go-hclog" + + "github.com/the-maldridge/nbuild/pkg/scheduler" + "github.com/the-maldridge/nbuild/pkg/source" +) + +func NewLocalCapacityProvider(l hclog.Logger, path string) *LocalCapacityProvider { + absPath, err := filepath.Abs(path) + if err != nil { + absPath = path + } + x := LocalCapacityProvider{ + l: l.Named("capacityProvider"), + path: absPath, + ongoing: nil, + } + return &x +} + +// Wrapper function for pkgCmd.Run() +func (c *LocalCapacityProvider) pkgRun(cmd *exec.Cmd) { + output, err := cmd.CombinedOutput() + c.ongoing = nil + if err != nil { + c.l.Warn("Error building pkg", "err", err) + } + c.l.Trace("Building package output", "output", string(output)) +} + +// Builds a package. +func (c *LocalCapacityProvider) DispatchBuild(b scheduler.Build) error { + if c.ongoing != nil { + return new(scheduler.NoCapacityError) + } + c.ongoing = &b + + // Git checkout + repo := source.New(c.l) + repo.SetBasepath(c.path) + err := repo.Bootstrap() + if err != nil { + return err + } + _, err = repo.Checkout(b.Rev) + if err != nil { + return err + } + + os.Chdir(c.path) + c.l.Info("Binary-bootstrapping", "path", c.path, "spec", b.Spec) + bootstrapCmd := exec.Command("./xbps-src", "binary-bootstrap", b.Spec.Host) + bootstrapCmd.Dir = c.path + err = bootstrapCmd.Run() + if err != nil { + c.l.Warn("Error running binary-bootstrap", "err", err) + return err + } + + c.l.Debug("Building package", "build", b, "path", c.path) + args := []string{"pkg", b.Pkg} + if !b.Spec.Native() { + args = append(args, "-a", b.Spec.Target) + } + pkgCmd := exec.Command("./xbps-src", args...) + pkgCmd.Dir = c.path + go c.pkgRun(pkgCmd) + os.Chdir("..") + + return nil +} + +// Lists the ongoing build, if there is one. +func (c *LocalCapacityProvider) ListBuilds() ([]scheduler.Build, error) { + if c.ongoing == nil { + return nil, nil + } else { + return []scheduler.Build{*c.ongoing}, nil + } +} diff --git a/pkg/scheduler/local/types.go b/pkg/scheduler/local/types.go new file mode 100644 index 0000000..dd0fb21 --- /dev/null +++ b/pkg/scheduler/local/types.go @@ -0,0 +1,15 @@ +package local + +import ( + "github.com/the-maldridge/nbuild/pkg/scheduler" + + "github.com/hashicorp/go-hclog" +) + +// LocalCapacityProvider is a capacity provider that builds one build at a time locally. +type LocalCapacityProvider struct { + l hclog.Logger + ongoing *scheduler.Build + path string + slots int +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 0000000..439d400 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,104 @@ +package scheduler + +import ( + "errors" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + + "github.com/the-maldridge/nbuild/pkg/graph" +) + +func NewScheduler(l hclog.Logger, c CapacityProvider, url string) *Scheduler { + x := Scheduler{ + l: l.Named("scheduler"), + capacityProvider: c, + apiClient: graph.NewAPIClient(l), + queueMutex: new(sync.Mutex), + } + x.apiClient.Url = url + + return &x +} + +// Pops a build off the queue and hands it off to the CapacityProvider. +func (s *Scheduler) send() error { + s.queueMutex.Lock() + defer s.queueMutex.Unlock() + + if len(s.queue) == 0 { + return errors.New("none in queue") + } + if err := s.capacityProvider.DispatchBuild(s.queue[0]); err != nil { + s.l.Trace("Unable to dispatch right now", "build", s.queue[0], "err", err) + return err + } + s.l.Trace("Dispatching", "build", s.queue[0]) + s.queue = s.queue[1:] + return nil +} + +// Reconstructs the queue from dispatchable. +func (s *Scheduler) Reconstruct() bool { + dispatchable, ok := s.apiClient.GetDispatchable() + if !ok { + return false + } + + s.queueMutex.Lock() + defer s.queueMutex.Unlock() + s.queue = make([]Build, 0) + + current, err := s.capacityProvider.ListBuilds() + if err != nil { + return false + } + for tuple, pkgs := range dispatchable.Pkgs { + for _, pkg := range pkgs { + b := Build{ + Spec: tuple, + Pkg: pkg, + Rev: dispatchable.Rev, + } + alreadyBuilding := false + for _, curBuild := range current { + if b.Equal(&curBuild) { + alreadyBuilding = true + break + } + } + if !alreadyBuilding { + s.queue = append(s.queue, b) + } + } + s.tuples = append(s.tuples, tuple) + } + s.l.Info("Successfully reconstructed queue") + return true +} + +// Update graph and then queue. +func (s *Scheduler) Update() bool { + ok := true + for _, tuple := range s.tuples { + cleanOk := s.apiClient.Clean(tuple.Target) + ok = ok && cleanOk + } + s.l.Info("Cleaned all targets in graph") + ok = ok && s.Reconstruct() + return ok +} + +// Start the scheduler. +func (s *Scheduler) Run() { + s.Reconstruct() // Get tuples + s.Update() // Now get real dispatchable + for { + err := s.send() + if err != nil { + // Don't try to send too often + time.Sleep(time.Second) + } + } +} diff --git a/pkg/scheduler/types.go b/pkg/scheduler/types.go new file mode 100644 index 0000000..c0a4b50 --- /dev/null +++ b/pkg/scheduler/types.go @@ -0,0 +1,41 @@ +package scheduler + +import ( + "sync" + + "github.com/hashicorp/go-hclog" + + "github.com/the-maldridge/nbuild/pkg/graph" + "github.com/the-maldridge/nbuild/pkg/types" +) + +type NoCapacityError struct{} + +func (e NoCapacityError) Error() string { + return "no capacity" +} + +// A Build is all the information required for a build +type Build struct { + Spec types.SpecTuple + Pkg string + Rev string +} + +// CapacityProviders are a way for packages to be built. +type CapacityProvider interface { + DispatchBuild(Build) error + ListBuilds() ([]Build, error) +} + +// Scheduler makes builds ready + dispatches them using a CapacityProvider. +type Scheduler struct { + l hclog.Logger + + queue []Build + queueMutex *sync.Mutex + tuples []types.SpecTuple + + apiClient *graph.APIClient + capacityProvider CapacityProvider +}