-
Notifications
You must be signed in to change notification settings - Fork 6
/
deviceagent.go
137 lines (116 loc) · 4.28 KB
/
deviceagent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package device_agent
import (
"context"
"sync"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/nais/device/internal/notify"
"github.com/nais/device/internal/device-agent/config"
"github.com/nais/device/internal/device-agent/runtimeconfig"
"github.com/nais/device/internal/device-agent/statemachine"
"github.com/nais/device/internal/pb"
)
type DeviceAgentServer struct {
pb.UnimplementedDeviceAgentServer
AgentStatus *pb.AgentStatus
lock sync.RWMutex
statusChannels map[uuid.UUID]chan *pb.AgentStatus
Config *config.Config
notifier notify.Notifier
rc runtimeconfig.RuntimeConfig
log *logrus.Entry
sendEvent func(statemachine.EventWithSpan)
}
func (das *DeviceAgentServer) Login(ctx context.Context, request *pb.LoginRequest) (*pb.LoginResponse, error) {
das.sendEvent(statemachine.SpanEvent(ctx, statemachine.EventLogin))
return &pb.LoginResponse{}, nil
}
func (das *DeviceAgentServer) Logout(ctx context.Context, request *pb.LogoutRequest) (*pb.LogoutResponse, error) {
das.sendEvent(statemachine.SpanEvent(ctx, statemachine.EventDisconnect))
return &pb.LogoutResponse{}, nil
}
func (das *DeviceAgentServer) Status(request *pb.AgentStatusRequest, statusServer pb.DeviceAgent_StatusServer) error {
id := uuid.New()
das.log.Debug("grpc: client connection established to device helper")
agentStatusChan := make(chan *pb.AgentStatus, 8)
agentStatusChan <- das.AgentStatus
das.lock.Lock()
das.statusChannels[id] = agentStatusChan
das.lock.Unlock()
defer func() {
das.log.Debugf("grpc: client connection with device helper closed")
if !request.GetKeepConnectionOnComplete() {
das.log.Debugf("grpc: keepalive not requested, tearing down connections...")
das.sendEvent(statemachine.SpanEvent(statusServer.Context(), statemachine.EventDisconnect))
}
close(agentStatusChan)
das.lock.Lock()
delete(das.statusChannels, id)
das.lock.Unlock()
}()
for {
select {
case <-statusServer.Context().Done():
return nil
case status := <-agentStatusChan:
err := statusServer.Send(status)
if err != nil {
das.log.Errorf("while sending agent status: %s", err)
}
}
}
}
func (das *DeviceAgentServer) ConfigureJITA(context.Context, *pb.ConfigureJITARequest) (*pb.ConfigureJITAResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ConfigureJITA not implemented")
}
func (das *DeviceAgentServer) UpdateAgentStatus(status *pb.AgentStatus) {
das.AgentStatus = status
das.lock.RLock()
for _, c := range das.statusChannels {
select {
case c <- status:
default:
das.log.Errorf("BUG: update agent status: channel is full")
}
}
das.lock.RUnlock()
}
func (das *DeviceAgentServer) SetAgentConfiguration(ctx context.Context, req *pb.SetAgentConfigurationRequest) (*pb.SetAgentConfigurationResponse, error) {
das.Config.AgentConfiguration = req.Config
das.Config.PersistAgentConfiguration(das.log)
return &pb.SetAgentConfigurationResponse{}, nil
}
func (das *DeviceAgentServer) GetAgentConfiguration(ctx context.Context, req *pb.GetAgentConfigurationRequest) (*pb.GetAgentConfigurationResponse, error) {
return &pb.GetAgentConfigurationResponse{
Config: das.Config.AgentConfiguration,
}, nil
}
func (das *DeviceAgentServer) SetActiveTenant(ctx context.Context, req *pb.SetActiveTenantRequest) (*pb.SetActiveTenantResponse, error) {
if err := das.rc.SetActiveTenant(req.Name); err != nil {
das.notifier.Errorf("while activating tenant: %s", err)
das.sendEvent(statemachine.SpanEvent(ctx, statemachine.EventDisconnect))
return &pb.SetActiveTenantResponse{}, nil
}
das.sendEvent(statemachine.SpanEvent(ctx, statemachine.EventDisconnect))
das.log.Infof("activated tenant: %s", req.Name)
return &pb.SetActiveTenantResponse{}, nil
}
func NewServer(ctx context.Context,
log *logrus.Entry,
cfg *config.Config,
rc runtimeconfig.RuntimeConfig,
notifier notify.Notifier,
sendEvent func(statemachine.EventWithSpan),
) *DeviceAgentServer {
return &DeviceAgentServer{
log: log,
AgentStatus: &pb.AgentStatus{ConnectionState: pb.AgentState_Disconnected},
statusChannels: make(map[uuid.UUID]chan *pb.AgentStatus),
Config: cfg,
rc: rc,
notifier: notifier,
sendEvent: sendEvent,
}
}