Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WorkloadEventsWatcher to the gRPC API. #3596

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
264 changes: 264 additions & 0 deletions cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manager

import (
"context"
"math"
"sort"
"strings"
"time"
Expand All @@ -12,7 +13,10 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
empty "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/datawire/dlib/derror"
Expand All @@ -25,6 +29,7 @@ import (
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/state"
"github.com/telepresenceio/telepresence/v2/pkg/agentmap"
"github.com/telepresenceio/telepresence/v2/pkg/dnsproxy"
"github.com/telepresenceio/telepresence/v2/pkg/iputil"
"github.com/telepresenceio/telepresence/v2/pkg/tracing"
Expand Down Expand Up @@ -891,6 +896,265 @@ func (s *service) WatchClusterInfo(session *rpc.SessionInfo, stream rpc.Manager_
return s.clusterInfo.Watch(ctx, stream)
}

//nolint:cyclop,gocyclo,gocognit // complex to avoid extremely specialized functions
func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.Manager_WatchWorkloadsServer) (err error) {
ctx := managerutil.WithSessionInfo(stream.Context(), request.SessionInfo)
defer func() {
if r := recover(); r != nil {
err = derror.PanicToError(r)
dlog.Errorf(ctx, "WatchWorkloads panic: %+v", err)
err = status.Errorf(codes.Internal, err.Error())
}
}()
dlog.Debugf(ctx, "WatchWorkloads called")

clientSession := request.SessionInfo.SessionId
clientInfo := s.state.GetClient(clientSession)
if clientInfo == nil {
return status.Errorf(codes.NotFound, "Client session %q not found", clientSession)
}
ns := clientInfo.Namespace

agentsCh := s.state.WatchAgents(ctx, func(_ string, info *rpc.AgentInfo) bool {
return info.Namespace == ns
})
interceptsCh := s.state.WatchIntercepts(ctx, func(_ string, info *rpc.InterceptInfo) bool {
return info.ClientSession.SessionId == clientSession
})
workloadsCh, err := s.state.WatchWorkloads(ctx, clientSession)
if err != nil {
return err
}

sessionDone, err := s.state.SessionDone(clientSession)
if err != nil {
return err
}

var interceptInfos map[string]*rpc.InterceptInfo
isIntercepted := func(name, namespace string) bool {
for _, ii := range interceptInfos {
if name == ii.Spec.Agent && namespace == ii.Spec.Namespace && ii.Disposition == rpc.InterceptDispositionType_ACTIVE {
return true
}
}
return false
}

rpcKind := func(s string) rpc.WorkloadInfo_Kind {
switch strings.ToLower(s) {
case "deployment":
return rpc.WorkloadInfo_DEPLOYMENT
case "replicaset":
return rpc.WorkloadInfo_REPLICASET
case "statefulset":
return rpc.WorkloadInfo_STATEFULSET
default:
return rpc.WorkloadInfo_UNSPECIFIED
}
}

// Send events if we're idle longer than this, otherwise wait for more data
const maxIdleTime = 5 * time.Millisecond
workloadEvents := make(map[string]*rpc.WorkloadEvent)
var lastEvents map[string]*rpc.WorkloadEvent

ticker := time.NewTicker(time.Duration(math.MaxInt64))
defer ticker.Stop()

var agentInfos map[string]*rpc.AgentInfo

start := time.Now()

sendEvents := func() {
// Time to send what we have
ticker.Reset(time.Duration(math.MaxInt64))
evs := make([]*rpc.WorkloadEvent, 0, len(workloadEvents))
for k, rew := range workloadEvents {
if lew, ok := lastEvents[k]; ok {
if proto.Equal(lew, rew) {
continue
}
}
evs = append(evs, rew)
}
if len(evs) == 0 {
return
}
dlog.Debugf(ctx, "Sending %d WorkloadEvents", len(evs))
err = stream.Send(&rpc.WorkloadEventsDelta{
Since: timestamppb.New(start),
Events: evs,
})
if err != nil {
dlog.Warnf(ctx, "failed to send workload events delta: %v", err)
return
}
lastEvents = workloadEvents
workloadEvents = make(map[string]*rpc.WorkloadEvent)
start = time.Now()
}

rpcWorkload := func(wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState) *rpc.WorkloadInfo {
return &rpc.WorkloadInfo{
Kind: rpcKind(wl.GetKind()),
Name: wl.GetName(),
Namespace: wl.GetNamespace(),
AgentState: as,
}
}

addEvent := func(eventType state.EventType, wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState) {
workloadEvents[wl.GetName()] = &rpc.WorkloadEvent{
Type: rpc.WorkloadEvent_Type(eventType),
Workload: rpcWorkload(wl, as),
}
sendEvents()
}

for {
select {
case <-sessionDone:
// The Manager believes this session has ended.
return nil

case <-ticker.C:
sendEvents()

// All events arriving at the workload channel are significant
case wes, ok := <-workloadsCh:
if !ok {
return nil
}
for _, we := range wes {
wl := we.Workload
if w, ok := workloadEvents[wl.GetName()]; ok {
if we.Type == state.EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED {
w.Type = rpc.WorkloadEvent_DELETED
dlog.Debugf(ctx, "WorkloadEvent DELETED %s.%s", wl.GetName(), wl.GetNamespace())
ticker.Reset(maxIdleTime)
}
} else {
as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED
if s.state.HasAgent(wl.GetName(), wl.GetNamespace()) {
if isIntercepted(wl.GetName(), wl.GetNamespace()) {
as = rpc.WorkloadInfo_INTERCEPTED
} else {
as = rpc.WorkloadInfo_INSTALLED
}
}

// If we've sent an ADDED event for this workload, and this is a MODIFIED event without any changes that
// we care about, then just skip it.
if we.Type == state.EventTypeUpdate {
lew, ok := lastEvents[wl.GetName()]
if ok && (lew.Type == rpc.WorkloadEvent_ADDED_UNSPECIFIED || lew.Type == rpc.WorkloadEvent_MODIFIED) && proto.Equal(lew.Workload, rpcWorkload(we.Workload, as)) {
break
}
}
dlog.Debugf(ctx, "WorkloadEvent %d %s %s.%s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), as)
addEvent(we.Type, wl, as)
}
}

// Events that arrive at the agent channel should be counted as modifications.
case ass, ok := <-agentsCh:
if !ok {
return nil
}
oldAgentInfos := agentInfos
agentInfos = ass.State
for k, a := range oldAgentInfos {
if _, ok = agentInfos[k]; !ok {
name := a.Name
as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED
dlog.Debugf(ctx, "AgentInfo %s.%s %s", a.Name, a.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
wl := w.Workload
if wl.AgentState != as {
wl.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
} else {
dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err)
if errors.IsNotFound(err) {
workloadEvents[name] = &rpc.WorkloadEvent{
Type: rpc.WorkloadEvent_DELETED,
Workload: &rpc.WorkloadInfo{
Name: name,
Namespace: a.Namespace,
AgentState: as,
},
}
sendEvents()
}
}
}
}
for _, a := range agentInfos {
name := a.Name
as := rpc.WorkloadInfo_INSTALLED
if isIntercepted(name, a.Namespace) {
as = rpc.WorkloadInfo_INTERCEPTED
}
dlog.Debugf(ctx, "AgentInfo %s.%s %s", a.Name, a.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
wl := w.Workload
if wl.AgentState != as {
wl.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
} else {
dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err)
}
}

// Events that arrive at the intercept channel should be counted as modifications.
case is, ok := <-interceptsCh:
if !ok {
return nil
}
oldInterceptInfos := interceptInfos
interceptInfos = is.State
for k, ii := range oldInterceptInfos {
if _, ok = interceptInfos[k]; !ok {
name := ii.Spec.Agent
as := rpc.WorkloadInfo_INSTALLED
dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
if w.Workload.AgentState != as {
w.Workload.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, ii.Spec.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
}
}
}
for _, ii := range interceptInfos {
name := ii.Spec.Agent
as := rpc.WorkloadInfo_INSTALLED
if ii.Disposition == rpc.InterceptDispositionType_ACTIVE {
as = rpc.WorkloadInfo_INTERCEPTED
}
dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
if w.Workload.AgentState != as {
w.Workload.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, ii.Spec.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
}
}
}
}
}

const agentSessionTTL = 15 * time.Second

// expire removes stale sessions.
Expand Down
43 changes: 34 additions & 9 deletions cmd/traffic/cmd/manager/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type State interface {
CountTunnelIngress() uint64
CountTunnelEgress() uint64
ExpireSessions(context.Context, time.Time, time.Time)
GetAgent(string) *rpc.AgentInfo
GetActiveAgent(string) *rpc.AgentInfo
GetAgent(sessionID string) *rpc.AgentInfo
GetActiveAgent(sessionID string) *rpc.AgentInfo
GetAllClients() map[string]*rpc.ClientInfo
GetClient(string) *rpc.ClientInfo
GetClient(sessionID string) *rpc.ClientInfo
GetSession(string) SessionState
GetSessionConsumptionMetrics(string) *SessionConsumptionMetrics
GetAllSessionConsumptionMetrics() map[string]*SessionConsumptionMetrics
Expand All @@ -61,6 +61,7 @@ type State interface {
GetConnectActiveStatus() *prometheus.GaugeVec
GetInterceptCounter() *prometheus.CounterVec
GetInterceptActiveStatus() *prometheus.GaugeVec
HasAgent(name, namespace string) bool
MarkSession(*rpc.RemainRequest, time.Time) bool
NewInterceptInfo(string, *rpc.SessionInfo, *rpc.CreateInterceptRequest) *rpc.InterceptInfo
PostLookupDNSResponse(context.Context, *rpc.DNSAgentResponse)
Expand Down Expand Up @@ -89,6 +90,7 @@ type State interface {
WatchAgents(context.Context, func(sessionID string, agent *rpc.AgentInfo) bool) <-chan watchable.Snapshot[*rpc.AgentInfo]
WatchDial(sessionID string) <-chan *rpc.DialRequest
WatchIntercepts(context.Context, func(sessionID string, intercept *rpc.InterceptInfo) bool) <-chan watchable.Snapshot[*rpc.InterceptInfo]
WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error)
WatchLookupDNS(string) <-chan *rpc.DNSRequest
ValidateCreateAgent(context.Context, k8sapi.Workload, agentconfig.SidecarExt) error
}
Expand Down Expand Up @@ -132,6 +134,7 @@ type state struct {
interceptStates *xsync.MapOf[string, *interceptState]
timedLogLevel log.TimedLevel
llSubs *loglevelSubscribers
workloadWatchers *xsync.MapOf[string, WorkloadWatcher] // workload watchers, created on demand and keyed by namespace
tunnelCounter int32
tunnelIngressCounter uint64
tunnelEgressCounter uint64
Expand All @@ -149,12 +152,13 @@ var NewStateFunc = NewState //nolint:gochecknoglobals // extension point
func NewState(ctx context.Context) State {
loglevel := os.Getenv("LOG_LEVEL")
s := &state{
backgroundCtx: ctx,
sessions: xsync.NewMapOf[string, SessionState](),
agentsByName: xsync.NewMapOf[string, *xsync.MapOf[string, *rpc.AgentInfo]](),
interceptStates: xsync.NewMapOf[string, *interceptState](),
timedLogLevel: log.NewTimedLevel(loglevel, log.SetLevel),
llSubs: newLoglevelSubscribers(),
backgroundCtx: ctx,
sessions: xsync.NewMapOf[string, SessionState](),
agentsByName: xsync.NewMapOf[string, *xsync.MapOf[string, *rpc.AgentInfo]](),
interceptStates: xsync.NewMapOf[string, *interceptState](),
workloadWatchers: xsync.NewMapOf[string, WorkloadWatcher](),
timedLogLevel: log.NewTimedLevel(loglevel, log.SetLevel),
llSubs: newLoglevelSubscribers(),
}
s.self = s
return s
Expand Down Expand Up @@ -451,6 +455,11 @@ func (s *state) getAllAgents() map[string]*rpc.AgentInfo {
return s.agents.LoadAll()
}

func (s *state) HasAgent(name, namespace string) bool {
_, ok := s.agentsByName.Load(name)
return ok
}

func (s *state) getAgentsByName(name, namespace string) map[string]*rpc.AgentInfo {
agn, ok := s.agentsByName.Load(name)
if !ok {
Expand All @@ -477,6 +486,22 @@ func (s *state) WatchAgents(
}
}

func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) {
client := s.GetClient(sessionID)
if client == nil {
return nil, status.Errorf(codes.NotFound, "session %q not found", sessionID)
}
ns := client.Namespace
ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww WorkloadWatcher) {
ww, err = NewWorkloadWatcher(s.backgroundCtx, ns)
return ww
})
if err != nil {
return nil, err
}
return ww.Subscribe(ctx), nil
}

// Intercepts //////////////////////////////////////////////////////////////////////////////////////

func (s *state) AddIntercept(ctx context.Context, sessionID, clusterID string, cir *rpc.CreateInterceptRequest) (client *rpc.ClientInfo, ret *rpc.InterceptInfo, err error) {
Expand Down