From 0e267480e1323cbd60901f7277ea5cba778c2b1c Mon Sep 17 00:00:00 2001 From: Adil Ansari Date: Sun, 4 Jun 2023 23:58:54 -0700 Subject: [PATCH] fix: Check for project existence before creating/deleting branch (#1260) --- server/metadata/tenant.go | 19 +++++++++---------- server/metadata/tenant_test.go | 3 +++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/server/metadata/tenant.go b/server/metadata/tenant.go index a08fee2b..6ef6fa4e 100644 --- a/server/metadata/tenant.go +++ b/server/metadata/tenant.go @@ -1228,18 +1228,17 @@ func (m *TenantManager) DeleteTenant(ctx context.Context, tx transaction.Tx, ten func (tenant *Tenant) CreateBranch(ctx context.Context, tx transaction.Tx, projName string, dbName *DatabaseName) error { tenant.Lock() defer tenant.Unlock() - - dbMeta, err := tenant.MetaStore.Database().Get(ctx, tx, tenant.namespace.Id(), projName) - if err != nil { - return err - } - // first get the project proj, ok := tenant.projects[projName] if !ok { return NewProjectNotFoundErr(projName) } + dbMeta, err := tenant.MetaStore.Database().Get(ctx, tx, tenant.namespace.Id(), projName) + if err != nil { + return err + } + if _, ok = proj.databaseBranches[dbName.Name()]; ok { return NewDatabaseBranchExistsErr(dbName.Branch()) } @@ -1272,15 +1271,15 @@ func (tenant *Tenant) DeleteBranch(ctx context.Context, tx transaction.Tx, projN tenant.Lock() defer tenant.Unlock() - if dbBranch.IsMainBranch() { - return NewMetadataError(ErrCodeCannotDeleteBranch, "'main' database cannot be deleted.") - } - proj, found := tenant.projects[projName] if !found { return NewProjectNotFoundErr(projName) } + if dbBranch.IsMainBranch() { + return NewMetadataError(ErrCodeCannotDeleteBranch, "'main' database cannot be deleted.") + } + return tenant.deleteBranch(ctx, tx, proj, dbBranch) } diff --git a/server/metadata/tenant_test.go b/server/metadata/tenant_test.go index 2cafeaca..6ba55766 100644 --- a/server/metadata/tenant_test.go +++ b/server/metadata/tenant_test.go @@ -267,6 +267,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) { require.NoError(t, err) err = tenant.CreateProject(ctx, tx, tenantProj2, nil) require.NoError(t, err) + unknownProject := "project_not_exists" databases := (&Project{}).GetDatabaseWithBranches() require.Len(t, databases, 0) @@ -278,6 +279,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) { require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch2"))) require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj2, NewDatabaseNameWithBranch(tenantProj2, "branch2"))) require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch3"))) + require.ErrorContains(t, tenant.CreateBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist") // reload again to get all the branches require.NoError(t, tenant.reload(ctx, tx, nil, nil)) @@ -335,6 +337,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) { require.NoError(t, tenant.DeleteBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch1"))) require.NoError(t, tenant.DeleteBranch(ctx, tx, tenantProj2, NewDatabaseNameWithBranch(tenantProj2, "branch1"))) require.NoError(t, tenant.DeleteBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch2"))) + require.ErrorContains(t, tenant.DeleteBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist") require.NoError(t, tenant.reload(ctx, tx, nil, nil)) require.NoError(t, tx.Commit(ctx))