/
client.go
134 lines (123 loc) · 4.12 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package worker
import (
"context"
"fmt"
"os"
"path"
"strconv"
"github.com/gogo/protobuf/types"
"github.com/pachyderm/pachyderm/src/client"
"github.com/pachyderm/pachyderm/src/client/debug"
"github.com/pachyderm/pachyderm/src/client/pps"
etcd "github.com/coreos/etcd/clientv3"
"google.golang.org/grpc"
)
const (
// WorkerEtcdPrefix is the prefix in etcd that we use to store worker information.
WorkerEtcdPrefix = "workers"
)
// Status returns the statuses of workers referenced by pipelineRcName.
// pipelineRcName is the name of the pipeline's RC and can be gotten with
// ppsutil.PipelineRcName. You can also pass "" for pipelineRcName to get all
// clients for all workers.
func Status(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16) ([]*pps.WorkerStatus, error) {
workerClients, err := Clients(ctx, pipelineRcName, etcdClient, etcdPrefix, workerGrpcPort)
if err != nil {
return nil, err
}
var result []*pps.WorkerStatus
for _, workerClient := range workerClients {
status, err := workerClient.Status(ctx, &types.Empty{})
if err != nil {
return nil, err
}
result = append(result, status)
}
return result, nil
}
// Cancel cancels a set of datums running on workers.
// pipelineRcName is the name of the pipeline's RC and can be gotten with
// ppsutil.PipelineRcName.
func Cancel(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client,
etcdPrefix string, workerGrpcPort uint16, jobID string, dataFilter []string) error {
workerClients, err := Clients(ctx, pipelineRcName, etcdClient, etcdPrefix, workerGrpcPort)
if err != nil {
return err
}
success := false
for _, workerClient := range workerClients {
resp, err := workerClient.Cancel(ctx, &CancelRequest{
JobID: jobID,
DataFilters: dataFilter,
})
if err != nil {
return err
}
if resp.Success {
success = true
}
}
if !success {
return fmt.Errorf("datum matching filter %+v could not be found for jobID %s", dataFilter, jobID)
}
return nil
}
// Conns returns a slice of connections to worker servers.
// pipelineRcName is the name of the pipeline's RC and can be gotten with
// ppsutil.PipelineRcName. You can also pass "" for pipelineRcName to get all
// clients for all workers.
func Conns(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16) ([]*grpc.ClientConn, error) {
resp, err := etcdClient.Get(ctx, path.Join(etcdPrefix, WorkerEtcdPrefix, pipelineRcName), etcd.WithPrefix())
if err != nil {
return nil, err
}
var result []*grpc.ClientConn
for _, kv := range resp.Kvs {
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", path.Base(string(kv.Key)), workerGrpcPort),
append(client.DefaultDialOptions(), grpc.WithInsecure())...)
if err != nil {
return nil, err
}
result = append(result, conn)
}
return result, nil
}
// Client combines the WorkerAPI and the DebugAPI into a single client.
type Client struct {
WorkerClient
debug.DebugClient
}
func newClient(conn *grpc.ClientConn) Client {
return Client{
NewWorkerClient(conn),
debug.NewDebugClient(conn),
}
}
// Clients returns a slice of worker clients for a pipeline.
// pipelineRcName is the name of the pipeline's RC and can be gotten with
// ppsutil.PipelineRcName. You can also pass "" for pipelineRcName to get all
// clients for all workers.
func Clients(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16) ([]Client, error) {
conns, err := Conns(ctx, pipelineRcName, etcdClient, etcdPrefix, workerGrpcPort)
if err != nil {
return nil, err
}
var result []Client
for _, conn := range conns {
result = append(result, newClient(conn))
}
return result, nil
}
// NewClient returns a worker client for the worker at the IP address passed in.
func NewClient(address string) (Client, error) {
port, err := strconv.Atoi(os.Getenv(client.PPSWorkerPortEnv))
if err != nil {
return Client{}, err
}
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", address, port),
append(client.DefaultDialOptions(), grpc.WithInsecure())...)
if err != nil {
return Client{}, err
}
return newClient(conn), nil
}