Skip to content

Commit

Permalink
fix: Add a ByOrgID index to DBRP
Browse files Browse the repository at this point in the history
This commit adds a new index and migration to the DBRP service for
retrieving all database and retention policy mappings for a single
organization.

This change was required to resolve an invalid assumption of the DBRP
service, which relied on a prefix match of the byOrgAndDatabase kv.Index
when performing search operations by organization ID only.

Closes #20096
  • Loading branch information
stuartcarnie committed Nov 23, 2020
1 parent 78977eb commit 6e7a631
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 18 deletions.
20 changes: 20 additions & 0 deletions dbrp/index.go
@@ -0,0 +1,20 @@
package dbrp

import (
"encoding/json"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)

var (
ByOrgIDIndexMapping = kv.NewIndexMapping(bucket, byOrgIDIndexBucket, func(v []byte) ([]byte, error) {
var dbrp influxdb.DBRPMappingV2
if err := json.Unmarshal(v, &dbrp); err != nil {
return nil, err
}

id, _ := dbrp.OrganizationID.Encode()
return id, nil
})
)
54 changes: 40 additions & 14 deletions dbrp/service.go
Expand Up @@ -35,9 +35,10 @@ import (
)

var (
bucket = []byte("dbrpv1")
indexBucket = []byte("dbrpbyorganddbindexv1")
defaultBucket = []byte("dbrpdefaultv1")
bucket = []byte("dbrpv1")
indexBucket = []byte("dbrpbyorganddbindexv1")
byOrgIDIndexBucket = []byte("dbrpbyorgv1")
defaultBucket = []byte("dbrpdefaultv1")
)

var _ influxdb.DBRPMappingServiceV2 = (*AuthorizedService)(nil)
Expand All @@ -48,6 +49,7 @@ type Service struct {

bucketSvc influxdb.BucketService
byOrgAndDatabase *kv.Index
byOrg *kv.Index
}

func indexForeignKey(dbrp influxdb.DBRPMappingV2) []byte {
Expand All @@ -74,6 +76,7 @@ func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Sto
}
return indexForeignKey(dbrp), nil
}), kv.WithIndexReadPathEnabled),
byOrg: kv.NewIndex(ByOrgIDIndexMapping, kv.WithIndexReadPathEnabled),
}
}

Expand Down Expand Up @@ -277,17 +280,17 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
err := s.store.View(ctx, func(tx kv.Tx) error {
// Optimized path, use index.
if orgID := filter.OrgID; orgID != nil {
// The index performs a prefix search.
// The foreign key is `orgID + db`.
// If you want to look by orgID only, just pass orgID as prefix.
db := ""
if filter.Database != nil {
var (
db = ""
compKey []byte
index *kv.Index
)
if filter.Database != nil && len(*filter.Database) > 0 {
db = *filter.Database
}
compKey := composeForeignKey(*orgID, db)
if len(db) > 0 {
// Even more optimized, looking for the default given an orgID and database.
// No walking index needed.
compKey = composeForeignKey(*orgID, db)
index = s.byOrgAndDatabase

// Filtering by Org, Database and Default == true
if def := filter.Default; def != nil && *def {
defID, err := s.getDefault(tx, compKey)
if kv.IsNotFound(err) {
Expand All @@ -307,8 +310,12 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
_, err = add(tx)(defID, v)
return err
}
} else {
compKey, _ = orgID.Encode()
index = s.byOrg
}
return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx))

return index.Walk(ctx, tx, compKey, add(tx))
}
bucket, err := tx.Bucket(bucket)
if err != nil {
Expand Down Expand Up @@ -359,15 +366,25 @@ func (s *Service) Create(ctx context.Context, dbrp *influxdb.DBRPMappingV2) erro
return ErrInvalidDBRPID
}

// OrganizationID has been validated by Validate
orgID, _ := dbrp.OrganizationID.Encode()

return s.store.Update(ctx, func(tx kv.Tx) error {
bucket, err := tx.Bucket(bucket)
if err != nil {
return ErrInternalService(err)
}

// populate indices
compKey := indexForeignKey(*dbrp)
if err := s.byOrgAndDatabase.Insert(tx, compKey, encodedID); err != nil {
return err
}

if err := s.byOrg.Insert(tx, orgID, encodedID); err != nil {
return err
}

defSet, err := s.isDefaultSet(tx, compKey)
if err != nil {
return err
Expand Down Expand Up @@ -463,6 +480,12 @@ func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error {
if err != nil {
return ErrInternalService(err)
}

encodedOrgID, err := id.Encode()
if err != nil {
return ErrInternalService(err)
}

return s.store.Update(ctx, func(tx kv.Tx) error {
bucket, err := tx.Bucket(bucket)
if err != nil {
Expand All @@ -475,6 +498,9 @@ func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error {
if err := s.byOrgAndDatabase.Delete(tx, compKey, encodedID); err != nil {
return ErrInternalService(err)
}
if err := s.byOrg.Delete(tx, encodedOrgID, encodedID); err != nil {
return ErrInternalService(err)
}
// If this was the default, we need to set a new default.
var derr error
if dbrp.Default {
Expand Down
8 changes: 8 additions & 0 deletions kv/migration/all/0012_dbrp_by_org_index.go
@@ -0,0 +1,8 @@
package all

import (
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/kv"
)

var Migration0012_DBRPByOrgIndex = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping, kv.WithIndexMigrationCleanup)
2 changes: 2 additions & 0 deletions kv/migration/all/all.go
Expand Up @@ -29,5 +29,7 @@ var Migrations = [...]migration.Spec{
Migration0010_AddIndexTelegrafByOrg,
// populate dashboards owner id
Migration0011_PopulateDashboardsOwnerId,
// Populate the DBRP service ByOrg index
Migration0012_DBRPByOrgIndex,
// {{ do_not_edit . }}
}
8 changes: 4 additions & 4 deletions testing/dbrp_mapping_v2.go
Expand Up @@ -735,23 +735,23 @@ func FindManyDBRPMappingsV2(
fields: DBRPMappingFieldsV2{
DBRPMappingsV2: []*influxdb.DBRPMappingV2{
{
ID: 100,
ID: MustIDBase16("0000000000000100"),
Database: "database",
RetentionPolicy: "retention_policyA",
Default: false,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: MustIDBase16(dbrpBucketAID),
},
{
ID: 200,
ID: MustIDBase16("0000000000000200"),
Database: "database",
RetentionPolicy: "retention_policyB",
Default: true,
OrganizationID: MustIDBase16(dbrpOrg3ID),
BucketID: MustIDBase16(dbrpBucketBID),
},
{
ID: 300,
ID: MustIDBase16("0000000000000300"),
Database: "database",
RetentionPolicy: "retention_policyB",
Default: true,
Expand All @@ -770,7 +770,7 @@ func FindManyDBRPMappingsV2(
wants: wants{
dbrpMappings: []*influxdb.DBRPMappingV2{
{
ID: 200,
ID: MustIDBase16("0000000000000200"),
Database: "database",
RetentionPolicy: "retention_policyB",
Default: true,
Expand Down

0 comments on commit 6e7a631

Please sign in to comment.