Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ tmp/
expt/

entropy.dev.yaml

requests.http
4 changes: 4 additions & 0 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
return err
}

if err := asyncWorker.Register(core.JobKindScheduledSyncResource, service.HandleSyncJob); err != nil {
return err
}

return entropyserver.Serve(ctx, cfg.Service.addr(), nrApp, zapLog, service)
}

Expand Down
10 changes: 5 additions & 5 deletions core/mocks/loggable_module.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/mocks/module.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion core/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package module
import (
"context"
"encoding/json"
"time"

"github.com/odpf/entropy/core/resource"
"github.com/odpf/entropy/pkg/errors"
Expand All @@ -16,7 +17,7 @@ type Module interface {
// Plan SHOULD validate the action on the current version of the resource,
// return the resource with config/status/state changes (if any) applied.
// Plan SHOULD NOT have side effects on anything other than the resource.
Plan(ctx context.Context, spec Spec, act ActionRequest) (*resource.Resource, error)
Plan(ctx context.Context, spec Spec, act ActionRequest) (*Plan, error)

// Sync is called repeatedly by Entropy core until the returned state is
// a terminal status. Module implementation is free to execute an action
Expand All @@ -27,6 +28,12 @@ type Module interface {
Sync(ctx context.Context, spec Spec) (*resource.State, error)
}

// Plan represents the changes to be staged and later synced by module.
type Plan struct {
Resource resource.Resource
ScheduleRunAt time.Time
}

// Spec represents the context for Plan() or Sync() invocations.
type Spec struct {
Resource resource.Resource `json:"resource"`
Expand Down
2 changes: 1 addition & 1 deletion core/module/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (mr *Registry) get(kind string) (Descriptor, bool) {
return desc, found
}

func (mr *Registry) Plan(ctx context.Context, spec Spec, act ActionRequest) (*resource.Resource, error) {
func (mr *Registry) Plan(ctx context.Context, spec Spec, act ActionRequest) (*Plan, error) {
kind := spec.Resource.Kind

desc, found := mr.get(kind)
Expand Down
25 changes: 17 additions & 8 deletions core/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ import (
"fmt"
"time"

"github.com/odpf/entropy/core/module"
"github.com/odpf/entropy/core/resource"
"github.com/odpf/entropy/pkg/errors"
"github.com/odpf/entropy/pkg/worker"
)

const JobKindSyncResource = "sync_resource"
const (
JobKindSyncResource = "sync_resource"
JobKindScheduledSyncResource = "sched_sync_resource"
)

type syncJobPayload struct {
ResourceURN string `json:"resource_urn"`
UpdatedAt time.Time `json:"updated_at"`
}

func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource) error {
func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource, runAt time.Time, jobType string) error {
data := syncJobPayload{
ResourceURN: res.URN,
UpdatedAt: res.UpdatedAt,
Expand All @@ -29,12 +33,17 @@ func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource) err
return err
}

return s.worker.Enqueue(ctx, worker.Job{
ID: fmt.Sprintf("sync-%s-%d", res.URN, res.UpdatedAt.Unix()),
Kind: JobKindSyncResource,
RunAt: time.Now(),
job := worker.Job{
ID: fmt.Sprintf(jobType+"-%s-%d", res.URN, runAt.Unix()),
Kind: jobType,
RunAt: runAt,
Payload: payload,
})
}

if err := s.worker.Enqueue(ctx, job); err != nil && !errors.Is(err, worker.ErrJobExists) {
return err
}
return nil
}

// HandleSyncJob is meant to be invoked by asyncWorker when an enqueued job is
Expand Down Expand Up @@ -94,7 +103,7 @@ func (s *Service) syncChange(ctx context.Context, urn string) (*resource.Resourc
return nil, err
}
} else {
if err := s.upsert(ctx, *res, false); err != nil {
if err := s.upsert(ctx, module.Plan{Resource: *res}, false); err != nil {
return nil, err
}
}
Expand Down
45 changes: 26 additions & 19 deletions core/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,70 +55,77 @@ func (s *Service) ApplyAction(ctx context.Context, urn string, act module.Action
}

func (s *Service) execAction(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) {
plannedRes, err := s.planChange(ctx, res, act)
planned, err := s.planChange(ctx, res, act)
if err != nil {
return nil, err
}

if isCreate(act.Name) {
plannedRes.CreatedAt = s.clock()
plannedRes.UpdatedAt = plannedRes.CreatedAt
planned.Resource.CreatedAt = s.clock()
planned.Resource.UpdatedAt = planned.Resource.CreatedAt
} else {
plannedRes.CreatedAt = res.CreatedAt
plannedRes.UpdatedAt = s.clock()
planned.Resource.CreatedAt = res.CreatedAt
planned.Resource.UpdatedAt = s.clock()
}

if err := s.upsert(ctx, *plannedRes, isCreate(act.Name)); err != nil {
if err := s.upsert(ctx, *planned, isCreate(act.Name)); err != nil {
return nil, err
}
return plannedRes, nil
return &planned.Resource, nil
}

func isCreate(actionName string) bool {
return actionName == module.CreateAction
}

func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) {
func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*module.Plan, error) {
modSpec, err := s.generateModuleSpec(ctx, res)
if err != nil {
return nil, err
}

plannedRes, err := s.rootModule.Plan(ctx, *modSpec, act)
planned, err := s.rootModule.Plan(ctx, *modSpec, act)
if err != nil {
if errors.Is(err, errors.ErrInvalid) {
return nil, err
}
return nil, errors.ErrInternal.WithMsgf("plan() failed").WithCausef(err.Error())
} else if err := plannedRes.Validate(isCreate(act.Name)); err != nil {
} else if err := planned.Resource.Validate(isCreate(act.Name)); err != nil {
return nil, err
}

return plannedRes, nil
return planned, nil
}

func (s *Service) upsert(ctx context.Context, res resource.Resource, isCreate bool) error {
hook := func(ctx context.Context) error {
if res.State.IsTerminal() {
func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) error {
var hooks []resource.MutationHook
hooks = append(hooks, func(ctx context.Context) error {
if plan.Resource.State.IsTerminal() {
// no need to enqueue if resource has reached terminal state.
return nil
}

return s.enqueueSyncJob(ctx, res)
return s.enqueueSyncJob(ctx, plan.Resource, s.clock(), JobKindSyncResource)
})

if !plan.ScheduleRunAt.IsZero() {
hooks = append(hooks, func(ctx context.Context) error {
return s.enqueueSyncJob(ctx, plan.Resource, plan.ScheduleRunAt, JobKindScheduledSyncResource)
})
}

var err error
if isCreate {
err = s.store.Create(ctx, res, hook)
err = s.store.Create(ctx, plan.Resource, hooks...)
} else {
err = s.store.Update(ctx, res, hook)
err = s.store.Update(ctx, plan.Resource, hooks...)
}

if err != nil {
if isCreate && errors.Is(err, errors.ErrConflict) {
return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", res.URN)
return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", plan.Resource.URN)
} else if !isCreate && errors.Is(err, errors.ErrNotFound) {
return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", res.URN)
return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", plan.Resource.URN)
}
return errors.ErrInternal.WithCausef(err.Error())
}
Expand Down
Loading