Skip to content

Commit

Permalink
feat: support dynamic qos
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Mar 31, 2019
1 parent b57f441 commit 1fe8c91
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 127 deletions.
98 changes: 95 additions & 3 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package daemon

import (
"bitbucket.org/mathildetech/kube-ovn/pkg/ovs"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"net"
"time"

Expand All @@ -27,23 +29,37 @@ type Controller struct {
namespacesLister listerv1.NamespaceLister
namespacesSynced cache.InformerSynced
namespaceQueue workqueue.RateLimitingInterface

podsLister listerv1.PodLister
podsSynced cache.InformerSynced
podQueue workqueue.RateLimitingInterface
}

func NewController(config *Configuration, informerFactory informers.SharedInformerFactory) *Controller {
namespaceInformer := informerFactory.Core().V1().Namespaces()
podInformer := informerFactory.Core().V1().Pods()

controller := &Controller{
config: config,
kubeclientset: config.KubeClient,
namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
namespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Namespace"),

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
podQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pod"),
}

namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueNamespace,
DeleteFunc: controller.enqueueNamespace,
})

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.enqueuePod,
})

return controller
}

Expand Down Expand Up @@ -159,7 +175,9 @@ func (c *Controller) reconcileRouters() error {
toDel = append(toDel, route.Dst.String())
}
}
klog.Infof("route to del %v", toDel)
if len(toDel) > 0 {
klog.Infof("route to del %v", toDel)
}

toAdd := []string{}
for _, c := range cidrs {
Expand All @@ -174,7 +192,9 @@ func (c *Controller) reconcileRouters() error {
toAdd = append(toAdd, c)
}
}
klog.Infof("route to add %v", toAdd)
if len(toAdd) > 0 {
klog.Infof("route to add %v", toAdd)
}

