diff --git a/.gitignore b/.gitignore index 4cad88f6..38bb345a 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ tmp/ expt/ entropy.dev.yaml + +requests.http \ No newline at end of file diff --git a/cli/serve.go b/cli/serve.go index 904c9234..065a1f7f 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -90,6 +90,10 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap return err } + if err := asyncWorker.Register(core.JobKindScheduledSyncResource, service.HandleSyncJob); err != nil { + return err + } + return entropyserver.Serve(ctx, cfg.Service.addr(), nrApp, zapLog, service) } diff --git a/core/mocks/loggable_module.go b/core/mocks/loggable_module.go index e43b35c0..cebeb9c2 100644 --- a/core/mocks/loggable_module.go +++ b/core/mocks/loggable_module.go @@ -73,15 +73,15 @@ func (_c *LoggableModule_Log_Call) Return(_a0 <-chan module.LogChunk, _a1 error) } // Plan provides a mock function with given fields: ctx, spec, act -func (_m *LoggableModule) Plan(ctx context.Context, spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (_m *LoggableModule) Plan(ctx context.Context, spec module.Spec, act module.ActionRequest) (*module.Plan, error) { ret := _m.Called(ctx, spec, act) - var r0 *resource.Resource - if rf, ok := ret.Get(0).(func(context.Context, module.Spec, module.ActionRequest) *resource.Resource); ok { + var r0 *module.Plan + if rf, ok := ret.Get(0).(func(context.Context, module.Spec, module.ActionRequest) *module.Plan); ok { r0 = rf(ctx, spec, act) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*resource.Resource) + r0 = ret.Get(0).(*module.Plan) } } @@ -115,7 +115,7 @@ func (_c *LoggableModule_Plan_Call) Run(run func(ctx context.Context, spec modul return _c } -func (_c *LoggableModule_Plan_Call) Return(_a0 *resource.Resource, _a1 error) *LoggableModule_Plan_Call { +func (_c *LoggableModule_Plan_Call) Return(_a0 *module.Plan, _a1 error) *LoggableModule_Plan_Call { _c.Call.Return(_a0, _a1) return _c } diff --git a/core/mocks/module.go b/core/mocks/module.go index 8d56274f..e0dc2151 100644 --- a/core/mocks/module.go +++ b/core/mocks/module.go @@ -25,15 +25,15 @@ func (_m *Module) EXPECT() *Module_Expecter { } // Plan provides a mock function with given fields: ctx, spec, act -func (_m *Module) Plan(ctx context.Context, spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (_m *Module) Plan(ctx context.Context, spec module.Spec, act module.ActionRequest) (*module.Plan, error) { ret := _m.Called(ctx, spec, act) - var r0 *resource.Resource - if rf, ok := ret.Get(0).(func(context.Context, module.Spec, module.ActionRequest) *resource.Resource); ok { + var r0 *module.Plan + if rf, ok := ret.Get(0).(func(context.Context, module.Spec, module.ActionRequest) *module.Plan); ok { r0 = rf(ctx, spec, act) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*resource.Resource) + r0 = ret.Get(0).(*module.Plan) } } @@ -67,7 +67,7 @@ func (_c *Module_Plan_Call) Run(run func(ctx context.Context, spec module.Spec, return _c } -func (_c *Module_Plan_Call) Return(_a0 *resource.Resource, _a1 error) *Module_Plan_Call { +func (_c *Module_Plan_Call) Return(_a0 *module.Plan, _a1 error) *Module_Plan_Call { _c.Call.Return(_a0, _a1) return _c } diff --git a/core/module/module.go b/core/module/module.go index 110b4507..6b49baf9 100644 --- a/core/module/module.go +++ b/core/module/module.go @@ -5,6 +5,7 @@ package module import ( "context" "encoding/json" + "time" "github.com/odpf/entropy/core/resource" "github.com/odpf/entropy/pkg/errors" @@ -16,7 +17,7 @@ type Module interface { // Plan SHOULD validate the action on the current version of the resource, // return the resource with config/status/state changes (if any) applied. // Plan SHOULD NOT have side effects on anything other than the resource. - Plan(ctx context.Context, spec Spec, act ActionRequest) (*resource.Resource, error) + Plan(ctx context.Context, spec Spec, act ActionRequest) (*Plan, error) // Sync is called repeatedly by Entropy core until the returned state is // a terminal status. Module implementation is free to execute an action @@ -27,6 +28,12 @@ type Module interface { Sync(ctx context.Context, spec Spec) (*resource.State, error) } +// Plan represents the changes to be staged and later synced by module. +type Plan struct { + Resource resource.Resource + ScheduleRunAt time.Time +} + // Spec represents the context for Plan() or Sync() invocations. type Spec struct { Resource resource.Resource `json:"resource"` diff --git a/core/module/registry.go b/core/module/registry.go index 44de68bf..a516b7d3 100644 --- a/core/module/registry.go +++ b/core/module/registry.go @@ -59,7 +59,7 @@ func (mr *Registry) get(kind string) (Descriptor, bool) { return desc, found } -func (mr *Registry) Plan(ctx context.Context, spec Spec, act ActionRequest) (*resource.Resource, error) { +func (mr *Registry) Plan(ctx context.Context, spec Spec, act ActionRequest) (*Plan, error) { kind := spec.Resource.Kind desc, found := mr.get(kind) diff --git a/core/sync.go b/core/sync.go index f9c67583..bbc34437 100644 --- a/core/sync.go +++ b/core/sync.go @@ -6,19 +6,23 @@ import ( "fmt" "time" + "github.com/odpf/entropy/core/module" "github.com/odpf/entropy/core/resource" "github.com/odpf/entropy/pkg/errors" "github.com/odpf/entropy/pkg/worker" ) -const JobKindSyncResource = "sync_resource" +const ( + JobKindSyncResource = "sync_resource" + JobKindScheduledSyncResource = "sched_sync_resource" +) type syncJobPayload struct { ResourceURN string `json:"resource_urn"` UpdatedAt time.Time `json:"updated_at"` } -func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource) error { +func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource, runAt time.Time, jobType string) error { data := syncJobPayload{ ResourceURN: res.URN, UpdatedAt: res.UpdatedAt, @@ -29,12 +33,17 @@ func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource) err return err } - return s.worker.Enqueue(ctx, worker.Job{ - ID: fmt.Sprintf("sync-%s-%d", res.URN, res.UpdatedAt.Unix()), - Kind: JobKindSyncResource, - RunAt: time.Now(), + job := worker.Job{ + ID: fmt.Sprintf(jobType+"-%s-%d", res.URN, runAt.Unix()), + Kind: jobType, + RunAt: runAt, Payload: payload, - }) + } + + if err := s.worker.Enqueue(ctx, job); err != nil && !errors.Is(err, worker.ErrJobExists) { + return err + } + return nil } // HandleSyncJob is meant to be invoked by asyncWorker when an enqueued job is @@ -94,7 +103,7 @@ func (s *Service) syncChange(ctx context.Context, urn string) (*resource.Resourc return nil, err } } else { - if err := s.upsert(ctx, *res, false); err != nil { + if err := s.upsert(ctx, module.Plan{Resource: *res}, false); err != nil { return nil, err } } diff --git a/core/write.go b/core/write.go index 263e501f..b87627f4 100644 --- a/core/write.go +++ b/core/write.go @@ -55,70 +55,77 @@ func (s *Service) ApplyAction(ctx context.Context, urn string, act module.Action } func (s *Service) execAction(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) { - plannedRes, err := s.planChange(ctx, res, act) + planned, err := s.planChange(ctx, res, act) if err != nil { return nil, err } if isCreate(act.Name) { - plannedRes.CreatedAt = s.clock() - plannedRes.UpdatedAt = plannedRes.CreatedAt + planned.Resource.CreatedAt = s.clock() + planned.Resource.UpdatedAt = planned.Resource.CreatedAt } else { - plannedRes.CreatedAt = res.CreatedAt - plannedRes.UpdatedAt = s.clock() + planned.Resource.CreatedAt = res.CreatedAt + planned.Resource.UpdatedAt = s.clock() } - if err := s.upsert(ctx, *plannedRes, isCreate(act.Name)); err != nil { + if err := s.upsert(ctx, *planned, isCreate(act.Name)); err != nil { return nil, err } - return plannedRes, nil + return &planned.Resource, nil } func isCreate(actionName string) bool { return actionName == module.CreateAction } -func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) { +func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*module.Plan, error) { modSpec, err := s.generateModuleSpec(ctx, res) if err != nil { return nil, err } - plannedRes, err := s.rootModule.Plan(ctx, *modSpec, act) + planned, err := s.rootModule.Plan(ctx, *modSpec, act) if err != nil { if errors.Is(err, errors.ErrInvalid) { return nil, err } return nil, errors.ErrInternal.WithMsgf("plan() failed").WithCausef(err.Error()) - } else if err := plannedRes.Validate(isCreate(act.Name)); err != nil { + } else if err := planned.Resource.Validate(isCreate(act.Name)); err != nil { return nil, err } - return plannedRes, nil + return planned, nil } -func (s *Service) upsert(ctx context.Context, res resource.Resource, isCreate bool) error { - hook := func(ctx context.Context) error { - if res.State.IsTerminal() { +func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) error { + var hooks []resource.MutationHook + hooks = append(hooks, func(ctx context.Context) error { + if plan.Resource.State.IsTerminal() { // no need to enqueue if resource has reached terminal state. return nil } - return s.enqueueSyncJob(ctx, res) + return s.enqueueSyncJob(ctx, plan.Resource, s.clock(), JobKindSyncResource) + }) + + if !plan.ScheduleRunAt.IsZero() { + hooks = append(hooks, func(ctx context.Context) error { + return s.enqueueSyncJob(ctx, plan.Resource, plan.ScheduleRunAt, JobKindScheduledSyncResource) + }) } var err error if isCreate { - err = s.store.Create(ctx, res, hook) + err = s.store.Create(ctx, plan.Resource, hooks...) } else { - err = s.store.Update(ctx, res, hook) + err = s.store.Update(ctx, plan.Resource, hooks...) } if err != nil { if isCreate && errors.Is(err, errors.ErrConflict) { - return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", res.URN) + return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", plan.Resource.URN) } else if !isCreate && errors.Is(err, errors.ErrNotFound) { - return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", res.URN) + return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", plan.Resource.URN) } return errors.ErrInternal.WithCausef(err.Error()) } diff --git a/core/write_test.go b/core/write_test.go index fac8d067..0db5e586 100644 --- a/core/write_test.go +++ b/core/write_test.go @@ -150,10 +150,12 @@ func TestService_CreateResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&resource.Resource{ - Kind: "mock", - Name: "child", - Project: "project", + Return(&module.Plan{ + Resource: resource.Resource{ + Kind: "mock", + Name: "child", + Project: "project", + }, }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -179,10 +181,12 @@ func TestService_CreateResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&resource.Resource{ - Kind: "mock", - Name: "child", - Project: "project", + Return(&module.Plan{ + Resource: resource.Resource{ + Kind: "mock", + Name: "child", + Project: "project", + }, }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -207,11 +211,13 @@ func TestService_CreateResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&resource.Resource{ - Kind: "mock", - Name: "child", - Project: "project", - State: resource.State{Status: resource.StatusCompleted}, + Return(&module.Plan{ + Resource: resource.Resource{ + Kind: "mock", + Name: "child", + Project: "project", + State: resource.State{Status: resource.StatusCompleted}, + }, }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -353,7 +359,7 @@ func TestService_UpdateResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&testResource, nil).Once() + Return(&module.Plan{Resource: testResource}, nil).Once() resourceRepo := &mocks.ResourceStore{} resourceRepo.EXPECT(). @@ -394,16 +400,18 @@ func TestService_UpdateResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&resource.Resource{ - URN: "orn:entropy:mock:project:child", - Kind: "mock", - Name: "child", - Project: "project", - Spec: resource.Spec{ - Configs: []byte(`{"foo": "bar"}`), + Return(&module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:mock:project:child", + Kind: "mock", + Name: "child", + Project: "project", + Spec: resource.Spec{ + Configs: []byte(`{"foo": "bar"}`), + }, + State: resource.State{Status: resource.StatusPending}, + CreatedAt: frozenTime, }, - State: resource.State{Status: resource.StatusPending}, - CreatedAt: frozenTime, }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -426,7 +434,7 @@ func TestService_UpdateResource(t *testing.T) { Return(nil). Run(func(ctx context.Context, jobs ...worker.Job) { assert.Len(t, jobs, 1) - assert.Equal(t, jobs[0].ID, "sync-orn:entropy:mock:project:child-1650536955") + assert.Equal(t, jobs[0].ID, "sync_resource-orn:entropy:mock:project:child-1650536955") assert.Equal(t, jobs[0].Kind, "sync_resource") }). Once() @@ -502,14 +510,16 @@ func TestService_DeleteResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&resource.Resource{ - URN: "orn:entropy:mock:project:child", - Kind: "mock", - Name: "child", - Project: "project", - State: resource.State{Status: resource.StatusPending}, - CreatedAt: frozenTime, - UpdatedAt: frozenTime, + Return(&module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:mock:project:child", + Kind: "mock", + Name: "child", + Project: "project", + State: resource.State{Status: resource.StatusPending}, + CreatedAt: frozenTime, + UpdatedAt: frozenTime, + }, }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -543,14 +553,16 @@ func TestService_DeleteResource(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, mock.Anything). - Return(&resource.Resource{ - URN: "orn:entropy:mock:project:child", - Kind: "mock", - Name: "child", - Project: "project", - State: resource.State{Status: resource.StatusPending}, - CreatedAt: frozenTime, - UpdatedAt: frozenTime, + Return(&module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:mock:project:child", + Kind: "mock", + Name: "child", + Project: "project", + State: resource.State{Status: resource.StatusPending}, + CreatedAt: frozenTime, + UpdatedAt: frozenTime, + }, }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -687,14 +699,15 @@ func TestService_ApplyAction(t *testing.T) { mod := &mocks.Module{} mod.EXPECT(). Plan(mock.Anything, mock.Anything, sampleAction). - Return(&resource.Resource{ - URN: "orn:entropy:mock:foo:bar", - Kind: "mock", - Project: "foo", - Name: "bar", - State: resource.State{Status: resource.StatusPending}, - }, nil). - Once() + Return(&module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:mock:foo:bar", + Kind: "mock", + Project: "foo", + Name: "bar", + State: resource.State{Status: resource.StatusPending}, + }, + }, nil).Once() resourceRepo := &mocks.ResourceStore{} resourceRepo.EXPECT(). diff --git a/modules/firehose/config.go b/modules/firehose/config.go index 27afce7c..aa05d474 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -4,8 +4,10 @@ import ( _ "embed" "encoding/json" "fmt" + "time" "github.com/odpf/entropy/core/resource" + "github.com/odpf/entropy/pkg/errors" "github.com/odpf/entropy/pkg/helm" ) @@ -33,6 +35,7 @@ var ( type moduleConfig struct { State string `json:"state"` ChartVersion string `json:"chart_version"` + StopTime *time.Time `json:"stop_time"` Telegraf map[string]interface{} `json:"telegraf"` Firehose struct { Replicas int `json:"replicas"` @@ -43,10 +46,15 @@ type moduleConfig struct { } `json:"firehose"` } -func (mc *moduleConfig) sanitiseAndValidate() { +func (mc *moduleConfig) sanitiseAndValidate() error { + if mc.StopTime != nil && mc.StopTime.Before(time.Now()) { + return errors.ErrInvalid. + WithMsgf("value for stop_time must be greater than current time") + } if mc.ChartVersion == "" { mc.ChartVersion = defaultVersionString } + return nil } func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseConfig { diff --git a/modules/firehose/plan.go b/modules/firehose/plan.go index 1a09080c..1d34cf66 100644 --- a/modules/firehose/plan.go +++ b/modules/firehose/plan.go @@ -9,7 +9,7 @@ import ( "github.com/odpf/entropy/pkg/errors" ) -func (m *firehoseModule) Plan(_ context.Context, spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (m *firehoseModule) Plan(_ context.Context, spec module.Spec, act module.ActionRequest) (*module.Plan, error) { switch act.Name { case module.CreateAction: return m.planCreate(spec, act) @@ -20,14 +20,17 @@ func (m *firehoseModule) Plan(_ context.Context, spec module.Spec, act module.Ac } } -func (*firehoseModule) planCreate(spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (*firehoseModule) planCreate(spec module.Spec, act module.ActionRequest) (*module.Plan, error) { + var plan module.Plan r := spec.Resource var reqConf moduleConfig if err := json.Unmarshal(act.Params, &reqConf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } - reqConf.sanitiseAndValidate() + if err := reqConf.sanitiseAndValidate(); err != nil { + return nil, err + } r.Spec.Configs = reqConf.JSON() r.State = resource.State{ @@ -36,10 +39,16 @@ func (*firehoseModule) planCreate(spec module.Spec, act module.ActionRequest) (* PendingSteps: []string{releaseCreate}, }.JSON(), } - return &r, nil + + plan.Resource = r + if reqConf.StopTime != nil { + plan.ScheduleRunAt = *reqConf.StopTime + } + return &plan, nil } -func (*firehoseModule) planChange(spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (*firehoseModule) planChange(spec module.Spec, act module.ActionRequest) (*module.Plan, error) { + var plan module.Plan r := spec.Resource var conf moduleConfig @@ -53,9 +62,15 @@ func (*firehoseModule) planChange(spec module.Spec, act module.ActionRequest) (* if err := json.Unmarshal(act.Params, &reqConf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } - reqConf.sanitiseAndValidate() + if err := reqConf.sanitiseAndValidate(); err != nil { + return nil, err + } conf = reqConf + if conf.StopTime != nil { + plan.ScheduleRunAt = *conf.StopTime + } + case ScaleAction: var scaleParams struct { Replicas int `json:"replicas"` @@ -79,10 +94,11 @@ func (*firehoseModule) planChange(spec module.Spec, act module.ActionRequest) (* PendingSteps: []string{releaseUpdate}, }.JSON(), } - return &r, nil + plan.Resource = r + return &plan, nil } -func (*firehoseModule) planReset(spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (*firehoseModule) planReset(spec module.Spec, act module.ActionRequest) (*module.Plan, error) { r := spec.Resource var conf moduleConfig @@ -115,5 +131,6 @@ func (*firehoseModule) planReset(spec module.Spec, act module.ActionRequest) (*r StateOverride: stateStopped, }.JSON(), } - return &r, nil + + return &module.Plan{Resource: r}, nil } diff --git a/modules/firehose/plan_test.go b/modules/firehose/plan_test.go index 799c98c5..8fd43597 100644 --- a/modules/firehose/plan_test.go +++ b/modules/firehose/plan_test.go @@ -3,6 +3,7 @@ package firehose import ( "context" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -30,7 +31,7 @@ func TestFirehoseModule_Plan(t *testing.T) { title string spec module.Spec act module.ActionRequest - want *resource.Resource + want *module.Plan wantErr error }{ { @@ -49,17 +50,19 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: module.CreateAction, Params: []byte(`{"state":"RUNNING","firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), }, - want: &resource.Resource{ - URN: "orn:entropy:firehose:test", - Kind: "firehose", - Name: "test", - Project: "demo", - Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), - }, - State: resource.State{ - Status: resource.StatusPending, - ModuleData: []byte(`{"pending_steps":["release_create"]}`), + want: &module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:firehose:test", + Kind: "firehose", + Name: "test", + Project: "demo", + Spec: resource.Spec{ + Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + }, + State: resource.State{ + Status: resource.StatusPending, + ModuleData: []byte(`{"pending_steps":["release_create"]}`), + }, }, }, }, @@ -79,17 +82,19 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: ScaleAction, Params: []byte(`{"replicas": 5}`), }, - want: &resource.Resource{ - URN: "orn:entropy:firehose:test", - Kind: "firehose", - Name: "test", - Project: "demo", - Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","telegraf":null,"firehose":{"replicas":5,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), - }, - State: resource.State{ - Status: resource.StatusPending, - ModuleData: []byte(`{"pending_steps":["release_update"]}`), + want: &module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:firehose:test", + Kind: "firehose", + Name: "test", + Project: "demo", + Spec: resource.Spec{ + Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":5,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + }, + State: resource.State{ + Status: resource.StatusPending, + ModuleData: []byte(`{"pending_steps":["release_update"]}`), + }, }, }, }, @@ -100,18 +105,44 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: ResetAction, Params: []byte(`{"to":"DATETIME","datetime":"2022-06-22T00:00:00+00:00"}`), }, - want: &resource.Resource{ - URN: "orn:entropy:firehose:test", - Kind: "firehose", - Name: "test", - Project: "demo", - Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + want: &module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:firehose:test", + Kind: "firehose", + Name: "test", + Project: "demo", + Spec: resource.Spec{ + Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + }, + State: resource.State{ + Status: resource.StatusPending, + ModuleData: []byte(`{"pending_steps":["release_update","consumer_reset","release_update"],"reset_to":"2022-06-22T00:00:00+00:00","state_override":"STOPPED"}`), + }, }, - State: resource.State{ - Status: resource.StatusPending, - ModuleData: []byte(`{"pending_steps":["release_update","consumer_reset","release_update"],"reset_to":"2022-06-22T00:00:00+00:00","state_override":"STOPPED"}`), + }, + }, + { + title: "WithStopTimeConfiguration", + spec: module.Spec{Resource: res}, + act: module.ActionRequest{ + Name: module.CreateAction, + Params: []byte(`{"state":"RUNNING","stop_time":"3022-07-13T00:40:14.028016Z","firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + }, + want: &module.Plan{ + Resource: resource.Resource{ + URN: "orn:entropy:firehose:test", + Kind: "firehose", + Name: "test", + Project: "demo", + Spec: resource.Spec{ + Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":"3022-07-13T00:40:14.028016Z","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + }, + State: resource.State{ + Status: resource.StatusPending, + ModuleData: []byte(`{"pending_steps":["release_create"]}`), + }, }, + ScheduleRunAt: parseTime("3022-07-13T00:40:14.028016Z"), }, }, } @@ -123,7 +154,7 @@ func TestFirehoseModule_Plan(t *testing.T) { m := firehoseModule{} got, err := m.Plan(context.Background(), tt.spec, tt.act) - if tt.wantErr != nil { + if tt.wantErr != nil || err != nil { assert.Error(t, err) assert.True(t, errors.Is(err, tt.wantErr)) assert.Nil(t, got) @@ -134,3 +165,11 @@ func TestFirehoseModule_Plan(t *testing.T) { }) } } + +func parseTime(timeString string) time.Time { + t, err := time.Parse(time.RFC3339, timeString) + if err != nil { + panic(err) + } + return t +} diff --git a/modules/firehose/schema/config.json b/modules/firehose/schema/config.json index 17a6260a..23765598 100644 --- a/modules/firehose/schema/config.json +++ b/modules/firehose/schema/config.json @@ -14,6 +14,10 @@ "chart_version": { "type": "string" }, + "stop_time": { + "type": "string", + "format": "date-time" + }, "firehose": { "type": "object", "properties": { diff --git a/modules/firehose/sync.go b/modules/firehose/sync.go index 17aa5dfe..360310d5 100644 --- a/modules/firehose/sync.go +++ b/modules/firehose/sync.go @@ -29,21 +29,16 @@ func (m *firehoseModule) Sync(ctx context.Context, spec module.Spec) (*resource. r := spec.Resource var data moduleData + var pendingStep string if err := json.Unmarshal(r.State.ModuleData, &data); err != nil { return nil, err } - if len(data.PendingSteps) == 0 { - return &resource.State{ - Status: resource.StatusCompleted, - Output: r.State.Output, - ModuleData: r.State.ModuleData, - }, nil + if len(data.PendingSteps) != 0 { + pendingStep = data.PendingSteps[0] + data.PendingSteps = data.PendingSteps[1:] } - pendingStep := data.PendingSteps[0] - data.PendingSteps = data.PendingSteps[1:] - var conf moduleConfig if err := json.Unmarshal(r.Spec.Configs, &conf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) @@ -72,6 +67,10 @@ func (m *firehoseModule) Sync(ctx context.Context, spec module.Spec) (*resource. return nil, err } data.StateOverride = "" + default: + if err := m.releaseSync(pendingStep == releaseCreate, conf, r, kubeOut); err != nil { + return nil, err + } } finalStatus := resource.StatusCompleted @@ -92,7 +91,7 @@ func (m *firehoseModule) Sync(ctx context.Context, spec module.Spec) (*resource. func (*firehoseModule) releaseSync(isCreate bool, conf moduleConfig, r resource.Resource, kube kubernetes.Output) error { helmCl := helm.NewClient(&helm.Config{Kubernetes: kube.Configs}) - if conf.State == stateStopped { + if conf.State == stateStopped || (conf.StopTime != nil && conf.StopTime.Before(time.Now())) { conf.Firehose.Replicas = 0 } diff --git a/modules/kubernetes/kubernetes.go b/modules/kubernetes/kubernetes.go index c7c4fe6e..fbbb732b 100644 --- a/modules/kubernetes/kubernetes.go +++ b/modules/kubernetes/kubernetes.go @@ -39,7 +39,7 @@ type Output struct { ServerInfo version.Info `json:"server_info"` } -func (*kubeModule) Plan(_ context.Context, spec module.Spec, act module.ActionRequest) (*resource.Resource, error) { +func (*kubeModule) Plan(_ context.Context, spec module.Spec, act module.ActionRequest) (*module.Plan, error) { res := spec.Resource conf := kube.DefaultClientConfig() @@ -68,7 +68,7 @@ func (*kubeModule) Plan(_ context.Context, spec module.Spec, act module.ActionRe ServerInfo: *info, }.JSON(), } - return &res, nil + return &module.Plan{Resource: res}, nil } func (*kubeModule) Sync(_ context.Context, spec module.Spec) (*resource.State, error) { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6ddf5e72..c5822957 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -112,7 +112,7 @@ func (w *Worker) runWorker(ctx context.Context) { if len(kinds) == 0 { w.logger.Warn("no job-handler registered, skipping dequeue") } else { - w.logger.Info("looking for a job", zap.Strings("kinds", kinds)) + w.logger.Debug("looking for a job", zap.Strings("kinds", kinds)) if err := w.queue.Dequeue(ctx, kinds, w.handleJob); err != nil { w.logger.Error("dequeue failed", zap.Error(err)) }