/
event.go
193 lines (174 loc) · 5.91 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package principal
import (
"context"
"fmt"
"time"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/jannfis/argocd-agent/internal/event"
"github.com/jannfis/argocd-agent/internal/namedlock"
"github.com/jannfis/argocd-agent/pkg/types"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"k8s.io/client-go/util/workqueue"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
// processRecvQueue processes an entry from the receiver queue, which holds the
// events received by agents. It will trigger updates of resources in the
// server's backend.
func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workqueue.RateLimitingInterface) error {
i, _ := q.Get()
// ev, ok := i.(*event.LegacyEvent)
ev, ok := i.(*cloudevents.Event)
if !ok {
return fmt.Errorf("invalid data in queue: have:%T want:%T", i, ev)
}
agentMode := s.agentMode(agentName)
incoming := &v1alpha1.Application{}
logCtx := log().WithFields(logrus.Fields{
"module": "QueueProcessor",
"client": agentName,
"mode": agentMode.String(),
"event": ev.Type(),
"incoming": incoming.QualifiedName(),
})
logCtx.Debugf("Processing event")
var err error
target := event.Target(ev)
switch target {
case event.TargetApplication:
err = s.processApplicationEvent(ctx, agentName, ev)
case event.TargetAppProject:
default:
err = fmt.Errorf("unable to process event of unknown type %s", target)
}
q.Done(ev)
return err
}
func (s *Server) processApplicationEvent(ctx context.Context, agentName string, ev *cloudevents.Event) error {
incoming := &v1alpha1.Application{}
err := ev.DataAs(incoming)
if err != nil {
return err
}
agentMode := s.agentMode(agentName)
logCtx := log().WithFields(logrus.Fields{
"module": "QueueProcessor",
"client": agentName,
"mode": agentMode.String(),
"event": ev.Type(),
"incoming": incoming.QualifiedName(),
})
switch ev.Type() {
case event.Create.String():
if agentMode == types.AgentModeAutonomous {
incoming.SetNamespace(agentName)
_, err := s.appManager.Create(ctx, incoming)
if err != nil {
return fmt.Errorf("could not create application %s: %w", incoming.QualifiedName(), err)
}
} else {
logCtx.Debugf("Discarding event, because agent is not in autonomous mode")
return nil
}
case event.StatusUpdate.String():
var err error
if agentMode == types.AgentModeAutonomous {
_, err = s.appManager.UpdateAutonomousApp(ctx, agentName, incoming)
} else {
err = fmt.Errorf("event type not allowed when mode is not autonomous")
}
if err != nil {
return fmt.Errorf("could not update application status for %s: %w", incoming.QualifiedName(), err)
}
logCtx.Infof("Updated application status %s", incoming.QualifiedName())
case event.SpecUpdate.String():
var err error
if agentMode == types.AgentModeManaged {
_, err = s.appManager.UpdateStatus(ctx, agentName, incoming)
} else {
err = fmt.Errorf("event type not allowed when mode is not managed")
}
if err != nil {
return fmt.Errorf("could not update application status for %s: %w", incoming.QualifiedName(), err)
}
logCtx.Infof("Updated application spec %s", incoming.QualifiedName())
default:
return fmt.Errorf("unable to process event of type %s", ev.Type())
}
return nil
}
// eventProcessor is the main loop to process event from the receiver queue,
// i.e. events coming from the connect agents. It will process events from
// different agents in parallel, but it will not parallelize processing of
// events from the same queue. These latter events need to be processed in
// sequential order, in any case.
func (s *Server) eventProcessor(ctx context.Context) error {
sem := semaphore.NewWeighted(s.options.eventProcessors)
queueLock := namedlock.NewNamedLock()
logCtx := log().WithField("module", "EventProcessor")
for {
for _, queueName := range s.queues.Names() {
select {
case <-ctx.Done():
logCtx.Infof("Shutting down event processor")
return nil
default:
// Though unlikely, the agent might have disconnected, and
// the queue will be gone. In this case, we'll just skip.
q := s.queues.RecvQ(queueName)
if q == nil {
logCtx.Debugf("Queue disappeared -- client probably has disconnected")
break
}
// Since q.Get() is blocking, we want to make sure something is actually
// in the queue before we try to grab it.
if q.Len() == 0 {
break
}
// We lock this specific queue, so that we won't process two
// items of the same queue at the same time. Queues must be
// processed in FIFO order, always.
//
// If it's not possible to get a lock (i.e. a lock is already
// being held elsewhere), we continue with the next queue.
if !queueLock.TryLock(queueName) {
break
}
logCtx.Trace("Acquired queue lock")
err := sem.Acquire(ctx, 1)
if err != nil {
logCtx.Tracef("Error acquiring semaphore: %v", err)
queueLock.Unlock(queueName)
break
}
logCtx.Trace("Acquired semaphore")
go func(agentName string, q workqueue.RateLimitingInterface) {
defer func() {
sem.Release(1)
queueLock.Unlock(agentName)
}()
err := s.processRecvQueue(ctx, agentName, q)
if err != nil {
logCtx.WithField("client", agentName).WithError(err).Errorf("Could not process agent recveiver queue")
}
}(queueName, q)
}
}
// Give the CPU a little rest when no agents are connected
time.Sleep(10 * time.Millisecond)
}
}
// StartEventProcessor will start the event processor, which processes items
// from all queues as the items appear in the queues. Processing will be
// performed in parallel, and in the background, until the context ctx is done.
//
// If an error occurs before the processor could be started, it will be
// returned.
func (s *Server) StartEventProcessor(ctx context.Context) error {
var err error
go func() {
log().Infof("Starting event processor")
err = s.eventProcessor(ctx)
}()
return err
}