Skip to content

Commit

Permalink
feat(clients/go): add optional list of variables to activate jobs/job…
Browse files Browse the repository at this point in the history
… worker

- when activating jobs a list of variable names can be specified
- the payload of activated jobs will only contain variables from the specified
  list of variable names
- this list can also be specified when creating a job worker
  • Loading branch information
menski committed Jan 9, 2019
1 parent 6eabff2 commit 911aeea
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 1 deletion.
6 changes: 6 additions & 0 deletions clients/go/commands/activateJobs_command.go
Expand Up @@ -45,6 +45,7 @@ type ActivateJobsCommandStep3 interface {

Timeout(time.Duration) ActivateJobsCommandStep3
WorkerName(string) ActivateJobsCommandStep3
FetchVariables(...string) ActivateJobsCommandStep3
}

type ActivateJobsCommand struct {
Expand Down Expand Up @@ -73,6 +74,11 @@ func (cmd *ActivateJobsCommand) WorkerName(workerName string) ActivateJobsComman
return cmd
}

func (cmd *ActivateJobsCommand) FetchVariables(fetchVariables ...string) ActivateJobsCommandStep3 {
cmd.request.FetchVariable = fetchVariables
return cmd
}

func (cmd *ActivateJobsCommand) Send() ([]entities.Job, error) {
ctx, cancel := context.WithTimeout(context.Background(), cmd.requestTimeout)
defer cancel()
Expand Down
31 changes: 31 additions & 0 deletions clients/go/commands/activateJobs_command_test.go
Expand Up @@ -188,3 +188,34 @@ func TestActivateJobsCommandWithWorkerName(t *testing.T) {
t.Errorf("Failed to receive response")
}
}

func TestActivateJobsCommandWithFetchVariables(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

client := mock_pb.NewMockGatewayClient(ctrl)
stream := mock_pb.NewMockGateway_ActivateJobsClient(ctrl)

fetchVariables := []string{"foo", "bar", "baz"}

request := &pb.ActivateJobsRequest{
Type: "foo",
Amount: 5,
Worker: DefaultJobWorkerName,
Timeout: DefaultJobTimeoutInMs,
FetchVariable: fetchVariables,
}

stream.EXPECT().Recv().Return(nil, io.EOF)
client.EXPECT().ActivateJobs(gomock.Any(), &utils.RpcTestMsg{Msg: request}).Return(stream, nil)

jobs, err := NewActivateJobsCommand(client, utils.DefaultTestTimeout).JobType("foo").Amount(5).FetchVariables(fetchVariables...).Send()

if err != nil {
t.Errorf("Failed to send request")
}

if len(jobs) != 0 {
t.Errorf("Failed to receive response")
}
}
6 changes: 6 additions & 0 deletions clients/go/worker/jobWorker_builder.go
Expand Up @@ -70,6 +70,8 @@ type JobWorkerBuilderStep3 interface {
PollInterval(time.Duration) JobWorkerBuilderStep3
// Set the threshold of buffered activated jobs before polling for new jobs, i.e. threshold * BufferSize(int)
PollThreshold(float64) JobWorkerBuilderStep3
// Set list of variable names which should be fetched on job activation
FetchVariables(...string) JobWorkerBuilderStep3
// Open the job worker and start polling and handling jobs
Open() JobWorker
}
Expand Down Expand Up @@ -124,7 +126,11 @@ func (builder *JobWorkerBuilder) PollThreshold(pollThreshold float64) JobWorkerB
log.Println("Ignoring invalid poll threshold", pollThreshold, "which should be greater then zero for job worker and using instead", builder.concurrency)
}
return builder
}

func (builder *JobWorkerBuilder) FetchVariables(fetchVariables ...string) JobWorkerBuilderStep3 {
builder.request.FetchVariable = fetchVariables
return builder
}

func (builder *JobWorkerBuilder) Open() JobWorker {
Expand Down
8 changes: 8 additions & 0 deletions clients/go/worker/jobWorker_builder_test.go
Expand Up @@ -84,3 +84,11 @@ func TestJobWorkerBuilder_PollThreshold(t *testing.T) {
builder.PollThreshold(0)
assert.Equal(t, 0.12, builder.pollThreshold)
}

func TestJobWorkerBuilder_FetchVariables(t *testing.T) {
fetchVariables := []string{"foo", "bar", "baz"}

builder := JobWorkerBuilder{}
builder.FetchVariables(fetchVariables...)
assert.Equal(t, fetchVariables, builder.request.FetchVariable)
}
4 changes: 3 additions & 1 deletion clients/zbctl/cmd/activateJobs.go
Expand Up @@ -29,6 +29,7 @@ var (
activateJobsAmountFlag int32
activateJobsWorkerFlag string
activateJobsTimeoutFlag time.Duration
activateJobsFetchVariablesFlag []string
)

var activateJobsCmd = &cobra.Command{
Expand All @@ -38,7 +39,7 @@ var activateJobsCmd = &cobra.Command{
PreRunE: initClient,
RunE: func(cmd *cobra.Command, args []string) error {
jobType := args[0]
jobs, err := client.NewActivateJobsCommand().JobType(jobType).Amount(activateJobsAmountFlag).WorkerName(activateJobsWorkerFlag).Timeout(activateJobsTimeoutFlag).Send()
jobs, err := client.NewActivateJobsCommand().JobType(jobType).Amount(activateJobsAmountFlag).WorkerName(activateJobsWorkerFlag).Timeout(activateJobsTimeoutFlag).FetchVariables(activateJobsFetchVariablesFlag...).Send()
if err != nil {
return err
}
Expand All @@ -63,4 +64,5 @@ func init() {
activateJobsCmd.Flags().Int32Var(&activateJobsAmountFlag, "amount", 1, "Specify amount of jobs to activate")
activateJobsCmd.Flags().StringVar(&activateJobsWorkerFlag, "worker", DefaultJobWorkerName, "Specify the name of the worker")
activateJobsCmd.Flags().DurationVar(&activateJobsTimeoutFlag, "timeout", commands.DefaultJobTimeout, "Specify the timeout of the activated job")
activateJobsCmd.Flags().StringSliceVar(&activateJobsFetchVariablesFlag, "variables", []string{}, "Specify the list of variable names which should be fetch on job activation (comma-separated)")
}

0 comments on commit 911aeea

Please sign in to comment.