Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(notifications): isolate endpoint service #19818

Merged
merged 2 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
Expand Down Expand Up @@ -759,17 +759,16 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.boltClient)

var (
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationEndpointStore platform.NotificationEndpointService = m.kvService
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
)

tenantStore := tenant.NewStore(m.kvStore)
Expand Down Expand Up @@ -988,10 +987,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
checkSvc = middleware.NewCheckService(checkSvc, m.kvService, coordinator)
}

var notificationEndpointSvc platform.NotificationEndpointService
{
notificationEndpointSvc = endpointservice.New(endpointservice.NewStore(m.kvStore), secretSvc)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
notificationRuleSvc, err = ruleservice.New(m.log, m.kvStore, m.kvService, ts.OrganizationService, notificationEndpointSvc)
if err != nil {
return err
}
Expand Down Expand Up @@ -1145,7 +1149,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationEndpointFinder: notificationEndpointSvc,
NotificationRuleFinder: notificationRuleSvc,
}

Expand Down Expand Up @@ -1190,7 +1194,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TaskService: taskSvc,
TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UserResourceMappingService, ts.OrganizationService),
NotificationEndpointService: notificationEndpointSvc,
CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
Expand Down Expand Up @@ -1506,6 +1510,11 @@ func (m *Launcher) TaskControlService() taskbackend.TaskControlService {
return m.taskControlService
}

// CheckService returns the internal check service.
func (m *Launcher) CheckService() platform.CheckService {
return m.apibackend.CheckService
}

// KeyValueService returns the internal key-value service.
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService
Expand Down
4 changes: 0 additions & 4 deletions cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,6 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService {
return &http.BucketService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) CheckService() influxdb.CheckService {
Copy link
Contributor Author

@GeorgeMac GeorgeMac Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the reader:

You'll see this has moved up onto the Launcher for now. I have been moving more of these helpers to use the clients as opposed to in-process service implementations. I went to move this to the http.CheckService implementation. But half the methods don't actually match the influxdb.CheckService interface. They return their own local check API domain model and accept slightly different arguments.
I started down the path of fixing this client, but It started to get unwieldy. I still want to deliver that though as its own PR later. So I have made a note to create an issue and fix this.
Until then this just updates the launcher test suite to interface with the same check service installed within the launcher itself. So that consistency is at least achieved, even if it doesn't exercise the front door as we would like.

return tl.kvService
}

func (tl *TestLauncher) DashboardService(tb testing.TB) *http.DashboardService {
tb.Helper()
return &http.DashboardService{Client: tl.HTTPClient(tb)}
Expand Down
12 changes: 0 additions & 12 deletions http/notification_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type NotificationEndpointBackend struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}

// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
Expand All @@ -38,7 +37,6 @@ func NewNotificationEndpointBackend(log *zap.Logger, b *APIBackend) *Notificatio
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
}

Expand All @@ -56,7 +54,6 @@ type NotificationEndpointHandler struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}

const (
Expand All @@ -81,7 +78,6 @@ func NewNotificationEndpointHandler(log *zap.Logger, b *NotificationEndpointBack
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
h.HandlerFunc("POST", prefixNotificationEndpoints, h.handlePostNotificationEndpoint)
h.HandlerFunc("GET", prefixNotificationEndpoints, h.handleGetNotificationEndpoints)
Expand Down Expand Up @@ -551,20 +547,12 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re
// NotificationEndpointService is an http client for the influxdb.NotificationEndpointService server implementation.
type NotificationEndpointService struct {
Client *httpc.Client
*UserResourceMappingService
*OrganizationService
}

// NewNotificationEndpointService constructs a new http NotificationEndpointService.
func NewNotificationEndpointService(client *httpc.Client) *NotificationEndpointService {
return &NotificationEndpointService{
Client: client,
UserResourceMappingService: &UserResourceMappingService{
Client: client,
},
OrganizationService: &OrganizationService{
Client: client,
},
}
}

Expand Down
63 changes: 38 additions & 25 deletions http/notification_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/endpoint/service"
endpointTesting "github.com/influxdata/influxdb/v2/notification/endpoint/service/testing"
"github.com/influxdata/influxdb/v2/pkg/testttp"
"github.com/influxdata/influxdb/v2/tenant"
influxTesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
Expand All @@ -32,7 +35,6 @@ func NewMockNotificationEndpointBackend(t *testing.T) *NotificationEndpointBacke
UserResourceMappingService: mock.NewUserResourceMappingService(),
LabelService: mock.NewLabelService(),
UserService: mock.NewUserService(),
OrganizationService: mock.NewOrganizationService(),
}
}

