Skip to content

Commit

Permalink
refactor(notifications): isolate endpoint service (#19818)
Browse files Browse the repository at this point in the history
* refactor(notifications): isolate endpoint service

Following the ongoing effort to isolate behaviours into their own
packages and off of kv.Service, this change move the notification
endpoints service implementation into its own package. It removes the
endpoint behaviors from the kv service completely.

* chore(influxd): wire up the isolated check service in place of kv service
  • Loading branch information
GeorgeMac committed Oct 28, 2020
1 parent 9dd37a1 commit 78cafa8
Show file tree
Hide file tree
Showing 21 changed files with 967 additions and 3,878 deletions.
39 changes: 24 additions & 15 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
Expand Down Expand Up @@ -760,17 +760,16 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.boltClient)

var (
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationEndpointStore platform.NotificationEndpointService = m.kvService
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
)

tenantStore := tenant.NewStore(m.kvStore)
Expand Down Expand Up @@ -989,10 +988,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
checkSvc = middleware.NewCheckService(checkSvc, m.kvService, coordinator)
}

var notificationEndpointSvc platform.NotificationEndpointService
{
notificationEndpointSvc = endpointservice.New(endpointservice.NewStore(m.kvStore), secretSvc)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
notificationRuleSvc, err = ruleservice.New(m.log, m.kvStore, m.kvService, ts.OrganizationService, notificationEndpointSvc)
if err != nil {
return err
}
Expand Down Expand Up @@ -1146,7 +1150,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationEndpointFinder: notificationEndpointSvc,
NotificationRuleFinder: notificationRuleSvc,
}

Expand Down Expand Up @@ -1191,7 +1195,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TaskService: taskSvc,
TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UserResourceMappingService, ts.OrganizationService),
NotificationEndpointService: notificationEndpointSvc,
CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
Expand Down Expand Up @@ -1529,6 +1533,11 @@ func (m *Launcher) TaskControlService() taskbackend.TaskControlService {
return m.taskControlService
}

// CheckService returns the internal check service.
func (m *Launcher) CheckService() platform.CheckService {
return m.apibackend.CheckService
}

// KeyValueService returns the internal key-value service.
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService
Expand Down
4 changes: 0 additions & 4 deletions cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,6 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService {
return &http.BucketService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) CheckService() influxdb.CheckService {
return tl.kvService
}

func (tl *TestLauncher) DashboardService(tb testing.TB) *http.DashboardService {
tb.Helper()
return &http.DashboardService{Client: tl.HTTPClient(tb)}
Expand Down
12 changes: 0 additions & 12 deletions http/notification_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type NotificationEndpointBackend struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}

// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
Expand All @@ -38,7 +37,6 @@ func NewNotificationEndpointBackend(log *zap.Logger, b *APIBackend) *Notificatio
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
}

Expand All @@ -56,7 +54,6 @@ type NotificationEndpointHandler struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}

const (
Expand All @@ -81,7 +78,6 @@ func NewNotificationEndpointHandler(log *zap.Logger, b *NotificationEndpointBack
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
h.HandlerFunc("POST", prefixNotificationEndpoints, h.handlePostNotificationEndpoint)
h.HandlerFunc("GET", prefixNotificationEndpoints, h.handleGetNotificationEndpoints)
Expand Down Expand Up @@ -551,20 +547,12 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re
// NotificationEndpointService is an http client for the influxdb.NotificationEndpointService server implementation.
type NotificationEndpointService struct {
Client *httpc.Client
*UserResourceMappingService
*OrganizationService
}

// NewNotificationEndpointService constructs a new http NotificationEndpointService.
func NewNotificationEndpointService(client *httpc.Client) *NotificationEndpointService {
return &NotificationEndpointService{
Client: client,
UserResourceMappingService: &UserResourceMappingService{
Client: client,
},
OrganizationService: &OrganizationService{
Client: client,
},
}
}

Expand Down
63 changes: 38 additions & 25 deletions http/notification_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/endpoint/service"
endpointTesting "github.com/influxdata/influxdb/v2/notification/endpoint/service/testing"
"github.com/influxdata/influxdb/v2/pkg/testttp"
"github.com/influxdata/influxdb/v2/tenant"
influxTesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
Expand All @@ -32,7 +35,6 @@ func NewMockNotificationEndpointBackend(t *testing.T) *NotificationEndpointBacke
UserResourceMappingService: mock.NewUserResourceMappingService(),
LabelService: mock.NewLabelService(),
UserService: mock.NewUserService(),
OrganizationService: mock.NewOrganizationService(),
}
}

