forked from coreos/fleet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
148 lines (117 loc) · 3.43 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package server
import (
"encoding/json"
"errors"
"net/http"
"time"
"github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/go-systemd/activation"
log "github.com/coreos/fleet/Godeps/_workspace/src/github.com/golang/glog"
"github.com/coreos/fleet/agent"
"github.com/coreos/fleet/api"
"github.com/coreos/fleet/config"
"github.com/coreos/fleet/engine"
"github.com/coreos/fleet/etcd"
"github.com/coreos/fleet/event"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
"github.com/coreos/fleet/sign"
"github.com/coreos/fleet/systemd"
"github.com/coreos/fleet/unit"
"github.com/coreos/fleet/version"
)
const (
// machineStateRefreshInterval is the amount of time the server will
// wait before each attempt to refresh the local machine state
machineStateRefreshInterval = time.Minute
)
type Server struct {
agent *agent.Agent
engine *engine.Engine
rStream *registry.EventStream
sStream *systemd.EventStream
eBus *event.EventBus
mach *machine.CoreOSMachine
stop chan bool
}
func New(cfg config.Config) (*Server, error) {
mach, err := newMachineFromConfig(cfg)
if err != nil {
return nil, err
}
mgr, err := systemd.NewSystemdUnitManager(systemd.DefaultUnitsDirectory)
if err != nil {
return nil, err
}
eClient, err := etcd.NewClient(cfg.EtcdServers, http.Transport{})
if err != nil {
return nil, err
}
reg := registry.New(eClient, cfg.EtcdKeyPrefix)
a, err := newAgentFromConfig(mach, reg, cfg, mgr)
if err != nil {
return nil, err
}
e := engine.New(reg, mach)
sStream := systemd.NewEventStream(mgr)
rStream, err := registry.NewEventStream(eClient, reg)
if err != nil {
return nil, err
}
aHandler := agent.NewEventHandler(a)
eHandler := engine.NewEventHandler(e)
eBus := event.NewEventBus()
eBus.AddListener("engine", eHandler)
eBus.AddListener("agent", aHandler)
listeners, err := activation.Listeners(false)
if err != nil {
return nil, err
}
mux := api.NewServeMux(reg)
for _, f := range listeners {
go http.Serve(f, mux)
}
return &Server{a, e, rStream, sStream, eBus, mach, nil}, nil
}
func newMachineFromConfig(cfg config.Config) (*machine.CoreOSMachine, error) {
state := machine.MachineState{
PublicIP: cfg.PublicIP,
Metadata: cfg.Metadata(),
Version: version.Version,
}
mach := machine.NewCoreOSMachine(state)
mach.Refresh()
if mach.State().ID == "" {
return nil, errors.New("unable to determine local machine ID")
}
return mach, nil
}
func newAgentFromConfig(mach machine.Machine, reg registry.Registry, cfg config.Config, mgr unit.UnitManager) (*agent.Agent, error) {
var verifier *sign.SignatureVerifier
if cfg.VerifyUnits {
var err error
verifier, err = sign.NewSignatureVerifierFromAuthorizedKeysFile(cfg.AuthorizedKeysFile)
if err != nil {
log.Errorln("Failed to get any key from authorized key file in verify_units mode:", err)
verifier = sign.NewSignatureVerifier()
}
}
return agent.New(mgr, reg, mach, cfg.AgentTTL, verifier)
}
func (s *Server) Run() {
idx := s.agent.Initialize()
s.stop = make(chan bool)
go s.mach.PeriodicRefresh(machineStateRefreshInterval, s.stop)
go s.rStream.Stream(idx, s.eBus.Dispatch, s.stop)
go s.sStream.Stream(s.eBus.Dispatch, s.stop)
go s.agent.Heartbeat(s.stop)
s.engine.CheckForWork()
}
func (s *Server) Stop() {
close(s.stop)
}
func (s *Server) Purge() {
s.agent.Purge()
}
func (s *Server) MarshalJSON() ([]byte, error) {
return json.Marshal(struct{ Agent *agent.Agent }{Agent: s.agent})
}