Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport - 2.8] Garbage Collect Determined Pipeline Users (#9491) [CORE-2131] #9658

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/server/pps/server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,7 +2322,7 @@ func (a *apiServer) createPipeline(ctx context.Context, req *pps.CreatePipelineV
// TODO: set up garbage collection for the records stored outside the DB
func (a *apiServer) CreateDetPipelineSideEffects(ctx context.Context, pipeline *pps.Pipeline, workspaces []string) error {
// check if pipeline's creds secret exists
secretName := pipeline.Project.Name + "-" + pipeline.Name + "-det"
secretName := detUserSecretName(pipeline)
password := uuid.NewWithoutDashes()
whoAmI, err := a.env.AuthServer.WhoAmI(ctx, &auth.WhoAmIRequest{})
if err != nil {
Expand Down Expand Up @@ -2351,7 +2351,10 @@ func (a *apiServer) CreateDetPipelineSideEffects(ctx context.Context, pipeline *
},
}
s.SetLabels(map[string]string{
"suite": "pachyderm",
"suite": "pachyderm",
"pipelineName": pipeline.Name,
"project": pipeline.GetProject().String(),
"determined": "true",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than determined=true, I wonder if this should be type=determined or similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great suggestion <3. I'll open a ticket for this and handle it in a separate PR

})
if _, err := a.env.KubeClient.CoreV1().Secrets(a.namespace).Create(ctx, s, metav1.CreateOptions{}); err != nil {
return errors.Wrapf(err, "failed to create pipeline's determined secret")
Expand Down
195 changes: 164 additions & 31 deletions src/server/pps/server/determined.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ import (
"strings"
"time"

det "github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/rbacv1"
"github.com/determined-ai/determined/proto/pkg/userv1"
"github.com/determined-ai/determined/proto/pkg/workspacev1"
"github.com/pachyderm/pachyderm/v2/src/internal/backoff"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
mlc "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging/client"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -19,42 +25,70 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

det "github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/rbacv1"
"github.com/determined-ai/determined/proto/pkg/userv1"
"github.com/determined-ai/determined/proto/pkg/workspacev1"
mlc "github.com/pachyderm/pachyderm/v2/src/internal/middleware/logging/client"
"google.golang.org/protobuf/types/known/wrapperspb"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

type detConfig struct {
MasterURL string
Username string
Password string
TLS bool
}

func (a *apiServer) getDetConfig() detConfig {
return detConfig{
MasterURL: a.env.Config.DeterminedURL,
Username: a.env.Config.DeterminedUsername,
Password: a.env.Config.DeterminedPassword,
TLS: a.env.Config.DeterminedTLS,
}
}

func newInClusterDetClient(ctx context.Context, cfg detConfig) (det.DeterminedClient, context.Context, context.CancelFunc, error) {
tlsOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
if cfg.TLS {
tlsOpt = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
}))
}
determinedURL, err := url.Parse(cfg.MasterURL)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "parsing determined url %q", cfg.MasterURL)
}
conn, err := grpc.DialContext(ctx, determinedURL.Host, tlsOpt, grpc.WithStreamInterceptor(mlc.LogStream), grpc.WithUnaryInterceptor(mlc.LogUnary))
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "dialing determined at %q", determinedURL.Host)
}
dc := det.NewDeterminedClient(conn)
tok, err := mintDeterminedToken(ctx, dc, cfg.Username, cfg.Password)
if err != nil {
return nil, nil, nil, err
}
ctx = metadata.AppendToOutgoingContext(ctx, "x-user-token", fmt.Sprintf("Bearer %s", tok))
return dc, ctx, func() { conn.Close() }, nil
}

