diff --git a/api/handlers/v1/resource.go b/api/handlers/v1/server.go similarity index 65% rename from api/handlers/v1/resource.go rename to api/handlers/v1/server.go index 099d478c..567d36f5 100644 --- a/api/handlers/v1/resource.go +++ b/api/handlers/v1/server.go @@ -6,6 +6,7 @@ import ( "github.com/odpf/entropy/domain" "github.com/odpf/entropy/pkg/module" + "github.com/odpf/entropy/pkg/provider" "github.com/odpf/entropy/pkg/resource" "github.com/odpf/entropy/store" entropyv1beta1 "go.buf.build/odpf/gwv/odpf/proton/odpf/entropy/v1beta1" @@ -19,14 +20,17 @@ var ErrInternal = status.Error(codes.Internal, "internal server error") type APIServer struct { entropyv1beta1.UnimplementedResourceServiceServer + entropyv1beta1.UnimplementedProviderServiceServer resourceService resource.ServiceInterface moduleService module.ServiceInterface + providerService provider.ServiceInterface } -func NewApiServer(resourceService resource.ServiceInterface, moduleService module.ServiceInterface) *APIServer { +func NewApiServer(resourceService resource.ServiceInterface, moduleService module.ServiceInterface, providerService provider.ServiceInterface) *APIServer { return &APIServer{ resourceService: resourceService, moduleService: moduleService, + providerService: providerService, } } @@ -39,7 +43,7 @@ func (server APIServer) CreateResource(ctx context.Context, request *entropyv1be } createdResource, err := server.resourceService.CreateResource(ctx, res) if err != nil { - if errors.Is(err, store.ResourceAlreadyExistsError) { + if errors.Is(err, store.ErrResourceAlreadyExists) { return nil, status.Error(codes.AlreadyExists, "resource already exists") } return nil, ErrInternal @@ -61,7 +65,7 @@ func (server APIServer) CreateResource(ctx context.Context, request *entropyv1be func (server APIServer) UpdateResource(ctx context.Context, request *entropyv1beta1.UpdateResourceRequest) (*entropyv1beta1.UpdateResourceResponse, error) { res, err := server.resourceService.GetResource(ctx, request.GetUrn()) if err != nil { - if errors.Is(err, store.ResourceNotFoundError) { + if errors.Is(err, store.ErrResourceNotFound) { return nil, status.Error(codes.NotFound, "could not find resource with given urn") } return nil, ErrInternal @@ -93,7 +97,7 @@ func (server APIServer) UpdateResource(ctx context.Context, request *entropyv1be func (server APIServer) GetResource(ctx context.Context, request *entropyv1beta1.GetResourceRequest) (*entropyv1beta1.GetResourceResponse, error) { res, err := server.resourceService.GetResource(ctx, request.GetUrn()) if err != nil { - if errors.Is(err, store.ResourceNotFoundError) { + if errors.Is(err, store.ErrResourceNotFound) { return nil, status.Error(codes.NotFound, "could not find resource with given urn") } return nil, ErrInternal @@ -134,7 +138,7 @@ func (server APIServer) DeleteResource(ctx context.Context, request *entropyv1be urn := request.GetUrn() _, err := server.resourceService.GetResource(ctx, urn) if err != nil { - if errors.Is(err, store.ResourceNotFoundError) { + if errors.Is(err, store.ErrResourceNotFound) { return nil, status.Error(codes.NotFound, "could not find resource with given urn") } return nil, ErrInternal @@ -152,7 +156,7 @@ func (server APIServer) DeleteResource(ctx context.Context, request *entropyv1be func (server APIServer) ApplyAction(ctx context.Context, request *entropyv1beta1.ApplyActionRequest) (*entropyv1beta1.ApplyActionResponse, error) { res, err := server.resourceService.GetResource(ctx, request.GetUrn()) if err != nil { - if errors.Is(err, store.ResourceNotFoundError) { + if errors.Is(err, store.ErrResourceNotFound) { return nil, status.Error(codes.NotFound, "could not find resource with given urn") } return nil, ErrInternal @@ -178,6 +182,50 @@ func (server APIServer) ApplyAction(ctx context.Context, request *entropyv1beta1 return response, nil } +func (server APIServer) CreateProvider(ctx context.Context, request *entropyv1beta1.CreateProviderRequest) (*entropyv1beta1.CreateProviderResponse, error) { + pro := providerFromProto(request.Provider) + pro.Urn = domain.GenerateProviderUrn(pro) + // TODO: add provider validation + + createdProvider, err := server.providerService.CreateProvider(ctx, pro) + if err != nil { + if errors.Is(err, store.ErrProviderAlreadyExists) { + return nil, status.Error(codes.AlreadyExists, "provider already exists") + } + return nil, ErrInternal + } + + responseProvider, err := providerToProto(createdProvider) + if err != nil { + return nil, ErrInternal + } + response := entropyv1beta1.CreateProviderResponse{ + Provider: responseProvider, + } + return &response, nil +} + +func (server APIServer) ListProviders(ctx context.Context, request *entropyv1beta1.ListProvidersRequest) (*entropyv1beta1.ListProvidersResponse, error) { + var responseProviders []*entropyv1beta1.Provider + providers, err := server.providerService.ListProviders(ctx, request.GetParent(), request.GetKind()) + if err != nil { + return nil, ErrInternal + } + + for _, pro := range providers { + responseProvider, err := providerToProto(pro) + if err != nil { + return nil, ErrInternal + } + responseProviders = append(responseProviders, responseProvider) + } + + response := entropyv1beta1.ListProvidersResponse{ + Providers: responseProviders, + } + return &response, nil +} + func (server APIServer) syncResource(ctx context.Context, updatedResource *domain.Resource) (*domain.Resource, error) { syncedResource, err := server.moduleService.Sync(ctx, updatedResource) if err != nil { @@ -193,10 +241,10 @@ func (server APIServer) syncResource(ctx context.Context, updatedResource *domai func (server APIServer) validateResource(ctx context.Context, res *domain.Resource) error { err := server.moduleService.Validate(ctx, res) if err != nil { - if errors.Is(err, store.ModuleNotFoundError) { + if errors.Is(err, store.ErrModuleNotFound) { return status.Errorf(codes.InvalidArgument, "failed to find module to deploy this kind") } - if errors.Is(err, domain.ModuleConfigParseFailed) { + if errors.Is(err, domain.ErrModuleConfigParseFailed) { return status.Errorf(codes.InvalidArgument, "failed to parse configs") } return status.Errorf(codes.InvalidArgument, err.Error()) @@ -216,12 +264,43 @@ func resourceToProto(res *domain.Resource) (*entropyv1beta1.Resource, error) { Kind: res.Kind, Configs: conf, Labels: res.Labels, + Providers: resourceProvidersToProto(res.Providers), Status: resourceStatusToProto(string(res.Status)), CreatedAt: timestamppb.New(res.CreatedAt), UpdatedAt: timestamppb.New(res.UpdatedAt), }, nil } +func resourceProvidersToProto(ps []domain.ProviderSelector) []*entropyv1beta1.ProviderSelector { + var providerSelectors []*entropyv1beta1.ProviderSelector + + for _, p := range ps { + selector := &entropyv1beta1.ProviderSelector{ + Urn: p.Urn, + Target: p.Target, + } + providerSelectors = append(providerSelectors, selector) + } + return providerSelectors +} + +func providerToProto(pro *domain.Provider) (*entropyv1beta1.Provider, error) { + conf, err := structpb.NewValue(pro.Configs) + if err != nil { + return nil, err + } + return &entropyv1beta1.Provider{ + Urn: pro.Urn, + Name: pro.Name, + Parent: pro.Parent, + Kind: pro.Kind, + Configs: conf, + Labels: pro.Labels, + CreatedAt: timestamppb.New(pro.CreatedAt), + UpdatedAt: timestamppb.New(pro.UpdatedAt), + }, nil +} + func resourceStatusToProto(status string) entropyv1beta1.Resource_Status { if resourceStatus, ok := entropyv1beta1.Resource_Status_value[status]; ok { return entropyv1beta1.Resource_Status(resourceStatus) @@ -231,11 +310,36 @@ func resourceStatusToProto(status string) entropyv1beta1.Resource_Status { func resourceFromProto(res *entropyv1beta1.Resource) *domain.Resource { return &domain.Resource{ - Urn: res.GetUrn(), - Name: res.GetName(), - Parent: res.GetParent(), - Kind: res.GetKind(), - Configs: res.GetConfigs().GetStructValue().AsMap(), - Labels: res.GetLabels(), + Urn: res.GetUrn(), + Name: res.GetName(), + Parent: res.GetParent(), + Kind: res.GetKind(), + Configs: res.GetConfigs().GetStructValue().AsMap(), + Labels: res.GetLabels(), + Providers: providerSelectorFromProto(res.GetProviders()), + } +} + +func providerSelectorFromProto(ps []*entropyv1beta1.ProviderSelector) []domain.ProviderSelector { + var providerSelectors []domain.ProviderSelector + + for _, p := range ps { + selector := domain.ProviderSelector{ + Urn: p.GetUrn(), + Target: p.GetTarget(), + } + providerSelectors = append(providerSelectors, selector) + } + return providerSelectors +} + +func providerFromProto(pro *entropyv1beta1.Provider) *domain.Provider { + return &domain.Provider{ + Urn: pro.GetUrn(), + Name: pro.GetName(), + Parent: pro.GetParent(), + Kind: pro.GetKind(), + Configs: pro.GetConfigs().GetStructValue().AsMap(), + Labels: pro.GetLabels(), } } diff --git a/api/handlers/v1/resource_test.go b/api/handlers/v1/server_test.go similarity index 93% rename from api/handlers/v1/resource_test.go rename to api/handlers/v1/server_test.go index e019ed18..6cfbebb3 100644 --- a/api/handlers/v1/resource_test.go +++ b/api/handlers/v1/server_test.go @@ -102,7 +102,8 @@ func TestAPIServer_CreateResource(t *testing.T) { UpdatedAt: createdAt, }, nil) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.CreateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("CreateResource() error = %v, wantErr %v", err, wantErr) @@ -135,13 +136,14 @@ func TestAPIServer_CreateResource(t *testing.T) { resourceService.EXPECT(). CreateResource(mock.Anything, mock.Anything). - Return(nil, store.ResourceAlreadyExistsError). + Return(nil, store.ErrResourceAlreadyExists). Once() moduleService := &mocks.ModuleService{} moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(nil) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.CreateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("CreateResource() error = %v, wantErr %v", err, wantErr) @@ -189,9 +191,10 @@ func TestAPIServer_CreateResource(t *testing.T) { }, nil).Once() moduleService := &mocks.ModuleService{} - moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(store.ModuleNotFoundError) + moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(store.ErrModuleNotFound) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.CreateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("CreateResource() error = %v, wantErr %v", err, wantErr) @@ -239,9 +242,10 @@ func TestAPIServer_CreateResource(t *testing.T) { }, nil).Once() moduleService := &mocks.ModuleService{} - moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(domain.ModuleConfigParseFailed) + moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(domain.ErrModuleConfigParseFailed) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.CreateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("CreateResource() error = %v, wantErr %v", err, wantErr) @@ -352,7 +356,8 @@ func TestAPIServer_UpdateResource(t *testing.T) { UpdatedAt: updatedAt, }, nil) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.UpdateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("UpdateResource() error = %v, wantErr %v", err, wantErr) @@ -380,11 +385,12 @@ func TestAPIServer_UpdateResource(t *testing.T) { resourceService.EXPECT(). GetResource(mock.Anything, mock.Anything). - Return(nil, store.ResourceNotFoundError).Once() + Return(nil, store.ErrResourceNotFound).Once() moduleService := &mocks.ModuleService{} - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.UpdateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("UpdateResource() error = %v, wantErr %v", err, wantErr) @@ -421,9 +427,10 @@ func TestAPIServer_UpdateResource(t *testing.T) { }, nil).Once() moduleService := &mocks.ModuleService{} - moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(store.ModuleNotFoundError) + moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(store.ErrModuleNotFound) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.UpdateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("UpdateResource() error = %v, wantErr %v", err, wantErr) @@ -460,9 +467,10 @@ func TestAPIServer_UpdateResource(t *testing.T) { }, nil).Once() moduleService := &mocks.ModuleService{} - moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(domain.ModuleConfigParseFailed) + moduleService.EXPECT().Validate(mock.Anything, mock.Anything).Return(domain.ErrModuleConfigParseFailed) - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.UpdateResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("UpdateResource() error = %v, wantErr %v", err, wantErr) @@ -523,7 +531,7 @@ func TestAPIServer_GetResource(t *testing.T) { wantErr := status.Error(codes.NotFound, "could not find resource with given urn") mockResourceService := &mocks.ResourceService{} - mockResourceService.EXPECT().GetResource(mock.Anything, mock.Anything).Return(nil, store.ResourceNotFoundError).Once() + mockResourceService.EXPECT().GetResource(mock.Anything, mock.Anything).Return(nil, store.ErrResourceNotFound).Once() mockModuleService := &mocks.ModuleService{} @@ -620,7 +628,8 @@ func TestAPIServer_DeleteResource(t *testing.T) { moduleService := &mocks.ModuleService{} - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.DeleteResource(ctx, request) if !errors.Is(err, wantErr) { t.Errorf("DeleteResource() error = %v, wantErr %v", err, wantErr) @@ -640,11 +649,12 @@ func TestAPIServer_DeleteResource(t *testing.T) { resourceService := &mocks.ResourceService{} resourceService.EXPECT(). GetResource(mock.Anything, "p-testdata-gl-testname-log"). - Return(nil, store.ResourceNotFoundError).Once() + Return(nil, store.ErrResourceNotFound).Once() moduleService := &mocks.ModuleService{} - server := NewApiServer(resourceService, moduleService) + providerService := &mocks.ProviderService{} + server := NewApiServer(resourceService, moduleService, providerService) got, err := server.DeleteResource(ctx, request) if errors.Is(err, nil) { t.Errorf("DeleteResource() got nil error") @@ -751,7 +761,7 @@ func TestAPIServer_ApplyAction(t *testing.T) { resourceService := &mocks.ResourceService{} resourceService.EXPECT(). GetResource(mock.Anything, "p-testdata-gl-testname-log"). - Return(nil, store.ResourceNotFoundError).Once() + Return(nil, store.ErrResourceNotFound).Once() moduleService := &mocks.ModuleService{} diff --git a/app/app.go b/app/app.go index 6204a3fc..702c49b0 100644 --- a/app/app.go +++ b/app/app.go @@ -3,16 +3,18 @@ package app import ( "context" "fmt" + "net/http" + "time" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/odpf/entropy/modules/firehose" "github.com/odpf/entropy/modules/log" "github.com/odpf/entropy/pkg/module" + "github.com/odpf/entropy/pkg/provider" "github.com/odpf/entropy/pkg/resource" "github.com/odpf/entropy/store" "github.com/odpf/entropy/store/inmemory" "github.com/odpf/entropy/store/mongodb" - "net/http" - "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" @@ -72,6 +74,10 @@ func RunServer(c *Config) error { mongoStore.Collection(store.ResourceRepositoryName), ) + providerRepository := mongodb.NewProviderRepository( + mongoStore.Collection(store.ProviderRepositoryName), + ) + moduleRepository := inmemory.NewModuleRepository() err = moduleRepository.Register(log.New(loggerInstance)) @@ -79,13 +85,14 @@ func RunServer(c *Config) error { return err } - err = moduleRepository.Register(firehose.New()) + err = moduleRepository.Register(firehose.New(providerRepository)) if err != nil { return err } resourceService := resource.NewService(resourceRepository) moduleService := module.NewService(moduleRepository) + providerService := provider.NewService(providerRepository) muxServer, err := server.NewMux(server.Config{ Port: c.Service.Port, @@ -116,6 +123,11 @@ func RunServer(c *Config) error { return err } + err = gw.RegisterHandler(ctx, entropyv1beta1.RegisterProviderServiceHandlerFromEndpoint) + if err != nil { + return err + } + muxServer.SetGateway("/api", gw) muxServer.RegisterService( @@ -124,7 +136,12 @@ func RunServer(c *Config) error { ) muxServer.RegisterService( &entropyv1beta1.ResourceService_ServiceDesc, - handlersv1.NewApiServer(resourceService, moduleService), + handlersv1.NewApiServer(resourceService, moduleService, providerService), + ) + + muxServer.RegisterService( + &entropyv1beta1.ProviderService_ServiceDesc, + handlersv1.NewApiServer(resourceService, moduleService, providerService), ) muxServer.RegisterHandler("/ping", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -160,5 +177,19 @@ func RunMigrations(c *Config) error { mongoStore.Collection(store.ResourceRepositoryName), ) - return resourceRepository.Migrate() + providerRepository := mongodb.NewProviderRepository( + mongoStore.Collection(store.ProviderRepositoryName), + ) + + err = resourceRepository.Migrate() + if err != nil { + return err + } + + err = providerRepository.Migrate() + if err != nil { + return err + } + + return nil } diff --git a/domain/module.go b/domain/module.go index 22f3e925..e03b4bc8 100644 --- a/domain/module.go +++ b/domain/module.go @@ -5,7 +5,7 @@ package domain import "errors" var ( - ModuleConfigParseFailed = errors.New("unable to load and validate config") + ErrModuleConfigParseFailed = errors.New("unable to load and validate config") ) type Module interface { diff --git a/domain/provider.go b/domain/provider.go new file mode 100644 index 00000000..5159678e --- /dev/null +++ b/domain/provider.go @@ -0,0 +1,24 @@ +package domain + +import ( + "strings" + "time" +) + +type Provider struct { + Urn string `bson:"urn"` + Name string `bson:"name"` + Kind string `bson:"kind"` + Parent string `bson:"parent"` + Configs map[string]interface{} `bson:"configs"` + Labels map[string]string `bson:"labels"` + CreatedAt time.Time `bson:"created_at"` + UpdatedAt time.Time `bson:"updated_at"` +} + +func GenerateProviderUrn(pro *Provider) string { + return strings.Join([]string{ + sanitizeString(pro.Parent), + sanitizeString(pro.Name), + }, "-") +} diff --git a/domain/resource.go b/domain/resource.go index 09356635..9871d74a 100644 --- a/domain/resource.go +++ b/domain/resource.go @@ -16,6 +16,11 @@ const ( ResourceStatusCompleted ResourceStatus = "STATUS_COMPLETED" ) +type ProviderSelector struct { + Urn string `bson:"urn"` + Target string `bson:"target"` +} + type Resource struct { Urn string `bson:"urn"` Name string `bson:"name"` @@ -23,6 +28,7 @@ type Resource struct { Kind string `bson:"kind"` Configs map[string]interface{} `bson:"configs"` Labels map[string]string `bson:"labels"` + Providers []ProviderSelector `bson:"providers"` Status ResourceStatus `bson:"status"` CreatedAt time.Time `bson:"created_at"` UpdatedAt time.Time `bson:"updated_at"` diff --git a/go.mod b/go.mod index d549929f..fedc76d4 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.17 require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/mcuadros/go-defaults v1.2.0 + github.com/mitchellh/mapstructure v1.4.3 github.com/newrelic/go-agent/v3 v3.12.0 github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.1 github.com/odpf/salt v0.0.0-20210929215807-5e1f68b4ec91 @@ -12,7 +13,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/xeipuuv/gojsonschema v1.2.0 go.buf.build/odpf/gw/odpf/proton v1.1.9 - go.buf.build/odpf/gwv/odpf/proton v1.1.65 + go.buf.build/odpf/gwv/odpf/proton v1.1.77 go.mongodb.org/mongo-driver v1.5.1 go.uber.org/zap v1.19.0 google.golang.org/grpc v1.43.0 @@ -93,7 +94,6 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-wordwrap v1.0.0 // indirect - github.com/mitchellh/mapstructure v1.4.3 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/locker v1.0.1 // indirect github.com/moby/spdystream v0.2.0 // indirect diff --git a/go.sum b/go.sum index 89c2b7bc..5b1be5c0 100644 --- a/go.sum +++ b/go.sum @@ -1162,8 +1162,8 @@ go.buf.build/odpf/gw/odpf/proton v1.1.9 h1:iEdRUVVc/HwOqB7WRXjhXjR2pza2gyUbQ74G2 go.buf.build/odpf/gw/odpf/proton v1.1.9/go.mod h1:I9E8CF7w/690vRNWqBU6qDcUbi3Pi2THdn1yycBVTDQ= go.buf.build/odpf/gwv/envoyproxy/protoc-gen-validate v1.1.3/go.mod h1:2Tg6rYIoDhpl39Zd2+WBOF9uG4XxAOs0bK2Z2/bwTOc= go.buf.build/odpf/gwv/grpc-ecosystem/grpc-gateway v1.1.37/go.mod h1:UrBCdmHgaY/pLapYUMOq01c1yuzwT8AEBTsgpmzq2zo= -go.buf.build/odpf/gwv/odpf/proton v1.1.65 h1:A0jh+7kMfiGWoSXpIoMOpO1sDTETLEo09y2wY7e40ZA= -go.buf.build/odpf/gwv/odpf/proton v1.1.65/go.mod h1:3VlD6BkZ7cmhyEb+pOdyjier/UO5QNT5rfBIgHyWzLM= +go.buf.build/odpf/gwv/odpf/proton v1.1.77 h1:vp0KIoAYYq46VF1f1/7WWzeSPYjvmNqRBIq3cqMbCLA= +go.buf.build/odpf/gwv/odpf/proton v1.1.77/go.mod h1:3VlD6BkZ7cmhyEb+pOdyjier/UO5QNT5rfBIgHyWzLM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= diff --git a/mocks/provider_repository.go b/mocks/provider_repository.go new file mode 100644 index 00000000..ea79ec63 --- /dev/null +++ b/mocks/provider_repository.go @@ -0,0 +1,186 @@ +// Code generated by mockery v2.10.0. DO NOT EDIT. + +package mocks + +import ( + domain "github.com/odpf/entropy/domain" + mock "github.com/stretchr/testify/mock" +) + +// ProviderRepository is an autogenerated mock type for the ProviderRepository type +type ProviderRepository struct { + mock.Mock +} + +type ProviderRepository_Expecter struct { + mock *mock.Mock +} + +func (_m *ProviderRepository) EXPECT() *ProviderRepository_Expecter { + return &ProviderRepository_Expecter{mock: &_m.Mock} +} + +// Create provides a mock function with given fields: r +func (_m *ProviderRepository) Create(r *domain.Provider) error { + ret := _m.Called(r) + + var r0 error + if rf, ok := ret.Get(0).(func(*domain.Provider) error); ok { + r0 = rf(r) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ProviderRepository_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' +type ProviderRepository_Create_Call struct { + *mock.Call +} + +// Create is a helper method to define mock.On call +// - r *domain.Provider +func (_e *ProviderRepository_Expecter) Create(r interface{}) *ProviderRepository_Create_Call { + return &ProviderRepository_Create_Call{Call: _e.mock.On("Create", r)} +} + +func (_c *ProviderRepository_Create_Call) Run(run func(r *domain.Provider)) *ProviderRepository_Create_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*domain.Provider)) + }) + return _c +} + +func (_c *ProviderRepository_Create_Call) Return(_a0 error) *ProviderRepository_Create_Call { + _c.Call.Return(_a0) + return _c +} + +// GetByURN provides a mock function with given fields: urn +func (_m *ProviderRepository) GetByURN(urn string) (*domain.Provider, error) { + ret := _m.Called(urn) + + var r0 *domain.Provider + if rf, ok := ret.Get(0).(func(string) *domain.Provider); ok { + r0 = rf(urn) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.Provider) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(urn) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ProviderRepository_GetByURN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetByURN' +type ProviderRepository_GetByURN_Call struct { + *mock.Call +} + +// GetByURN is a helper method to define mock.On call +// - urn string +func (_e *ProviderRepository_Expecter) GetByURN(urn interface{}) *ProviderRepository_GetByURN_Call { + return &ProviderRepository_GetByURN_Call{Call: _e.mock.On("GetByURN", urn)} +} + +func (_c *ProviderRepository_GetByURN_Call) Run(run func(urn string)) *ProviderRepository_GetByURN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *ProviderRepository_GetByURN_Call) Return(_a0 *domain.Provider, _a1 error) *ProviderRepository_GetByURN_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// List provides a mock function with given fields: filter +func (_m *ProviderRepository) List(filter map[string]string) ([]*domain.Provider, error) { + ret := _m.Called(filter) + + var r0 []*domain.Provider + if rf, ok := ret.Get(0).(func(map[string]string) []*domain.Provider); ok { + r0 = rf(filter) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*domain.Provider) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(map[string]string) error); ok { + r1 = rf(filter) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ProviderRepository_List_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'List' +type ProviderRepository_List_Call struct { + *mock.Call +} + +// List is a helper method to define mock.On call +// - filter map[string]string +func (_e *ProviderRepository_Expecter) List(filter interface{}) *ProviderRepository_List_Call { + return &ProviderRepository_List_Call{Call: _e.mock.On("List", filter)} +} + +func (_c *ProviderRepository_List_Call) Run(run func(filter map[string]string)) *ProviderRepository_List_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string)) + }) + return _c +} + +func (_c *ProviderRepository_List_Call) Return(_a0 []*domain.Provider, _a1 error) *ProviderRepository_List_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// Migrate provides a mock function with given fields: +func (_m *ProviderRepository) Migrate() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ProviderRepository_Migrate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Migrate' +type ProviderRepository_Migrate_Call struct { + *mock.Call +} + +// Migrate is a helper method to define mock.On call +func (_e *ProviderRepository_Expecter) Migrate() *ProviderRepository_Migrate_Call { + return &ProviderRepository_Migrate_Call{Call: _e.mock.On("Migrate")} +} + +func (_c *ProviderRepository_Migrate_Call) Run(run func()) *ProviderRepository_Migrate_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *ProviderRepository_Migrate_Call) Return(_a0 error) *ProviderRepository_Migrate_Call { + _c.Call.Return(_a0) + return _c +} diff --git a/mocks/provider_service.go b/mocks/provider_service.go new file mode 100644 index 00000000..dcfa87bc --- /dev/null +++ b/mocks/provider_service.go @@ -0,0 +1,118 @@ +// Code generated by mockery v2.10.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + domain "github.com/odpf/entropy/domain" + mock "github.com/stretchr/testify/mock" +) + +// ProviderService is an autogenerated mock type for the ServiceInterface type +type ProviderService struct { + mock.Mock +} + +type ProviderService_Expecter struct { + mock *mock.Mock +} + +func (_m *ProviderService) EXPECT() *ProviderService_Expecter { + return &ProviderService_Expecter{mock: &_m.Mock} +} + +// CreateProvider provides a mock function with given fields: ctx, res +func (_m *ProviderService) CreateProvider(ctx context.Context, res *domain.Provider) (*domain.Provider, error) { + ret := _m.Called(ctx, res) + + var r0 *domain.Provider + if rf, ok := ret.Get(0).(func(context.Context, *domain.Provider) *domain.Provider); ok { + r0 = rf(ctx, res) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*domain.Provider) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *domain.Provider) error); ok { + r1 = rf(ctx, res) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ProviderService_CreateProvider_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateProvider' +type ProviderService_CreateProvider_Call struct { + *mock.Call +} + +// CreateProvider is a helper method to define mock.On call +// - ctx context.Context +// - res *domain.Provider +func (_e *ProviderService_Expecter) CreateProvider(ctx interface{}, res interface{}) *ProviderService_CreateProvider_Call { + return &ProviderService_CreateProvider_Call{Call: _e.mock.On("CreateProvider", ctx, res)} +} + +func (_c *ProviderService_CreateProvider_Call) Run(run func(ctx context.Context, res *domain.Provider)) *ProviderService_CreateProvider_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*domain.Provider)) + }) + return _c +} + +func (_c *ProviderService_CreateProvider_Call) Return(_a0 *domain.Provider, _a1 error) *ProviderService_CreateProvider_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// ListProviders provides a mock function with given fields: ctx, parent, kind +func (_m *ProviderService) ListProviders(ctx context.Context, parent string, kind string) ([]*domain.Provider, error) { + ret := _m.Called(ctx, parent, kind) + + var r0 []*domain.Provider + if rf, ok := ret.Get(0).(func(context.Context, string, string) []*domain.Provider); ok { + r0 = rf(ctx, parent, kind) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*domain.Provider) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, parent, kind) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ProviderService_ListProviders_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListProviders' +type ProviderService_ListProviders_Call struct { + *mock.Call +} + +// ListProviders is a helper method to define mock.On call +// - ctx context.Context +// - parent string +// - kind string +func (_e *ProviderService_Expecter) ListProviders(ctx interface{}, parent interface{}, kind interface{}) *ProviderService_ListProviders_Call { + return &ProviderService_ListProviders_Call{Call: _e.mock.On("ListProviders", ctx, parent, kind)} +} + +func (_c *ProviderService_ListProviders_Call) Run(run func(ctx context.Context, parent string, kind string)) *ProviderService_ListProviders_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *ProviderService_ListProviders_Call) Return(_a0 []*domain.Provider, _a1 error) *ProviderService_ListProviders_Call { + _c.Call.Return(_a0, _a1) + return _c +} diff --git a/modules/firehose/module.go b/modules/firehose/module.go index 893f44f0..27a9f2cf 100644 --- a/modules/firehose/module.go +++ b/modules/firehose/module.go @@ -5,248 +5,340 @@ import ( "fmt" "strings" + "github.com/mitchellh/mapstructure" "github.com/odpf/entropy/domain" + "github.com/odpf/entropy/plugins/providers/helm" + "github.com/odpf/entropy/store/mongodb" gjs "github.com/xeipuuv/gojsonschema" ) +const ( + releaseConfigString = "release_configs" + KUBERNETES = "kubernetes" + defaultRepositoryString = "https://odpf.github.io/charts/" + defaultChartString = "firehose" + defaultVersionString = "0.1.1" +) + const configSchemaString = ` { "$schema": "http://json-schema.org/draft-07/schema#", "$id": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { - "image": { - "type": "string" - }, - "replicas": { - "type": "number" - }, - "namespace": { - "type": "string" - }, - "cluster": { - "type": "string" - }, - "sink_type": { - "type": "string", - "enum": [ - "LOG", - "HTTP" - ] - }, - "state": { - "type": "string", - "enum": [ - "RUNNING", - "STOPPED" - ] - }, - "stop_date": { - "type": "string", - "format": "date-time" - }, - "description": { - "type": "string" - } - }, - "allOf": [ - { - "if": { - "properties": { - "sink_type": { - "const": "LOG" - } + "release_configs": { + "type": "object", + "properties": { + "name": { + "type": "string" }, - "required": [ - "sink_type" - ] - }, - "then": { - "properties": { - "configuration": { - "type": "object", - "properties": { - "KAFKA_RECORD_PARSER_MODE": { - "type": "string" - }, - "SOURCE_KAFKA_BROKERS": { - "type": "string" - }, - "SOURCE_KAFKA_TOPIC": { - "type": "string" - }, - "SOURCE_KAFKA_CONSUMER_GROUP_ID": { - "type": "string" - }, - "INPUT_SCHEMA_PROTO_CLASS": { - "type": "string" - } - }, - "required": [ - "KAFKA_RECORD_PARSER_MODE", - "SOURCE_KAFKA_BROKERS", - "SOURCE_KAFKA_TOPIC", - "SOURCE_KAFKA_CONSUMER_GROUP_ID", - "INPUT_SCHEMA_PROTO_CLASS" - ] - } - } - } - }, - { - "if": { - "properties": { - "sink_type": { - "const": "HTTP" - } + "repository": { + "type": "string" }, - "required": [ - "sink_type" - ] - }, - "then": { - "properties": { - "configuration": { - "type": "object", - "properties": { - "SOURCE_KAFKA_BROKERS": { - "type": "string" - }, - "SOURCE_KAFKA_TOPIC": { - "type": "string" - }, - "SOURCE_KAFKA_CONSUMER_GROUP_ID": { - "type": "string" - }, - "INPUT_SCHEMA_PROTO_CLASS": { - "type": "string" - }, - "SINK_HTTP_RETRY_STATUS_CODE_RANGES": { - "type": "string" - }, - "SINK_HTTP_REQUEST_LOG_STATUS_CODE_RANGES": { - "type": "string" - }, - "SINK_HTTP_REQUEST_TIMEOUT_MS": { - "type": "number" - }, - "SINK_HTTP_REQUEST_METHOD": { - "type": "string", - "enum": [ - "put", - "post" - ] - }, - "SINK_HTTP_MAX_CONNECTIONS": { - "type": "number" - }, - "SINK_HTTP_SERVICE_URL": { - "type": "string" - }, - "SINK_HTTP_HEADERS": { - "type": "string" - }, - "SINK_HTTP_PARAMETER_SOURCE": { - "type": "string", - "enum": [ - "key", - "message", - "disabled" - ] - }, - "SINK_HTTP_DATA_FORMAT": { - "type": "string", - "enum": [ - "proto", - "json" + "chart": { + "type": "string" + }, + "version": { + "type": "string" + }, + "namespace": { + "type": "string" + }, + "timeout": { + "type": "number" + }, + "force_update": { + "type": "boolean" + }, + "recreate_pods": { + "type": "boolean" + }, + "wait": { + "type": "boolean" + }, + "wait_for_jobs": { + "type": "boolean" + }, + "replace": { + "type": "boolean" + }, + "description": { + "type": "string" + }, + "create_namespace": { + "type": "boolean" + }, + "state": { + "type": "string", + "enum": [ + "RUNNING", + "STOPPED" + ] + }, + "values": { + "type": "object", + "properties": { + "image": { + "type": "string" + }, + "replicas": { + "type": "number" + }, + "namespace": { + "type": "string" + }, + "cluster": { + "type": "string" + }, + "sink_type": { + "type": "string", + "enum": [ + "LOG", + "HTTP" + ] + }, + "stop_date": { + "type": "string", + "format": "date-time" + }, + "description": { + "type": "string" + } + }, + "allOf": [ + { + "if": { + "properties": { + "sink_type": { + "const": "LOG" + } + }, + "required": [ + "sink_type" ] }, - "SINK_HTTP_OAUTH2_ENABLE": { - "type": "boolean" - }, - "SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL": { - "type": "string" - }, - "SINK_HTTP_OAUTH2_CLIENT_NAME": { - "type": "string" - }, - "SINK_HTTP_OAUTH2_CLIENT_SECRET": { - "type": "string" - }, - "SINK_HTTP_OAUTH2_SCOPE": { - "type": "string" - }, - "SINK_HTTP_JSON_BODY_TEMPLATE": { - "type": "string" - }, - "SINK_HTTP_PARAMETER_PLACEMENT": { - "type": "string", - "enum": [ - "query", - "header" + "then": { + "properties": { + "configuration": { + "type": "object", + "properties": { + "KAFKA_RECORD_PARSER_MODE": { + "type": "string" + }, + "SOURCE_KAFKA_BROKERS": { + "type": "string" + }, + "SOURCE_KAFKA_TOPIC": { + "type": "string" + }, + "SOURCE_KAFKA_CONSUMER_GROUP_ID": { + "type": "string" + }, + "INPUT_SCHEMA_PROTO_CLASS": { + "type": "string" + } + }, + "required": [ + "KAFKA_RECORD_PARSER_MODE", + "SOURCE_KAFKA_BROKERS", + "SOURCE_KAFKA_TOPIC", + "SOURCE_KAFKA_CONSUMER_GROUP_ID", + "INPUT_SCHEMA_PROTO_CLASS" + ] + } + } + } + }, + { + "if": { + "properties": { + "sink_type": { + "const": "HTTP" + } + }, + "required": [ + "sink_type" ] }, - "SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS": { - "type": "string" + "then": { + "properties": { + "configuration": { + "type": "object", + "properties": { + "SOURCE_KAFKA_BROKERS": { + "type": "string" + }, + "SOURCE_KAFKA_TOPIC": { + "type": "string" + }, + "SOURCE_KAFKA_CONSUMER_GROUP_ID": { + "type": "string" + }, + "INPUT_SCHEMA_PROTO_CLASS": { + "type": "string" + }, + "SINK_HTTP_RETRY_STATUS_CODE_RANGES": { + "type": "string" + }, + "SINK_HTTP_REQUEST_LOG_STATUS_CODE_RANGES": { + "type": "string" + }, + "SINK_HTTP_REQUEST_TIMEOUT_MS": { + "type": "number" + }, + "SINK_HTTP_REQUEST_METHOD": { + "type": "string", + "enum": [ + "put", + "post" + ] + }, + "SINK_HTTP_MAX_CONNECTIONS": { + "type": "number" + }, + "SINK_HTTP_SERVICE_URL": { + "type": "string" + }, + "SINK_HTTP_HEADERS": { + "type": "string" + }, + "SINK_HTTP_PARAMETER_SOURCE": { + "type": "string", + "enum": [ + "key", + "message", + "disabled" + ] + }, + "SINK_HTTP_DATA_FORMAT": { + "type": "string", + "enum": [ + "proto", + "json" + ] + }, + "SINK_HTTP_OAUTH2_ENABLE": { + "type": "boolean" + }, + "SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL": { + "type": "string" + }, + "SINK_HTTP_OAUTH2_CLIENT_NAME": { + "type": "string" + }, + "SINK_HTTP_OAUTH2_CLIENT_SECRET": { + "type": "string" + }, + "SINK_HTTP_OAUTH2_SCOPE": { + "type": "string" + }, + "SINK_HTTP_JSON_BODY_TEMPLATE": { + "type": "string" + }, + "SINK_HTTP_PARAMETER_PLACEMENT": { + "type": "string", + "enum": [ + "query", + "header" + ] + }, + "SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS": { + "type": "string" + } + }, + "required": [ + "SOURCE_KAFKA_BROKERS", + "SOURCE_KAFKA_TOPIC", + "SOURCE_KAFKA_CONSUMER_GROUP_ID", + "INPUT_SCHEMA_PROTO_CLASS", + "SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS", + "SINK_HTTP_PARAMETER_PLACEMENT", + "SINK_HTTP_JSON_BODY_TEMPLATE", + "SINK_HTTP_OAUTH2_SCOPE", + "SINK_HTTP_OAUTH2_CLIENT_SECRET", + "SINK_HTTP_OAUTH2_CLIENT_NAME", + "SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL", + "SINK_HTTP_OAUTH2_ENABLE", + "SINK_HTTP_DATA_FORMAT", + "SINK_HTTP_PARAMETER_SOURCE", + "SINK_HTTP_HEADERS", + "SINK_HTTP_SERVICE_URL", + "SINK_HTTP_MAX_CONNECTIONS", + "SINK_HTTP_REQUEST_METHOD", + "SINK_HTTP_REQUEST_TIMEOUT_MS", + "SINK_HTTP_REQUEST_LOG_STATUS_CODE_RANGES", + "SINK_HTTP_RETRY_STATUS_CODE_RANGES" + ] + } + } } - }, - "required": [ - "SOURCE_KAFKA_BROKERS", - "SOURCE_KAFKA_TOPIC", - "SOURCE_KAFKA_CONSUMER_GROUP_ID", - "INPUT_SCHEMA_PROTO_CLASS", - "SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS", - "SINK_HTTP_PARAMETER_PLACEMENT", - "SINK_HTTP_JSON_BODY_TEMPLATE", - "SINK_HTTP_OAUTH2_SCOPE", - "SINK_HTTP_OAUTH2_CLIENT_SECRET", - "SINK_HTTP_OAUTH2_CLIENT_NAME", - "SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL", - "SINK_HTTP_OAUTH2_ENABLE", - "SINK_HTTP_DATA_FORMAT", - "SINK_HTTP_PARAMETER_SOURCE", - "SINK_HTTP_HEADERS", - "SINK_HTTP_SERVICE_URL", - "SINK_HTTP_MAX_CONNECTIONS", - "SINK_HTTP_REQUEST_METHOD", - "SINK_HTTP_REQUEST_TIMEOUT_MS", - "SINK_HTTP_REQUEST_LOG_STATUS_CODE_RANGES", - "SINK_HTTP_RETRY_STATUS_CODE_RANGES" - ] - } + } + ], + "required": [ + "image", + "replicas", + "namespace" + ] } - } + }, + "required": [ + "state" + ] } - ], - "required": [ - "image", - "replicas", - "namespace", - "state" - ] + } } ` type Module struct { - schema *gjs.Schema + schema *gjs.Schema + providerRepository *mongodb.ProviderRepository } func (m *Module) ID() string { return "firehose" } -func New() *Module { +func New(providerRepository *mongodb.ProviderRepository) *Module { schemaLoader := gjs.NewStringLoader(configSchemaString) schema, err := gjs.NewSchema(schemaLoader) if err != nil { return nil } return &Module{ - schema: schema, + schema: schema, + providerRepository: providerRepository, } } func (m *Module) Apply(r *domain.Resource) (domain.ResourceStatus, error) { + for _, p := range r.Providers { + provider, err := m.providerRepository.GetByURN(p.Urn) + if err != nil { + return domain.ResourceStatusError, err + } + + if provider.Kind == KUBERNETES { + releaseConfig := helm.DefaultReleaseConfig() + releaseConfig.Repository = defaultRepositoryString + releaseConfig.Chart = defaultChartString + releaseConfig.Version = defaultVersionString + err := mapstructure.Decode(r.Configs[releaseConfigString], &releaseConfig) + if err != nil { + return domain.ResourceStatusError, err + } + + kubeConfig := helm.ToKubeConfig(provider.Configs) + helmConfig := &helm.ProviderConfig{ + Kubernetes: kubeConfig, + } + helmProvider := helm.NewProvider(helmConfig) + _, err = helmProvider.Release(releaseConfig) + if err != nil { + return domain.ResourceStatusError, nil + } + } + } + return domain.ResourceStatusCompleted, nil } @@ -254,7 +346,7 @@ func (m *Module) Validate(r *domain.Resource) error { resourceLoader := gjs.NewGoLoader(r.Configs) result, err := m.schema.Validate(resourceLoader) if err != nil { - return fmt.Errorf("%w: %s", domain.ModuleConfigParseFailed, err) + return fmt.Errorf("%w: %s", domain.ErrModuleConfigParseFailed, err) } if !result.Valid() { var errorStrings []string diff --git a/modules/log/module.go b/modules/log/module.go index a034771a..90e39338 100644 --- a/modules/log/module.go +++ b/modules/log/module.go @@ -3,10 +3,11 @@ package log import ( "errors" "fmt" + "strings" + "github.com/odpf/entropy/domain" gjs "github.com/xeipuuv/gojsonschema" "go.uber.org/zap" - "strings" ) type Level string @@ -77,7 +78,7 @@ func (m *Module) Validate(r *domain.Resource) error { resourceLoader := gjs.NewGoLoader(r.Configs) result, err := m.schema.Validate(resourceLoader) if err != nil { - return fmt.Errorf("%w: %s", domain.ModuleConfigParseFailed, err) + return fmt.Errorf("%w: %s", domain.ErrModuleConfigParseFailed, err) } if !result.Valid() { var errorStrings []string diff --git a/pkg/module/service_test.go b/pkg/module/service_test.go index ca30a7d2..704ee383 100644 --- a/pkg/module/service_test.go +++ b/pkg/module/service_test.go @@ -3,13 +3,14 @@ package module import ( "context" "errors" + "reflect" + "testing" + "time" + "github.com/odpf/entropy/domain" "github.com/odpf/entropy/mocks" "github.com/odpf/entropy/store" "github.com/stretchr/testify/mock" - "reflect" - "testing" - "time" ) func TestService_Sync(t *testing.T) { @@ -75,7 +76,7 @@ func TestService_Sync(t *testing.T) { { name: "test sync module not found error", setup: func(t *testing.T) { - mockModuleRepo.EXPECT().Get("mock").Return(nil, store.ModuleNotFoundError).Once() + mockModuleRepo.EXPECT().Get("mock").Return(nil, store.ErrModuleNotFound).Once() }, fields: fields{ moduleRepository: mockModuleRepo, @@ -95,7 +96,7 @@ func TestService_Sync(t *testing.T) { CreatedAt: currentTime, UpdatedAt: currentTime, }, - wantErr: store.ModuleNotFoundError, + wantErr: store.ErrModuleNotFound, }, { name: "test sync module error while applying", @@ -195,7 +196,7 @@ func TestService_Validate(t *testing.T) { { name: "test validate module not found error", setup: func(t *testing.T) { - mockModuleRepo.EXPECT().Get("mock").Return(nil, store.ModuleNotFoundError).Once() + mockModuleRepo.EXPECT().Get("mock").Return(nil, store.ErrModuleNotFound).Once() }, fields: fields{ moduleRepository: mockModuleRepo, @@ -204,7 +205,7 @@ func TestService_Validate(t *testing.T) { ctx: nil, r: r, }, - wantErr: store.ModuleNotFoundError, + wantErr: store.ErrModuleNotFound, }, { name: "test validation failed", diff --git a/pkg/provider/service.go b/pkg/provider/service.go new file mode 100644 index 00000000..e327f2f2 --- /dev/null +++ b/pkg/provider/service.go @@ -0,0 +1,46 @@ +package provider + +import ( + "context" + + "github.com/odpf/entropy/domain" + "github.com/odpf/entropy/store" +) + +type ServiceInterface interface { + CreateProvider(ctx context.Context, res *domain.Provider) (*domain.Provider, error) + ListProviders(ctx context.Context, parent string, kind string) ([]*domain.Provider, error) +} + +type Service struct { + providerRepository store.ProviderRepository +} + +func NewService(repository store.ProviderRepository) *Service { + return &Service{ + providerRepository: repository, + } +} + +func (s *Service) CreateProvider(ctx context.Context, pro *domain.Provider) (*domain.Provider, error) { + err := s.providerRepository.Create(pro) + if err != nil { + return nil, err + } + createdProvider, err := s.providerRepository.GetByURN(pro.Urn) + if err != nil { + return nil, err + } + return createdProvider, nil +} + +func (s *Service) ListProviders(ctx context.Context, parent string, kind string) ([]*domain.Provider, error) { + filter := map[string]string{} + if kind != "" { + filter["kind"] = kind + } + if parent != "" { + filter["parent"] = parent + } + return s.providerRepository.List(filter) +} diff --git a/pkg/resource/service_test.go b/pkg/resource/service_test.go index eebc2420..287b60df 100644 --- a/pkg/resource/service_test.go +++ b/pkg/resource/service_test.go @@ -3,14 +3,15 @@ package resource import ( "context" "errors" + "reflect" + "testing" + "time" + "github.com/odpf/entropy/domain" "github.com/odpf/entropy/mocks" "github.com/odpf/entropy/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "reflect" - "testing" - "time" ) func TestService_CreateResource(t *testing.T) { @@ -76,10 +77,10 @@ func TestService_CreateResource(t *testing.T) { Labels: map[string]string{}, } want := (*domain.Resource)(nil) - wantErr := store.ResourceAlreadyExistsError + wantErr := store.ErrResourceAlreadyExists mockRepo.EXPECT().Create(mock.Anything).Run(func(r *domain.Resource) { assert.Equal(t, domain.ResourceStatusPending, r.Status) - }).Return(store.ResourceAlreadyExistsError).Once() + }).Return(store.ErrResourceAlreadyExists).Once() s := NewService(mockRepo) got, err := s.CreateResource(context.Background(), argResource) @@ -158,11 +159,11 @@ func TestService_UpdateResource(t *testing.T) { mockRepo := &mocks.ResourceRepository{} want := (*domain.Resource)(nil) - wantErr := store.ResourceNotFoundError + wantErr := store.ErrResourceNotFound mockRepo.EXPECT(). Update(mock.Anything). - Return(store.ResourceNotFoundError). + Return(store.ErrResourceNotFound). Once() s := NewService(mockRepo) @@ -236,11 +237,11 @@ func TestService_GetResource(t *testing.T) { mockRepo := &mocks.ResourceRepository{} want := (*domain.Resource)(nil) - wantErr := store.ResourceNotFoundError + wantErr := store.ErrResourceNotFound mockRepo.EXPECT(). GetByURN(mock.Anything). - Return(nil, store.ResourceNotFoundError). + Return(nil, store.ErrResourceNotFound). Once() s := NewService(mockRepo) diff --git a/pkg/provider/helm/kube_rest.go b/plugins/providers/helm/kube_rest.go similarity index 100% rename from pkg/provider/helm/kube_rest.go rename to plugins/providers/helm/kube_rest.go diff --git a/pkg/provider/helm/provider.go b/plugins/providers/helm/provider.go similarity index 80% rename from pkg/provider/helm/provider.go rename to plugins/providers/helm/provider.go index a3ea42e0..43fb48bd 100644 --- a/pkg/provider/helm/provider.go +++ b/plugins/providers/helm/provider.go @@ -10,15 +10,15 @@ import ( "k8s.io/client-go/tools/clientcmd/api" ) -type providerConfig struct { +type ProviderConfig struct { // HelmDriver - The backend storage driver. Values are - configmap, secret, memory, sql HelmDriver string `default:"secret"` // Kubernetes configuration. Kubernetes KubernetesConfig } -func DefaultProviderConfig() *providerConfig { - defaultProviderConfig := new(providerConfig) +func DefaultProviderConfig() *ProviderConfig { + defaultProviderConfig := new(ProviderConfig) defaults.SetDefaults(defaultProviderConfig) return defaultProviderConfig } @@ -41,11 +41,11 @@ type KubernetesConfig struct { } type Provider struct { - config *providerConfig + config *ProviderConfig cliSettings *cli.EnvSettings } -func NewProvider(config *providerConfig) *Provider { +func NewProvider(config *ProviderConfig) *Provider { return &Provider{config: config, cliSettings: cli.New()} } @@ -80,3 +80,13 @@ func (p *Provider) getActionConfiguration(namespace string) (*action.Configurati } return actionConfig, nil } + +func ToKubeConfig(providerConfig map[string]interface{}) KubernetesConfig { + return KubernetesConfig{ + Host: providerConfig["host"].(string), + Insecure: providerConfig["insecure"].(bool), + ClientCertificate: providerConfig["clientCertificate"].(string), + ClientKey: providerConfig["clientKey"].(string), + ClusterCACertificate: providerConfig["clusterCACertificate"].(string), + } +} diff --git a/pkg/provider/helm/release.go b/plugins/providers/helm/release.go similarity index 84% rename from pkg/provider/helm/release.go rename to plugins/providers/helm/release.go index 371c4db9..e874db09 100644 --- a/pkg/provider/helm/release.go +++ b/plugins/providers/helm/release.go @@ -18,45 +18,46 @@ import ( var ErrReleaseNotFound = errors.New("release not found") var ErrChartNotApplication = errors.New("helm chart is not an application chart") -type releaseConfig struct { +type ReleaseConfig struct { // Name - Release Name - Name string `valid:"required"` + Name string `json:"name" mapstructure:"name"` // Repository - Repository where to locate the requested chart. If is a URL the chart is installed without installing the repository. - Repository string `valid:"required"` + Repository string `json:"repository" mapstructure:"repository"` // Chart - Chart name to be installed. A path may be used. - Chart string `valid:"required"` + Chart string `json:"chart" mapstructure:"chart"` // Version - Specify the exact chart version to install. If this is not specified, the latest version is installed. - Version string + Version string `json:"version" mapstructure:"version"` // Values - Map of values in to pass to helm. - Values map[string]interface{} + Values map[string]interface{} `json:"values" mapstructure:"values"` // Namespace - Namespace to install the release into. - Namespace string `default:"default"` + Namespace string `json:"namespace" mapstructure:"namespace" default:"default"` // Timeout - Time in seconds to wait for any individual kubernetes operation. - Timeout int `default:"300"` + Timeout int `json:"timeout" mapstructure:"timeout" default:"300"` // ForceUpdate - Force resource update through delete/recreate if needed. - ForceUpdate bool `default:"false"` + ForceUpdate bool `json:"force_update" mapstructure:"force_update" default:"false"` // RecreatePods - Perform pods restart during upgrade/rollback - RecreatePods bool `default:"false"` + RecreatePods bool `json:"recreate_pods" mapstructure:"recreate_pods" default:"false"` // Wait - Will wait until all resources are in a ready state before marking the release as successful. - Wait bool `default:"true"` + Wait bool `json:"wait" mapstructure:"wait" default:"true"` // WaitForJobs - If wait is enabled, will wait until all Jobs have been completed before marking the release as successful. - WaitForJobs bool `default:"false"` + WaitForJobs bool `json:"wait_for_jobs" mapstructure:"wait_for_jobs" default:"false"` // Replace - Re-use the given name, even if that name is already used. This is unsafe in production - Replace bool `default:"false"` + Replace bool `json:"replace" mapstructure:"replace" default:"false"` // Description - Add a custom description - Description string + Description string `json:"description" mapstructure:"description"` // CreateNamespace - Create the namespace if it does not exist - CreateNamespace bool `default:"false"` + CreateNamespace bool `json:"create_namespace" mapstructure:"create_namespace" default:"false"` + State string `json:"state" mapstructure:"state"` } -func DefaultReleaseConfig() *releaseConfig { - defaultReleaseConfig := new(releaseConfig) +func DefaultReleaseConfig() *ReleaseConfig { + defaultReleaseConfig := new(ReleaseConfig) defaults.SetDefaults(defaultReleaseConfig) return defaultReleaseConfig } type Release struct { - Config *releaseConfig + Config *ReleaseConfig Output ReleaseOutput } @@ -68,7 +69,7 @@ type ReleaseOutput struct { } // Release - creates or updates a helm release with its configs -func (p *Provider) Release(config *releaseConfig) (*Release, error) { +func (p *Provider) Release(config *ReleaseConfig) (*Release, error) { releaseExists, _ := p.resourceReleaseExists(config.Name, config.Namespace) if releaseExists { return p.update(config) @@ -76,7 +77,7 @@ func (p *Provider) Release(config *releaseConfig) (*Release, error) { return p.create(config) } -func (p *Provider) create(config *releaseConfig) (*Release, error) { +func (p *Provider) create(config *ReleaseConfig) (*Release, error) { actionConfig, err := p.getActionConfiguration(config.Namespace) if err != nil { return nil, fmt.Errorf("error while getting action configuration : %w", err) @@ -155,7 +156,7 @@ func (p *Provider) create(config *releaseConfig) (*Release, error) { }, nil } -func (p *Provider) update(config *releaseConfig) (*Release, error) { +func (p *Provider) update(config *ReleaseConfig) (*Release, error) { var rel *release.Release var err error @@ -230,7 +231,7 @@ func (p *Provider) update(config *releaseConfig) (*Release, error) { }, nil } -func (p *Provider) chartPathOptions(config *releaseConfig) (*action.ChartPathOptions, string) { +func (p *Provider) chartPathOptions(config *ReleaseConfig) (*action.ChartPathOptions, string) { repositoryURL, chartName := resolveChartName(config.Repository, strings.TrimSpace(config.Chart)) version := getVersion(config.Version) @@ -261,7 +262,7 @@ func getVersion(version string) string { return strings.TrimSpace(version) } -func (p *Provider) getChart(config *releaseConfig, name string, cpo *action.ChartPathOptions) (*chart.Chart, string, error) { +func (p *Provider) getChart(config *ReleaseConfig, name string, cpo *action.ChartPathOptions) (*chart.Chart, string, error) { // TODO: Add a lock as Load function blows up if accessed concurrently path, err := cpo.LocateChart(name, p.cliSettings) diff --git a/pkg/provider/helm/release_test.go b/plugins/providers/helm/release_test.go similarity index 100% rename from pkg/provider/helm/release_test.go rename to plugins/providers/helm/release_test.go diff --git a/pkg/provider/helm/status.go b/plugins/providers/helm/status.go similarity index 100% rename from pkg/provider/helm/status.go rename to plugins/providers/helm/status.go diff --git a/store/inmemory/module_repository.go b/store/inmemory/module_repository.go index c3a7742b..923844ad 100644 --- a/store/inmemory/module_repository.go +++ b/store/inmemory/module_repository.go @@ -17,7 +17,7 @@ func NewModuleRepository() *ModuleRepository { func (mr *ModuleRepository) Register(module domain.Module) error { if _, exists := mr.collection[module.ID()]; exists { - return store.ModuleAlreadyExistsError + return store.ErrModuleAlreadyExists } mr.collection[module.ID()] = module return nil @@ -27,5 +27,5 @@ func (mr *ModuleRepository) Get(id string) (domain.Module, error) { if module, exists := mr.collection[id]; exists { return module, nil } - return nil, store.ModuleNotFoundError + return nil, store.ErrModuleNotFound } diff --git a/store/inmemory/module_repository_test.go b/store/inmemory/module_repository_test.go index 8e53ea28..cb6fbbae 100644 --- a/store/inmemory/module_repository_test.go +++ b/store/inmemory/module_repository_test.go @@ -2,11 +2,12 @@ package inmemory import ( "errors" + "reflect" + "testing" + "github.com/odpf/entropy/domain" "github.com/odpf/entropy/modules/log" "github.com/odpf/entropy/store" - "reflect" - "testing" ) func TestModuleRepository_Get(t *testing.T) { @@ -48,7 +49,7 @@ func TestModuleRepository_Get(t *testing.T) { id: "notlog", }, want: nil, - wantErr: store.ModuleNotFoundError, + wantErr: store.ErrModuleNotFound, }, } for _, tt := range tests { @@ -102,7 +103,7 @@ func TestModuleRepository_Register(t *testing.T) { args: args{ module: mod, }, - wantErr: store.ModuleAlreadyExistsError, + wantErr: store.ErrModuleAlreadyExists, }, } for _, tt := range tests { diff --git a/store/mongodb/provider_repository.go b/store/mongodb/provider_repository.go new file mode 100644 index 00000000..e893fb77 --- /dev/null +++ b/store/mongodb/provider_repository.go @@ -0,0 +1,69 @@ +package mongodb + +import ( + "context" + "fmt" + "time" + + "github.com/odpf/entropy/store" + + "github.com/odpf/entropy/domain" + "go.mongodb.org/mongo-driver/mongo" +) + +type ProviderRepository struct { + collection *mongo.Collection +} + +func NewProviderRepository(collection *mongo.Collection) *ProviderRepository { + return &ProviderRepository{ + collection: collection, + } +} + +func (rc *ProviderRepository) Migrate() error { + return createUniqueIndex(rc.collection, "urn", 1) +} + +func (rc *ProviderRepository) Create(Provider *domain.Provider) error { + Provider.Urn = domain.GenerateProviderUrn(Provider) + Provider.CreatedAt = time.Now() + Provider.UpdatedAt = time.Now() + + _, err := rc.collection.InsertOne(context.TODO(), Provider) + if err != nil { + if mongo.IsDuplicateKeyError(err) { + return fmt.Errorf("%w: %s", store.ErrProviderAlreadyExists, err) + } + return err + } + return nil +} + +func (rc *ProviderRepository) GetByURN(urn string) (*domain.Provider, error) { + pro := &domain.Provider{} + err := rc.collection.FindOne(context.TODO(), map[string]interface{}{"urn": urn}).Decode(pro) + if err != nil { + if err == mongo.ErrNoDocuments { + return nil, fmt.Errorf("%w: %s", store.ErrProviderNotFound, err) + } + return nil, err + } + return pro, nil +} + +func (rc *ProviderRepository) List(filter map[string]string) ([]*domain.Provider, error) { + var pro []*domain.Provider + cur, err := rc.collection.Find(context.TODO(), filter) + if err != nil { + return nil, err + } + err = cur.All(context.TODO(), &pro) + if err != nil { + if err == mongo.ErrNoDocuments { + return pro, nil + } + return nil, err + } + return pro, nil +} diff --git a/store/mongodb/resource_repository.go b/store/mongodb/resource_repository.go index aaf3d80f..6f351353 100644 --- a/store/mongodb/resource_repository.go +++ b/store/mongodb/resource_repository.go @@ -45,7 +45,7 @@ func (rc *ResourceRepository) Create(resource *domain.Resource) error { _, err := rc.collection.InsertOne(context.TODO(), resource) if err != nil { if mongo.IsDuplicateKeyError(err) { - return fmt.Errorf("%w: %s", store.ResourceAlreadyExistsError, err) + return fmt.Errorf("%w: %s", store.ErrResourceAlreadyExists, err) } return err } @@ -61,7 +61,7 @@ func (rc *ResourceRepository) Update(r *domain.Resource) error { err := singleResult.Err() if err != nil { if err == mongo.ErrNoDocuments { - return fmt.Errorf("%w: urn = %s", store.ResourceNotFoundError, r.Urn) + return fmt.Errorf("%w: urn = %s", store.ErrResourceNotFound, r.Urn) } return err } @@ -73,7 +73,7 @@ func (rc *ResourceRepository) GetByURN(urn string) (*domain.Resource, error) { err := rc.collection.FindOne(context.TODO(), map[string]interface{}{"urn": urn}).Decode(res) if err != nil { if err == mongo.ErrNoDocuments { - return nil, fmt.Errorf("%w: %s", store.ResourceNotFoundError, err) + return nil, fmt.Errorf("%w: %s", store.ErrResourceNotFound, err) } return nil, err } @@ -100,7 +100,7 @@ func (rc *ResourceRepository) Delete(urn string) error { _, err := rc.collection.DeleteOne(context.TODO(), map[string]interface{}{"urn": urn}) if err != nil { if err == mongo.ErrNoDocuments { - return fmt.Errorf("%w: %s", store.ResourceNotFoundError, err) + return fmt.Errorf("%w: %s", store.ErrResourceNotFound, err) } return err } diff --git a/store/mongodb/resource_repository_test.go b/store/mongodb/resource_repository_test.go index 0d4094fc..ef1318e0 100644 --- a/store/mongodb/resource_repository_test.go +++ b/store/mongodb/resource_repository_test.go @@ -2,14 +2,15 @@ package mongodb import ( "errors" + "reflect" + "testing" + "time" + "github.com/odpf/entropy/domain" "github.com/odpf/entropy/store" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/integration/mtest" - "reflect" - "testing" - "time" ) func TestNewResourceRepository(t *testing.T) { @@ -94,7 +95,7 @@ func TestResourceRepository_Create(t *testing.T) { UpdatedAt: time.Now(), }} }, - wantErr: store.ResourceAlreadyExistsError, + wantErr: store.ErrResourceAlreadyExists, }, } for _, tt := range tests { @@ -152,7 +153,7 @@ func TestResourceRepository_GetByURN(t *testing.T) { fields: func(mt *mtest.T) fields { return fields{mt.Coll} }, args: func(mt *mtest.T) args { return args{"p-testdata-gl-unknown-log"} }, want: func(mt *mtest.T) *domain.Resource { return nil }, - wantErr: store.ResourceNotFoundError, + wantErr: store.ErrResourceNotFound, }, } for _, tt := range tests { diff --git a/store/store.go b/store/store.go index 05ae9f5e..174271b3 100644 --- a/store/store.go +++ b/store/store.go @@ -11,13 +11,16 @@ import ( // Custom errors which can be used by multiple DB vendors var ( - ResourceAlreadyExistsError = errors.New("resource already exists") - ResourceNotFoundError = errors.New("no resource(s) found") - ModuleAlreadyExistsError = errors.New("module already exists") - ModuleNotFoundError = errors.New("no module(s) found") + ErrResourceAlreadyExists = errors.New("resource already exists") + ErrResourceNotFound = errors.New("no resource(s) found") + ErrModuleAlreadyExists = errors.New("module already exists") + ErrModuleNotFound = errors.New("no module(s) found") + ErrProviderAlreadyExists = errors.New("provider already exists") + ErrProviderNotFound = errors.New("no provider(s) found") ) var ResourceRepositoryName = "resources" +var ProviderRepositoryName = "providers" type ResourceRepository interface { Create(r *domain.Resource) error @@ -28,6 +31,13 @@ type ResourceRepository interface { Delete(urn string) error } +type ProviderRepository interface { + Create(r *domain.Provider) error + GetByURN(urn string) (*domain.Provider, error) + List(filter map[string]string) ([]*domain.Provider, error) + Migrate() error +} + type ModuleRepository interface { Register(module domain.Module) error Get(id string) (domain.Module, error)