for _, r := range toDel {
_, cidr, _ := net.ParseCIDR(r)
Expand All @@ -197,16 +217,88 @@ func (c *Controller) reconcileRouters() error {
return nil
}

func (c *Controller) enqueuePod(old, new interface{}) {
oldPod := old.(*v1.Pod)
newPod := new.(*v1.Pod)
if newPod.Spec.NodeName != c.config.NodeName {
return
}
if oldPod.Annotations[util.IngressRateAnnotation] != newPod.Annotations[util.IngressRateAnnotation] ||
oldPod.Annotations[util.EgressRateAnnotation] != newPod.Annotations[util.EgressRateAnnotation] {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(new); err != nil {
utilruntime.HandleError(err)
return
}
c.podQueue.AddRateLimited(key)
}
}

func (c *Controller) runPodWorker() {
for c.processNextPodWorkItem() {
}
}

func (c *Controller) processNextPodWorkItem() bool {
obj, shutdown := c.podQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.podQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.podQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handlePod(key); err != nil {
c.podQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.podQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) handlePod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
klog.Infof("handle qos update for pod %s/%s", namespace, name)
pod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return ovs.SetPodBandwidth(pod.Name, pod.Namespace, pod.Annotations[util.IngressRateAnnotation], pod.Annotations[util.EgressRateAnnotation])
}

func (c *Controller) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.namespaceQueue.ShutDown()

klog.Info("start watching namespace changes")
if ok := cache.WaitForCacheSync(stopCh, c.namespacesSynced); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.namespacesSynced, c.podsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

go wait.Until(c.runNamespaceWorker, time.Second, stopCh)
go wait.Until(c.runPodWorker, time.Second, stopCh)

klog.Info("Started workers")
<-stopCh
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (csh CniServerHandler) handleDel(req *restful.Request, resp *restful.Respon
return
}
klog.Infof("delete port request %v", podRequest)
err = csh.deleteNic(podRequest.NetNs, podRequest.ContainerID)
err = csh.deleteNic(podRequest.NetNs, podRequest.PodName, podRequest.PodNamespace, podRequest.ContainerID)
if err != nil {
klog.Errorf("del nic failed %v", err)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, err)
Expand Down
127 changes: 8 additions & 119 deletions pkg/daemon/ovs.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package daemon

import (
"bitbucket.org/mathildetech/kube-ovn/pkg/ovs"
"bitbucket.org/mathildetech/kube-ovn/pkg/util"
"fmt"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/vishvananda/netlink"
"k8s.io/klog"
"net"
"os/exec"
"strconv"
"strings"
)

func (csh CniServerHandler) configureNic(podName, podNamespace, netns, containerID, mac, ip, gateway, ingress, egress string) error {
Expand Down Expand Up @@ -50,7 +49,7 @@ func (csh CniServerHandler) configureNic(podName, podNamespace, netns, container
return err
}

err = setPodBandwidth(containerID, hostNicName, ingress, egress)
err = ovs.SetPodBandwidth(podName, podNamespace, ingress, egress)
if err != nil {
return err
}
Expand All @@ -67,7 +66,12 @@ func (csh CniServerHandler) configureNic(podName, podNamespace, netns, container
return nil
}

func (csh CniServerHandler) deleteNic(netns, containerID string) error {
func (csh CniServerHandler) deleteNic(netns, podName, podNamespace, containerID string) error {
err := ovs.ClearPodBandwidth(podName, podNamespace)
if err != nil {
return err
}

hostNicName, _ := generateNicName(containerID)
// Remove ovs port
output, err := exec.Command("ovs-vsctl", "--if-exists", "--with-iface", "del-port", "br-int", hostNicName).CombinedOutput()
Expand All @@ -88,11 +92,6 @@ func (csh CniServerHandler) deleteNic(netns, containerID string) error {
return fmt.Errorf("delete host link %s failed %v", hostLink, err)
}

err = clearPodBandwidth(containerID)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -215,113 +214,3 @@ func configureNodeNic(portName, ip, mac string) error {
}
return nil
}

func clearPodBandwidth(sandboxID string) error {
// interfaces will have the same name as ports
portList, err := ovsFind("interface", "name", "external-ids:sandbox="+sandboxID)
if err != nil {
return err
}

// Clear the QoS for any ports of this sandbox
for _, port := range portList {
if err = ovsClear("port", port, "qos"); err != nil {
return err
}
}

// Now that the QoS is unused remove it
qosList, err := ovsFind("qos", "_uuid", "external-ids:sandbox="+sandboxID)
if err != nil {
return err
}
for _, qos := range qosList {
if err := ovsDestroy("qos", qos); err != nil {
return err
}
}

return nil
}

func setPodBandwidth(sandboxID, ifname string, ingress, egress string) error {
ingressMPS, _ := strconv.Atoi(ingress)
ingressKPS := ingressMPS * 1000
if ingressKPS > 0 {
// ingress_policing_rate is in Kbps
err := ovsSet("interface", ifname, fmt.Sprintf("ingress_policing_rate=%d", ingressKPS))
if err != nil {
return err
}
}
egressMPS, _ := strconv.Atoi(egress)
egressBPS := egressMPS * 1000 * 1000
if egressBPS > 0 {
qos, err := ovsCreate("qos", "type=linux-htb", fmt.Sprintf("other-config:max-rate=%d", egressBPS), "external-ids=sandbox="+sandboxID)
if err != nil {
return err
}
err = ovsSet("port", ifname, fmt.Sprintf("qos=%s", qos))
if err != nil {
return err
}
}
return nil
}

func ovsExec(args ...string) (string, error) {
args = append([]string{"--timeout=30"}, args...)
output, err := exec.Command("ovs-vsctl", args...).CombinedOutput()
if err != nil {
return "", fmt.Errorf("failed to run 'ovs-vsctl %s': %v\n %q", strings.Join(args, " "), err, string(output))
}

outStr := string(output)
trimmed := strings.TrimSpace(outStr)
// If output is a single line, strip the trailing newline
if strings.Count(trimmed, "\n") == 0 {
outStr = trimmed
}

return outStr, nil
}

func ovsCreate(table string, values ...string) (string, error) {
args := append([]string{"create", table}, values...)
return ovsExec(args...)
}

func ovsDestroy(table, record string) error {
_, err := ovsExec("--if-exists", "destroy", table, record)
return err
}

func ovsSet(table, record string, values ...string) error {
args := append([]string{"set", table, record}, values...)
_, err := ovsExec(args...)
return err
}

// Returns the given column of records that match the condition
func ovsFind(table, column, condition string) ([]string, error) {
output, err := ovsExec("--no-heading", "--columns="+column, "find", table, condition)
if err != nil {
return nil, err
}
values := strings.Split(output, "\n\n")
// We want "bare" values for strings, but we can't pass --bare to ovs-vsctl because
// it breaks more complicated types. So try passing each value through Unquote();
// if it fails, that means the value wasn't a quoted string, so use it as-is.
for i, val := range values {
if unquoted, err := strconv.Unquote(val); err == nil {
values[i] = unquoted
}
}
return values, nil
}

func ovsClear(table, record string, columns ...string) error {
args := append([]string{"--if-exists", "clear", table, record}, columns...)
_, err := ovsExec(args...)
return err
}
8 changes: 4 additions & 4 deletions pkg/daemon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"
)

var RequestLogString = "[%s] Incoming %s %s %s request from %s"
var ResponseLogString = "[%s] Outcoming response to %s %s %s with %d status code in %vms"
var RequestLogString = "[%s] Incoming %s %s %s request"
var ResponseLogString = "[%s] Outcoming response %s %s with %d status code in %vms"

func RunServer(config *Configuration) {
csh, err := createCniServerHandler(config)
Expand Down Expand Up @@ -75,7 +75,7 @@ func formatRequestLog(request *restful.Request) string {
}

return fmt.Sprintf(RequestLogString, time.Now().Format(time.RFC3339), request.Request.Proto,
request.Request.Method, uri, request.Request.RemoteAddr)
request.Request.Method, uri)
}

// formatResponseLog formats response log string.
Expand All @@ -85,5 +85,5 @@ func formatResponseLog(response *restful.Response, request *restful.Request, req
uri = request.Request.URL.RequestURI()
}
return fmt.Sprintf(ResponseLogString, time.Now().Format(time.RFC3339),
request.Request.RemoteAddr, request.Request.Method, uri, response.StatusCode(), reqTime)
request.Request.Method, uri, response.StatusCode(), reqTime)
}
File renamed without changes.

0 comments on commit 1fe8c91

Please sign in to comment.