Skip to content

Commit

Permalink
adapt: pkg/deploy informers
Browse files Browse the repository at this point in the history
  • Loading branch information
sttts authored and deads2k committed Apr 27, 2017
1 parent 34472b5 commit 360cb58
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 131 deletions.
21 changes: 11 additions & 10 deletions pkg/deploy/controller/deployment/controller.go
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/api"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"

deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
Expand Down Expand Up @@ -65,14 +66,14 @@ type DeploymentController struct {
// queue contains replication controllers that need to be synced.
queue workqueue.RateLimitingInterface

// rcStore is a store of replication controllers.
rcStore cache.StoreToReplicationControllerLister
// podStore is a store of pods.
podStore cache.StoreToPodLister
// rcStoreSynced makes sure the rc store is synced before reconcling any deployment.
rcStoreSynced func() bool
// podStoreSynced makes sure the pod store is synced before reconcling any deployment.
podStoreSynced func() bool
// rcLister can list/get replication controllers from a shared informer's cache
rcLister kcorelisters.ReplicationControllerLister
// rcListerSynced makes sure the rc store is synced before reconcling any deployment.
rcListerSynced cache.InformerSynced
// podLister can list/get pods from a shared informer's cache
podLister kcorelisters.PodLister
// podListerSynced makes sure the pod store is synced before reconcling any deployment.
podListerSynced cache.InformerSynced

// deployerImage specifies which Docker image can support the default strategies.
deployerImage string
Expand Down Expand Up @@ -102,7 +103,7 @@ func (c *DeploymentController) handle(deployment *kapi.ReplicationController, wi
nextStatus := currentStatus

deployerPodName := deployutil.DeployerPodNameForDeployment(deployment.Name)
deployer, deployerErr := c.podStore.Pods(deployment.Namespace).Get(deployerPodName, metav1.GetOptions{})
deployer, deployerErr := c.podLister.Pods(deployment.Namespace).Get(deployerPodName)
if deployerErr == nil {
nextStatus = c.nextStatus(deployer, deployment, updatedAnnotations)
}
Expand Down Expand Up @@ -415,7 +416,7 @@ func (c *DeploymentController) makeDeployerContainer(strategy *deployapi.Deploym

func (c *DeploymentController) cleanupDeployerPods(deployment *kapi.ReplicationController) error {
selector := deployutil.DeployerPodSelector(deployment.Name)
deployerList, err := c.podStore.Pods(deployment.Namespace).List(selector)
deployerList, err := c.podLister.Pods(deployment.Namespace).List(selector)
if err != nil {
return fmt.Errorf("couldn't fetch deployer pods for %q: %v", deployutil.LabelForDeployment(deployment), err)
}
Expand Down
29 changes: 21 additions & 8 deletions pkg/deploy/controller/deployment/controller_test.go
Expand Up @@ -5,7 +5,6 @@ import (
"reflect"
"sort"
"testing"
"time"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -16,6 +15,7 @@ import (
kapi "k8s.io/kubernetes/pkg/api"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
kinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"

deployapi "github.com/openshift/origin/pkg/deploy/api"
_ "github.com/openshift/origin/pkg/deploy/api/install"
Expand All @@ -29,26 +29,39 @@ var (
codec = kapi.Codecs.LegacyCodec(deployapiv1.SchemeGroupVersion)
)

func okDeploymentController(client kclientset.Interface, deployment *kapi.ReplicationController, hookPodNames []string, related bool, deployerStatus kapi.PodPhase) *DeploymentController {
rcInformer := cache.NewSharedIndexInformer(&cache.ListWatch{}, &kapi.ReplicationController{}, 2*time.Minute, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := cache.NewSharedIndexInformer(&cache.ListWatch{}, &kapi.Pod{}, 2*time.Minute, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
func alwaysReady() bool { return true }

type deploymentController struct {
*DeploymentController
podIndexer cache.Indexer
}

func okDeploymentController(client kclientset.Interface, deployment *kapi.ReplicationController, hookPodNames []string, related bool, deployerStatus kapi.PodPhase) *deploymentController {
informerFactory := kinformers.NewSharedInformerFactory(client, 0)
rcInformer := informerFactory.Core().InternalVersion().ReplicationControllers()
podInformer := informerFactory.Core().InternalVersion().Pods()

c := NewDeploymentController(rcInformer, podInformer, client, "sa:test", "openshift/origin-deployer", env, codec)
c.podListerSynced = alwaysReady
c.rcListerSynced = alwaysReady

// deployer pod
if deployment != nil {
pod := deployerPod(deployment, "", related)
pod.Status.Phase = deployerStatus
c.podStore.Indexer.Add(pod)
podInformer.Informer().GetIndexer().Add(pod)
}

// hook pods
for _, name := range hookPodNames {
pod := deployerPod(deployment, name, related)
c.podStore.Indexer.Add(pod)
podInformer.Informer().GetIndexer().Add(pod)
}

return c
return &deploymentController{
c,
podInformer.Informer().GetIndexer(),
}
}

func deployerPod(deployment *kapi.ReplicationController, alternateName string, related bool) *kapi.Pod {
Expand Down Expand Up @@ -580,7 +593,7 @@ func TestHandle_cleanupPodNoop(t *testing.T) {
controller := okDeploymentController(client, deployment, nil, true, kapi.PodSucceeded)
pod := deployerPod(deployment, "", true)
pod.Labels[deployapi.DeployerPodForDeploymentLabel] = "unrelated"
controller.podStore.Indexer.Update(pod)
controller.podIndexer.Update(pod)

if err := controller.handle(deployment, false); err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
72 changes: 38 additions & 34 deletions pkg/deploy/controller/deployment/factory.go
Expand Up @@ -5,15 +5,18 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
kclientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/api"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcoreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
kcontroller "k8s.io/kubernetes/pkg/controller"

deployutil "github.com/openshift/origin/pkg/deploy/util"
Expand All @@ -27,75 +30,71 @@ const (
)

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(rcInformer, podInformer cache.SharedIndexInformer, kc kclientset.Interface, sa, image string, env []kapi.EnvVar, codec runtime.Codec) *DeploymentController {
func NewDeploymentController(
rcInformer kcoreinformers.ReplicationControllerInformer,
podInformer kcoreinformers.PodInformer,
kc kclientset.Interface,
sa,
image string,
env []kapi.EnvVar,
codec runtime.Codec,
) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deployer-controller"})
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kv1core.New(kc.Core().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(kapi.Scheme, kclientv1.EventSource{Component: "deployer-controller"})

c := &DeploymentController{
rn: kc.Core(),
pn: kc.Core(),

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

rcLister: rcInformer.Lister(),
rcListerSynced: rcInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podListerSynced: podInformer.Informer().HasSynced,

serviceAccount: sa,
deployerImage: image,
environment: env,
recorder: recorder,
codec: codec,
}

c.rcStore.Indexer = rcInformer.GetIndexer()
rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addReplicationController,
UpdateFunc: c.updateReplicationController,
})

c.podStore.Indexer = podInformer.GetIndexer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: c.updatePod,
DeleteFunc: c.deletePod,
})

c.rcStoreSynced = rcInformer.HasSynced
c.podStoreSynced = podInformer.HasSynced

return c
}

// Run begins watching and syncing.
func (c *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

glog.Infof("Starting deployer controller")

// Wait for the dc store to sync before starting any work in this controller.
ready := make(chan struct{})
go c.waitForSyncedStores(ready, stopCh)
select {
case <-ready:
case <-stopCh:
if !cache.WaitForCacheSync(stopCh, c.rcListerSynced, c.podListerSynced) {
return
}

glog.Infof("Deployer controller caches are synced. Starting workers.")

for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployer controller")
c.queue.ShutDown()
}

func (c *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
<-stopCh

for !c.rcStoreSynced() || !c.podStoreSynced() {
glog.V(4).Infof("Waiting for the rc and pod caches to sync before starting the deployer controller workers")
select {
case <-time.After(storeSyncedPollPeriod):
case <-stopCh:
return
}
}
close(ready)
glog.Infof("Shutting down deployer controller")
}

func (c *DeploymentController) addReplicationController(obj interface{}) {
Expand Down Expand Up @@ -211,15 +210,20 @@ func (c *DeploymentController) work() bool {
}

func (c *DeploymentController) getByKey(key string) (*kapi.ReplicationController, error) {
obj, exists, err := c.rcStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.queue.AddRateLimited(key)
return nil, err
}
if !exists {
rc, err := c.rcLister.ReplicationControllers(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("Replication controller %q has been deleted", key)
return nil, nil
}
if err != nil {
glog.Infof("Unable to retrieve replication controller %q from store: %v", key, err)
c.queue.AddRateLimited(key)
return nil, err
}

return obj.(*kapi.ReplicationController), nil
return rc, nil
}
28 changes: 13 additions & 15 deletions pkg/deploy/controller/deploymentconfig/controller.go
Expand Up @@ -7,16 +7,15 @@ import (
"github.com/golang/glog"

kapierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/api"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/client/retry"

osclient "github.com/openshift/origin/pkg/client"
Expand Down Expand Up @@ -63,17 +62,16 @@ type DeploymentConfigController struct {

// dcStore provides a local cache for deployment configs.
dcStore oscache.StoreToDeploymentConfigLister
// rcStore provides a local cache for replication controllers.
rcStore cache.StoreToReplicationControllerLister
// podStore provides a local cache for pods.
podStore cache.StoreToPodLister

// dcStoreSynced makes sure the dc store is synced before reconcling any deployment config.
dcStoreSynced func() bool
// rcStoreSynced makes sure the rc store is synced before reconcling any deployment config.
rcStoreSynced func() bool
// podStoreSynced makes sure the pod store is synced before reconcling any deployment config.
podStoreSynced func() bool
// rcLister can list/get replication controllers from a shared informer's cache
rcLister kcorelisters.ReplicationControllerLister
// rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config.
rcListerSynced func() bool
// podLister can list/get pods from a shared informer's cache
podLister kcorelisters.PodLister
// podListerSynced makes sure the pod shared informer is synced before reconcling any deployment config.
podListerSynced func() bool

// codec is used to build deployments from configs.
codec runtime.Codec
Expand All @@ -92,7 +90,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)

// Find all deployments owned by the deployment config.
selector := deployutil.ConfigSelector(config.Name)
existingDeployments, err := c.rcStore.ReplicationControllers(config.Namespace).List(selector)
existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector)
if err != nil {
return err
}
Expand Down Expand Up @@ -124,7 +122,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
// Retry faster on conflicts
var updatedDeployment *kapi.ReplicationController
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if kapierrors.IsNotFound(err) {
return nil
}
Expand Down Expand Up @@ -241,7 +239,7 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []
if newReplicaCount != oldReplicaCount {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// refresh the replication controller version
rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if err != nil {
return err
}
Expand Down Expand Up @@ -310,7 +308,7 @@ func (c *DeploymentConfigController) updateStatus(config *deployapi.DeploymentCo
func (c *DeploymentConfigController) calculateStatus(config deployapi.DeploymentConfig, deployments []*kapi.ReplicationController, additional ...deployapi.DeploymentCondition) (deployapi.DeploymentConfigStatus, error) {
selector := labels.Set(config.Spec.Selector).AsSelector()
// TODO: Replace with using rc.status.availableReplicas that comes with the next rebase.
pods, err := c.podStore.Pods(config.Namespace).List(selector)
pods, err := c.podLister.Pods(config.Namespace).List(selector)
if err != nil {
return config.Status, err
}
Expand Down

0 comments on commit 360cb58

Please sign in to comment.