Skip to content

Commit

Permalink
Merge pull request #3596 from telepresenceio/thallgren/workload-watcher
Browse files Browse the repository at this point in the history
Add WorkloadEventsWatcher to the gRPC API.
  • Loading branch information
thallgren committed May 16, 2024
2 parents 1d0e76a + 7afeacb commit b0b9b29
Show file tree
Hide file tree
Showing 7 changed files with 1,607 additions and 363 deletions.
264 changes: 264 additions & 0 deletions cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manager

import (
"context"
"math"
"sort"
"strings"
"time"
Expand All @@ -12,7 +13,10 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
empty "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/datawire/dlib/derror"
Expand All @@ -25,6 +29,7 @@ import (
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/state"
"github.com/telepresenceio/telepresence/v2/pkg/agentmap"
"github.com/telepresenceio/telepresence/v2/pkg/dnsproxy"
"github.com/telepresenceio/telepresence/v2/pkg/iputil"
"github.com/telepresenceio/telepresence/v2/pkg/tracing"
Expand Down Expand Up @@ -891,6 +896,265 @@ func (s *service) WatchClusterInfo(session *rpc.SessionInfo, stream rpc.Manager_
return s.clusterInfo.Watch(ctx, stream)
}

//nolint:cyclop,gocyclo,gocognit // complex to avoid extremely specialized functions
func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.Manager_WatchWorkloadsServer) (err error) {
ctx := managerutil.WithSessionInfo(stream.Context(), request.SessionInfo)
defer func() {
if r := recover(); r != nil {
err = derror.PanicToError(r)
dlog.Errorf(ctx, "WatchWorkloads panic: %+v", err)
err = status.Errorf(codes.Internal, err.Error())
}
}()
dlog.Debugf(ctx, "WatchWorkloads called")

clientSession := request.SessionInfo.SessionId
clientInfo := s.state.GetClient(clientSession)
if clientInfo == nil {
return status.Errorf(codes.NotFound, "Client session %q not found", clientSession)
}
ns := clientInfo.Namespace

agentsCh := s.state.WatchAgents(ctx, func(_ string, info *rpc.AgentInfo) bool {
return info.Namespace == ns
})
interceptsCh := s.state.WatchIntercepts(ctx, func(_ string, info *rpc.InterceptInfo) bool {
return info.ClientSession.SessionId == clientSession
})
workloadsCh, err := s.state.WatchWorkloads(ctx, clientSession)
if err != nil {
return err
}

sessionDone, err := s.state.SessionDone(clientSession)
if err != nil {
return err
}

var interceptInfos map[string]*rpc.InterceptInfo
isIntercepted := func(name, namespace string) bool {
for _, ii := range interceptInfos {
if name == ii.Spec.Agent && namespace == ii.Spec.Namespace && ii.Disposition == rpc.InterceptDispositionType_ACTIVE {
return true
}
}
return false
}

rpcKind := func(s string) rpc.WorkloadInfo_Kind {
switch strings.ToLower(s) {
case "deployment":
return rpc.WorkloadInfo_DEPLOYMENT
case "replicaset":
return rpc.WorkloadInfo_REPLICASET
case "statefulset":
return rpc.WorkloadInfo_STATEFULSET
default:
return rpc.WorkloadInfo_UNSPECIFIED
}
}

// Send events if we're idle longer than this, otherwise wait for more data
const maxIdleTime = 5 * time.Millisecond
workloadEvents := make(map[string]*rpc.WorkloadEvent)
var lastEvents map[string]*rpc.WorkloadEvent

ticker := time.NewTicker(time.Duration(math.MaxInt64))
defer ticker.Stop()

var agentInfos map[string]*rpc.AgentInfo

start := time.Now()

sendEvents := func() {
// Time to send what we have
ticker.Reset(time.Duration(math.MaxInt64))
evs := make([]*rpc.WorkloadEvent, 0, len(workloadEvents))
for k, rew := range workloadEvents {
if lew, ok := lastEvents[k]; ok {
if proto.Equal(lew, rew) {
continue
}
}
evs = append(evs, rew)
}
if len(evs) == 0 {
return
}
dlog.Debugf(ctx, "Sending %d WorkloadEvents", len(evs))
err = stream.Send(&rpc.WorkloadEventsDelta{
Since: timestamppb.New(start),
Events: evs,
})
if err != nil {
dlog.Warnf(ctx, "failed to send workload events delta: %v", err)
return
}
lastEvents = workloadEvents
workloadEvents = make(map[string]*rpc.WorkloadEvent)
start = time.Now()
}

rpcWorkload := func(wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState) *rpc.WorkloadInfo {
return &rpc.WorkloadInfo{
Kind: rpcKind(wl.GetKind()),
Name: wl.GetName(),
Namespace: wl.GetNamespace(),
AgentState: as,
}
}

addEvent := func(eventType state.EventType, wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState) {
workloadEvents[wl.GetName()] = &rpc.WorkloadEvent{
Type: rpc.WorkloadEvent_Type(eventType),
Workload: rpcWorkload(wl, as),
}
sendEvents()
}

for {
select {
case <-sessionDone:
// The Manager believes this session has ended.
return nil

case <-ticker.C:
sendEvents()

// All events arriving at the workload channel are significant
case wes, ok := <-workloadsCh:
if !ok {
return nil
}
for _, we := range wes {
wl := we.Workload
if w, ok := workloadEvents[wl.GetName()]; ok {
if we.Type == state.EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED {
w.Type = rpc.WorkloadEvent_DELETED
dlog.Debugf(ctx, "WorkloadEvent DELETED %s.%s", wl.GetName(), wl.GetNamespace())
ticker.Reset(maxIdleTime)
}
} else {
as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED
if s.state.HasAgent(wl.GetName(), wl.GetNamespace()) {
if isIntercepted(wl.GetName(), wl.GetNamespace()) {
as = rpc.WorkloadInfo_INTERCEPTED
} else {
as = rpc.WorkloadInfo_INSTALLED
}
}

// If we've sent an ADDED event for this workload, and this is a MODIFIED event without any changes that
// we care about, then just skip it.
if we.Type == state.EventTypeUpdate {
lew, ok := lastEvents[wl.GetName()]
if ok && (lew.Type == rpc.WorkloadEvent_ADDED_UNSPECIFIED || lew.Type == rpc.WorkloadEvent_MODIFIED) && proto.Equal(lew.Workload, rpcWorkload(we.Workload, as)) {
break
}
}
dlog.Debugf(ctx, "WorkloadEvent %d %s %s.%s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), as)
addEvent(we.Type, wl, as)
}
}

// Events that arrive at the agent channel should be counted as modifications.
case ass, ok := <-agentsCh:
if !ok {
return nil
}
oldAgentInfos := agentInfos
agentInfos = ass.State
for k, a := range oldAgentInfos {
if _, ok = agentInfos[k]; !ok {
name := a.Name
as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED
dlog.Debugf(ctx, "AgentInfo %s.%s %s", a.Name, a.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
wl := w.Workload
if wl.AgentState != as {
wl.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
} else {
dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err)
if errors.IsNotFound(err) {
workloadEvents[name] = &rpc.WorkloadEvent{
Type: rpc.WorkloadEvent_DELETED,
Workload: &rpc.WorkloadInfo{
Name: name,
Namespace: a.Namespace,
AgentState: as,
},
}
sendEvents()
}
}
}
}
for _, a := range agentInfos {
name := a.Name
as := rpc.WorkloadInfo_INSTALLED
if isIntercepted(name, a.Namespace) {
as = rpc.WorkloadInfo_INTERCEPTED
}
dlog.Debugf(ctx, "AgentInfo %s.%s %s", a.Name, a.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
wl := w.Workload
if wl.AgentState != as {
wl.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
} else {
dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err)
}
}

// Events that arrive at the intercept channel should be counted as modifications.
case is, ok := <-interceptsCh:
if !ok {
return nil
}
oldInterceptInfos := interceptInfos
interceptInfos = is.State
for k, ii := range oldInterceptInfos {
if _, ok = interceptInfos[k]; !ok {
name := ii.Spec.Agent
as := rpc.WorkloadInfo_INSTALLED
dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
if w.Workload.AgentState != as {
w.Workload.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, ii.Spec.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
}
}
}
for _, ii := range interceptInfos {
name := ii.Spec.Agent
as := rpc.WorkloadInfo_INSTALLED
if ii.Disposition == rpc.InterceptDispositionType_ACTIVE {
as = rpc.WorkloadInfo_INTERCEPTED
}
dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as)
if w, ok := workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED {
if w.Workload.AgentState != as {
w.Workload.AgentState = as
ticker.Reset(maxIdleTime)
}
} else if wl, err := agentmap.GetWorkload(ctx, name, ii.Spec.Namespace, ""); err == nil {
addEvent(state.EventTypeUpdate, wl, as)
}
}
}
}
}

const agentSessionTTL = 15 * time.Second

// expire removes stale sessions.
Expand Down
43 changes: 34 additions & 9 deletions cmd/traffic/cmd/manager/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type State interface {
CountTunnelIngress() uint64
CountTunnelEgress() uint64
ExpireSessions(context.Context, time.Time, time.Time)
GetAgent(string) *rpc.AgentInfo
GetActiveAgent(string) *rpc.AgentInfo
GetAgent(sessionID string) *rpc.AgentInfo
GetActiveAgent(sessionID string) *rpc.AgentInfo
GetAllClients() map[string]*rpc.ClientInfo
GetClient(string) *rpc.ClientInfo
GetClient(sessionID string) *rpc.ClientInfo
GetSession(string) SessionState
GetSessionConsumptionMetrics(string) *SessionConsumptionMetrics
GetAllSessionConsumptionMetrics() map[string]*SessionConsumptionMetrics
Expand All @@ -61,6 +61,7 @@ type State interface {
GetConnectActiveStatus() *prometheus.GaugeVec
GetInterceptCounter() *prometheus.CounterVec
GetInterceptActiveStatus() *prometheus.GaugeVec
HasAgent(name, namespace string) bool
MarkSession(*rpc.RemainRequest, time.Time) bool
NewInterceptInfo(string, *rpc.SessionInfo, *rpc.CreateInterceptRequest) *rpc.InterceptInfo
PostLookupDNSResponse(context.Context, *rpc.DNSAgentResponse)
Expand Down Expand Up @@ -89,6 +90,7 @@ type State interface {
WatchAgents(context.Context, func(sessionID string, agent *rpc.AgentInfo) bool) <-chan watchable.Snapshot[*rpc.AgentInfo]
WatchDial(sessionID string) <-chan *rpc.DialRequest
WatchIntercepts(context.Context, func(sessionID string, intercept *rpc.InterceptInfo) bool) <-chan watchable.Snapshot[*rpc.InterceptInfo]
WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error)
WatchLookupDNS(string) <-chan *rpc.DNSRequest
ValidateCreateAgent(context.Context, k8sapi.Workload, agentconfig.SidecarExt) error
}
Expand Down Expand Up @@ -132,6 +134,7 @@ type state struct {
interceptStates *xsync.MapOf[string, *interceptState]
timedLogLevel log.TimedLevel
llSubs *loglevelSubscribers
workloadWatchers *xsync.MapOf[string, WorkloadWatcher] // workload watchers, created on demand and keyed by namespace
tunnelCounter int32
tunnelIngressCounter uint64
tunnelEgressCounter uint64
Expand All @@ -149,12 +152,13 @@ var NewStateFunc = NewState //nolint:gochecknoglobals // extension point
func NewState(ctx context.Context) State {
loglevel := os.Getenv("LOG_LEVEL")
s := &state{
backgroundCtx: ctx,
sessions: xsync.NewMapOf[string, SessionState](),
agentsByName: xsync.NewMapOf[string, *xsync.MapOf[string, *rpc.AgentInfo]](),
interceptStates: xsync.NewMapOf[string, *interceptState](),
timedLogLevel: log.NewTimedLevel(loglevel, log.SetLevel),
llSubs: newLoglevelSubscribers(),
backgroundCtx: ctx,
sessions: xsync.NewMapOf[string, SessionState](),
agentsByName: xsync.NewMapOf[string, *xsync.MapOf[string, *rpc.AgentInfo]](),
interceptStates: xsync.NewMapOf[string, *interceptState](),
workloadWatchers: xsync.NewMapOf[string, WorkloadWatcher](),
timedLogLevel: log.NewTimedLevel(loglevel, log.SetLevel),
llSubs: newLoglevelSubscribers(),
}
s.self = s
return s
Expand Down Expand Up @@ -451,6 +455,11 @@ func (s *state) getAllAgents() map[string]*rpc.AgentInfo {
return s.agents.LoadAll()
}

func (s *state) HasAgent(name, namespace string) bool {
_, ok := s.agentsByName.Load(name)
return ok
}

func (s *state) getAgentsByName(name, namespace string) map[string]*rpc.AgentInfo {
agn, ok := s.agentsByName.Load(name)
if !ok {
Expand All @@ -477,6 +486,22 @@ func (s *state) WatchAgents(
}
}

func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) {
client := s.GetClient(sessionID)
if client == nil {
return nil, status.Errorf(codes.NotFound, "session %q not found", sessionID)
}
ns := client.Namespace
ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww WorkloadWatcher) {
ww, err = NewWorkloadWatcher(s.backgroundCtx, ns)
return ww
})
if err != nil {
return nil, err
}
return ww.Subscribe(ctx), nil
}

// Intercepts //////////////////////////////////////////////////////////////////////////////////////

func (s *state) AddIntercept(ctx context.Context, sessionID, clusterID string, cir *rpc.CreateInterceptRequest) (client *rpc.ClientInfo, ret *rpc.InterceptInfo, err error) {
Expand Down

0 comments on commit b0b9b29

Please sign in to comment.