Expand Down Expand Up @@ -489,7 +491,6 @@ func TestService_handlePostNotificationEndpoint(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
notificationEndpointBackend := NewMockNotificationEndpointBackend(t)
notificationEndpointBackend.NotificationEndpointService = tt.fields.NotificationEndpointService
notificationEndpointBackend.OrganizationService = tt.fields.OrganizationService

testttp.
PostJSON(t, prefixNotificationEndpoints, tt.args.endpoint).
Expand Down Expand Up @@ -1060,37 +1061,40 @@ func TestService_handlePostNotificationEndpointOwner(t *testing.T) {
}
}

func initNotificationEndpointService(f influxTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
func initNotificationEndpointService(f endpointTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
store := NewTestInmemStore(t)
logger := zaptest.NewLogger(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator

for _, v := range f.Orgs {
if err := svc.PutOrganization(ctx, v); err != nil {
t.Fatalf("failed to replace org: %v", err)
}
}

for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
kvSvc := kv.NewService(logger, store)
kvSvc.IDGenerator = f.IDGenerator
kvSvc.TimeGenerator = f.TimeGenerator

endpointStore := service.NewStore(store)
endpointStore.IDGenerator = f.IDGenerator
endpointStore.TimeGenerator = f.TimeGenerator
endpointService := service.New(endpointStore, kvSvc)

tenantStore := tenant.NewStore(store)
tenantService := tenant.NewService(tenantStore)

for _, o := range f.Orgs {
withOrgID(tenantStore, o.ID, func() {
if err := tenantService.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}

for _, v := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, v); err != nil {
if err := endpointStore.PutNotificationEndpoint(ctx, v); err != nil {
t.Fatalf("failed to update endpoint: %v", err)
}
}

fakeBackend := NewMockNotificationEndpointBackend(t)
fakeBackend.NotificationEndpointService = svc
fakeBackend.UserService = svc
fakeBackend.UserResourceMappingService = svc
fakeBackend.OrganizationService = svc
fakeBackend.NotificationEndpointService = endpointService
fakeBackend.UserResourceMappingService = tenantService
fakeBackend.UserService = tenantService

handler := NewNotificationEndpointHandler(zaptest.NewLogger(t), fakeBackend)
auth := func(next http.Handler) http.HandlerFunc {
Expand All @@ -1103,19 +1107,19 @@ func initNotificationEndpointService(f influxTesting.NotificationEndpointFields,
done := server.Close

client := mustNewHTTPClient(t, server.URL, "")
return NewNotificationEndpointService(client), svc, done
return NewNotificationEndpointService(client), kvSvc, done
}

func TestNotificationEndpointService(t *testing.T) {
t.Skip("wonky")

tests := []struct {
name string
testFn func(init func(influxTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
testFn func(init func(endpointTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
}{
{
name: "CreateNotificationEndpoint",
testFn: influxTesting.CreateNotificationEndpoint,
testFn: endpointTesting.CreateNotificationEndpoint,
},
}

Expand All @@ -1131,3 +1135,12 @@ func authCtxFn(userID influxdb.ID) func(context.Context) context.Context {
return pcontext.SetAuthorizer(ctx, &influxdb.Session{UserID: userID})
}
}

func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()

store.OrgIDGen = mock.NewStaticIDGenerator(orgID)

fn()
}
Loading

0 comments on commit 78cafa8

Please sign in to comment.