Skip to content

Commit

Permalink
refactor(notifications): isolate endpoint service
Browse files Browse the repository at this point in the history
Following the ongoing effort to isolate behaviours into their own
packages and off of kv.Service, this change move the notification
endpoints service implementation into its own package. It removes the
endpoint behaviors from the kv service completely.
  • Loading branch information
GeorgeMac committed Oct 27, 2020
1 parent 3d643e0 commit f0c3c82
Show file tree
Hide file tree
Showing 19 changed files with 970 additions and 1,379 deletions.
34 changes: 19 additions & 15 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
Expand Down Expand Up @@ -759,17 +759,16 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.boltClient)

var (
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationEndpointStore platform.NotificationEndpointService = m.kvService
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
)

tenantStore := tenant.NewStore(m.kvStore)
Expand Down Expand Up @@ -988,10 +987,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
checkSvc = middleware.NewCheckService(checkSvc, m.kvService, coordinator)
}

var notificationEndpointSvc platform.NotificationEndpointService
{
notificationEndpointSvc = endpointservice.New(endpointservice.NewStore(m.kvStore), secretSvc)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
notificationRuleSvc, err = ruleservice.New(m.log, m.kvStore, m.kvService, ts.OrganizationService, notificationEndpointSvc)
if err != nil {
return err
}
Expand Down Expand Up @@ -1145,7 +1149,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationEndpointFinder: notificationEndpointSvc,
NotificationRuleFinder: notificationRuleSvc,
}

Expand Down Expand Up @@ -1190,7 +1194,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TaskService: taskSvc,
TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UserResourceMappingService, ts.OrganizationService),
NotificationEndpointService: notificationEndpointSvc,
CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
Expand Down
12 changes: 0 additions & 12 deletions http/notification_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type NotificationEndpointBackend struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}

// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
Expand All @@ -38,7 +37,6 @@ func NewNotificationEndpointBackend(log *zap.Logger, b *APIBackend) *Notificatio
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
}

Expand All @@ -56,7 +54,6 @@ type NotificationEndpointHandler struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}

const (
Expand All @@ -81,7 +78,6 @@ func NewNotificationEndpointHandler(log *zap.Logger, b *NotificationEndpointBack
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
h.HandlerFunc("POST", prefixNotificationEndpoints, h.handlePostNotificationEndpoint)
h.HandlerFunc("GET", prefixNotificationEndpoints, h.handleGetNotificationEndpoints)
Expand Down Expand Up @@ -551,20 +547,12 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re
// NotificationEndpointService is an http client for the influxdb.NotificationEndpointService server implementation.
type NotificationEndpointService struct {
Client *httpc.Client
*UserResourceMappingService
*OrganizationService
}

// NewNotificationEndpointService constructs a new http NotificationEndpointService.
func NewNotificationEndpointService(client *httpc.Client) *NotificationEndpointService {
return &NotificationEndpointService{
Client: client,
UserResourceMappingService: &UserResourceMappingService{
Client: client,
},
OrganizationService: &OrganizationService{
Client: client,
},
}
}

Expand Down
63 changes: 38 additions & 25 deletions http/notification_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/endpoint/service"
endpointTesting "github.com/influxdata/influxdb/v2/notification/endpoint/service/testing"
"github.com/influxdata/influxdb/v2/pkg/testttp"
"github.com/influxdata/influxdb/v2/tenant"
influxTesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
Expand All @@ -32,7 +35,6 @@ func NewMockNotificationEndpointBackend(t *testing.T) *NotificationEndpointBacke
UserResourceMappingService: mock.NewUserResourceMappingService(),
LabelService: mock.NewLabelService(),
UserService: mock.NewUserService(),
OrganizationService: mock.NewOrganizationService(),
}
}

Expand Down Expand Up @@ -489,7 +491,6 @@ func TestService_handlePostNotificationEndpoint(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
notificationEndpointBackend := NewMockNotificationEndpointBackend(t)
notificationEndpointBackend.NotificationEndpointService = tt.fields.NotificationEndpointService
notificationEndpointBackend.OrganizationService = tt.fields.OrganizationService

testttp.
PostJSON(t, prefixNotificationEndpoints, tt.args.endpoint).
Expand Down Expand Up @@ -1060,37 +1061,40 @@ func TestService_handlePostNotificationEndpointOwner(t *testing.T) {
}
}

