-
Notifications
You must be signed in to change notification settings - Fork 5
/
masterclient.go
99 lines (88 loc) · 2.75 KB
/
masterclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package gomes
import (
"bytes"
"code.google.com/p/goprotobuf/proto"
"fmt"
mesos "github.com/vladimirvivien/gomes/mesosproto"
"net"
"net/http"
"time"
)
type masterClient struct {
address address
httpClient http.Client
}
func newMasterClient(master string) *masterClient {
return &masterClient{
address: address(master),
httpClient: http.Client{
Transport: &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, time.Second*7)
if err != nil {
return nil, err
}
return c, nil
},
DisableCompression: true,
},
},
}
}
func (client *masterClient) RegisterFramework(schedId schedProcID, framework *mesos.FrameworkInfo) error {
regMsg := &mesos.RegisterFrameworkMessage{Framework: framework}
return client.send(schedId, buildReqPath(REGISTER_FRAMEWORK_CALL), regMsg)
}
func (client *masterClient) UnregisterFramework(schedId schedProcID, frameworkId *mesos.FrameworkID) error {
msg := &mesos.UnregisterFrameworkMessage{FrameworkId: frameworkId}
return client.send(schedId, buildReqPath(UNREGISTER_FRAMEWORK_CALL), msg)
}
func (client *masterClient) DeactivateFramework(schedId schedProcID, frameworkId *mesos.FrameworkID) error {
msg := &mesos.DeactivateFrameworkMessage{FrameworkId: frameworkId}
return client.send(schedId, buildReqPath(DEACTIVATE_FRAMEWORK_CALL), msg)
}
func (client *masterClient) KillTask(schedId schedProcID, taskId *mesos.TaskID) error {
msg := &mesos.KillTaskMessage{TaskId: taskId}
return client.send(schedId, buildReqPath(KILL_TASK_CALL), msg)
}
func (client *masterClient) LaunchTasks(
schedId schedProcID,
frameworkId *mesos.FrameworkID,
offerIds []*mesos.OfferID,
tasks []*mesos.TaskInfo,
filters *mesos.Filters,
) error {
msg := &mesos.LaunchTasksMessage{
FrameworkId: frameworkId,
OfferIds: offerIds,
Tasks: tasks,
Filters: filters,
}
return client.send(schedId, buildReqPath(LAUNCH_TASKS_CALL), msg)
}
func (client *masterClient) send(from schedProcID, reqPath string, msg proto.Message) error {
u, err := client.address.AsHttpURL()
if err != nil {
return err
}
u.Path = reqPath
data, err := proto.Marshal(msg)
if err != nil {
return err
}
req, err := http.NewRequest(HTTP_POST_METHOD, u.String(), bytes.NewReader(data))
req.Header.Add("Content-Type", HTTP_CONTENT_TYPE)
req.Header.Add("Connection", "Keep-Alive")
req.Header.Add("Libprocess-From", from.value)
rsp, err := client.httpClient.Do(req)
if err != nil {
return err
}
if rsp.StatusCode != http.StatusAccepted {
return fmt.Errorf("Master did not accept request %s. Returned status %s.", u.String(), rsp.Status)
}
return nil
}
func buildReqPath(message string) string {
return "/" + HTTP_MASTER_PREFIX + "/" + MESOS_INTERNAL_PREFIX + message
}