Skip to content

Commit

Permalink
refactor(notifications): isolate endpoint service
Browse files Browse the repository at this point in the history
Following the ongoing effort to isolate behaviours into their own
packages and off of kv.Service, this change move the notification
endpoints service implementation into its own package. It removes the
endpoint behaviors from the kv service completely.
  • Loading branch information
GeorgeMac committed Oct 26, 2020
1 parent 764c8a5 commit edf0d1b
Show file tree
Hide file tree
Showing 15 changed files with 911 additions and 1,336 deletions.
34 changes: 19 additions & 15 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
Expand Down Expand Up @@ -759,17 +759,16 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.boltClient)

var (
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationEndpointStore platform.NotificationEndpointService = m.kvService
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
)

tenantStore := tenant.NewStore(m.kvStore)
Expand Down Expand Up @@ -988,10 +987,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
checkSvc = middleware.NewCheckService(checkSvc, m.kvService, coordinator)
}

var notificationEndpointSvc platform.NotificationEndpointService
{
notificationEndpointSvc = endpointservice.New(endpointservice.NewStore(m.kvStore), secretSvc)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, notificationEndpointSvc)
if err != nil {
return err
}
Expand Down Expand Up @@ -1145,7 +1149,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationEndpointFinder: notificationEndpointSvc,
NotificationRuleFinder: notificationRuleSvc,
}

Expand Down Expand Up @@ -1190,7 +1194,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TaskService: taskSvc,
TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UserResourceMappingService, ts.OrganizationService),
NotificationEndpointService: notificationEndpointSvc,
CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
Expand Down
38 changes: 27 additions & 11 deletions kv/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/tenant"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
Expand All @@ -15,20 +17,20 @@ func TestBoltCheckService(t *testing.T) {
influxdbtesting.CheckService(initBoltCheckService, t)
}

func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, *kv.Service, string, func()) {
func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, influxdb.UserResourceMappingService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}

svc, op, closeSvc := initCheckService(s, f, t)
return svc, svc, op, func() {
svc, urmSvc, op, closeSvc := initCheckService(s, f, t)
return svc, urmSvc, op, func() {
closeSvc()
closeBolt()
}
}

func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testing.T) (*kv.Service, string, func()) {
func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, influxdb.UserResourceMappingService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
Expand All @@ -39,15 +41,20 @@ func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testin
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}

tenantStore := tenant.NewStore(s)
tenantSvc := tenant.NewService(tenantStore)

for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
if err := tenantSvc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
withOrgID(tenantStore, o.ID, func() {
if err := tenantSvc.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}
for _, c := range f.Checks {
if err := svc.PutCheck(ctx, c); err != nil {
Expand All @@ -59,14 +66,14 @@ func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testin
t.Fatalf("failed to populate tasks: %v", err)
}
}
return svc, kv.OpPrefix, func() {
return svc, tenantSvc, kv.OpPrefix, func() {
for _, o := range f.Organizations {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
if err := tenantSvc.DeleteOrganization(ctx, o.ID); err != nil {
t.Logf("failed to remove organization: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
if err := tenantSvc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
Expand All @@ -77,3 +84,12 @@ func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testin
}
}
}

func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()

store.OrgIDGen = mock.NewStaticIDGenerator(orgID)

fn()
}
4 changes: 2 additions & 2 deletions kv/initial_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
// store base backed services
checkBucket,
checkIndexBucket,
notificationEndpointBucket,
notificationEndpointIndexBucket,
[]byte("notificationEndpointv1"),
[]byte("notificationEndpointIndexv1"),
variableBucket,
variableIndexBucket,
variableOrgsIndex,
Expand Down
Loading

0 comments on commit edf0d1b

Please sign in to comment.