Skip to content

Commit

Permalink
pkg/scheduler: add a scheduler.
Browse files Browse the repository at this point in the history
Along with this add a example local CapacityProvider.
  • Loading branch information
fosslinux authored and the-maldridge committed Oct 19, 2021
1 parent 05c6020 commit 7b09104
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,3 +1,4 @@
build-packages/
void-packages/
state.json
main
Expand Down
7 changes: 7 additions & 0 deletions cmd/graph/main.go
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/the-maldridge/nbuild/pkg/config"
"github.com/the-maldridge/nbuild/pkg/graph"
"github.com/the-maldridge/nbuild/pkg/http"
"github.com/the-maldridge/nbuild/pkg/scheduler"
"github.com/the-maldridge/nbuild/pkg/scheduler/local"
"github.com/the-maldridge/nbuild/pkg/storage"

_ "github.com/the-maldridge/nbuild/pkg/storage/bc"
Expand Down Expand Up @@ -50,6 +52,11 @@ func main() {
mgr.Bootstrap()
mgr.SetIndexURLs(cfg.RepoDataURLs)

capacityProvider := local.NewLocalCapacityProvider(appLogger, "build-packages")
scheduler := scheduler.NewScheduler(appLogger, capacityProvider, "localhost:8080")
go scheduler.Run()

srv.Mount("/api/scheduler", scheduler.HTTPEntry())
srv.Mount("/api/graph", mgr.HTTPEntry())
go srv.Serve(":8080")

Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/build.go
@@ -0,0 +1,8 @@
package scheduler

func (b *Build) Equal(c *Build) bool {
return b.Spec.Host == c.Spec.Host &&
b.Spec.Target == c.Spec.Target &&
b.Pkg == c.Pkg &&
b.Rev == c.Rev
}
26 changes: 26 additions & 0 deletions pkg/scheduler/http.go
@@ -0,0 +1,26 @@
package scheduler

import (
"net/http"

"github.com/go-chi/chi/v5"
)

// HTTPEntry provides the mountpoint for this service into the shared
// webserver routing tree.
func (s *Scheduler) HTTPEntry() chi.Router {
r := chi.NewRouter()

r.Get("/done", s.httpDone)
return r
}

func (s *Scheduler) httpDone(w http.ResponseWriter, r *http.Request) {
ok := s.Reconstruct()
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusNoContent)
}
86 changes: 86 additions & 0 deletions pkg/scheduler/local/localCapacity.go
@@ -0,0 +1,86 @@
package local

import (
"os"
"os/exec"
"path/filepath"

"github.com/hashicorp/go-hclog"

"github.com/the-maldridge/nbuild/pkg/scheduler"
"github.com/the-maldridge/nbuild/pkg/source"
)

func NewLocalCapacityProvider(l hclog.Logger, path string) *LocalCapacityProvider {
absPath, err := filepath.Abs(path)
if err != nil {
absPath = path
}
x := LocalCapacityProvider{
l: l.Named("capacityProvider"),
path: absPath,
ongoing: nil,
}
return &x
}

// Wrapper function for pkgCmd.Run()
func (c *LocalCapacityProvider) pkgRun(cmd *exec.Cmd) {
output, err := cmd.CombinedOutput()
c.ongoing = nil
if err != nil {
c.l.Warn("Error building pkg", "err", err)
}
c.l.Trace("Building package output", "output", string(output))
}

// Builds a package.
func (c *LocalCapacityProvider) DispatchBuild(b scheduler.Build) error {
if c.ongoing != nil {
return new(scheduler.NoCapacityError)
}
c.ongoing = &b

// Git checkout
repo := source.New(c.l)
repo.SetBasepath(c.path)
err := repo.Bootstrap()
if err != nil {
return err
}
_, err = repo.Checkout(b.Rev)
if err != nil {
return err
}

os.Chdir(c.path)
c.l.Info("Binary-bootstrapping", "path", c.path, "spec", b.Spec)
bootstrapCmd := exec.Command("./xbps-src", "binary-bootstrap", b.Spec.Host)
bootstrapCmd.Dir = c.path
err = bootstrapCmd.Run()
if err != nil {
c.l.Warn("Error running binary-bootstrap", "err", err)
return err
}

c.l.Debug("Building package", "build", b, "path", c.path)
args := []string{"pkg", b.Pkg}
if !b.Spec.Native() {
args = append(args, "-a", b.Spec.Target)
}
pkgCmd := exec.Command("./xbps-src", args...)
pkgCmd.Dir = c.path
go c.pkgRun(pkgCmd)
os.Chdir("..")

return nil
}

