Skip to content

Commit

Permalink
fix: allow creation of asset with owner identified by UUID
Browse files Browse the repository at this point in the history
  • Loading branch information
Chief-Rishab committed Apr 4, 2023
1 parent 0a04c07 commit 90af9ce
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 67 deletions.
1 change: 1 addition & 0 deletions core/asset/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func buildOwners(data interface{}) (owners []user.User) {
buildOwner := func(data map[string]interface{}) user.User {
return user.User{
ID: getString("id", data),
UUID: getString("uuid", data),
Email: getString("email", data),
Provider: getString("provider", data),
}
Expand Down
83 changes: 45 additions & 38 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,22 @@ func (r *AssetRepository) Upsert(ctx context.Context, ns *namespace.Namespace, a

if fetchedAsset.ID == "" {
// insert flow
fetchedAsset.ID, err = r.insert(ctx, ns, ast)
id, err := r.insert(ctx, ns, ast)
if err != nil {
return fetchedAsset.ID, fmt.Errorf("error inserting asset to DB: %w", err)
}
} else {
// update flow
changelog, err := fetchedAsset.Diff(ast)
if err != nil {
return "", fmt.Errorf("error diffing two assets: %w", err)
return id, fmt.Errorf("error inserting asset to DB: %w", err)
}
return id, nil
}

err = r.update(ctx, ns, fetchedAsset.ID, ast, &fetchedAsset, changelog)
if err != nil {
return "", fmt.Errorf("error updating asset to DB: %w", err)
}
// update flow
changelog, err := fetchedAsset.Diff(ast)
if err != nil {
return "", fmt.Errorf("error diffing two assets: %w", err)
}

err = r.update(ctx, ns, fetchedAsset.ID, ast, &fetchedAsset, changelog)
if err != nil {
return "", fmt.Errorf("error updating asset to DB: %w", err)
}

return fetchedAsset.ID, nil
Expand Down Expand Up @@ -484,6 +485,8 @@ func (r *AssetRepository) insert(ctx context.Context, ns *namespace.Namespace, a
return fmt.Errorf("error building insert query: %w", err)
}

ast.Version = asset.BaseVersion

err = tx.QueryRowContext(ctx, query, args...).Scan(&id)
if err != nil {
return fmt.Errorf("error running insert query: %w", err)
Expand Down Expand Up @@ -633,31 +636,31 @@ func (r *AssetRepository) getOwners(ctx context.Context, assetID string) (owners
}

// insertOwners inserts relation of asset id and user id
func (r *AssetRepository) insertOwners(ctx context.Context, execer sqlx.ExecerContext, assetID string, owners []user.User) (err error) {
func (r *AssetRepository) insertOwners(ctx context.Context, execer sqlx.ExecerContext, assetID string, owners []user.User) error {
if len(owners) == 0 {
return
return nil
}

if !isValidUUID(assetID) {
return asset.InvalidError{AssetID: assetID}
}

var values []string
var args = []interface{}{assetID}
for i, owner := range owners {
values = append(values, fmt.Sprintf("($1, $%d)", i+2))
args = append(args, owner.ID)
sqlb := sq.Insert("asset_owners").
Columns("asset_id", "user_id")
for _, o := range owners {
sqlb = sqlb.Values(assetID, o.ID)
}
query := fmt.Sprintf(`
INSERT INTO asset_owners
(asset_id, user_id)
VALUES %s`, strings.Join(values, ","))
err = r.execContext(ctx, execer, query, args...)

qry, args, err := sqlb.PlaceholderFormat(sq.Dollar).ToSql()
if err != nil {
err = fmt.Errorf("error running insert owners query: %w", err)
return fmt.Errorf("build insert owners SQL: %w", err)
}

return
if err := r.execContext(ctx, execer, qry, args...); err != nil {
return fmt.Errorf("error running insert owners query: %w", err)
}

return nil
}

func (r *AssetRepository) removeOwners(ctx context.Context, execer sqlx.ExecerContext, assetID string, owners []user.User) (err error) {
Expand Down Expand Up @@ -717,34 +720,38 @@ func (r *AssetRepository) compareOwners(current, newOwners []user.User) (toInser
return
}

func (r *AssetRepository) createOrFetchUsers(ctx context.Context, tx *sqlx.Tx, ns *namespace.Namespace, users []user.User) (results []user.User, err error) {
func (r *AssetRepository) createOrFetchUsers(ctx context.Context, tx *sqlx.Tx, ns *namespace.Namespace, users []user.User) ([]user.User, error) {
var results []user.User
for _, u := range users {
var (
userID string
fetchedUser user.User
err error
)
if u.UUID != "" {
results = append(results, u)
continue
fetchedUser, err = r.userRepo.GetByUUID(ctx, u.UUID)
} else {
fetchedUser, err = r.userRepo.GetByEmail(ctx, u.Email)
}
if err == nil {
userID = fetchedUser.ID
}
var userID string
var fetchedUser user.User
fetchedUser, err = r.userRepo.GetByEmail(ctx, u.Email)
userID = fetchedUser.ID
if errors.As(err, &user.NotFoundError{}) {
u.Provider = r.defaultUserProvider
userID, err = r.userRepo.CreateWithTx(ctx, tx, ns, &u)
if err != nil {
err = fmt.Errorf("error creating owner: %w", err)
return
return nil, fmt.Errorf("error creating owner: %w", err)
}
}
if err != nil {
err = fmt.Errorf("error getting owner's ID: %w", err)
return
return nil, fmt.Errorf("error getting owner's ID: %w", err)
}

u.ID = userID
results = append(results, u)
}

return
return results, nil
}

func (r *AssetRepository) execContext(ctx context.Context, execer sqlx.ExecerContext, query string, args ...interface{}) error {
Expand Down
58 changes: 29 additions & 29 deletions internal/store/postgres/asset_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,8 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Owners: []user.User{
r.users[1],
r.users[2],
stripUserID(r.users[1]),
{Email: r.users[2].Email},
},
UpdatedBy: r.users[0],
}
Expand All @@ -991,9 +991,8 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
r.NoError(err)

r.Len(actual.Owners, len(ast.Owners))
for i, owner := range actual.Owners {
r.Equal(ast.Owners[i].ID, owner.ID)
}
r.Equal(r.users[1].ID, actual.Owners[0].ID)
r.Equal(r.users[2].ID, actual.Owners[1].ID)
})

r.Run("should create owners as users if they do not exist yet", func() {
Expand All @@ -1002,7 +1001,8 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Owners: []user.User{
{Email: "newuser@example.com", Provider: defaultProviderName},
{Email: "newuser@example.com"},
{UUID: "108151e5-4c9f-4951-a8e1-6966b5aa2bb6"},
},
UpdatedBy: r.users[0],
}
Expand All @@ -1015,10 +1015,8 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
r.NoError(err)

r.Len(actual.Owners, len(ast.Owners))
for i, owner := range actual.Owners {
r.Equal(ast.Owners[i].Email, owner.Email)
r.NotEmpty(id)
}
r.Equal(ast.Owners[0].Email, actual.Owners[0].Email)
r.Equal(ast.Owners[1].UUID, actual.Owners[1].UUID)
})
})

Expand Down Expand Up @@ -1081,14 +1079,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Owners: []user.User{
r.users[1],
r.users[2],
stripUserID(r.users[1]),
stripUserID(r.users[2]),
},
UpdatedBy: r.users[0],
}
newAsset := ast
newAsset.Owners = []user.User{
r.users[2],
stripUserID(r.users[2]),
}

id, err := r.repository.Upsert(r.ctx, r.ns, &ast)
Expand All @@ -1104,9 +1102,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
actual, err := r.repository.GetByID(r.ctx, ast.ID)
r.NoError(err)
r.Len(actual.Owners, len(newAsset.Owners))
for i, owner := range actual.Owners {
r.Equal(newAsset.Owners[i].ID, owner.ID)
}
r.Equal(r.users[2].ID, actual.Owners[0].ID)
})

