Skip to content

Commit

Permalink
Merge branch 'master' into fix/telegrafs-bucket-sort
Browse files Browse the repository at this point in the history
  • Loading branch information
asalem1 committed Oct 22, 2019
2 parents 6615b84 + 1e69c51 commit 07d8df2
Show file tree
Hide file tree
Showing 24 changed files with 568 additions and 400 deletions.
21 changes: 19 additions & 2 deletions authorizer/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in
return b, nil
}

// FindBucketByName returns a bucket by name for a particular organization.
func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, n string) (*influxdb.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

b, err := s.s.FindBucketByName(ctx, orgID, n)
if err != nil {
return nil, err
}

if err := authorizeReadBucket(ctx, b.OrgID, b.ID); err != nil {
return nil, err
}

return b, nil
}

// FindBucket retrieves the bucket and checks to see if the authorizer on context has read access to the bucket.
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
Expand Down Expand Up @@ -105,8 +122,8 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
buckets := bs[:0]
for _, b := range bs {
// temporary hack for system buckets
if b.IsSystem() {
// HACK: remove once system buckets are migrated away from hard coded values
if b.Type == influxdb.BucketTypeSystem {
buckets = append(buckets, b)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion bolt/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestID(t *testing.T) {
}
defer closeFn()

c.IDGenerator = mock.NewIDGenerator(testIDStr, t)
c.IDGenerator = mock.NewIDGenerator(testID.String(), t)

if err := c.Open(context.Background()); err != nil {
t.Fatalf("failed to open bolt client: %v", err)
Expand Down
34 changes: 21 additions & 13 deletions bolt/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

var (
testID = platform.ID(1)
testIDStr = testID.String()
testID = platform.ID(70000)
existingBucketID = platform.ID(mock.FirstMockID + 1)
firstMockID = platform.ID(mock.FirstMockID)
)

func TestClient_Name(t *testing.T) {
Expand Down Expand Up @@ -71,15 +72,21 @@ func TestClient_Name(t *testing.T) {
args: args{
resource: platform.Resource{
Type: platform.BucketsResourceType,
ID: platformtesting.IDPtr(testID),
ID: &existingBucketID,
},
init: func(ctx context.Context, s *bolt.Client) error {
_ = s.CreateOrganization(ctx, &platform.Organization{
o := platform.Organization{
Name: "o1",
})
}

err := s.CreateOrganization(ctx, &o)
if err != nil {
return err
}

return s.CreateBucket(ctx, &platform.Bucket{
Name: "b1",
OrgID: testID,
OrgID: platform.ID(mock.FirstMockID),
})
},
},
Expand All @@ -100,7 +107,7 @@ func TestClient_Name(t *testing.T) {
args: args{
resource: platform.Resource{
Type: platform.DashboardsResourceType,
ID: platformtesting.IDPtr(testID),
ID: &firstMockID,
},
init: func(ctx context.Context, s *bolt.Client) error {
return s.CreateDashboard(ctx, &platform.Dashboard{
Expand All @@ -126,7 +133,7 @@ func TestClient_Name(t *testing.T) {
args: args{
resource: platform.Resource{
Type: platform.OrgsResourceType,
ID: platformtesting.IDPtr(testID),
ID: &firstMockID,
},
init: func(ctx context.Context, s *bolt.Client) error {
return s.CreateOrganization(ctx, &platform.Organization{
Expand All @@ -151,7 +158,7 @@ func TestClient_Name(t *testing.T) {
args: args{
resource: platform.Resource{
Type: platform.SourcesResourceType,
ID: platformtesting.IDPtr(testID),
ID: &firstMockID,
},
init: func(ctx context.Context, s *bolt.Client) error {
return s.CreateSource(ctx, &platform.Source{
Expand All @@ -176,7 +183,7 @@ func TestClient_Name(t *testing.T) {
args: args{
resource: platform.Resource{
Type: platform.TelegrafsResourceType,
ID: platformtesting.IDPtr(testID),
ID: &firstMockID,
},
init: func(ctx context.Context, s *bolt.Client) error {
return s.CreateTelegrafConfig(ctx, &platform.TelegrafConfig{
Expand All @@ -202,7 +209,7 @@ func TestClient_Name(t *testing.T) {
args: args{
resource: platform.Resource{
Type: platform.UsersResourceType,
ID: platformtesting.IDPtr(testID),
ID: &firstMockID,
},
init: func(ctx context.Context, s *bolt.Client) error {
return s.CreateUser(ctx, &platform.User{
Expand Down Expand Up @@ -230,8 +237,8 @@ func TestClient_Name(t *testing.T) {
t.Fatalf("unable to create bolt test client: %v", err)
}
defer done()

c.IDGenerator = mock.NewIDGenerator(testIDStr, t)
mockIDGen := mock.NewMockIDGenerator()
c.IDGenerator = mockIDGen
ctx := context.Background()
if tt.args.init != nil {
if err := tt.args.init(ctx, c); err != nil {
Expand All @@ -242,6 +249,7 @@ func TestClient_Name(t *testing.T) {
if tt.args.resource.ID != nil {
id = *tt.args.resource.ID
}

got, err := c.Name(ctx, tt.args.resource.Type, id)
if (err != nil) != tt.wantErr {
t.Errorf("Service.Name() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
56 changes: 48 additions & 8 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package influxdb

import (
"context"
"fmt"
"strings"
"time"
)

// TasksSystemBucketID and MonitoringSystemBucketID are IDs that are reserved for system buckets.
// If any system bucket IDs are added, Bucket.IsSystem must be updated to include them.
const (
// TasksSystemBucketID is the fixed ID for our tasks system bucket
TasksSystemBucketID = ID(10)
Expand All @@ -18,6 +17,16 @@ const (
BucketTypeUser = BucketType(0)
// BucketTypeSystem is an internally created bucket that cannot be deleted/renamed.
BucketTypeSystem = BucketType(1)
// MonitoringSystemBucketRetention is the time we should retain monitoring system bucket information
MonitoringSystemBucketRetention = time.Hour * 24 * 7
// TasksSystemBucketRetention is the time we should retain task system bucket information
TasksSystemBucketRetention = time.Hour * 24 * 3
)

// Bucket names constants
const (
TasksSystemBucketName = "_tasks"
MonitoringSystemBucketName = "_monitoring"
)

// InfiniteRetention is default infinite retention period.
Expand Down Expand Up @@ -46,12 +55,6 @@ func (bt BucketType) String() string {
return "user"
}

// TODO(jade): move this logic to a type set directly on Bucket.
// IsSystem returns true if a bucket is a known system bucket
func (b *Bucket) IsSystem() bool {
return b.ID == TasksSystemBucketID || b.ID == MonitoringSystemBucketID
}

// ops for buckets error and buckets op logs.
var (
OpFindBucketByID = "FindBucketByID"
Expand Down Expand Up @@ -83,6 +86,7 @@ type BucketService interface {

// DeleteBucket removes a bucket by ID.
DeleteBucket(ctx context.Context, id ID) error
FindBucketByName(ctx context.Context, orgID ID, name string) (*Bucket, error)
}

// BucketUpdate represents updates to a bucket.
Expand Down Expand Up @@ -142,3 +146,39 @@ func (f BucketFilter) String() string {
}
return "[" + strings.Join(parts, ", ") + "]"
}

// FindSystemBucket finds the system bucket with a given name
func FindSystemBucket(ctx context.Context, bs BucketService, orgID ID, name string) (*Bucket, error) {
bucket, err := bs.FindBucketByName(ctx, orgID, name)
if err != nil {
return nil, err
}

if bucket != nil {
return bucket, nil
}

switch name {
case TasksSystemBucketName:
return &Bucket{
ID: TasksSystemBucketID,
Type: BucketTypeSystem,
Name: TasksSystemBucketName,
RetentionPeriod: TasksSystemBucketRetention,
Description: "System bucket for task logs",
}, nil
case MonitoringSystemBucketName:
return &Bucket{
ID: MonitoringSystemBucketID,
Type: BucketTypeSystem,
Name: MonitoringSystemBucketName,
RetentionPeriod: MonitoringSystemBucketRetention,
Description: "System bucket for monitoring logs",
}, nil
default:
return nil, &Error{
Code: ENotFound,
Msg: fmt.Sprintf("system bucket %q not found", name),
}
}
}
16 changes: 11 additions & 5 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
},
{
DestP: &vaultConfig.Address,
Flag: "vault-address",
Flag: "vault-addr",
Desc: "address of the Vault server expressed as a URL and port, for example: https://127.0.0.1:8200/.",
},
{
Expand All @@ -206,17 +206,17 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
},
{
DestP: &vaultConfig.MaxRetries,
Flag: "vault-client-max-retries",
Flag: "vault-max-retries",
Desc: "maximum number of retries when a 5xx error code is encountered. The default is 2, for three total attempts. Set this to 0 or less to disable retrying.",
},
{
DestP: &vaultConfig.CACert,
Flag: "vault-ca-cert",
Flag: "vault-cacert",
Desc: "path to a PEM-encoded CA certificate file on the local disk. This file is used to verify the Vault server's SSL certificate. This environment variable takes precedence over VAULT_CAPATH.",
},
{
DestP: &vaultConfig.CAPath,
Flag: "vault-ca-path",
Flag: "vault-capath",
Desc: "path to a directory of PEM-encoded CA certificate files on the local disk. These certificates are used to verify the Vault server's SSL certificate.",
},
{
Expand All @@ -239,6 +239,11 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
Flag: "vault-tls-server-name",
Desc: "name to use as the SNI host when connecting via TLS.",
},
{
DestP: &vaultConfig.Token,
Flag: "vault-token",
Desc: "vault authentication token",
},
{
DestP: &l.httpTlsCert,
Flag: "tls-cert",
Expand Down Expand Up @@ -616,7 +621,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// validation(coordinator(analyticalstore(kv.Service)))

// define the executor and build analytical storage middleware
combinedTaskService := taskbackend.NewAnalyticalStorage(m.logger.With(zap.String("service", "task-analytical-store")), m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController})
combinedTaskService := taskbackend.NewAnalyticalStorage(
m.logger.With(zap.String("service", "task-analytical-store")), m.kvService, m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController})
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, combinedTaskService)

// create the scheduler
Expand Down
Loading

0 comments on commit 07d8df2

Please sign in to comment.