This repository has been archived by the owner on Jun 18, 2020. It is now read-only.
forked from waffle-iron/core0
/
local.go
130 lines (110 loc) · 2.6 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"encoding/json"
"fmt"
"github.com/threefoldtech/0-core/base/pm"
"github.com/threefoldtech/0-core/base/utils"
"github.com/threefoldtech/0-core/apps/core0/subsys/containers"
"net"
"os"
"strconv"
"strings"
)
type Local struct {
listener *net.UnixListener
mgr containers.ContainerManager
}
type LocalCmd struct {
Sync bool `json:"sync"`
Container string `json:"container"`
Content json.RawMessage `json:"content"`
}
func NewLocal(mgr containers.ContainerManager, s string) (*Local, error) {
if utils.Exists(s) {
os.Remove(s)
}
addr, err := net.ResolveUnixAddr("unix", s)
if err != nil {
return nil, err
}
listener, err := net.ListenUnix("unix", addr)
if err != nil {
return nil, err
}
return &Local{
mgr: mgr,
listener: listener,
}, nil
}
func (l *Local) container(x string) containers.Container {
if x == "" {
return nil
}
id, err := strconv.ParseUint(x, 10, 16)
if err == nil {
return l.mgr.Of(uint16(id))
}
tags := strings.Split(x, ",")
return l.mgr.GetOneWithTags(tags...)
}
func (l *Local) server(con net.Conn) {
//read command
result := &pm.JobResult{
State: pm.StateError,
}
defer func() {
//send result
m, _ := json.Marshal(result)
if _, err := con.Write(m); err != nil {
log.Errorf("Failed to write response to local transport: %s", err)
}
con.Close()
}()
decoder := json.NewDecoder(con)
var lcmd LocalCmd
if err := decoder.Decode(&lcmd); err != nil {
result.Streams = pm.Streams{"", fmt.Sprintf("Failed to decode message: %s", err)}
return
}
cmd, err := pm.LoadCmd(lcmd.Content)
if err != nil {
result.Streams = pm.Streams{"", fmt.Sprintf("Failed to extract command: %s", err)}
return
}
container := l.container(lcmd.Container)
if lcmd.Container != "" && container == nil {
result.Streams = pm.Streams{"", fmt.Sprintf("couldn't match any containers with '%s'", lcmd.Container)}
return
}
if container == nil {
job, err := pm.Run(cmd)
if err != nil {
result.Streams = pm.Streams{"", fmt.Sprintf("Failed to get result job for command(%s): %s", cmd.Command, err)}
return
}
if lcmd.Sync {
result = job.Wait()
}
return
} else {
contjob, err := l.mgr.Dispatch(container.ID(), cmd)
if err != nil {
result.Streams = pm.Streams{"", fmt.Sprintf("Failed to dispatch command (%s): %s", cmd.Command, err)}
return
}
result = contjob
}
}
func (l *Local) start() {
defer l.listener.Close()
for {
con, err := l.listener.Accept()
if err != nil {
log.Errorf("local transport error: %s", err)
}
go l.server(con)
}
}
func (l *Local) Start() {
go l.start()
}