Skip to content

Commit

Permalink
remove pod dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mkabilov committed Jul 27, 2017
1 parent 1f8b37f commit d195fde
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 28 deletions.
1 change: 0 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (c *Controller) runInformers(stopCh <-chan struct{}) {
go c.postgresqlInformer.Run(stopCh)
go c.podInformer.Run(stopCh)
go c.podEventsDispatcher(stopCh)
go c.clusterResync(stopCh)

<-stopCh
Expand Down
40 changes: 14 additions & 26 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,30 @@ func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface,
return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts)
}

func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spec.PodEvent) {
c.clustersMu.RLock()
cluster, ok := c.clusters[clusterName]
c.clustersMu.RUnlock()
if ok {
c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, clusterName)
cluster.ReceivePodEvent(event)
}
}

func (c *Controller) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}

podEvent := spec.PodEvent{
ClusterName: c.podClusterName(pod),
PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod,
EventType: spec.EventAdd,
ResourceVersion: pod.ResourceVersion,
}

c.podCh <- podEvent
c.dispatchPodEvent(c.podClusterName(pod), podEvent)
}

func (c *Controller) podUpdate(prev, cur interface{}) {
Expand All @@ -69,15 +78,14 @@ func (c *Controller) podUpdate(prev, cur interface{}) {
}

podEvent := spec.PodEvent{
ClusterName: c.podClusterName(curPod),
PodName: util.NameFromMeta(curPod.ObjectMeta),
PrevPod: prevPod,
CurPod: curPod,
EventType: spec.EventUpdate,
ResourceVersion: curPod.ResourceVersion,
}

c.podCh <- podEvent
c.dispatchPodEvent(c.podClusterName(curPod), podEvent)
}

func (c *Controller) podDelete(obj interface{}) {
Expand All @@ -87,31 +95,11 @@ func (c *Controller) podDelete(obj interface{}) {
}

podEvent := spec.PodEvent{
ClusterName: c.podClusterName(pod),
PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod,
EventType: spec.EventDelete,
ResourceVersion: pod.ResourceVersion,
}

c.podCh <- podEvent
}

func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) {
c.logger.Debugln("Watching all pod events")
for {
select {
case event := <-c.podCh:
c.clustersMu.RLock()
cluster, ok := c.clusters[event.ClusterName]
c.clustersMu.RUnlock()

if ok {
c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName)
cluster.ReceivePodEvent(event)
}
case <-stopCh:
return
}
}
}
c.dispatchPodEvent(c.podClusterName(pod), podEvent)
}
1 change: 0 additions & 1 deletion pkg/spec/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
// PodEvent describes the event for a single Pod
type PodEvent struct {
ResourceVersion string
ClusterName NamespacedName
PodName NamespacedName
PrevPod *v1.Pod
CurPod *v1.Pod
Expand Down

0 comments on commit d195fde

Please sign in to comment.