Skip to content

Commit

Permalink
Fix plugin caching
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Buchanan committed Apr 14, 2017
1 parent daa353d commit b5c5719
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/funnel/scheduler/condor/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var log = logger.New("condor")
const prefix = "condor-worker-"

// Plugin provides the HTCondor scheduler backend plugin.
var Plugin = scheduler.BackendPlugin{
var Plugin = *scheduler.BackendPlugin{
Name: "condor",
Create: NewBackend,
}
Expand Down
2 changes: 1 addition & 1 deletion src/funnel/scheduler/gce/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var log = logger.New("gce")

// Plugin provides the Google Cloud Compute scheduler backend plugin.
var Plugin = scheduler.BackendPlugin{
var Plugin = &scheduler.BackendPlugin{
Name: "gce",
Create: NewBackend,
}
Expand Down
2 changes: 1 addition & 1 deletion src/funnel/scheduler/gce/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func setup() *harness {
h := &harness{conf, srv, gce, gce}

// Add mock backend
h.srv.Scheduler.AddBackend(scheduler.BackendPlugin{
h.srv.Scheduler.AddBackend(&scheduler.BackendPlugin{
Name: "gce-mock",
Create: func(conf config.Config) (scheduler.Backend, error) {
log.Debug("Creating mock scheduler backend")
Expand Down
2 changes: 1 addition & 1 deletion src/funnel/scheduler/local/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
var log = logger.New("local")

// Plugin provides the local scheduler backend plugin
var Plugin = scheduler.BackendPlugin{
var Plugin = &scheduler.BackendPlugin{
Name: "local",
Create: NewBackend,
}
Expand Down
93 changes: 93 additions & 0 deletions src/funnel/scheduler/mocks/Database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package mocks

import context "golang.org/x/net/context"
import funnel "funnel/proto/funnel"
import mock "github.com/stretchr/testify/mock"

import tes "funnel/proto/tes"

// Database is an autogenerated mock type for the Database type
type Database struct {
mock.Mock
}

// AssignJob provides a mock function with given fields: _a0, _a1
func (_m *Database) AssignJob(_a0 *tes.Job, _a1 *funnel.Worker) {
_m.Called(_a0, _a1)
}

// CheckWorkers provides a mock function with given fields:
func (_m *Database) CheckWorkers() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}

return r0
}

// GetWorkers provides a mock function with given fields: _a0, _a1
func (_m *Database) GetWorkers(_a0 context.Context, _a1 *funnel.GetWorkersRequest) (*funnel.GetWorkersResponse, error) {
ret := _m.Called(_a0, _a1)

var r0 *funnel.GetWorkersResponse
if rf, ok := ret.Get(0).(func(context.Context, *funnel.GetWorkersRequest) *funnel.GetWorkersResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*funnel.GetWorkersResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *funnel.GetWorkersRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ReadQueue provides a mock function with given fields: n
func (_m *Database) ReadQueue(n int) []*tes.Job {
ret := _m.Called(n)

var r0 []*tes.Job
if rf, ok := ret.Get(0).(func(int) []*tes.Job); ok {
r0 = rf(n)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*tes.Job)
}
}

return r0
}

// UpdateWorker provides a mock function with given fields: _a0, _a1
func (_m *Database) UpdateWorker(_a0 context.Context, _a1 *funnel.Worker) (*funnel.UpdateWorkerResponse, error) {
ret := _m.Called(_a0, _a1)

var r0 *funnel.UpdateWorkerResponse
if rf, ok := ret.Get(0).(func(context.Context, *funnel.Worker) *funnel.UpdateWorkerResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*funnel.UpdateWorkerResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *funnel.Worker) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}

return r0, r1
}
2 changes: 1 addition & 1 deletion src/funnel/scheduler/openstack/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var log = logger.New("openstack")

// Plugin provides the OpenStack scheduler backend plugin.
var Plugin = scheduler.BackendPlugin{
var Plugin = &scheduler.BackendPlugin{
Name: "openstack",
Create: NewBackend,
}
Expand Down
8 changes: 5 additions & 3 deletions src/funnel/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Database interface {

// NewScheduler returns a new Scheduler instance.
func NewScheduler(db Database, conf config.Config) (*Scheduler, error) {
backends := map[string]BackendPlugin{}
backends := map[string]*BackendPlugin{}

err := util.EnsureDir(conf.WorkDir)
if err != nil {
Expand All @@ -37,11 +37,11 @@ func NewScheduler(db Database, conf config.Config) (*Scheduler, error) {
type Scheduler struct {
db Database
conf config.Config
backends map[string]BackendPlugin
backends map[string]*BackendPlugin
}

// AddBackend adds a backend plugin.
func (s *Scheduler) AddBackend(plugin BackendPlugin) {
func (s *Scheduler) AddBackend(plugin *BackendPlugin) {
s.backends[plugin.Name] = plugin
}

Expand All @@ -60,10 +60,12 @@ func (s *Scheduler) Start(ctx context.Context) error {
var err error
err = s.Schedule(ctx)
if err != nil {
log.Error("Schedule error", err)
return err
}
err = s.Scale(ctx)
if err != nil {
log.Error("Scale error", err)
return err
}
}
Expand Down
47 changes: 47 additions & 0 deletions src/funnel/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package scheduler

import (
"funnel/config"
"funnel/logger"
"funnel/proto/tes"
"funnel/scheduler/mocks"
"golang.org/x/net/context"
"testing"
)

func init() {
logger.ForceColors()
}

type BlankBackend struct{}

func (b *BlankBackend) Schedule(*tes.Job) *Offer {
return nil
}

func TestBackendCaching(t *testing.T) {
conf := config.DefaultConfig()
conf.Scheduler = "test"

db := &mocks.Database{}
db.On("CheckWorkers").Return(nil)
db.On("ReadQueue", 10).Return([]*tes.Job{})

s, _ := NewScheduler(db, conf)
calls := 0
s.AddBackend(&BackendPlugin{
Name: "test",
Create: func(config.Config) (Backend, error) {
calls++
return &BlankBackend{}, nil
},
})
ctx := context.Background()
s.Schedule(ctx)
s.Schedule(ctx)
if calls != 1 {
logger.Debug("Calls", calls)
t.Error("Expected one call")
return
}
}
2 changes: 1 addition & 1 deletion src/funnel/server/mocks/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (m *Server) Start() {
m.stop = stop
m.Server.Start(ctx)
m.NoopWorker = NewNoopWorker(m.Conf)
m.Scheduler.AddBackend(scheduler.BackendPlugin{
m.Scheduler.AddBackend(&scheduler.BackendPlugin{
Name: "noop",
Create: func(conf config.Config) (scheduler.Backend, error) {
return scheduler.Backend(&NoopBackend{m.NoopWorker, conf}), nil
Expand Down
1 change: 0 additions & 1 deletion src/funnel/tests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func TestBasicWorker(t *testing.T) {
}

// Test a scheduled job is removed from the job queue.
// TODO doesn't this belong more in the scheduler?
func TestScheduledJobRemovedFromQueue(t *testing.T) {
srv := server_mocks.NewServer(server_mocks.NewConfig())
srv.Start()
Expand Down

0 comments on commit b5c5719

Please sign in to comment.