Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve dbt errors #2895

Merged
merged 4 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/lakectl/cmd/dbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func CopyModels(ctx context.Context, models []model, continueOnError bool, fromS
return err
}
fmt.Printf("copy %s.%s -> %s.%s failed: %s\n", m.Schema, m.Alias, toSchema, m.Alias, err)
} else {
fmt.Printf("copied %s.%s -> %s.%s\n", m.Schema, m.Alias, toSchema, m.Alias)
}
fmt.Printf("copied %s.%s -> %s.%s\n", m.Schema, m.Alias, toSchema, m.Alias)
}
return nil
}
Expand All @@ -146,7 +147,7 @@ func getDbtTables(projectRoot string) ([]model, error) {
dbtCmd.Dir = projectRoot
output, err := dbtCmd.Output()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to run dbt debug with output:\n %s\n and error: %s", output, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this one - it looks like an error that goes to the output not something that failed - it capture the output.
is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, when dbt debug returns an error, the information for the error is printed to the standard outpout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it, now, in case of error: the output is first printed as output and the error as error

}
models := make([]model, 0)
scan := bufio.NewScanner(bytes.NewReader(output))
Expand All @@ -155,7 +156,7 @@ func getDbtTables(projectRoot string) ([]model, error) {
var m model
err = json.Unmarshal(line, &m)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}
models = append(models, m)
}
Expand All @@ -168,7 +169,7 @@ func dbtDebug(projectRoot string) string {
dbtCmd.Dir = projectRoot
output, err := dbtCmd.Output()
if err != nil {
DieErr(err)
DieFmt("failed to run dbt debug with output:\n %s\n and error: %s", output, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DieFmt("failed to run dbt debug with output:\n %s\n and error: %s", output, err)
DieFmt("Failed to run dbt debug with output:\n%s\nError: %s", output, err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
submatch := schemaRegex.FindSubmatch(output)
if submatch == nil || len(submatch) < 2 {
Expand Down
26 changes: 20 additions & 6 deletions pkg/metastore/ms_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ type Client interface {
func CopyOrMerge(ctx context.Context, fromClient, toClient Client, fromDB, fromTable, toDB, toTable, toBranch, serde string, partition []string, fixSparkPlaceHolder bool, dbfsLocation string) error {
transformLocation := func(location string) (string, error) {
location = HandleDBFSLocation(location, dbfsLocation)
return ReplaceBranchName(location, toBranch)
transformedLocation, err := ReplaceBranchName(location, toBranch)
if err != nil {
return "", fmt.Errorf("failed to replace branch name with location: %s and branch: %s : %w", location, toBranch, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "", fmt.Errorf("failed to replace branch name with location: %s and branch: %s : %w", location, toBranch, err)
return "", fmt.Errorf("failed to replace branch name with location '%s' and branch '%s': %w", location, toBranch, err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
return transformedLocation, nil
}
return copyOrMergeWithTransformLocation(ctx, fromClient, toClient, fromDB, fromTable, toDB, toTable, serde, false, partition, transformLocation, fixSparkPlaceHolder)
}
Expand All @@ -55,22 +59,30 @@ func CopyDB(ctx context.Context, fromClient, toClient Client, fromDB, toDB, toBr
return "", nil
}
location = HandleDBFSLocation(location, dbfsLocation)
return ReplaceBranchName(location, toBranch)
transformedLocation, err := ReplaceBranchName(location, toBranch)
if err != nil {
return "", fmt.Errorf("failed to replace branch name with location: %s and branch: %s : %w", location, toBranch, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional:

Suggested change
return "", fmt.Errorf("failed to replace branch name with location: %s and branch: %s : %w", location, toBranch, err)
return "", fmt.Errorf("failed to replace branch name with location '%s' and branch '%s': %w", location, toBranch, err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
return transformedLocation, nil
}
return copyDBWithTransformLocation(ctx, fromClient, toClient, fromDB, toDB, transformLocation)
}

func copyDBWithTransformLocation(ctx context.Context, fromClient, toClient Client, fromDB string, toDB string, transformLocation func(location string) (string, error)) error {
schema, err := fromClient.GetDatabase(ctx, fromDB)
if err != nil {
return err
return fmt.Errorf("failed to get database on copy: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would add the fromDB to the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
schema.Name = toDB
schema.LocationURI, err = transformLocation(schema.LocationURI)
if err != nil {
return err
}
return toClient.CreateDatabase(ctx, schema)
err = toClient.CreateDatabase(ctx, schema)
if err != nil {
return fmt.Errorf("failed to create database with name %s and location %s :%w", schema.Name, schema.LocationURI, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • fix the spaces
  • prefer to use '%s' to quote names/values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
return nil
}

func copyOrMergeWithTransformLocation(ctx context.Context, fromClient, toClient Client, fromDB, fromTable, toDB, toTable, serde string, setSymlink bool, partition []string, transformLocation func(location string) (string, error), fixSparkPlaceHolder bool) error {
Expand Down Expand Up @@ -146,10 +158,12 @@ func CopyOrMergeAll(ctx context.Context, fromClient, toClient Client, schemaFilt

// HandleDBFSLocation translates Data Bricks File system path to the S3 path using the dbfsLocation
func HandleDBFSLocation(location string, dbfsLocation string) string {
l := location
if dbfsLocation != "" && strings.HasPrefix(location, dbfsPrefix) {
location = strings.Replace(location, dbfsPrefix, dbfsLocation, 1)
l = strings.Replace(location, dbfsPrefix, dbfsLocation, 1)
}
return location
logging.Default().WithFields(logging.Fields{"dbfsLocation": dbfsLocation, "location": location, "new_location": l}).Info("translate databricks file system path to s3 path")
return l
}

func ImportAll(ctx context.Context, fromClient, toClient Client, schemaFilter, tableFilter, repo, toBranch string, continueOnError, fixSparkPlaceHolder bool, dbfsLocation string) error {
Expand Down