func (a *apiServer) hookDeterminedPipeline(ctx context.Context, p *pps.Pipeline, workspaces []string, pipPassword string, whoami string) error {
config := detConfig{
MasterURL: a.env.Config.DeterminedURL,
Username: a.env.Config.DeterminedUsername,
Password: a.env.Config.DeterminedPassword,
TLS: a.env.Config.DeterminedTLS,
}
var cf context.CancelFunc
ctx, cf = context.WithTimeout(ctx, 60*time.Second)
defer cf()
errCnt := 0
// right now the entire integration is specifc to auth, so first check that auth is active
if err := backoff.RetryUntilCancel(ctx, func() error {
tlsOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
if a.env.Config.DeterminedTLS {
tlsOpt = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
}))
}
determinedURL, err := url.Parse(a.env.Config.DeterminedURL)
if err != nil {
return errors.Wrapf(err, "parsing determined url %q", a.env.Config.DeterminedURL)
}
conn, err := grpc.DialContext(ctx, determinedURL.Host, tlsOpt, grpc.WithStreamInterceptor(mlc.LogStream), grpc.WithUnaryInterceptor(mlc.LogUnary))
if err != nil {
return errors.Wrapf(err, "dialing determined at %q", determinedURL.Host)
}
defer conn.Close()
dc := det.NewDeterminedClient(conn)
tok, err := mintDeterminedToken(ctx, dc, a.env.Config.DeterminedUsername, a.env.Config.DeterminedPassword)
dc, ctx, cf, err := newInClusterDetClient(ctx, config)
if err != nil {
return err
return errors.Wrap(err, "set up in cluster determined client")
}
ctx = metadata.AppendToOutgoingContext(ctx, "x-user-token", fmt.Sprintf("Bearer %s", tok))
defer cf()
detWorkspaces, err := resolveDeterminedWorkspaces(ctx, dc, workspaces)
if err != nil {
return err
Expand Down Expand Up @@ -90,6 +124,83 @@ func (a *apiServer) hookDeterminedPipeline(ctx context.Context, p *pps.Pipeline,
return nil
}

type pipelineGetter interface {
ListPipelineInfo(ctx context.Context, f func(*pps.PipelineInfo) error) error
}

func gcDetUsers(ctx context.Context, config detConfig, period time.Duration, pipGetter pipelineGetter, secrets corev1.SecretInterface) {
if config.MasterURL == "" {
log.Info(ctx, "Determined not configured. Skipping Determined user garbage collection.")
return
}
log.Info(ctx, "Starting Determined user garbage collection.")
err := backoff.RetryUntilCancel(ctx, func() error {
dc, detCtx, cf, err := newInClusterDetClient(ctx, config)
if err != nil {
return errors.Wrap(err, "setup in cluster determined client for garbage collection")
}
defer cf()
ticker := time.NewTicker(period)
defer ticker.Stop()
for {
pipelines := make(map[string]struct{})
if err := pipGetter.ListPipelineInfo(ctx, func(pi *pps.PipelineInfo) error {
pipelines[pi.Pipeline.String()] = struct{}{}
return nil
}); err != nil {
return errors.Wrap(err, "list pipelines for determined user garbage collection")
}
ss, err := secrets.List(ctx, v1.ListOptions{
TypeMeta: metav1.TypeMeta{
Kind: "ListOptions",
APIVersion: "v1",
},
LabelSelector: "suite=pachyderm,determined=true"})
if err != nil {
return errors.Wrap(err, "list determined pipelien user secrets")
}
for _, s := range ss.Items {
p := &pps.Pipeline{
Name: s.Labels["pipelineName"],
Project: &pfs.Project{Name: s.Labels["project"]},
}
// make sure the secret was created long enough ago so that it can be assumed it's not in the middle of being created
if s.CreationTimestamp.Add(time.Minute).Before(time.Now()) {
continue
}
if _, ok := pipelines[p.String()]; !ok {
ctx, end := log.SpanContextL(ctx, "", log.DebugLevel, zap.String("pipeline", p.String()))
if err := func() error {
u, err := getDetPipelineUser(detCtx, dc, p)
if err != nil {
return err
}
if _, err := dc.PatchUser(detCtx, &det.PatchUserRequest{
UserId: u.Id,
User: &userv1.PatchUser{
Active: &wrapperspb.BoolValue{Value: false},
}}); err != nil {
return errors.Wrapf(err, "inactivate user")
}
if err := secrets.Delete(ctx, detUserSecretName(p), v1.DeleteOptions{}); err != nil {
return errors.Wrapf(err, "delete determined pipeline user secret %q", detUserSecretName(p))
}
return nil
}(); err != nil {
end(log.Errorp(&err))
}
}
}
select {
case <-ctx.Done():
return errors.Wrap(context.Cause(ctx), "determined user garbage collection")
case <-ticker.C:
}
}
}, backoff.RetryEvery(5*time.Minute), nil)
log.Info(ctx, "determined user GC context cancelled", zap.Error(err))
}

func workspaceEditorRoleId(ctx context.Context, dc det.DeterminedClient) (int32, error) {
resp, err := dc.ListRoles(ctx, &det.ListRolesRequest{})
if err != nil {
Expand Down Expand Up @@ -226,26 +337,44 @@ func provisionDeterminedPipelineUser(ctx context.Context, dc det.DeterminedClien
})
if err != nil {
if status.Code(err) == codes.InvalidArgument && strings.Contains(err.Error(), "user already exists") {
usersResp, err := dc.GetUsers(ctx, &det.GetUsersRequest{Name: pipelineUserName(p)})
u, err := getDetPipelineUser(ctx, dc, p)
if err != nil {
return 0, errors.Wrapf(err, "get determined user %q", pipelineUserName(p))
return 0, err
}
if len(usersResp.Users) == 0 {
return 0, errors.Wrapf(err, "no determined users return for user %q", pipelineUserName(p))
if !u.Active {
if _, err := dc.PatchUser(ctx, &det.PatchUserRequest{
UserId: u.Id,
User: &userv1.PatchUser{
Active: &wrapperspb.BoolValue{Value: true},
},
}); err != nil {
return 0, errors.Wrapf(err, "reactivate user %q", pipelineUserName(p))
}
}
if _, err := dc.SetUserPassword(ctx, &det.SetUserPasswordRequest{
UserId: usersResp.Users[0].Id,
UserId: u.Id,
Password: password,
}); err != nil {
return 0, errors.Wrapf(err, "set password for user %q", pipelineUserName(p))
}
return usersResp.Users[0].Id, nil
return u.Id, nil
}
return 0, errors.Wrap(err, "provision determined user")
}
return resp.User.Id, nil
}

func getDetPipelineUser(ctx context.Context, dc det.DeterminedClient, p *pps.Pipeline) (*userv1.User, error) {
usersResp, err := dc.GetUsers(ctx, &det.GetUsersRequest{Name: pipelineUserName(p)})
if err != nil {
return nil, errors.Wrapf(err, "get determined user %q", pipelineUserName(p))
}
if len(usersResp.Users) == 0 {
return nil, errors.Wrapf(err, "no determined users return for user %q", pipelineUserName(p))
}
return usersResp.Users[0], nil
}

func assignDeterminedPipelineRole(ctx context.Context, dc det.DeterminedClient, userId int32, roleId int32, workspaces []*workspacev1.Workspace) error {
var roleAssignments []*rbacv1.UserRoleAssignment
for _, w := range workspaces {
Expand Down Expand Up @@ -273,3 +402,7 @@ func pipelineUserName(p *pps.Pipeline) string {
// users with names containing '/' don't work correctly in determined.
return strings.ReplaceAll(p.String(), "/", "_")
}

func detUserSecretName(p *pps.Pipeline) string {
return p.Project.Name + "-" + p.Name + "-det"
}
1 change: 1 addition & 0 deletions src/server/pps/server/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (a *apiServer) master(ctx context.Context) {
log.Info(ctx, "PPS master: launching master process")
kd := newKubeDriver(a.env.KubeClient, a.env.Config)
sd := newPipelineStateDriver(a.env.DB, a.pipelines, a.txnEnv, a.env.PFSServer)
go gcDetUsers(ctx, a.getDetConfig(), time.Minute, sd, a.env.KubeClient.CoreV1().Secrets(a.namespace))
m := newMaster(ctx, a.env, a.etcdPrefix, kd, sd)
m.run()
return errors.Wrapf(context.Cause(ctx), "ppsMaster.Run() exited unexpectedly")
Expand Down
13 changes: 12 additions & 1 deletion src/testing/deploy/determined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,17 @@ func TestDeterminedInstallAndIntegration(t *testing.T) {
require.NoError(t, err)
current := determinedGetUsers(t, *detUrl, userToken)
require.Equal(t, len(*previous.Users)+1, len(*current.Users), "the new pipeline has created an additional service user in Determined")

// once a pipeline is deleted the users should eventually be cleaned up in determined
_, err = c.PpsAPIClient.DeletePipeline(c.Ctx(), &pps.DeletePipelineRequest{Pipeline: client.NewPipeline(pfs.DefaultProjectName, pipelineName)})
require.NoError(t, err)
previous = current
require.NoErrorWithinTRetryConstant(t, 2*time.Minute, func() error {
current = determinedGetUsers(t, *detUrl, userToken)
if len(*previous.Users)-1 == len(*current.Users) {
return errors.Errorf("the new pipeline has created an additional service user in Determined")
}
return nil
}, 5*time.Second)
}

func determinedLogin(t testing.TB, detUrl url.URL, username string, password string) string {
Expand Down Expand Up @@ -154,6 +164,7 @@ func determinedCreateUser(t testing.TB, detUrl url.URL, authToken string) *Deter

func determinedGetUsers(t testing.TB, detUrl url.URL, authToken string) *DeterminedUserList {
detUrl.Path = detUserPath
detUrl.Query().Add("active", "true")
req, err := http.NewRequest("GET", detUrl.String(), strings.NewReader("{}"))
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authToken))
require.NoError(t, err, "Creating Determined get users request")
Expand Down