Skip to content

Commit

Permalink
Improve dbt errors (#2895)
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har committed Jan 31, 2022
1 parent 2d80946 commit 1f595c8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
18 changes: 9 additions & 9 deletions cmd/lakectl/cmd/dbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ func copySchemaWithDbtTables(ctx context.Context, continueOnError, continueOnSch
DieFmt("schema %s exists, change schema or use %s flag", toSchema, continueOnSchemaFlag)
}

models, err := getDbtTables(projectRoot)
if err != nil {
DieErr(err)
}
models := getDbtTables(projectRoot)
if err := CopyModels(ctx, models, continueOnError, fromSchema, toSchema, branchName, dbfsLocation, client); err != nil {
DieErr(err)
}
Expand Down Expand Up @@ -135,18 +132,20 @@ func CopyModels(ctx context.Context, models []model, continueOnError bool, fromS
return err
}
fmt.Printf("copy %s.%s -> %s.%s failed: %s\n", m.Schema, m.Alias, toSchema, m.Alias, err)
} else {
fmt.Printf("copied %s.%s -> %s.%s\n", m.Schema, m.Alias, toSchema, m.Alias)
}
fmt.Printf("copied %s.%s -> %s.%s\n", m.Schema, m.Alias, toSchema, m.Alias)
}
return nil
}

func getDbtTables(projectRoot string) ([]model, error) {
func getDbtTables(projectRoot string) []model {
dbtCmd := exec.Command("dbt", "ls", "--resource-type", "model", "--select", "config.materialized:table config.materialized:incremental", "--output", "json", "--output-keys", "alias schema")
dbtCmd.Dir = projectRoot
output, err := dbtCmd.Output()
if err != nil {
return nil, err
fmt.Println(string(output))
DieErr(err)
}
models := make([]model, 0)
scan := bufio.NewScanner(bytes.NewReader(output))
Expand All @@ -155,11 +154,11 @@ func getDbtTables(projectRoot string) ([]model, error) {
var m model
err = json.Unmarshal(line, &m)
if err != nil {
return nil, err
DieErr(err)
}
models = append(models, m)
}
return models, nil
return models
}

// dbtDebug validates dbt connection using dbt debug, return the schema configured by the target environment (configured in the dbt profiles file)
Expand All @@ -168,6 +167,7 @@ func dbtDebug(projectRoot string) string {
dbtCmd.Dir = projectRoot
output, err := dbtCmd.Output()
if err != nil {
fmt.Println(string(output))
DieErr(err)
}
submatch := schemaRegex.FindSubmatch(output)
Expand Down
26 changes: 20 additions & 6 deletions pkg/metastore/ms_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ type Client interface {
func CopyOrMerge(ctx context.Context, fromClient, toClient Client, fromDB, fromTable, toDB, toTable, toBranch, serde string, partition []string, fixSparkPlaceHolder bool, dbfsLocation string) error {
transformLocation := func(location string) (string, error) {
location = HandleDBFSLocation(location, dbfsLocation)
return ReplaceBranchName(location, toBranch)
transformedLocation, err := ReplaceBranchName(location, toBranch)
if err != nil {
return "", fmt.Errorf("failed to replace branch name with location: '%s' and branch: '%s': %w", location, toBranch, err)
}
return transformedLocation, nil
}
return copyOrMergeWithTransformLocation(ctx, fromClient, toClient, fromDB, fromTable, toDB, toTable, serde, false, partition, transformLocation, fixSparkPlaceHolder)
}
Expand All @@ -55,22 +59,30 @@ func CopyDB(ctx context.Context, fromClient, toClient Client, fromDB, toDB, toBr
return "", nil
}
location = HandleDBFSLocation(location, dbfsLocation)
return ReplaceBranchName(location, toBranch)
transformedLocation, err := ReplaceBranchName(location, toBranch)
if err != nil {
return "", fmt.Errorf("failed to replace branch name with location: '%s' and branch: '%s': %w", location, toBranch, err)
}
return transformedLocation, nil
}
return copyDBWithTransformLocation(ctx, fromClient, toClient, fromDB, toDB, transformLocation)
}

func copyDBWithTransformLocation(ctx context.Context, fromClient, toClient Client, fromDB string, toDB string, transformLocation func(location string) (string, error)) error {
schema, err := fromClient.GetDatabase(ctx, fromDB)
if err != nil {
return err
return fmt.Errorf("failed to get database on copy from '%s': %w", fromDB, err)
}
schema.Name = toDB
schema.LocationURI, err = transformLocation(schema.LocationURI)
if err != nil {
return err
}
return toClient.CreateDatabase(ctx, schema)
err = toClient.CreateDatabase(ctx, schema)
if err != nil {
return fmt.Errorf("failed to create database with name '%s' and location '%s': %w", schema.Name, schema.LocationURI, err)
}
return nil
}

func copyOrMergeWithTransformLocation(ctx context.Context, fromClient, toClient Client, fromDB, fromTable, toDB, toTable, serde string, setSymlink bool, partition []string, transformLocation func(location string) (string, error), fixSparkPlaceHolder bool) error {
Expand Down Expand Up @@ -146,10 +158,12 @@ func CopyOrMergeAll(ctx context.Context, fromClient, toClient Client, schemaFilt

// HandleDBFSLocation translates Data Bricks File system path to the S3 path using the dbfsLocation
func HandleDBFSLocation(location string, dbfsLocation string) string {
l := location
if dbfsLocation != "" && strings.HasPrefix(location, dbfsPrefix) {
location = strings.Replace(location, dbfsPrefix, dbfsLocation, 1)
l = strings.Replace(location, dbfsPrefix, dbfsLocation, 1)
}
return location
logging.Default().WithFields(logging.Fields{"dbfsLocation": dbfsLocation, "location": location, "new_location": l}).Info("translate databricks file system path to s3 path")
return l
}

func ImportAll(ctx context.Context, fromClient, toClient Client, schemaFilter, tableFilter, repo, toBranch string, continueOnError, fixSparkPlaceHolder bool, dbfsLocation string) error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/metastore/ms_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metastore_test

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -106,7 +107,7 @@ func TestMSClient_CopySchema(t *testing.T) {

err := metastore.CopyDB(nil, client, client, tt.SourceName, tt.DestinationName, tt.destBranch, "")

if err != tt.ExpectedError {
if !errors.Is(err, tt.ExpectedError) {
t.Fatalf("expected err:%s got:%v", tt.ExpectedError, err)
}
if tt.ExpectedError != nil {
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestMSClient_CopyAndMergeBack(t *testing.T) {
if err != nil {
t.Fatal(err)
}
//add columns to existing partition
// add columns to existing partition
partition19 := expectedPartitionsMap[mock.GetPartitionKey(toDBName, toTableName, []string{"part=19"})]
addColumn := &metastore.FieldSchema{
Name: "column_three",
Expand Down Expand Up @@ -236,7 +237,7 @@ func TestMSClient_CopyAndMergeBack(t *testing.T) {
if len(mergedPartitions) != numOfPartitions {
t.Fatalf("got wrong amount of partitions expected:%d, got:%d", numOfPartitions, len(mergedPartitions))
}
//check if partition 20 was added and has three columns, 19 was updated and 17 was deleted
// check if partition 20 was added and has three columns, 19 was updated and 17 was deleted
m := make(map[string]*metastore.Partition)
for _, partition := range mergedPartitions {
m[partition.Values[0]] = partition
Expand Down

0 comments on commit 1f595c8

Please sign in to comment.