Skip to content

Commit

Permalink
Fix missing events when intercept or agent is removed.
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Hallgren <thomas@datawire.io>
  • Loading branch information
thallgren committed May 16, 2024
1 parent 1d52509 commit f16110f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
45 changes: 43 additions & 2 deletions cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"google.golang.org/protobuf/proto"
empty "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/datawire/dlib/derror"
Expand Down Expand Up @@ -961,6 +962,8 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.
ticker := time.NewTicker(time.Duration(math.MaxInt64))
defer ticker.Stop()

var agentInfos map[string]*rpc.AgentInfo

start := time.Now()

sendEvents := func() {
Expand Down Expand Up @@ -1059,8 +1062,30 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.
if !ok {
return nil
}
agm := ass.State
for _, a := range agm {
oldAgentInfos := agentInfos
agentInfos = ass.State
for k, a := range oldAgentInfos {
if _, ok = agentInfos[k]; !ok {
name := a.Name
as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED
dlog.Debugf(ctx, "AgentInfo %s.%s %s", a.Name, a.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
wl := w.Workload
if wl.AgentState != as {
wl.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
} else {
dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err)
if errors.IsNotFound(err) {
addEvent(state.EventTypeDelete, wl, as)
}
}
}
}
for _, a := range agentInfos {
name := a.Name
as := rpc.WorkloadInfo_INSTALLED
if isIntercepted(name, a.Namespace) {
Expand All @@ -1085,7 +1110,23 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.
if !ok {
return nil
}
oldInterceptInfos := interceptInfos
interceptInfos = is.State
for k, ii := range oldInterceptInfos {
if _, ok = interceptInfos[k]; !ok {
name := ii.Spec.Agent
as := rpc.WorkloadInfo_INSTALLED
dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
if w.Workload.AgentState != as {
w.Workload.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, ii.Spec.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
}
}
}
for _, ii := range interceptInfos {
name := ii.Spec.Agent
as := rpc.WorkloadInfo_INSTALLED
Expand Down
1 change: 0 additions & 1 deletion integration_test/workspace_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (s *notConnectedSuite) Test_WorkspaceListener() {
// 5. Remove the deployment
go func() {
defer cancel()
defer s.DeleteSvcAndWorkload(ctx, "deploy", "echo-easy")
s.ApplyApp(ctx, "echo-easy", "deploy/echo-easy")
ir := &manager.CreateInterceptRequest{
Session: clientSession,
Expand Down

0 comments on commit f16110f

Please sign in to comment.