Skip to content

Commit

Permalink
feat: separate ip pool pod and add parallelism to workers
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed May 5, 2019
1 parent 29e4bcd commit b5ac7da
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Configuration struct {

PodName string
PodNamespace string

WorkerNum int
}

// ParseFlags parses cmd args then init kubeclient and conf
Expand All @@ -58,6 +60,8 @@ func ParseFlags() (*Configuration, error) {

argClusterTcpLoadBalancer = pflag.String("cluster-tcp-loadbalancer", "cluster-tcp-loadbalancer", "The name for cluster tcp loadbalancer")
argClusterUdpLoadBalancer = pflag.String("cluster-udp-loadbalancer", "cluster-udp-loadbalancer", "The name for cluster udp loadbalancer")

argWorkerNum = pflag.Int("worker-num", 3, "The parallelism of each worker. Default: 3")
)

flag.Set("alsologtostderr", "true")
Expand Down Expand Up @@ -93,6 +97,7 @@ func ParseFlags() (*Configuration, error) {
NodeSwitchGateway: *argNodeSwitchGateway,
ClusterTcpLoadBalancer: *argClusterTcpLoadBalancer,
ClusterUdpLoadBalancer: *argClusterUdpLoadBalancer,
WorkerNum: *argWorkerNum,
PodName: os.Getenv("POD_NAME"),
PodNamespace: os.Getenv("KUBE_NAMESPACE"),
}
Expand Down
58 changes: 39 additions & 19 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type Controller struct {
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface
addPodQueue workqueue.RateLimitingInterface
addIpPoolPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface

namespacesLister v1.NamespaceLister
namespacesSynced cache.InformerSynced
Expand Down Expand Up @@ -95,11 +96,12 @@ func NewController(config *Configuration) *Controller {
ovnClient: ovs.NewClient(config.OvnNbHost, config.OvnNbPort, "", 0, config.ClusterRouter, config.ClusterTcpLoadBalancer, config.ClusterUdpLoadBalancer, config.NodeSwitch, config.NodeSwitchCIDR),
kubeclientset: config.KubeClient,

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
addIpPoolPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddIpPoolPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
Expand Down Expand Up @@ -162,10 +164,24 @@ func NewController(config *Configuration) *Controller {
// workers to finish processing their current work items.
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()

defer c.addPodQueue.ShutDown()
defer c.addIpPoolPodQueue.ShutDown()
defer c.deletePodQueue.ShutDown()
defer c.updatePodQueue.ShutDown()

defer c.addNamespaceQueue.ShutDown()
defer c.updateNamespaceQueue.ShutDown()
defer c.deleteNamespaceQueue.ShutDown()

defer c.addNodeQueue.ShutDown()
defer c.deleteNodeQueue.ShutDown()

defer c.addServiceQueue.ShutDown()
defer c.updateServiceQueue.ShutDown()

defer c.updateEndpointQueue.ShutDown()

// Start the informer factories to begin populating the informer caches
klog.Info("Starting OVN controller")

Expand Down Expand Up @@ -195,21 +211,25 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
klog.Info("Starting workers")

// Launch workers to process resources
go wait.Until(c.runAddPodWorker, time.Second, stopCh)
go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
go wait.Until(c.runAddIpPoolPodWorker, time.Second, stopCh)

for i := 0; i < c.config.WorkerNum; i++ {
go wait.Until(c.runAddPodWorker, time.Second, stopCh)
go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)

go wait.Until(c.runAddNamespaceWorker, time.Second, stopCh)
go wait.Until(c.runDeleteNamespaceWorker, time.Second, stopCh)
go wait.Until(c.runUpdateNamespaceWorker, time.Second, stopCh)
go wait.Until(c.runAddNamespaceWorker, time.Second, stopCh)
go wait.Until(c.runDeleteNamespaceWorker, time.Second, stopCh)
go wait.Until(c.runUpdateNamespaceWorker, time.Second, stopCh)

go wait.Until(c.runAddNodeWorker, time.Second, stopCh)
go wait.Until(c.runDeleteNodeWorker, time.Second, stopCh)
go wait.Until(c.runAddNodeWorker, time.Second, stopCh)
go wait.Until(c.runDeleteNodeWorker, time.Second, stopCh)

go wait.Until(c.runUpdateServiceWorker, time.Second, stopCh)
go wait.Until(c.runAddServiceWorker, time.Second, stopCh)
go wait.Until(c.runUpdateServiceWorker, time.Second, stopCh)
go wait.Until(c.runAddServiceWorker, time.Second, stopCh)

go wait.Until(c.runUpdateEndpointWorker, time.Second, stopCh)
go wait.Until(c.runUpdateEndpointWorker, time.Second, stopCh)
}

klog.Info("Started workers")
<-stopCh
Expand Down
135 changes: 135 additions & 0 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
utilruntime.HandleError(err)
return
}

p := obj.(*v1.Pod)
if p.Annotations[util.IpPoolAnnotation] != "" && p.Annotations[util.IpAddressAnnotation] == "" {
klog.V(3).Infof("enqueue add ip pool address pod %s", key)
c.addIpPoolPodQueue.AddRateLimited(key)
return
}

klog.V(3).Infof("enqueue add pod %s", key)
c.addPodQueue.AddRateLimited(key)
}
Expand Down Expand Up @@ -78,6 +86,11 @@ func (c *Controller) runAddPodWorker() {
}
}

