Skip to content

Commit

Permalink
feat: session service
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Jul 6, 2020
1 parent 5ca78e8 commit 8e03239
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 52 deletions.
68 changes: 37 additions & 31 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ type Configuration struct {
NodeSwitchCIDR string
NodeSwitchGateway string

ClusterTcpLoadBalancer string
ClusterUdpLoadBalancer string
ClusterTcpLoadBalancer string
ClusterUdpLoadBalancer string
ClusterTcpSessionLoadBalancer string
ClusterUdpSessionLoadBalancer string

PodName string
PodNamespace string
Expand Down Expand Up @@ -75,8 +77,10 @@ func ParseFlags() (*Configuration, error) {
argNodeSwitchCIDR = pflag.String("node-switch-cidr", "100.64.0.0/16", "The cidr for node switch, default: 100.64.0.0/16")
argNodeSwitchGateway = pflag.String("node-switch-gateway", "", "The gateway for node switch, default the first ip in node-switch-cidr")

argClusterTcpLoadBalancer = pflag.String("cluster-tcp-loadbalancer", "cluster-tcp-loadbalancer", "The name for cluster tcp loadbalancer")
argClusterUdpLoadBalancer = pflag.String("cluster-udp-loadbalancer", "cluster-udp-loadbalancer", "The name for cluster udp loadbalancer")
argClusterTcpLoadBalancer = pflag.String("cluster-tcp-loadbalancer", "cluster-tcp-loadbalancer", "The name for cluster tcp loadbalancer")
argClusterUdpLoadBalancer = pflag.String("cluster-udp-loadbalancer", "cluster-udp-loadbalancer", "The name for cluster udp loadbalancer")
argClusterTcpSessionLoadBalancer = pflag.String("cluster-tcp-session-loadbalancer", "cluster-tcp-session-loadbalancer", "The name for cluster tcp session loadbalancer")
argClusterUdpSessionLoadBalancer = pflag.String("cluster-udp-session-loadbalancer", "cluster-udp-session-loadbalancer", "The name for cluster udp session loadbalancer")

argWorkerNum = pflag.Int("worker-num", 3, "The parallelism of each worker, default: 3")
argPprofPort = pflag.Int("pprof-port", 10660, "The port to get profiling data, default 10660")
Expand Down Expand Up @@ -108,33 +112,35 @@ func ParseFlags() (*Configuration, error) {
pflag.Parse()

config := &Configuration{
OvnNbSocket: *argOvnNbSocket,
OvnNbHost: *argOvnNbHost,
OvnNbPort: *argOvnNbPort,
OvnSbHost: *argOvnSbHost,
OvnSbPort: *argOvnSbPort,
OvnTimeout: *argOvnTimeout,
KubeConfigFile: *argKubeConfigFile,
DefaultLogicalSwitch: *argDefaultLogicalSwitch,
DefaultCIDR: *argDefaultCIDR,
DefaultGateway: *argDefaultGateway,
DefaultExcludeIps: *argDefaultExcludeIps,
ClusterRouter: *argClusterRouter,
NodeSwitch: *argNodeSwitch,
NodeSwitchCIDR: *argNodeSwitchCIDR,
NodeSwitchGateway: *argNodeSwitchGateway,
ClusterTcpLoadBalancer: *argClusterTcpLoadBalancer,
ClusterUdpLoadBalancer: *argClusterUdpLoadBalancer,
WorkerNum: *argWorkerNum,
PprofPort: *argPprofPort,
NetworkType: *argsNetworkType,
DefaultVlanID: *argsDefaultVlanID,
DefaultProviderName: *argsDefaultProviderName,
DefaultHostInterface: *argsDefaultInterfaceName,
DefaultVlanName: *argsDefaultVlanName,
DefaultVlanRange: *argsDefaultVlanRange,
PodName: os.Getenv("POD_NAME"),
PodNamespace: os.Getenv("KUBE_NAMESPACE"),
OvnNbSocket: *argOvnNbSocket,
OvnNbHost: *argOvnNbHost,
OvnNbPort: *argOvnNbPort,
OvnSbHost: *argOvnSbHost,
OvnSbPort: *argOvnSbPort,
OvnTimeout: *argOvnTimeout,
KubeConfigFile: *argKubeConfigFile,
DefaultLogicalSwitch: *argDefaultLogicalSwitch,
DefaultCIDR: *argDefaultCIDR,
DefaultGateway: *argDefaultGateway,
DefaultExcludeIps: *argDefaultExcludeIps,
ClusterRouter: *argClusterRouter,
NodeSwitch: *argNodeSwitch,
NodeSwitchCIDR: *argNodeSwitchCIDR,
NodeSwitchGateway: *argNodeSwitchGateway,
ClusterTcpLoadBalancer: *argClusterTcpLoadBalancer,
ClusterUdpLoadBalancer: *argClusterUdpLoadBalancer,
ClusterTcpSessionLoadBalancer: *argClusterTcpSessionLoadBalancer,
ClusterUdpSessionLoadBalancer: *argClusterUdpSessionLoadBalancer,
WorkerNum: *argWorkerNum,
PprofPort: *argPprofPort,
NetworkType: *argsNetworkType,
DefaultVlanID: *argsDefaultVlanID,
DefaultProviderName: *argsDefaultProviderName,
DefaultHostInterface: *argsDefaultInterfaceName,
DefaultVlanName: *argsDefaultVlanName,
DefaultVlanRange: *argsDefaultVlanRange,
PodName: os.Getenv("POD_NAME"),
PodNamespace: os.Getenv("KUBE_NAMESPACE"),
}

if config.DefaultGateway == "" {
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
return nil
}

tcpLb, udpLb := c.config.ClusterTcpLoadBalancer, c.config.ClusterUdpLoadBalancer
if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
tcpLb, udpLb = c.config.ClusterTcpSessionLoadBalancer, c.config.ClusterUdpSessionLoadBalancer
}

for _, port := range svc.Spec.Ports {
var vip string
if util.CheckProtocol(clusterIP) == kubeovnv1.ProtocolIPv6 {
Expand All @@ -128,27 +133,27 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
if port.Protocol == v1.ProtocolTCP {
// for performance reason delete lb with no backends
if len(backends) > 0 {
err = c.ovnClient.CreateLoadBalancerRule(c.config.ClusterTcpLoadBalancer, vip, getServicePortBackends(ep, port, clusterIP), string(port.Protocol))
err = c.ovnClient.CreateLoadBalancerRule(tcpLb, vip, getServicePortBackends(ep, port, clusterIP), string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to tcp lb, %v", vip, err)
return err
}
} else {
err = c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
err = c.ovnClient.DeleteLoadBalancerVip(vip, tcpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at tcp lb, %v", vip, err)
return err
}
}
} else {
if len(backends) > 0 {
err = c.ovnClient.CreateLoadBalancerRule(c.config.ClusterUdpLoadBalancer, vip, getServicePortBackends(ep, port, clusterIP), string(port.Protocol))
err = c.ovnClient.CreateLoadBalancerRule(udpLb, vip, getServicePortBackends(ep, port, clusterIP), string(port.Protocol))
if err != nil {
klog.Errorf("failed to update vip %s to udp lb, %v", vip, err)
return err
}
} else {
err = c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
err = c.ovnClient.DeleteLoadBalancerVip(vip, udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s at udp lb, %v", vip, err)
return err
Expand Down
56 changes: 53 additions & 3 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,23 @@ func (c *Controller) gcLoadBalancer() error {
}
tcpVips := []string{}
udpVips := []string{}
tcpSessionVips := []string{}
udpSessionVips := []string{}
for _, svc := range svcs {
ip := svc.Spec.ClusterIP
for _, port := range svc.Spec.Ports {
if port.Protocol == corev1.ProtocolTCP {
tcpVips = append(tcpVips, fmt.Sprintf("%s:%d", ip, port.Port))
if svc.Spec.SessionAffinity == corev1.ServiceAffinityClientIP {
tcpSessionVips = append(tcpSessionVips, fmt.Sprintf("%s:%d", ip, port.Port))
} else {
tcpVips = append(tcpVips, fmt.Sprintf("%s:%d", ip, port.Port))
}
} else {
udpVips = append(udpVips, fmt.Sprintf("%s:%d", ip, port.Port))
if svc.Spec.SessionAffinity == corev1.ServiceAffinityClientIP {
tcpSessionVips = append(udpSessionVips, fmt.Sprintf("%s:%d", ip, port.Port))
} else {
tcpVips = append(udpVips, fmt.Sprintf("%s:%d", ip, port.Port))
}
}
}
}
Expand All @@ -164,7 +174,7 @@ func (c *Controller) gcLoadBalancer() error {
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
klog.Errorf("failed to get tcp lb vips %v", err)
return err
}
for vip := range vips {
Expand All @@ -177,6 +187,25 @@ func (c *Controller) gcLoadBalancer() error {
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterTcpSessionLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get tcp session lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, tcpSessionVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpSessionLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp session lb, %v", vip, err)
return err
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
Expand All @@ -196,6 +225,27 @@ func (c *Controller) gcLoadBalancer() error {
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpSessionLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp session lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, udpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpSessionLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from udp session lb, %v", vip, err)
return err
}
}
}

return nil
}

Expand Down
35 changes: 33 additions & 2 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *Controller) initLoadBalancer() error {
}
if tcpLb == "" {
klog.Infof("init cluster tcp load balancer %s", c.config.ClusterTcpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpLoadBalancer, util.ProtocolTCP)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpLoadBalancer, util.ProtocolTCP, "")
if err != nil {
klog.Errorf("failed to crate cluster tcp load balancer %v", err)
return err
Expand All @@ -138,20 +138,51 @@ func (c *Controller) initLoadBalancer() error {
klog.Infof("tcp load balancer %s exists", tcpLb)
}

tcpSessionLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpSessionLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find tcp session lb %v", err)
}
if tcpSessionLb == "" {
klog.Infof("init cluster tcp session load balancer %s", c.config.ClusterTcpSessionLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpSessionLoadBalancer, util.ProtocolTCP, "ip_src")
if err != nil {
klog.Errorf("failed to crate cluster tcp session load balancer %v", err)
return err
}
} else {
klog.Infof("tcp session load balancer %s exists", tcpSessionLb)
}

udpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp lb %v", err)
}
if udpLb == "" {
klog.Infof("init cluster udp load balancer %s", c.config.ClusterUdpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpLoadBalancer, util.ProtocolUDP)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpLoadBalancer, util.ProtocolUDP, "")
if err != nil {
klog.Errorf("failed to crate cluster udp load balancer %v", err)
return err
}
} else {
klog.Infof("udp load balancer %s exists", udpLb)
}

udpSessionLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpSessionLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp session lb %v", err)
}
if udpSessionLb == "" {
klog.Infof("init cluster udp session load balancer %s", c.config.ClusterUdpSessionLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpSessionLoadBalancer, util.ProtocolUDP, "ip_src")
if err != nil {
klog.Errorf("failed to crate cluster udp session load balancer %v", err)
return err
}
} else {
klog.Infof("udp session load balancer %s exists", udpSessionLb)
}

return nil
}

Expand Down
35 changes: 26 additions & 9 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,17 @@ func (c *Controller) handleDeleteService(vip string, protocol v1.Protocol) error
}

