Skip to content

Commit

Permalink
bug: use spinning logs correctly in flink deploy command (#800)
Browse files Browse the repository at this point in the history
* bug: use spinning logs correctly in flink deploy command
  • Loading branch information
janelletavares committed Aug 8, 2023
1 parent d9175e0 commit a666404
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions cmd/meroxa/root/flink/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/meroxa/cli/log"
"github.com/meroxa/cli/utils"
"github.com/meroxa/meroxa-go/pkg/meroxa"
"github.com/meroxa/turbine-core/pkg/ir"
)

type deployFlinkJobClient interface {
Expand Down Expand Up @@ -104,18 +105,18 @@ func (d *Deploy) Execute(ctx context.Context) error {
return fmt.Errorf("please provide a JAR file to the --jar flag")
}

name := d.args.Name
if name == "" {
return fmt.Errorf("the name of your Flink Job be provided as an argument")
}

secrets := utils.StringSliceToStringMap(d.flags.Secrets)
spec, err := flink.GetIRSpec(ctx, jarPath, secrets, d.logger)
if err != nil {
fmt.Printf("failed to extract IR spec: %v\n", err)
d.logger.Warnf(ctx, "failed to extract IR spec: %v\n", err)
// non-blocking
}

name := d.args.Name
if name == "" {
return fmt.Errorf("the name of your Flink Job be provided as an argument")
}

filename := filepath.Base(jarPath)
d.logger.StartSpinner("\t", "Fetching Meroxa Platform source...")
source, err := d.client.CreateSourceV2(ctx, &meroxa.CreateSourceInputV2{Filename: filename})
Expand All @@ -126,17 +127,37 @@ func (d *Deploy) Execute(ctx context.Context) error {
}
d.logger.StopSpinnerWithStatus("Platform source fetched", log.Successful)

// Logging happens inside UploadFile
err = turbine.UploadFile(ctx, d.logger, jarPath, source.PutUrl)
if err != nil {
return err
}

d.logger.StartSpinner("\t", "Creating Flink job...")
input := &meroxa.CreateFlinkJobInput{Name: name, JarURL: source.GetUrl}
err = d.addIntegrations(ctx, spec, input)
if err != nil {
return err
}

d.logger.StartSpinner("\t", "Creating Flink job...")
fj, err := d.client.CreateFlinkJob(ctx, input)
if err != nil {
d.logger.Errorf(ctx, "\t 𐄂 Unable to create Flink job")
d.logger.StopSpinnerWithStatus("\t", log.Failed)
return err
}

d.logger.StopSpinnerWithStatus("Flink job created", log.Successful)
d.logger.JSON(ctx, fj)
return nil
}

func (d *Deploy) addIntegrations(ctx context.Context, spec *ir.DeploymentSpec, input *meroxa.CreateFlinkJobInput) error {
d.logger.StartSpinner("\t", "Checking Meroxa integrations...")
successMsg := "Finished checking Meroxa integrations"
if spec != nil {
d.logger.StartSpinner("\t", "Adding Meroxa integrations to request...")
var bytes []byte
bytes, err = json.Marshal(spec)
bytes, err := json.Marshal(spec)
if err != nil {
d.logger.Errorf(ctx, "\t 𐄂 Unable to add Meroxa integrations to request")
d.logger.StopSpinnerWithStatus("\t", log.Failed)
Expand All @@ -148,19 +169,10 @@ func (d *Deploy) Execute(ctx context.Context) error {
d.logger.StopSpinnerWithStatus("\t", log.Failed)
return unmarshalErr
}

successMsg = "Added Meroxa integrations to request"
input.Spec = inputSpec
input.SpecVersion = spec.Definition.Metadata.SpecVersion
}

fj, err := d.client.CreateFlinkJob(ctx, input)
if err != nil {
d.logger.Errorf(ctx, "\t 𐄂 Unable to create Flink job")
d.logger.StopSpinnerWithStatus("\t", log.Failed)
return err
}

d.logger.StopSpinnerWithStatus("Flink job created", log.Successful)
d.logger.JSON(ctx, fj)
d.logger.StopSpinnerWithStatus(successMsg, log.Successful)
return nil
}

0 comments on commit a666404

Please sign in to comment.