// Lists the ongoing build, if there is one.
func (c *LocalCapacityProvider) ListBuilds() ([]scheduler.Build, error) {
if c.ongoing == nil {
return nil, nil
} else {
return []scheduler.Build{*c.ongoing}, nil
}
}
15 changes: 15 additions & 0 deletions pkg/scheduler/local/types.go
@@ -0,0 +1,15 @@
package local

import (
"github.com/the-maldridge/nbuild/pkg/scheduler"

"github.com/hashicorp/go-hclog"
)

// LocalCapacityProvider is a capacity provider that builds one build at a time locally.
type LocalCapacityProvider struct {
l hclog.Logger
ongoing *scheduler.Build
path string
slots int
}
104 changes: 104 additions & 0 deletions pkg/scheduler/scheduler.go
@@ -0,0 +1,104 @@
package scheduler

import (
"errors"
"sync"
"time"

"github.com/hashicorp/go-hclog"

"github.com/the-maldridge/nbuild/pkg/graph"
)

func NewScheduler(l hclog.Logger, c CapacityProvider, url string) *Scheduler {
x := Scheduler{
l: l.Named("scheduler"),
capacityProvider: c,
apiClient: graph.NewAPIClient(l),
queueMutex: new(sync.Mutex),
}
x.apiClient.Url = url

return &x
}

// Pops a build off the queue and hands it off to the CapacityProvider.
func (s *Scheduler) send() error {
s.queueMutex.Lock()
defer s.queueMutex.Unlock()

if len(s.queue) == 0 {
return errors.New("none in queue")
}
if err := s.capacityProvider.DispatchBuild(s.queue[0]); err != nil {
s.l.Trace("Unable to dispatch right now", "build", s.queue[0], "err", err)
return err
}
s.l.Trace("Dispatching", "build", s.queue[0])
s.queue = s.queue[1:]
return nil
}

// Reconstructs the queue from dispatchable.
func (s *Scheduler) Reconstruct() bool {
dispatchable, ok := s.apiClient.GetDispatchable()
if !ok {
return false
}

s.queueMutex.Lock()
defer s.queueMutex.Unlock()
s.queue = make([]Build, 0)

current, err := s.capacityProvider.ListBuilds()
if err != nil {
return false
}
for tuple, pkgs := range dispatchable.Pkgs {
for _, pkg := range pkgs {
b := Build{
Spec: tuple,
Pkg: pkg,
Rev: dispatchable.Rev,
}
alreadyBuilding := false
for _, curBuild := range current {
if b.Equal(&curBuild) {
alreadyBuilding = true
break
}
}
if !alreadyBuilding {
s.queue = append(s.queue, b)
}
}
s.tuples = append(s.tuples, tuple)
}
s.l.Info("Successfully reconstructed queue")
return true
}

// Update graph and then queue.
func (s *Scheduler) Update() bool {
ok := true
for _, tuple := range s.tuples {
cleanOk := s.apiClient.Clean(tuple.Target)
ok = ok && cleanOk
}
s.l.Info("Cleaned all targets in graph")
ok = ok && s.Reconstruct()
return ok
}

// Start the scheduler.
func (s *Scheduler) Run() {
s.Reconstruct() // Get tuples
s.Update() // Now get real dispatchable
for {
err := s.send()
if err != nil {
// Don't try to send too often
time.Sleep(time.Second)
}
}
}
41 changes: 41 additions & 0 deletions pkg/scheduler/types.go
@@ -0,0 +1,41 @@
package scheduler

import (
"sync"

"github.com/hashicorp/go-hclog"

"github.com/the-maldridge/nbuild/pkg/graph"
"github.com/the-maldridge/nbuild/pkg/types"
)

type NoCapacityError struct{}

func (e NoCapacityError) Error() string {
return "no capacity"
}

// A Build is all the information required for a build
type Build struct {
Spec types.SpecTuple
Pkg string
Rev string
}

// CapacityProviders are a way for packages to be built.
type CapacityProvider interface {
DispatchBuild(Build) error
ListBuilds() ([]Build, error)
}

// Scheduler makes builds ready + dispatches them using a CapacityProvider.
type Scheduler struct {
l hclog.Logger

queue []Build
queueMutex *sync.Mutex
tuples []types.SpecTuple

apiClient *graph.APIClient
capacityProvider CapacityProvider
}

0 comments on commit 7b09104

Please sign in to comment.