diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 8bddc329d39..94e70e5d43e 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -925,7 +925,13 @@ func (m *Launcher) run(ctx context.Context) (err error) { var sessionSvc platform.SessionService { - sessionSvc = session.NewService(session.NewStorage(inmem.NewSessionStore()), userSvc, userResourceSvc, authSvc, time.Duration(m.sessionLength)*time.Minute) + sessionSvc = session.NewService( + session.NewStorage(inmem.NewSessionStore()), + userSvc, + userResourceSvc, + authSvc, + session.WithSessionLength(time.Duration(m.sessionLength)*time.Minute), + ) sessionSvc = session.NewSessionMetrics(m.reg, sessionSvc) sessionSvc = session.NewSessionLogger(m.log.With(zap.String("service", "session")), sessionSvc) } diff --git a/kv/initial_migration.go b/kv/initial_migration.go index e273bfad698..4fa5ac78635 100644 --- a/kv/initial_migration.go +++ b/kv/initial_migration.go @@ -43,7 +43,6 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error { userpasswordBucket, scrapersBucket, secretBucket, - sessionBucket, telegrafBucket, telegrafPluginsBucket, urmBucket, @@ -62,6 +61,8 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error { variableBucket, variableIndexBucket, variableOrgsIndex, + // deprecated: removed in later migration + []byte("sessionsv1"), } { if err := store.CreateBucket(ctx, bucket); err != nil { return err diff --git a/kv/migration/all/0006_delete-bucket-sessionsv1.go b/kv/migration/all/0006_delete-bucket-sessionsv1.go new file mode 100644 index 00000000000..8d83ea9af00 --- /dev/null +++ b/kv/migration/all/0006_delete-bucket-sessionsv1.go @@ -0,0 +1,7 @@ +package all + +import "github.com/influxdata/influxdb/v2/kv/migration" + +// Migration0006_DeleteBucketSessionsv1 removes the sessionsv1 bucket +// from the backing kv store. +var Migration0006_DeleteBucketSessionsv1 = migration.DeleteBuckets("delete sessionsv1 bucket", []byte("sessionsv1")) diff --git a/kv/migration/all/all.go b/kv/migration/all/all.go index 86b30989d43..55336329166 100644 --- a/kv/migration/all/all.go +++ b/kv/migration/all/all.go @@ -17,5 +17,7 @@ var Migrations = [...]migration.Spec{ Migration0004_AddDbrpBuckets, // add pkger buckets Migration0005_AddPkgerBuckets, + // delete bucket sessionsv1 + Migration0006_DeleteBucketSessionsv1, // {{ do_not_edit . }} } diff --git a/kv/migration/buckets.go b/kv/migration/buckets.go index 54424414d70..f0098e6946f 100644 --- a/kv/migration/buckets.go +++ b/kv/migration/buckets.go @@ -6,41 +6,78 @@ import ( "github.com/influxdata/influxdb/v2/kv" ) -// CreateBucketsMigration is a migration Spec which creates +type bucketMigrationType string + +const ( + createBucketMigration = bucketMigrationType("create") + deleteBucketMigration = bucketMigrationType("delete") +) + +// BucketsMigration is a migration Spec which creates // the provided list of buckets on a store when Up is called // and deletes them on Down. -type CreateBucketsMigration struct { +type BucketsMigration struct { + typ bucketMigrationType name string buckets [][]byte } -// CreateBuckets returns a new CreateBucketsMigration Spec. -func CreateBuckets(name string, buckets ...[]byte) Spec { - return CreateBucketsMigration{name, buckets} +// CreateBuckets returns a new BucketsMigration Spec. +func CreateBuckets(name string, bucket []byte, extraBuckets ...[]byte) Spec { + buckets := append([][]byte{bucket}, extraBuckets...) + return BucketsMigration{createBucketMigration, name, buckets} +} + +// DeleteBuckets returns a new BucketsMigration Spec. +func DeleteBuckets(name string, bucket []byte, extraBuckets ...[]byte) Spec { + buckets := append([][]byte{bucket}, extraBuckets...) + return BucketsMigration{deleteBucketMigration, name, buckets} } // MigrationName returns the name of the migration. -func (c CreateBucketsMigration) MigrationName() string { - return c.name +func (m BucketsMigration) MigrationName() string { + return m.name } // Up creates the buckets on the store. -func (c CreateBucketsMigration) Up(ctx context.Context, store kv.SchemaStore) error { - for _, bucket := range c.buckets { - if err := store.CreateBucket(ctx, bucket); err != nil { +func (m BucketsMigration) Up(ctx context.Context, store kv.SchemaStore) error { + var fn func(context.Context, []byte) error + switch m.typ { + case createBucketMigration: + fn = store.CreateBucket + case deleteBucketMigration: + fn = store.DeleteBucket + default: + panic("unrecognized buckets migration type") + } + + for _, bucket := range m.buckets { + if err := fn(ctx, bucket); err != nil { return err } + } return nil } // Down delets the buckets on the store. -func (c CreateBucketsMigration) Down(ctx context.Context, store kv.SchemaStore) error { - for _, bucket := range c.buckets { - if err := store.DeleteBucket(ctx, bucket); err != nil { +func (m BucketsMigration) Down(ctx context.Context, store kv.SchemaStore) error { + var fn func(context.Context, []byte) error + switch m.typ { + case createBucketMigration: + fn = store.DeleteBucket + case deleteBucketMigration: + fn = store.CreateBucket + default: + panic("unrecognized buckets migration type") + } + + for _, bucket := range m.buckets { + if err := fn(ctx, bucket); err != nil { return err } + } return nil diff --git a/kv/migration/buckets_test.go b/kv/migration/buckets_test.go new file mode 100644 index 00000000000..ae8017098b0 --- /dev/null +++ b/kv/migration/buckets_test.go @@ -0,0 +1,97 @@ +package migration + +import ( + "context" + "errors" + "testing" + + "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kv" +) + +func Test_BucketsMigration_CreateBuckets(t *testing.T) { + var ( + ctx = context.Background() + bucket = []byte("some_bucket") + store = inmem.NewKVStore() + ) + + // bucket should not exist + bucketShouldNotExist(t, store, bucket) + + // build new create buckets migration + migration := CreateBuckets("create bucket some_bucket", bucket) + + // apply migration up + if err := migration.Up(ctx, store); err != nil { + t.Fatal("unexpected error", err) + } + + // bucket should now exist + bucketShouldExist(t, store, bucket) + + // apply migration down + if err := migration.Down(ctx, store); err != nil { + t.Fatal("unexpected error", err) + } + + // bucket should no longer exist + bucketShouldNotExist(t, store, bucket) +} + +func Test_BucketsMigration_DeleteBuckets(t *testing.T) { + var ( + ctx = context.Background() + bucket = []byte("some_bucket") + store = inmem.NewKVStore() + ) + + // initially create bucket + if err := store.CreateBucket(ctx, bucket); err != nil { + t.Fatal("unexpected error", err) + } + + // ensure bucket is there to start with + bucketShouldExist(t, store, bucket) + + // build new delete buckets migration + migration := DeleteBuckets("delete bucket some_bucket", bucket) + + // apply migration up + if err := migration.Up(ctx, store); err != nil { + t.Fatal("unexpected error", err) + } + + // bucket should have been removed + bucketShouldNotExist(t, store, bucket) + + // apply migration down + if err := migration.Down(ctx, store); err != nil { + t.Fatal("unexpected error", err) + } + + // bucket should exist again + bucketShouldExist(t, store, bucket) +} + +func bucketShouldExist(t *testing.T, store kv.Store, bucket []byte) { + t.Helper() + + if err := store.View(context.Background(), func(tx kv.Tx) error { + _, err := tx.Bucket(bucket) + return err + }); err != nil { + t.Fatal("unexpected error", err) + } +} + +func bucketShouldNotExist(t *testing.T, store kv.Store, bucket []byte) { + t.Helper() + + if err := store.View(context.Background(), func(tx kv.Tx) error { + _, err := tx.Bucket(bucket) + return err + }); !errors.Is(err, kv.ErrBucketNotFound) { + t.Fatalf("expected bucket not found, got %q", err) + } +} diff --git a/kv/session.go b/kv/session.go deleted file mode 100644 index e43bd877830..00000000000 --- a/kv/session.go +++ /dev/null @@ -1,250 +0,0 @@ -package kv - -import ( - "context" - "encoding/json" - "time" - - "github.com/influxdata/influxdb/v2" -) - -var ( - sessionBucket = []byte("sessionsv1") -) - -var _ influxdb.SessionService = (*Service)(nil) - -// RenewSession extends the expire time to newExpiration. -func (s *Service) RenewSession(ctx context.Context, session *influxdb.Session, newExpiration time.Time) error { - if session == nil { - return &influxdb.Error{ - Msg: "session is nil", - } - } - - // session already has longer expiration - if newExpiration.Before(session.ExpiresAt) { - return nil - } - - return s.kv.Update(ctx, func(tx Tx) error { - sess, err := s.findSession(ctx, tx, session.Key) - if err != nil { - return err - } - - // session already has longer expiration - if newExpiration.Before(session.ExpiresAt) { - return nil - } - - sess.ExpiresAt = newExpiration - - if err := s.putSession(ctx, tx, sess); err != nil { - return &influxdb.Error{ - Err: err, - } - } - - *session = *sess - - return nil - }) -} - -// FindSession retrieves the session found at the provided key. -func (s *Service) FindSession(ctx context.Context, key string) (*influxdb.Session, error) { - var sess *influxdb.Session - err := s.kv.View(ctx, func(tx Tx) error { - s, err := s.findSession(ctx, tx, key) - if err != nil { - return err - } - - sess = s - return nil - }) - - if err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - - if err := sess.Expired(); err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - return sess, nil -} - -func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb.Session, error) { - b, err := tx.Bucket(sessionBucket) - if err != nil { - return nil, err - } - - v, err := b.Get([]byte(key)) - if IsNotFound(err) { - return nil, &influxdb.Error{ - Code: influxdb.ENotFound, - Msg: influxdb.ErrSessionNotFound, - } - } - - if err != nil { - return nil, err - } - - sn := &influxdb.Session{} - if err := json.Unmarshal(v, sn); err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - - ps, err := s.maxPermissions(ctx, tx, sn.UserID) - if err != nil { - return nil, err - } - - sn.Permissions = ps - return sn, nil -} - -func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) { - // TODO(desa): these values should be cached so it's not so expensive to lookup each time. - f := influxdb.UserResourceMappingFilter{UserID: userID} - mappings, err := s.findUserResourceMappings(ctx, tx, f) - if err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - - ps := make([]influxdb.Permission, 0, len(mappings)) - for _, m := range mappings { - p, err := m.ToPermissions() - if err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - - ps = append(ps, p...) - } - ps = append(ps, influxdb.MePermissions(userID)...) - - if !s.disableAuthorizationsForMaxPermissions(ctx) { - // TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere - // we did this so that the oper token would be used in a users permissions. - af := influxdb.AuthorizationFilter{UserID: &userID} - as, err := s.findAuthorizations(ctx, tx, af) - if err != nil { - return nil, err - } - for _, a := range as { - ps = append(ps, a.Permissions...) - } - } - - return ps, nil -} - -// PutSession puts the session at key. -func (s *Service) PutSession(ctx context.Context, sn *influxdb.Session) error { - return s.kv.Update(ctx, func(tx Tx) error { - if err := s.putSession(ctx, tx, sn); err != nil { - return err - } - return nil - }) -} - -func (s *Service) putSession(ctx context.Context, tx Tx, sn *influxdb.Session) error { - v, err := json.Marshal(sn) - if err != nil { - return &influxdb.Error{ - Err: err, - } - } - - b, err := tx.Bucket(sessionBucket) - if err != nil { - return err - } - - if err := b.Put([]byte(sn.Key), v); err != nil { - return &influxdb.Error{ - Err: err, - } - } - return nil -} - -// ExpireSession expires the session at the provided key. -func (s *Service) ExpireSession(ctx context.Context, key string) error { - return s.kv.Update(ctx, func(tx Tx) error { - sn, err := s.findSession(ctx, tx, key) - if err != nil { - return err - } - - sn.ExpiresAt = time.Now() - - if err := s.putSession(ctx, tx, sn); err != nil { - return err - } - return nil - }) -} - -// CreateSession creates a session for a user with the users maximal privileges. -func (s *Service) CreateSession(ctx context.Context, user string) (*influxdb.Session, error) { - var sess *influxdb.Session - err := s.kv.Update(ctx, func(tx Tx) error { - sn, err := s.createSession(ctx, tx, user) - if err != nil { - return err - } - - sess = sn - - return nil - }) - - if err != nil { - return nil, err - } - - return sess, nil -} - -func (s *Service) createSession(ctx context.Context, tx Tx, user string) (*influxdb.Session, error) { - u, pe := s.findUserByName(ctx, tx, user) - if pe != nil { - return nil, pe - } - - sn := &influxdb.Session{} - sn.ID = s.IDGenerator.ID() - k, err := s.TokenGenerator.Token() - if err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - sn.Key = k - sn.UserID = u.ID - sn.CreatedAt = time.Now() - sn.ExpiresAt = sn.CreatedAt.Add(s.Config.SessionLength) - // TODO(desa): not totally sure what to do here. Possibly we should have a maximal privilege permission. - sn.Permissions = []influxdb.Permission{} - - if err := s.putSession(ctx, tx, sn); err != nil { - return nil, err - } - - return sn, nil -} diff --git a/kv/session_test.go b/kv/session_test.go deleted file mode 100644 index c3a5acb2185..00000000000 --- a/kv/session_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package kv_test - -import ( - "context" - "testing" - - "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kv" - influxdbtesting "github.com/influxdata/influxdb/v2/testing" - "go.uber.org/zap/zaptest" -) - -func TestBoltSessionService(t *testing.T) { - influxdbtesting.SessionService(initBoltSessionService, t) -} - -func initBoltSessionService(f influxdbtesting.SessionFields, t *testing.T) (influxdb.SessionService, string, func()) { - s, closeBolt, err := NewTestBoltStore(t) - if err != nil { - t.Fatalf("failed to create new kv store: %v", err) - } - - svc, op, closeSvc := initSessionService(s, f, t) - return svc, op, func() { - closeSvc() - closeBolt() - } -} - -func initSessionService(s kv.SchemaStore, f influxdbtesting.SessionFields, t *testing.T) (influxdb.SessionService, string, func()) { - ctx := context.Background() - svc := kv.NewService(zaptest.NewLogger(t), s) - svc.IDGenerator = f.IDGenerator - svc.TokenGenerator = f.TokenGenerator - - for _, u := range f.Users { - if err := svc.PutUser(ctx, u); err != nil { - t.Fatalf("failed to populate users") - } - } - for _, s := range f.Sessions { - if err := svc.PutSession(ctx, s); err != nil { - t.Fatalf("failed to populate sessions") - } - } - return svc, kv.OpPrefix, func() { - for _, u := range f.Users { - if err := svc.DeleteUser(ctx, u.ID); err != nil { - t.Logf("failed to remove users: %v", err) - } - } - } -} diff --git a/kv/task.go b/kv/task.go index e9960d090c7..d8b6ad839c2 100644 --- a/kv/task.go +++ b/kv/task.go @@ -1798,3 +1798,42 @@ func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, } return options.FromScript(lang, matches[0]) } + +func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) { + // TODO(desa): these values should be cached so it's not so expensive to lookup each time. + f := influxdb.UserResourceMappingFilter{UserID: userID} + mappings, err := s.findUserResourceMappings(ctx, tx, f) + if err != nil { + return nil, &influxdb.Error{ + Err: err, + } + } + + ps := make([]influxdb.Permission, 0, len(mappings)) + for _, m := range mappings { + p, err := m.ToPermissions() + if err != nil { + return nil, &influxdb.Error{ + Err: err, + } + } + + ps = append(ps, p...) + } + ps = append(ps, influxdb.MePermissions(userID)...) + + if !s.disableAuthorizationsForMaxPermissions(ctx) { + // TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere + // we did this so that the oper token would be used in a users permissions. + af := influxdb.AuthorizationFilter{UserID: &userID} + as, err := s.findAuthorizations(ctx, tx, af) + if err != nil { + return nil, err + } + for _, a := range as { + ps = append(ps, a.Permissions...) + } + } + + return ps, nil +} diff --git a/session/service.go b/session/service.go index 2df6bcd156c..70005750b1e 100644 --- a/session/service.go +++ b/session/service.go @@ -24,23 +24,53 @@ type Service struct { disableAuthorizationsForMaxPermissions func(context.Context) bool } -// NewService creates a new session service -func NewService(store *Storage, userService influxdb.UserService, urmService influxdb.UserResourceMappingService, authSvc influxdb.AuthorizationService, sessionLength time.Duration) *Service { - if sessionLength <= 0 { - sessionLength = time.Hour +// ServiceOption is a functional option for configuring a *Service +type ServiceOption func(*Service) + +// WithSessionLength configures the length of the session with the provided +// duration when the resulting option is called on a *Service. +func WithSessionLength(length time.Duration) ServiceOption { + return func(s *Service) { + s.sessionLength = length + } +} + +// WithIDGenerator overrides the default ID generator with the one +// provided to this function when called on a *Service +func WithIDGenerator(gen influxdb.IDGenerator) ServiceOption { + return func(s *Service) { + s.idGen = gen + } +} + +// WithTokenGenerator overrides the default token generator with the one +// provided to this function when called on a *Service +func WithTokenGenerator(gen influxdb.TokenGenerator) ServiceOption { + return func(s *Service) { + s.tokenGen = gen } - return &Service{ +} + +// NewService creates a new session service +func NewService(store *Storage, userService influxdb.UserService, urmService influxdb.UserResourceMappingService, authSvc influxdb.AuthorizationService, opts ...ServiceOption) *Service { + service := &Service{ store: store, userService: userService, urmService: urmService, authService: authSvc, - sessionLength: sessionLength, + sessionLength: time.Hour, idGen: snowflake.NewIDGenerator(), tokenGen: rand.NewTokenGenerator(64), disableAuthorizationsForMaxPermissions: func(context.Context) bool { return false }, } + + for _, opt := range opts { + opt(service) + } + + return service } // WithMaxPermissionFunc sets the useAuthorizationsForMaxPermissions function diff --git a/session/service_test.go b/session/service_test.go index f6aca7eae60..937d909b914 100644 --- a/session/service_test.go +++ b/session/service_test.go @@ -34,14 +34,14 @@ func initSessionService(f influxdbtesting.SessionFields, t *testing.T) (influxdb FindAuthorizationsFn: func(context.Context, influxdb.AuthorizationFilter, ...influxdb.FindOptions) ([]*influxdb.Authorization, int, error) { return []*influxdb.Authorization{}, 0, nil }, - }, time.Minute) + }, WithSessionLength(time.Minute)) if f.IDGenerator != nil { - svc.idGen = f.IDGenerator + WithIDGenerator(f.IDGenerator)(svc) } if f.TokenGenerator != nil { - svc.tokenGen = f.TokenGenerator + WithTokenGenerator(f.TokenGenerator)(svc) } for _, u := range f.Users {