Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har committed Jan 31, 2022
1 parent ac8f464 commit daddbfb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
17 changes: 8 additions & 9 deletions cmd/lakectl/cmd/dbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ func copySchemaWithDbtTables(ctx context.Context, continueOnError, continueOnSch
DieFmt("schema %s exists, change schema or use %s flag", toSchema, continueOnSchemaFlag)
}

models, err := getDbtTables(projectRoot)
if err != nil {
DieErr(err)
}
models := getDbtTables(projectRoot)
if err := CopyModels(ctx, models, continueOnError, fromSchema, toSchema, branchName, dbfsLocation, client); err != nil {
DieErr(err)
}
Expand Down Expand Up @@ -142,12 +139,13 @@ func CopyModels(ctx context.Context, models []model, continueOnError bool, fromS
return nil
}

func getDbtTables(projectRoot string) ([]model, error) {
func getDbtTables(projectRoot string) []model {
dbtCmd := exec.Command("dbt", "ls", "--resource-type", "model", "--select", "config.materialized:table config.materialized:incremental", "--output", "json", "--output-keys", "alias schema")
dbtCmd.Dir = projectRoot
output, err := dbtCmd.Output()
if err != nil {
return nil, fmt.Errorf("failed to run dbt debug with output:\n %s\n and error: %s", output, err)
fmt.Println(string(output))
DieErr(err)
}
models := make([]model, 0)
scan := bufio.NewScanner(bytes.NewReader(output))
Expand All @@ -156,11 +154,11 @@ func getDbtTables(projectRoot string) ([]model, error) {
var m model
err = json.Unmarshal(line, &m)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
DieErr(err)
}
models = append(models, m)
}
return models, nil
return models
}

// dbtDebug validates dbt connection using dbt debug, return the schema configured by the target environment (configured in the dbt profiles file)
Expand All @@ -169,7 +167,8 @@ func dbtDebug(projectRoot string) string {
dbtCmd.Dir = projectRoot
output, err := dbtCmd.Output()
if err != nil {
DieFmt("failed to run dbt debug with output:\n %s\n and error: %s", output, err)
fmt.Println(string(output))
DieErr(err)
}
submatch := schemaRegex.FindSubmatch(output)
if submatch == nil || len(submatch) < 2 {
Expand Down
8 changes: 4 additions & 4 deletions pkg/metastore/ms_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func CopyOrMerge(ctx context.Context, fromClient, toClient Client, fromDB, fromT
location = HandleDBFSLocation(location, dbfsLocation)
transformedLocation, err := ReplaceBranchName(location, toBranch)
if err != nil {
return "", fmt.Errorf("failed to replace branch name with location: %s and branch: %s : %w", location, toBranch, err)
return "", fmt.Errorf("failed to replace branch name with location: '%s' and branch: '%s': %w", location, toBranch, err)
}
return transformedLocation, nil
}
Expand All @@ -61,7 +61,7 @@ func CopyDB(ctx context.Context, fromClient, toClient Client, fromDB, toDB, toBr
location = HandleDBFSLocation(location, dbfsLocation)
transformedLocation, err := ReplaceBranchName(location, toBranch)
if err != nil {
return "", fmt.Errorf("failed to replace branch name with location: %s and branch: %s : %w", location, toBranch, err)
return "", fmt.Errorf("failed to replace branch name with location: '%s' and branch: '%s': %w", location, toBranch, err)
}
return transformedLocation, nil
}
Expand All @@ -71,7 +71,7 @@ func CopyDB(ctx context.Context, fromClient, toClient Client, fromDB, toDB, toBr
func copyDBWithTransformLocation(ctx context.Context, fromClient, toClient Client, fromDB string, toDB string, transformLocation func(location string) (string, error)) error {
schema, err := fromClient.GetDatabase(ctx, fromDB)
if err != nil {
return fmt.Errorf("failed to get database on copy: %w", err)
return fmt.Errorf("failed to get database on copy from '%s': %w", fromDB, err)
}
schema.Name = toDB
schema.LocationURI, err = transformLocation(schema.LocationURI)
Expand All @@ -80,7 +80,7 @@ func copyDBWithTransformLocation(ctx context.Context, fromClient, toClient Clien
}
err = toClient.CreateDatabase(ctx, schema)
if err != nil {
return fmt.Errorf("failed to create database with name %s and location %s :%w", schema.Name, schema.LocationURI, err)
return fmt.Errorf("failed to create database with name '%s' and location '%s': %w", schema.Name, schema.LocationURI, err)
}
return nil
}
Expand Down

0 comments on commit daddbfb

Please sign in to comment.