Expand Down Expand Up @@ -489,7 +491,6 @@ func TestService_handlePostNotificationEndpoint(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
notificationEndpointBackend := NewMockNotificationEndpointBackend(t)
notificationEndpointBackend.NotificationEndpointService = tt.fields.NotificationEndpointService
notificationEndpointBackend.OrganizationService = tt.fields.OrganizationService

testttp.
PostJSON(t, prefixNotificationEndpoints, tt.args.endpoint).
Expand Down Expand Up @@ -1060,37 +1061,40 @@ func TestService_handlePostNotificationEndpointOwner(t *testing.T) {
}
}

func initNotificationEndpointService(f influxTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
func initNotificationEndpointService(f endpointTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
store := NewTestInmemStore(t)
logger := zaptest.NewLogger(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator

for _, v := range f.Orgs {
if err := svc.PutOrganization(ctx, v); err != nil {
t.Fatalf("failed to replace org: %v", err)
}
}

for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
kvSvc := kv.NewService(logger, store)
kvSvc.IDGenerator = f.IDGenerator
kvSvc.TimeGenerator = f.TimeGenerator

endpointStore := service.NewStore(store)
endpointStore.IDGenerator = f.IDGenerator
endpointStore.TimeGenerator = f.TimeGenerator
endpointService := service.New(endpointStore, kvSvc)

tenantStore := tenant.NewStore(store)
tenantService := tenant.NewService(tenantStore)

for _, o := range f.Orgs {
withOrgID(tenantStore, o.ID, func() {
if err := tenantService.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}

for _, v := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, v); err != nil {
if err := endpointStore.PutNotificationEndpoint(ctx, v); err != nil {
t.Fatalf("failed to update endpoint: %v", err)
}
}

fakeBackend := NewMockNotificationEndpointBackend(t)
fakeBackend.NotificationEndpointService = svc
fakeBackend.UserService = svc
fakeBackend.UserResourceMappingService = svc
fakeBackend.OrganizationService = svc
fakeBackend.NotificationEndpointService = endpointService
fakeBackend.UserResourceMappingService = tenantService
fakeBackend.UserService = tenantService

handler := NewNotificationEndpointHandler(zaptest.NewLogger(t), fakeBackend)
auth := func(next http.Handler) http.HandlerFunc {
Expand All @@ -1103,19 +1107,19 @@ func initNotificationEndpointService(f influxTesting.NotificationEndpointFields,
done := server.Close

client := mustNewHTTPClient(t, server.URL, "")
return NewNotificationEndpointService(client), svc, done
return NewNotificationEndpointService(client), kvSvc, done
}

func TestNotificationEndpointService(t *testing.T) {
t.Skip("wonky")

tests := []struct {
name string
testFn func(init func(influxTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
testFn func(init func(endpointTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
}{
{
name: "CreateNotificationEndpoint",
testFn: influxTesting.CreateNotificationEndpoint,
testFn: endpointTesting.CreateNotificationEndpoint,
},
}

Expand All @@ -1131,3 +1135,12 @@ func authCtxFn(userID influxdb.ID) func(context.Context) context.Context {
return pcontext.SetAuthorizer(ctx, &influxdb.Session{UserID: userID})
}
}

func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()

store.OrgIDGen = mock.NewStaticIDGenerator(orgID)

fn()
}
Loading