r.Run("should create new owners if it does not exist on old asset", func() {
Expand All @@ -1115,14 +1111,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Owners: []user.User{
r.users[1],
stripUserID(r.users[1]),
},
UpdatedBy: r.users[0],
}
newAsset := ast
newAsset.Owners = []user.User{
r.users[1],
r.users[2],
stripUserID(r.users[1]),
stripUserID(r.users[2]),
}

id, err := r.repository.Upsert(r.ctx, r.ns, &ast)
Expand All @@ -1138,9 +1134,8 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
actual, err := r.repository.GetByID(r.ctx, ast.ID)
r.NoError(err)
r.Len(actual.Owners, len(newAsset.Owners))
for i, owner := range actual.Owners {
r.Equal(newAsset.Owners[i].ID, owner.ID)
}
r.Equal(r.users[1].ID, actual.Owners[0].ID)
r.Equal(r.users[2].ID, actual.Owners[1].ID)
})

r.Run("should create users from owners if owner emails do not exist yet", func() {
Expand All @@ -1149,14 +1144,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Owners: []user.User{
r.users[1],
stripUserID(r.users[1]),
},
UpdatedBy: r.users[0],
}
newAsset := ast
newAsset.Owners = []user.User{
r.users[1],
{Email: "newuser@example.com", Provider: defaultProviderName},
stripUserID(r.users[1]),
{Email: "newuser@example.com"},
}

id, err := r.repository.Upsert(r.ctx, r.ns, &ast)
Expand All @@ -1172,10 +1167,10 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
actual, err := r.repository.GetByID(r.ctx, ast.ID)
r.NoError(err)
r.Len(actual.Owners, len(newAsset.Owners))
for i, owner := range actual.Owners {
r.Equal(newAsset.Owners[i].Email, owner.Email)
r.NotEmpty(id)
}
r.NotEmpty(actual.Owners[0].ID)
r.Equal(r.users[1].ID, actual.Owners[0].ID)
r.NotEmpty(actual.Owners[1].ID)
r.Equal(newAsset.Owners[1].Email, actual.Owners[1].Email)
})
})
}
Expand Down Expand Up @@ -1855,3 +1850,8 @@ func (r *AssetRepositoryTestSuite) assertProbe(t *testing.T, expected asset.Prob
func TestAssetRepository(t *testing.T) {
suite.Run(t, &AssetRepositoryTestSuite{})
}

func stripUserID(u user.User) user.User {
u.ID = ""
return u
}

0 comments on commit 90af9ce

Please sign in to comment.