diff --git a/src/funnel/scheduler/condor/sched.go b/src/funnel/scheduler/condor/sched.go index 9d5a9d858..eed65196b 100644 --- a/src/funnel/scheduler/condor/sched.go +++ b/src/funnel/scheduler/condor/sched.go @@ -24,7 +24,7 @@ var log = logger.New("condor") const prefix = "condor-worker-" // Plugin provides the HTCondor scheduler backend plugin. -var Plugin = scheduler.BackendPlugin{ +var Plugin = *scheduler.BackendPlugin{ Name: "condor", Create: NewBackend, } diff --git a/src/funnel/scheduler/gce/sched.go b/src/funnel/scheduler/gce/sched.go index 45091db03..fcd6f26e1 100644 --- a/src/funnel/scheduler/gce/sched.go +++ b/src/funnel/scheduler/gce/sched.go @@ -20,7 +20,7 @@ import ( var log = logger.New("gce") // Plugin provides the Google Cloud Compute scheduler backend plugin. -var Plugin = scheduler.BackendPlugin{ +var Plugin = &scheduler.BackendPlugin{ Name: "gce", Create: NewBackend, } diff --git a/src/funnel/scheduler/gce/util_test.go b/src/funnel/scheduler/gce/util_test.go index d89fb3abc..32f49b599 100644 --- a/src/funnel/scheduler/gce/util_test.go +++ b/src/funnel/scheduler/gce/util_test.go @@ -44,7 +44,7 @@ func setup() *harness { h := &harness{conf, srv, gce, gce} // Add mock backend - h.srv.Scheduler.AddBackend(scheduler.BackendPlugin{ + h.srv.Scheduler.AddBackend(&scheduler.BackendPlugin{ Name: "gce-mock", Create: func(conf config.Config) (scheduler.Backend, error) { log.Debug("Creating mock scheduler backend") diff --git a/src/funnel/scheduler/local/sched.go b/src/funnel/scheduler/local/sched.go index 617f07c1f..4f60580f5 100644 --- a/src/funnel/scheduler/local/sched.go +++ b/src/funnel/scheduler/local/sched.go @@ -13,7 +13,7 @@ import ( var log = logger.New("local") // Plugin provides the local scheduler backend plugin -var Plugin = scheduler.BackendPlugin{ +var Plugin = &scheduler.BackendPlugin{ Name: "local", Create: NewBackend, } diff --git a/src/funnel/scheduler/mocks/Database.go b/src/funnel/scheduler/mocks/Database.go new file mode 100644 index 000000000..eb158d37c --- /dev/null +++ b/src/funnel/scheduler/mocks/Database.go @@ -0,0 +1,93 @@ +package mocks + +import context "golang.org/x/net/context" +import funnel "funnel/proto/funnel" +import mock "github.com/stretchr/testify/mock" + +import tes "funnel/proto/tes" + +// Database is an autogenerated mock type for the Database type +type Database struct { + mock.Mock +} + +// AssignJob provides a mock function with given fields: _a0, _a1 +func (_m *Database) AssignJob(_a0 *tes.Job, _a1 *funnel.Worker) { + _m.Called(_a0, _a1) +} + +// CheckWorkers provides a mock function with given fields: +func (_m *Database) CheckWorkers() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetWorkers provides a mock function with given fields: _a0, _a1 +func (_m *Database) GetWorkers(_a0 context.Context, _a1 *funnel.GetWorkersRequest) (*funnel.GetWorkersResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *funnel.GetWorkersResponse + if rf, ok := ret.Get(0).(func(context.Context, *funnel.GetWorkersRequest) *funnel.GetWorkersResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*funnel.GetWorkersResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *funnel.GetWorkersRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ReadQueue provides a mock function with given fields: n +func (_m *Database) ReadQueue(n int) []*tes.Job { + ret := _m.Called(n) + + var r0 []*tes.Job + if rf, ok := ret.Get(0).(func(int) []*tes.Job); ok { + r0 = rf(n) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*tes.Job) + } + } + + return r0 +} + +// UpdateWorker provides a mock function with given fields: _a0, _a1 +func (_m *Database) UpdateWorker(_a0 context.Context, _a1 *funnel.Worker) (*funnel.UpdateWorkerResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *funnel.UpdateWorkerResponse + if rf, ok := ret.Get(0).(func(context.Context, *funnel.Worker) *funnel.UpdateWorkerResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*funnel.UpdateWorkerResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *funnel.Worker) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/src/funnel/scheduler/openstack/sched.go b/src/funnel/scheduler/openstack/sched.go index a8f4adc94..39e2e63b5 100644 --- a/src/funnel/scheduler/openstack/sched.go +++ b/src/funnel/scheduler/openstack/sched.go @@ -12,7 +12,7 @@ import ( var log = logger.New("openstack") // Plugin provides the OpenStack scheduler backend plugin. -var Plugin = scheduler.BackendPlugin{ +var Plugin = &scheduler.BackendPlugin{ Name: "openstack", Create: NewBackend, } diff --git a/src/funnel/scheduler/scheduler.go b/src/funnel/scheduler/scheduler.go index 433495a53..8e3cc8cfd 100644 --- a/src/funnel/scheduler/scheduler.go +++ b/src/funnel/scheduler/scheduler.go @@ -23,7 +23,7 @@ type Database interface { // NewScheduler returns a new Scheduler instance. func NewScheduler(db Database, conf config.Config) (*Scheduler, error) { - backends := map[string]BackendPlugin{} + backends := map[string]*BackendPlugin{} err := util.EnsureDir(conf.WorkDir) if err != nil { @@ -37,11 +37,11 @@ func NewScheduler(db Database, conf config.Config) (*Scheduler, error) { type Scheduler struct { db Database conf config.Config - backends map[string]BackendPlugin + backends map[string]*BackendPlugin } // AddBackend adds a backend plugin. -func (s *Scheduler) AddBackend(plugin BackendPlugin) { +func (s *Scheduler) AddBackend(plugin *BackendPlugin) { s.backends[plugin.Name] = plugin } @@ -60,10 +60,12 @@ func (s *Scheduler) Start(ctx context.Context) error { var err error err = s.Schedule(ctx) if err != nil { + log.Error("Schedule error", err) return err } err = s.Scale(ctx) if err != nil { + log.Error("Scale error", err) return err } } diff --git a/src/funnel/scheduler/scheduler_test.go b/src/funnel/scheduler/scheduler_test.go new file mode 100644 index 000000000..d532fb1c1 --- /dev/null +++ b/src/funnel/scheduler/scheduler_test.go @@ -0,0 +1,47 @@ +package scheduler + +import ( + "funnel/config" + "funnel/logger" + "funnel/proto/tes" + "funnel/scheduler/mocks" + "golang.org/x/net/context" + "testing" +) + +func init() { + logger.ForceColors() +} + +type BlankBackend struct{} + +func (b *BlankBackend) Schedule(*tes.Job) *Offer { + return nil +} + +func TestBackendCaching(t *testing.T) { + conf := config.DefaultConfig() + conf.Scheduler = "test" + + db := &mocks.Database{} + db.On("CheckWorkers").Return(nil) + db.On("ReadQueue", 10).Return([]*tes.Job{}) + + s, _ := NewScheduler(db, conf) + calls := 0 + s.AddBackend(&BackendPlugin{ + Name: "test", + Create: func(config.Config) (Backend, error) { + calls++ + return &BlankBackend{}, nil + }, + }) + ctx := context.Background() + s.Schedule(ctx) + s.Schedule(ctx) + if calls != 1 { + logger.Debug("Calls", calls) + t.Error("Expected one call") + return + } +} diff --git a/src/funnel/server/mocks/server.go b/src/funnel/server/mocks/server.go index 0fb307c00..1620a4932 100644 --- a/src/funnel/server/mocks/server.go +++ b/src/funnel/server/mocks/server.go @@ -90,7 +90,7 @@ func (m *Server) Start() { m.stop = stop m.Server.Start(ctx) m.NoopWorker = NewNoopWorker(m.Conf) - m.Scheduler.AddBackend(scheduler.BackendPlugin{ + m.Scheduler.AddBackend(&scheduler.BackendPlugin{ Name: "noop", Create: func(conf config.Config) (scheduler.Backend, error) { return scheduler.Backend(&NoopBackend{m.NoopWorker, conf}), nil diff --git a/src/funnel/tests/worker_test.go b/src/funnel/tests/worker_test.go index 0241c13eb..f17dd2d08 100644 --- a/src/funnel/tests/worker_test.go +++ b/src/funnel/tests/worker_test.go @@ -67,7 +67,6 @@ func TestBasicWorker(t *testing.T) { } // Test a scheduled job is removed from the job queue. -// TODO doesn't this belong more in the scheduler? func TestScheduledJobRemovedFromQueue(t *testing.T) { srv := server_mocks.NewServer(server_mocks.NewConfig()) srv.Start()