if protocol == v1.ProtocolTCP {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
if err != nil {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
return err
}
if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpSessionLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from tcp session lb, %v", vip, err)
return err
}
} else {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from udp lb, %v", vip, err)
if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpSessionLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from udp session lb, %v", vip, err)
return err
}
}
Expand Down Expand Up @@ -208,6 +210,13 @@ func (c *Controller) handleUpdateService(key string) error {

tcpVips := []string{}
udpVips := []string{}
tcpLb, udpLb := c.config.ClusterTcpLoadBalancer, c.config.ClusterUdpLoadBalancer
oTcpLb, oUdpLb := c.config.ClusterTcpSessionLoadBalancer, c.config.ClusterUdpSessionLoadBalancer
if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
tcpLb, udpLb = c.config.ClusterTcpSessionLoadBalancer, c.config.ClusterUdpSessionLoadBalancer
oTcpLb, oUdpLb = c.config.ClusterTcpLoadBalancer, c.config.ClusterUdpLoadBalancer

}

for _, port := range svc.Spec.Ports {
if port.Protocol == v1.ProtocolTCP {
Expand All @@ -217,7 +226,7 @@ func (c *Controller) handleUpdateService(key string) error {
}
}
// for service update
lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
lbUuid, err := c.ovnClient.FindLoadbalancer(tcpLb)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
Expand All @@ -229,6 +238,10 @@ func (c *Controller) handleUpdateService(key string) error {
}
klog.V(3).Infof("exist tcp vips are %v", vips)
for _, vip := range tcpVips {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, oTcpLb); err != nil {
klog.Errorf("failed to delete lb %s form %s, %v", vip, oTcpLb, err)
return err
}
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to tcp lb", vip)
c.updateEndpointQueue.Add(key)
Expand All @@ -239,15 +252,15 @@ func (c *Controller) handleUpdateService(key string) error {
for vip := range vips {
if strings.Split(vip, ":")[0] == ip && !util.IsStringIn(vip, tcpVips) {
klog.Infof("remove stall vip %s", vip)
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
err := c.ovnClient.DeleteLoadBalancerVip(vip, tcpLb)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb %v", vip, err)
return err
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
lbUuid, err = c.ovnClient.FindLoadbalancer(udpLb)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
Expand All @@ -259,6 +272,10 @@ func (c *Controller) handleUpdateService(key string) error {
}
klog.Infof("exist udp vips are %v", vips)
for _, vip := range udpVips {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, oUdpLb); err != nil {
klog.Errorf("failed to delete lb %s form %s, %v", vip, oUdpLb, err)
return err
}
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to udp lb", vip)
c.updateEndpointQueue.Add(key)
Expand All @@ -269,7 +286,7 @@ func (c *Controller) handleUpdateService(key string) error {
for vip := range vips {
if strings.Split(vip, ":")[0] == ip && !util.IsStringIn(vip, udpVips) {
klog.Infof("remove stall vip %s", vip)
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
err := c.ovnClient.DeleteLoadBalancerVip(vip, udpLb)
if err != nil {
klog.Errorf("failed to delete vip %s from udp lb %v", vip, err)
return err
Expand Down
13 changes: 10 additions & 3 deletions pkg/ovs/ovn-nbctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,16 @@ func (c Client) FindLoadbalancer(lb string) (string, error) {
}

// CreateLoadBalancer create loadbalancer in ovn
func (c Client) CreateLoadBalancer(lb, protocol string) error {
_, err := c.ovnNbCommand("create", "load_balancer",
fmt.Sprintf("name=%s", lb), fmt.Sprintf("protocol=%s", protocol))
func (c Client) CreateLoadBalancer(lb, protocol, selectFields string) error {
var err error
if selectFields == "" {
_, err = c.ovnNbCommand("create", "load_balancer",
fmt.Sprintf("name=%s", lb), fmt.Sprintf("protocol=%s", protocol))
} else {
_, err = c.ovnNbCommand("create", "load_balancer",
fmt.Sprintf("name=%s", lb), fmt.Sprintf("protocol=%s", protocol), fmt.Sprintf("selection_fields=%s", selectFields))
}

return err
}

Expand Down

0 comments on commit 8e03239

Please sign in to comment.