Skip to content

Commit

Permalink
W.I.P: WorkloadInfo subscription.
Browse files Browse the repository at this point in the history
Initial draft of the gRPC for the `WatchWorkloads` call.

Signed-off-by: Thomas Hallgren <thomas@datawire.io>
  • Loading branch information
thallgren committed May 13, 2024
1 parent a943eb0 commit 0b58294
Show file tree
Hide file tree
Showing 6 changed files with 694 additions and 457 deletions.
18 changes: 17 additions & 1 deletion cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,11 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadInfoRequest, stream rpc.Ma
interceptsCh := s.state.WatchIntercepts(ctx, func(_ string, info *rpc.InterceptInfo) bool {
return info.ClientSession.SessionId == clientSession
})
workloadsCh, err := s.state.WatchWorkloads(ctx, clientSession)
if err != nil {
return err
}

sessionDone, err := s.state.SessionDone(clientSession)
if err != nil {
return err
Expand All @@ -922,13 +927,24 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadInfoRequest, stream rpc.Ma
}
return false
}

workloadEvents := make(map[string]*rpc.WorkloadEvent)
var agents []*rpc.AgentPodInfo
var agentNames []string
for {
select {
case <-sessionDone:
// Manager believes this session has ended.
return nil
case wes, ok := <-workloadsCh:
if !ok {
return nil
}
for _, we := range wes {
w, ok := workloadEvents[we.Workload.GetName()]

}

case as, ok := <-agentsCh:
if !ok {
return nil
Expand Down Expand Up @@ -959,7 +975,7 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadInfoRequest, stream rpc.Ma
}
}
if agents != nil {
if err = stream.Send(&rpc.WorkloadInfoDelta{Agents: agents}); err != nil {
if err = stream.Send(&rpc.WorkloadEventsDelta{Events: workloads}); err != nil {
return err
}
}
Expand Down
19 changes: 19 additions & 0 deletions cmd/traffic/cmd/manager/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type State interface {
WatchAgents(context.Context, func(sessionID string, agent *rpc.AgentInfo) bool) <-chan watchable.Snapshot[*rpc.AgentInfo]
WatchDial(sessionID string) <-chan *rpc.DialRequest
WatchIntercepts(context.Context, func(sessionID string, intercept *rpc.InterceptInfo) bool) <-chan watchable.Snapshot[*rpc.InterceptInfo]
WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error)
WatchLookupDNS(string) <-chan *rpc.DNSRequest
ValidateCreateAgent(context.Context, k8sapi.Workload, agentconfig.SidecarExt) error
}
Expand Down Expand Up @@ -124,6 +125,7 @@ type state struct {
// 7. `cfgMapLocks` access must be concurrency protected
// 8. `cachedAgentImage` access must be concurrency protected
// 9. `interceptState` must be concurrency protected and updated/deleted in sync with intercepts
// 10. `workloadWatcher` because its created on demand
intercepts watchable.Map[*rpc.InterceptInfo] // info for intercepts, keyed by intercept id
agents watchable.Map[*rpc.AgentInfo] // info for agent sessions, keyed by session id
clients watchable.Map[*rpc.ClientInfo] // info for client sessions, keyed by session id
Expand All @@ -132,6 +134,7 @@ type state struct {
interceptStates *xsync.MapOf[string, *interceptState]
timedLogLevel log.TimedLevel
llSubs *loglevelSubscribers
workloadWatchers *xsync.MapOf[string, WorkloadWatcher] // workload watchers, created on demand and keyed by namespace
tunnelCounter int32
tunnelIngressCounter uint64
tunnelEgressCounter uint64
Expand Down Expand Up @@ -477,6 +480,22 @@ func (s *state) WatchAgents(
}
}

func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) {
client := s.GetClient(sessionID)
if client == nil {
return nil, status.Errorf(codes.NotFound, "session %q not found", sessionID)
}
ns := client.Namespace
ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww WorkloadWatcher) {
ww, err = NewWorkloadWatcher(s.backgroundCtx, ns)
return ww
})
if err != nil {
return nil, err
}
return ww.Subscribe(ctx), nil
}

