From 170ebc5fc4a2d86a954bef255a9e4a9222fe8ed6 Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Thu, 22 Aug 2019 12:12:37 -0500 Subject: [PATCH] Add store event logic to replicated jobs orchestrator Signed-off-by: Drew Erny --- .../replicatedjob/orchestrator.go | 54 ++++--- .../replicatedjob/orchestrator_test.go | 132 ++++++++++++++---- .../orchestrator/replicatedjob/reconciler.go | 25 ++-- .../replicatedjob/reconciler_test.go | 40 +++++- 4 files changed, 193 insertions(+), 58 deletions(-) diff --git a/manager/orchestrator/replicatedjob/orchestrator.go b/manager/orchestrator/replicatedjob/orchestrator.go index 48fdbcd079..74ee306c80 100644 --- a/manager/orchestrator/replicatedjob/orchestrator.go +++ b/manager/orchestrator/replicatedjob/orchestrator.go @@ -15,10 +15,6 @@ type Orchestrator struct { // we need the store, of course, to do updates store *store.MemoryStore - // a copy of the cluster is needed, because we need it when creating tasks - // to set the default log driver - cluster *api.Cluster - // reconciler holds the logic of actually operating on a service. reconciler reconciler @@ -53,27 +49,21 @@ func (o *Orchestrator) Run(ctx context.Context) { services []*api.Service ) - o.store.View(func(tx store.ReadTx) { + watchChan, cancel, _ := store.ViewAndWatch(o.store, func(tx store.ReadTx) error { // TODO(dperny): figure out what to do about the error return value // from FindServices services, _ = store.FindServices(tx, store.All) - - // there should only ever be 1 cluster object, but for reasons - // forgotten by me, it needs to be retrieved in a rather roundabout way - // from the store - // TODO(dperny): figure out what to do with this error too - clusters, _ := store.FindClusters(tx, store.All) - if len(clusters) == 1 { - o.cluster = clusters[0] - } + return nil }) + defer cancel() + // for testing purposes, if a reconciler already exists on the // orchestrator, we will not set it up. this allows injecting a fake // reconciler. if o.reconciler == nil { // the cluster might be nil, but that doesn't matter. - o.reconciler = newReconciler(o.store, o.cluster) + o.reconciler = newReconciler(o.store) } for _, service := range services { @@ -84,9 +74,37 @@ func (o *Orchestrator) Run(ctx context.Context) { } } - // TODO(dperny): this will be a case in the main select loop, but for now - // just block until stopChan is closed. - <-o.stopChan + for { + // first, before taking any action, see if we should stop the + // orchestrator. if both the stop channel and the watch channel are + // available to read, the channel that gets read is picked at random, + // but we always want to stop if it's possible. + select { + case <-o.stopChan: + return + default: + } + + select { + case event := <-watchChan: + var service *api.Service + + switch ev := event.(type) { + case api.EventCreateService: + service = ev.Service + case api.EventUpdateService: + service = ev.Service + } + + if service != nil { + o.reconciler.ReconcileService(service.ID) + } + case <-o.stopChan: + // we also need to check for stop in here, in case there are no + // updates to cause the loop to turn over. + return + } + } } // Stop stops the Orchestrator diff --git a/manager/orchestrator/replicatedjob/orchestrator_test.go b/manager/orchestrator/replicatedjob/orchestrator_test.go index 2fb3cbce88..ddd9f6a720 100644 --- a/manager/orchestrator/replicatedjob/orchestrator_test.go +++ b/manager/orchestrator/replicatedjob/orchestrator_test.go @@ -6,6 +6,7 @@ import ( "context" "fmt" + "sync" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/orchestrator/testutils" @@ -15,6 +16,8 @@ import ( // fakeReconciler implements the reconciler interface for testing the // orchestrator. type fakeReconciler struct { + sync.Mutex + // serviceErrors contains a mapping of ids to errors that should be // returned if that ID is passed to reconcileService serviceErrors map[string]error @@ -28,6 +31,8 @@ type fakeReconciler struct { // just records what arguments it has been passed, and maybe also returns an // error if desired. func (f *fakeReconciler) ReconcileService(id string) error { + f.Lock() + defer f.Unlock() f.servicesReconciled = append(f.servicesReconciled, id) if err, ok := f.serviceErrors[id]; ok { return err @@ -35,6 +40,18 @@ func (f *fakeReconciler) ReconcileService(id string) error { return nil } +func (f *fakeReconciler) getServicesReconciled() []string { + f.Lock() + defer f.Unlock() + // we can't just return the slice, because then we'd be accessing it + // outside of the protection of the mutex anyway. instead, we'll copy its + // contents. this is fine because this is only the tests, and the slice is + // almost certainly rather short. + returnSet := make([]string, len(f.servicesReconciled)) + copy(returnSet, f.servicesReconciled) + return returnSet +} + var _ = Describe("Replicated job orchestrator", func() { var ( o *Orchestrator @@ -52,11 +69,14 @@ var _ = Describe("Replicated job orchestrator", func() { }) Describe("Starting and stopping", func() { - It("should stop when Stop is called", func(done Done) { + It("should stop when Stop is called", func() { stopped := testutils.EnsureRuns(func() { o.Run(context.Background()) }) o.Stop() - Expect(stopped).To(BeClosed()) - close(done) + // Eventually here will repeatedly run the matcher against the + // argument. This means that we will keep checking if stopped is + // closed until the test times out. Using Eventually instead of + // Expect ensure we can't race on "stopped". + Eventually(stopped).Should(BeClosed()) }) }) @@ -94,20 +114,7 @@ var _ = Describe("Replicated job orchestrator", func() { }, }, } - if err := store.CreateService(tx, globalJob); err != nil { - return err - } - - cluster := &api.Cluster{ - ID: "someCluster", - Spec: api.ClusterSpec{ - Annotations: api.Annotations{ - Name: "someName", - }, - }, - } - - return store.CreateCluster(tx, cluster) + return store.CreateService(tx, globalJob) }) Expect(err).ToNot(HaveOccurred()) @@ -125,13 +132,6 @@ var _ = Describe("Replicated job orchestrator", func() { o.Stop() }) - It("should pick up the cluster object", func() { - // this is a white-box test which looks to see that o.cluster is - // set correctly. - Expect(o.cluster).ToNot(BeNil()) - Expect(o.cluster.ID).To(Equal("someCluster")) - }) - It("should reconcile each replicated job service that already exists", func() { Expect(f.servicesReconciled).To(ConsistOf( "service0", "service1", "service2", @@ -167,7 +167,89 @@ var _ = Describe("Replicated job orchestrator", func() { }) Describe("receiving events", func() { - It("should reconcile each time it receives an event", func() { + var stopped <-chan struct{} + BeforeEach(func() { + stopped = testutils.EnsureRuns(func() { o.Run(context.Background()) }) + }) + + AfterEach(func() { + // If a test needs to stop early, that's no problem, because + // repeated calls to Stop have no effect. + o.Stop() + Eventually(stopped).Should(BeClosed()) + }) + + It("should reconcile each replicated job service received", func() { + // Create some services. Wait a moment, and then check that they + // are reconciled. + err := s.Update(func(tx store.Tx) error { + for i := 0; i < 3; i++ { + service := &api.Service{ + ID: fmt.Sprintf("service%v", i), + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: fmt.Sprintf("service%v", i), + }, + Mode: &api.ServiceSpec_ReplicatedJob{ + ReplicatedJob: &api.ReplicatedJob{}, + }, + }, + } + + if err := store.CreateService(tx, service); err != nil { + return err + } + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + + Eventually(f.getServicesReconciled).Should(ConsistOf( + "service0", "service1", "service2", + )) + }) + + It("should not reconcile anything after calling Stop", func() { + err := s.Update(func(tx store.Tx) error { + service := &api.Service{ + ID: fmt.Sprintf("service0"), + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: fmt.Sprintf("service0"), + }, + Mode: &api.ServiceSpec_ReplicatedJob{ + ReplicatedJob: &api.ReplicatedJob{}, + }, + }, + } + + return store.CreateService(tx, service) + }) + + Expect(err).ToNot(HaveOccurred()) + + Eventually(f.getServicesReconciled).Should(ConsistOf("service0")) + + o.Stop() + + err = s.Update(func(tx store.Tx) error { + service := &api.Service{ + ID: fmt.Sprintf("service1"), + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: fmt.Sprintf("service1"), + }, + Mode: &api.ServiceSpec_ReplicatedJob{ + ReplicatedJob: &api.ReplicatedJob{}, + }, + }, + } + + return store.CreateService(tx, service) + }) + + // service1 should never be reconciled. + Consistently(f.getServicesReconciled).Should(ConsistOf("service0")) }) }) }) diff --git a/manager/orchestrator/replicatedjob/reconciler.go b/manager/orchestrator/replicatedjob/reconciler.go index cdc6db6937..f654bcfed9 100644 --- a/manager/orchestrator/replicatedjob/reconciler.go +++ b/manager/orchestrator/replicatedjob/reconciler.go @@ -19,17 +19,12 @@ type reconciler interface { type reconcilerObj struct { // we need the store, of course, to do updates store *store.MemoryStore - - // a copy of the cluster is needed, because we need it when creating tasks - // to set the default log driver - cluster *api.Cluster } // newReconciler creates a new reconciler object -func newReconciler(store *store.MemoryStore, cluster *api.Cluster) reconciler { +func newReconciler(store *store.MemoryStore) reconciler { return &reconcilerObj{ - store: store, - cluster: cluster, + store: store, } } @@ -41,6 +36,7 @@ func (r *reconcilerObj) ReconcileService(id string) error { var ( service *api.Service tasks []*api.Task + cluster *api.Cluster viewErr error ) // first, get the service and all of its tasks @@ -48,6 +44,19 @@ func (r *reconcilerObj) ReconcileService(id string) error { service = store.GetService(tx, id) tasks, viewErr = store.FindTasks(tx, store.ByServiceID(id)) + + // there should only ever be 1 cluster object, but for reasons + // forgotten by me, it needs to be retrieved in a rather roundabout way + // from the store + var clusters []*api.Cluster + clusters, viewErr = store.FindClusters(tx, store.All) + if len(clusters) == 1 { + cluster = clusters[0] + } else if len(clusters) > 1 { + // this should never happen, and indicates that the system is + // broken. + panic("there should never be more than one cluster object") + } }) // errors during view should only happen in a few rather catastrophic @@ -181,7 +190,7 @@ func (r *reconcilerObj) ReconcileService(id string) error { } } - task := orchestrator.NewTask(r.cluster, service, slot, "") + task := orchestrator.NewTask(cluster, service, slot, "") // when we create the task, we also need to set the // JobIteration. task.JobIteration = &api.Version{Index: jobVersion} diff --git a/manager/orchestrator/replicatedjob/reconciler_test.go b/manager/orchestrator/replicatedjob/reconciler_test.go index 2a5db6ee2d..78d3cd75cb 100644 --- a/manager/orchestrator/replicatedjob/reconciler_test.go +++ b/manager/orchestrator/replicatedjob/reconciler_test.go @@ -56,8 +56,9 @@ func AllTasks(s *store.MemoryStore) []*api.Task { var _ = Describe("Replicated Job reconciler", func() { var ( - r *reconcilerObj - s *store.MemoryStore + r *reconcilerObj + s *store.MemoryStore + cluster *api.Cluster ) Describe("ReconcileService", func() { @@ -66,8 +67,7 @@ var _ = Describe("Replicated Job reconciler", func() { Expect(s).ToNot(BeNil()) r = &reconcilerObj{ - store: s, - cluster: nil, + store: s, } }) @@ -97,12 +97,31 @@ var _ = Describe("Replicated Job reconciler", func() { }, } + cluster = &api.Cluster{ + ID: "someCluster", + Spec: api.ClusterSpec{ + Annotations: api.Annotations{ + Name: "someCluster", + }, + TaskDefaults: api.TaskDefaults{ + LogDriver: &api.Driver{ + Name: "someDriver", + }, + }, + }, + } + }) JustBeforeEach(func() { err := s.Update(func(tx store.Tx) error { if service != nil { - return store.CreateService(tx, service) + if err := store.CreateService(tx, service); err != nil { + return err + } + } + if cluster != nil { + return store.CreateCluster(tx, cluster) } return nil }) @@ -141,6 +160,13 @@ var _ = Describe("Replicated Job reconciler", func() { Expect(task.DesiredState).To(Equal(api.TaskStateCompleted)) } }) + + It("should use the cluster to set the default log driver", func() { + tasks := AllTasks(s) + Expect(len(tasks) >= 1).To(BeTrue()) + + Expect(tasks[0].LogDriver).To(Equal(cluster.Spec.TaskDefaults.LogDriver)) + }) }) When("the job has some tasks already in progress", func() { @@ -150,7 +176,7 @@ var _ = Describe("Replicated Job reconciler", func() { // also, to fully exercise the slot picking code, we'll // assign these tasks to every other slot for i := uint64(0); i < 12; i += 2 { - task := orchestrator.NewTask(r.cluster, service, i, "") + task := orchestrator.NewTask(cluster, service, i, "") task.JobIteration = &api.Version{} task.DesiredState = api.TaskStateCompleted @@ -179,7 +205,7 @@ var _ = Describe("Replicated Job reconciler", func() { BeforeEach(func() { err := s.Update(func(tx store.Tx) error { for i := uint64(0); i < maxConcurrent; i++ { - task := orchestrator.NewTask(r.cluster, service, i, "") + task := orchestrator.NewTask(cluster, service, i, "") task.JobIteration = &api.Version{} task.DesiredState = api.TaskStateShutdown