func initNotificationEndpointService(f influxTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
func initNotificationEndpointService(f endpointTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
store := NewTestInmemStore(t)
logger := zaptest.NewLogger(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator

for _, v := range f.Orgs {
if err := svc.PutOrganization(ctx, v); err != nil {
t.Fatalf("failed to replace org: %v", err)
}
}

for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
kvSvc := kv.NewService(logger, store)
kvSvc.IDGenerator = f.IDGenerator
kvSvc.TimeGenerator = f.TimeGenerator

endpointStore := service.NewStore(store)
endpointStore.IDGenerator = f.IDGenerator
endpointStore.TimeGenerator = f.TimeGenerator
endpointService := service.New(endpointStore, kvSvc)

tenantStore := tenant.NewStore(store)
tenantService := tenant.NewService(tenantStore)

for _, o := range f.Orgs {
withOrgID(tenantStore, o.ID, func() {
if err := tenantService.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}

for _, v := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, v); err != nil {
if err := endpointStore.PutNotificationEndpoint(ctx, v); err != nil {
t.Fatalf("failed to update endpoint: %v", err)
}
}

fakeBackend := NewMockNotificationEndpointBackend(t)
fakeBackend.NotificationEndpointService = svc
fakeBackend.UserService = svc
fakeBackend.UserResourceMappingService = svc
fakeBackend.OrganizationService = svc
fakeBackend.NotificationEndpointService = endpointService
fakeBackend.UserResourceMappingService = tenantService
fakeBackend.UserService = tenantService

handler := NewNotificationEndpointHandler(zaptest.NewLogger(t), fakeBackend)
auth := func(next http.Handler) http.HandlerFunc {
Expand All @@ -1103,19 +1107,19 @@ func initNotificationEndpointService(f influxTesting.NotificationEndpointFields,
done := server.Close

client := mustNewHTTPClient(t, server.URL, "")
return NewNotificationEndpointService(client), svc, done
return NewNotificationEndpointService(client), kvSvc, done
}

func TestNotificationEndpointService(t *testing.T) {
t.Skip("wonky")

tests := []struct {
name string
testFn func(init func(influxTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
testFn func(init func(endpointTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
}{
{
name: "CreateNotificationEndpoint",
testFn: influxTesting.CreateNotificationEndpoint,
testFn: endpointTesting.CreateNotificationEndpoint,
},
}

Expand All @@ -1131,3 +1135,12 @@ func authCtxFn(userID influxdb.ID) func(context.Context) context.Context {
return pcontext.SetAuthorizer(ctx, &influxdb.Session{UserID: userID})
}
}

func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()

store.OrgIDGen = mock.NewStaticIDGenerator(orgID)

fn()
}
38 changes: 27 additions & 11 deletions kv/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/tenant"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
Expand All @@ -15,20 +17,20 @@ func TestBoltCheckService(t *testing.T) {
influxdbtesting.CheckService(initBoltCheckService, t)
}

func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, *kv.Service, string, func()) {
func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, influxdb.UserResourceMappingService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}

svc, op, closeSvc := initCheckService(s, f, t)
return svc, svc, op, func() {
svc, urmSvc, op, closeSvc := initCheckService(s, f, t)
return svc, urmSvc, op, func() {
closeSvc()
closeBolt()
}
}

func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testing.T) (*kv.Service, string, func()) {
func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, influxdb.UserResourceMappingService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
Expand All @@ -39,15 +41,20 @@ func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testin
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}

tenantStore := tenant.NewStore(s)
tenantSvc := tenant.NewService(tenantStore)

for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
if err := tenantSvc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
withOrgID(tenantStore, o.ID, func() {
if err := tenantSvc.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}
for _, c := range f.Checks {
if err := svc.PutCheck(ctx, c); err != nil {
Expand All @@ -59,14 +66,14 @@ func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testin
t.Fatalf("failed to populate tasks: %v", err)
}
}
return svc, kv.OpPrefix, func() {
return svc, tenantSvc, kv.OpPrefix, func() {
for _, o := range f.Organizations {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
if err := tenantSvc.DeleteOrganization(ctx, o.ID); err != nil {
t.Logf("failed to remove organization: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
if err := tenantSvc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
Expand All @@ -77,3 +84,12 @@ func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testin
}
}
}

func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()

store.OrgIDGen = mock.NewStaticIDGenerator(orgID)

fn()
}
4 changes: 2 additions & 2 deletions kv/initial_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
// store base backed services
checkBucket,
checkIndexBucket,
notificationEndpointBucket,
notificationEndpointIndexBucket,
[]byte("notificationEndpointv1"),
[]byte("notificationEndpointIndexv1"),
variableBucket,
variableIndexBucket,
variableOrgsIndex,
Expand Down
Loading

0 comments on commit f0c3c82

Please sign in to comment.