// Intercepts //////////////////////////////////////////////////////////////////////////////////////

func (s *state) AddIntercept(ctx context.Context, sessionID, clusterID string, cir *rpc.CreateInterceptRequest) (client *rpc.ClientInfo, ret *rpc.InterceptInfo, err error) {
Expand Down
93 changes: 88 additions & 5 deletions cmd/traffic/cmd/manager/state/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,121 @@ package state

import (
"context"
"math"
"sync"
"time"

"github.com/google/uuid"
apps "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/cache"

"github.com/datawire/k8sapi/pkg/k8sapi"
"github.com/telepresenceio/telepresence/v2/pkg/informer"
)

func WatchWorkloads(ctx context.Context, ns string, addHandler, modifyHandler, removeHandler func(workload k8sapi.Workload)) error {
type EventType int

const (
EventTypeAdd = iota
EventTypeUpdate
EventTypeDelete
)

type WorkloadEvent struct {
Type EventType
Workload k8sapi.Workload
}

type WorkloadWatcher interface {
Subscribe(ctx context.Context) <-chan []WorkloadEvent
}

type wlWatcher struct {
sync.Mutex
subscriptions map[uuid.UUID]chan<- []WorkloadEvent
timer *time.Timer
events []WorkloadEvent
}

func NewWorkloadWatcher(ctx context.Context, ns string) (WorkloadWatcher, error) {
w := new(wlWatcher)
w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent)
w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() {
w.Lock()
ss := make([]chan<- []WorkloadEvent, len(w.subscriptions))
i := 0
for _, sub := range w.subscriptions {
ss[i] = sub
i++
}
events := w.events
w.events = nil
w.Unlock()
for _, s := range ss {
select {
case <-ctx.Done():
return
case s <- events:
}
}
})

err := w.addEventHandler(ctx, ns)
if err != nil {
return nil, err
}
return w, nil
}

func (w *wlWatcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent {
ch := make(chan []WorkloadEvent)
id := uuid.New()
w.Lock()
w.subscriptions[id] = ch
w.Unlock()
go func() {
<-ctx.Done()
close(ch)
w.Lock()
delete(w.subscriptions, id)
w.Unlock()
}()
return ch
}

func (w *wlWatcher) addEventHandler(ctx context.Context, ns string) error {
// TODO: Potentially watch Replicasets and Statefulsets too, perhaps configurable since it's fairly uncommon to not have a Deployment.
ix := informer.GetFactory(ctx, ns).Apps().V1().Deployments().Informer()
_, err := ix.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
if d, ok := obj.(*apps.Deployment); ok {
addHandler(k8sapi.Deployment(d))
w.handleEvent(WorkloadEvent{Type: EventTypeAdd, Workload: k8sapi.Deployment(d)})
}
},
DeleteFunc: func(obj any) {
if d, ok := obj.(*apps.Deployment); ok {
removeHandler(k8sapi.Deployment(d))
w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: k8sapi.Deployment(d)})
} else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok {
if d, ok := dfsu.Obj.(*apps.Deployment); ok {
removeHandler(k8sapi.Deployment(d))
w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: k8sapi.Deployment(d)})
}
}
},
UpdateFunc: func(oldObj, newObj any) {
if d, ok := newObj.(*apps.Deployment); ok {
modifyHandler(k8sapi.Deployment(d))
w.handleEvent(WorkloadEvent{Type: EventTypeUpdate, Workload: k8sapi.Deployment(d)})
}
},
})
return err
}

func (w *wlWatcher) handleEvent(we WorkloadEvent) {
w.Lock()
w.events = append(w.events, we)
w.Unlock()

// Defer sending until things been quiet for a while
w.timer.Reset(50 * time.Millisecond)
}
Loading

0 comments on commit 0b58294

Please sign in to comment.