func (c *Controller) runAddIpPoolPodWorker() {
for c.processNextAddIpPoolPodWorkItem() {
}
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
Expand Down Expand Up @@ -145,6 +158,58 @@ func (c *Controller) processNextAddPodWorkItem() bool {
return true
}

func (c *Controller) processNextAddIpPoolPodWorkItem() bool {
obj, shutdown := c.addIpPoolPodQueue.Get()

if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.addIpPoolPodQueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.addIpPoolPodQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.handleAddIpPoolPod(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.addIpPoolPodQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.addIpPoolPodQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextDeletePodWorkItem() bool {
Expand Down Expand Up @@ -283,6 +348,76 @@ func (c *Controller) handleAddPod(key string) error {
ls = c.config.DefaultLogicalSwitch
}

// pod address info may already exist in ovn
ip := pod.Annotations[util.IpAddressAnnotation]
mac := pod.Annotations[util.MacAddressAnnotation]

nic, err := c.ovnClient.CreatePort(ls, ovs.PodNameToPortName(name, namespace), ip, mac)
if err != nil {
return err
}

op := "replace"
if len(pod.Annotations) == 0 {
op = "add"
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.IpAddressAnnotation] = nic.IpAddress
pod.Annotations[util.MacAddressAnnotation] = nic.MacAddress
pod.Annotations[util.CidrAnnotation] = nic.CIDR
pod.Annotations[util.GatewayAnnotation] = nic.Gateway
pod.Annotations[util.LogicalSwitchAnnotation] = ls

patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/annotations",
"value": %s
}]`

raw, _ := json.Marshal(pod.Annotations)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
_, err = c.kubeclientset.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, []byte(patchPayload))
if err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
}
return err
}

func (c *Controller) handleAddIpPoolPod(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
pod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
// The Pod resource may no longer exist, in which case we stop
// processing.
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
klog.Infof("add ip pool address pod %s/%s", namespace, name)
if pod.Spec.HostNetwork {
klog.Infof("pod %s/%s in host network mode no need for ovn process", namespace, name)
return nil
}

ns, err := c.namespacesLister.Get(namespace)
if err != nil {
klog.Errorf("get namespace %s failed %v", namespace, err)
return err
}
ls := ns.Annotations[util.LogicalSwitchAnnotation]
if ls == "" {
ls = c.config.DefaultLogicalSwitch
}

ipPoolAnnotation := pod.Annotations[util.IpPoolAnnotation]

if ipPoolAnnotation != "" && pod.Annotations[util.IpAddressAnnotation] == "" {
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ func (c *Controller) handlePod(key string) error {
// Run starts controller
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()

defer c.namespaceQueue.ShutDown()
defer c.podQueue.ShutDown()

klog.Info("start watching namespace changes")
if ok := cache.WaitForCacheSync(stopCh, c.namespacesSynced, c.podsSynced); !ok {
Expand Down

0 comments on commit b5ac7da

Please sign in to comment.