This repository has been archived by the owner on Feb 5, 2024. It is now read-only.
/
client.go
85 lines (78 loc) · 2.49 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Copyright 2018 The OpenPitrix Authors. All rights reserved.
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.
package task
import (
"context"
"fmt"
"time"
"openpitrix.io/openpitrix/pkg/constants"
"openpitrix.io/openpitrix/pkg/logger"
"openpitrix.io/openpitrix/pkg/manager"
"openpitrix.io/openpitrix/pkg/models"
"openpitrix.io/openpitrix/pkg/pb"
"openpitrix.io/openpitrix/pkg/util/funcutil"
)
type Client struct {
pb.TaskManagerClient
}
func NewClient() (*Client, error) {
conn, err := manager.NewClient(constants.TaskManagerHost, constants.TaskManagerPort)
if err != nil {
return nil, err
}
return &Client{
TaskManagerClient: pb.NewTaskManagerClient(conn),
}, nil
}
func (c *Client) WaitTask(ctx context.Context, taskId string, timeout time.Duration, waitInterval time.Duration) error {
logger.Debug(ctx, "Waiting for task [%s] finished", taskId)
return funcutil.WaitForSpecificOrError(func() (bool, error) {
taskRequest := &pb.DescribeTasksRequest{
TaskId: []string{taskId},
}
taskResponse, err := c.DescribeTasks(ctx, taskRequest)
if err != nil {
//network or api error, not considered task fail.
return false, nil
}
if len(taskResponse.TaskSet) == 0 {
return false, fmt.Errorf("Can not find task [%s]. ", taskId)
}
t := taskResponse.TaskSet[0]
if t.Status == nil {
logger.Error(ctx, "Task [%s] status is nil", taskId)
return false, nil
}
if t.Status.GetValue() == constants.StatusWorking || t.Status.GetValue() == constants.StatusPending {
return false, nil
}
if t.Status.GetValue() == constants.StatusSuccessful {
return true, nil
}
if t.Status.GetValue() == constants.StatusFailed {
return false, fmt.Errorf("Task [%s] failed. ", taskId)
}
logger.Error(ctx, "Unknown status [%s] for task [%s]. ", t.Status.GetValue(), taskId)
return false, nil
}, timeout, waitInterval)
}
func (c *Client) SendTask(ctx context.Context, task *models.Task) (string, error) {
pbTask := models.TaskToPb(task)
taskRequest := &pb.CreateTaskRequest{
JobId: pbTask.JobId,
NodeId: pbTask.NodeId,
Target: pbTask.Target,
TaskAction: pbTask.TaskAction,
Directive: pbTask.Directive,
FailureAllowed: pbTask.FailureAllowed,
Status: pbTask.Status,
}
response, err := c.CreateTask(ctx, taskRequest)
taskId := response.GetTaskId().GetValue()
if err != nil {
logger.Error(ctx, "Failed to create task [%s]: %+v", taskId, err)
return "", err
}
return taskId, nil
}