Skip to content

Commit

Permalink
Merge pull request #1137 from trozet/add_sctp_service_support
Browse files Browse the repository at this point in the history
Add sctp service support
  • Loading branch information
dcbw committed Apr 6, 2020
2 parents 9685534 + c878a37 commit 8dd8f5c
Show file tree
Hide file tree
Showing 17 changed files with 535 additions and 287 deletions.
5 changes: 5 additions & 0 deletions dist/images/Dockerfile.fedora
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ RUN INSTALL_PKGS=" \
dnf install --best --refresh -y --setopt=tsflags=nodocs $INSTALL_PKGS && \
dnf clean all && rm -rf /var/cache/dnf/*

# ensure we pick up ovn-20.03.0-2.fc31 for SCTP fixes/support
# this should have no effect in the future once the
# RPM has propagated to all stable mirrors and can be removed
RUN dnf install -y ovn --best --advisory=FEDORA-2020-b570bbc33b || true

RUN mkdir -p /var/run/openvswitch && \
mkdir -p /usr/libexec/cni/

Expand Down
1 change: 1 addition & 0 deletions go-controller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8=
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f h1:GiPwtSzdP43eI1hpPCbROQCCIgCuiMMNF8YUVLF3vJo=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Expand Down
10 changes: 9 additions & 1 deletion go-controller/pkg/node/gateway_init_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
. "github.com/onsi/gomega"
)

func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID string) {
func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID, sctpLBUUID string) {
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:TCP_lb_gateway_router=" + util.GWRouterPrefix + nodeName,
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:UDP_lb_gateway_router=" + util.GWRouterPrefix + nodeName,
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:SCTP_lb_gateway_router=" + util.GWRouterPrefix + nodeName,
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:TCP_lb_gateway_router=" + util.GWRouterPrefix + nodeName + " protocol=tcp",
Expand All @@ -40,9 +41,14 @@ func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID stri
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:UDP_lb_gateway_router=" + util.GWRouterPrefix + nodeName + " protocol=udp",
Output: udpLBUUID,
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:SCTP_lb_gateway_router=" + util.GWRouterPrefix + nodeName + " protocol=sctp",
Output: sctpLBUUID,
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 set logical_router " + util.GWRouterPrefix + nodeName + " load_balancer=" + tcpLBUUID,
"ovn-nbctl --timeout=15 add logical_router " + util.GWRouterPrefix + nodeName + " load_balancer " + udpLBUUID,
"ovn-nbctl --timeout=15 add logical_router " + util.GWRouterPrefix + nodeName + " load_balancer " + sctpLBUUID,
})
}

Expand All @@ -59,6 +65,7 @@ func shareGatewayInterfaceTest(app *cli.App, testNS ns.NetNS,
systemID string = "cb9ec8fa-b409-4ef3-9f42-d9283c47aac6"
tcpLBUUID string = "d2e858b2-cb5a-441b-a670-ed450f79a91f"
udpLBUUID string = "12832f14-eb0f-44d4-b8db-4cccbc73c792"
sctpLBUUID string = "0514c521-a120-4756-aec6-883fe5db7139"
nodeSubnet string = "10.1.1.0/24"
gwRouter string = util.GWRouterPrefix + nodeName
mgtPortName string = "k8s-" + nodeName
Expand Down Expand Up @@ -289,6 +296,7 @@ var _ = Describe("Gateway Init Operations", func() {
systemID string = "cb9ec8fa-b409-4ef3-9f42-d9283c47aac6"
tcpLBUUID string = "d2e858b2-cb5a-441b-a670-ed450f79a91f"
udpLBUUID string = "12832f14-eb0f-44d4-b8db-4cccbc73c792"
sctpLBUUID string = "0514c521-a120-4756-aec6-883fe5db7139"
nodeSubnet string = "10.1.1.0/24"
gwRouter string = util.GWRouterPrefix + nodeName
clusterIPNet string = "10.1.0.0"
Expand Down
7 changes: 4 additions & 3 deletions go-controller/pkg/node/gateway_localnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ func localnetIPTablesHelper(subnet string) (util.IPTablesHelper, error) {
func localnetIptRules(svc *kapi.Service, gatewayIP string) []iptRule {
rules := make([]iptRule, 0)
for _, svcPort := range svc.Spec.Ports {
protocol := svcPort.Protocol
if protocol != kapi.ProtocolUDP && protocol != kapi.ProtocolTCP {
protocol = kapi.ProtocolTCP
protocol, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("Invalid service port %s: %v", svcPort.Name, err)
continue
}

nodePort := fmt.Sprintf("%d", svcPort.NodePort)
Expand Down
19 changes: 12 additions & 7 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func addService(service *kapi.Service, inport, outport, gwBridge string) {
}

for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != kapi.ProtocolTCP &&
svcPort.Protocol != kapi.ProtocolUDP {
_, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("Skipping service add. Invalid service port %s: %v", svcPort.Name, err)
continue
}
protocol := strings.ToLower(string(svcPort.Protocol))
Expand All @@ -45,8 +46,9 @@ func deleteService(service *kapi.Service, inport, gwBridge string) {
}

for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != kapi.ProtocolTCP &&
svcPort.Protocol != kapi.ProtocolUDP {
_, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("Skipping service delete. Invalid service port %s: %v", svcPort.Name, err)
continue
}

Expand Down Expand Up @@ -84,11 +86,12 @@ func syncServices(services []interface{}, inport, gwBridge string) {
continue
}

prot := svcPort.Protocol
if prot != kapi.ProtocolTCP && prot != kapi.ProtocolUDP {
proto, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("syncServices error for service port %s: %v", svcPort.Name, err)
continue
}
protocol := strings.ToLower(string(prot))
protocol := strings.ToLower(string(proto))
nodePortKey := fmt.Sprintf("%s_%d", protocol, port)
nodePorts[nodePortKey] = true
}
Expand Down Expand Up @@ -119,6 +122,8 @@ func syncServices(services []interface{}, inport, gwBridge string) {
key = fmt.Sprintf("tcp_%s", group[1])
} else if strings.Contains(flow, "udp") {
key = fmt.Sprintf("udp_%s", group[1])
} else if strings.Contains(flow, "sctp") {
key = fmt.Sprintf("sctp_%s", group[1])
} else {
continue
}
Expand Down
205 changes: 85 additions & 120 deletions go-controller/pkg/ovn/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,36 @@ type lbEndpoints struct {
Port int32
}

func (ovn *Controller) getLbEndpoints(ep *kapi.Endpoints, tcpPortMap, udpPortMap map[string]lbEndpoints) {
func (ovn *Controller) getLbEndpoints(ep *kapi.Endpoints) map[kapi.Protocol]map[string]lbEndpoints {
protoPortMap := map[kapi.Protocol]map[string]lbEndpoints{
kapi.ProtocolTCP: make(map[string]lbEndpoints),
kapi.ProtocolUDP: make(map[string]lbEndpoints),
kapi.ProtocolSCTP: make(map[string]lbEndpoints),
}
for _, s := range ep.Subsets {
for _, ip := range s.Addresses {
for _, port := range s.Ports {
var ips []string
var portMap map[string]lbEndpoints
if port.Protocol == kapi.ProtocolUDP {
portMap = udpPortMap
} else if port.Protocol == kapi.ProtocolTCP {
portMap = tcpPortMap
if _, err := util.ValidateProtocol(port.Protocol); err != nil {
klog.Errorf("Invalid endpoint port: %s: %v", port.Name, err)
continue
}
if lbEps, ok := portMap[port.Name]; ok {
ips = lbEps.IPs
if lbEps, ok := protoPortMap[port.Protocol][port.Name]; ok {
ips = append(lbEps.IPs, ip.IP)
} else {
ips = make([]string, 0)
ips = []string{ip.IP}
}
ips = append(ips, ip.IP)
portMap[port.Name] = lbEndpoints{IPs: ips, Port: port.Port}
protoPortMap[port.Protocol][port.Name] = lbEndpoints{IPs: ips, Port: port.Port}
}
}
}
klog.V(5).Infof("Tcp table: %v\nUdp table: %v", tcpPortMap, udpPortMap)
klog.V(5).Infof("Endpoint Protocol Map is: %v", protoPortMap)
return protoPortMap
}

// AddEndpoints adds endpoints and creates corresponding resources in OVN
func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
klog.V(5).Infof("Adding endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
// get service
// TODO: cache the service
svc, err := ovn.watchFactory.GetService(ep.Namespace, ep.Name)
Expand All @@ -55,71 +59,47 @@ func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
svc.Name, svc.Spec.ClusterIP)
return nil
}
tcpPortMap := make(map[string]lbEndpoints)
udpPortMap := make(map[string]lbEndpoints)
ovn.getLbEndpoints(ep, tcpPortMap, udpPortMap)
for svcPortName, lbEps := range tcpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolTCP && svcPort.Name == svcPortName {
if util.ServiceTypeHasNodePort(svc) {
klog.V(5).Infof("Creating Gateways IP for NodePort: %d, %v", svcPort.NodePort, ips)
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
if err != nil {
klog.Errorf("Error in creating Node Port for svc %s, node port: %d - %v\n", svc.Name, svcPort.NodePort, err)
continue
}
}
if util.ServiceTypeHasClusterIP(svc) {
var loadBalancer string
loadBalancer, err = ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Errorf("Failed to get loadbalancer for %s (%v)",
svcPort.Protocol, err)
continue
}
err = ovn.createLoadBalancerVIP(loadBalancer,
svc.Spec.ClusterIP, svcPort.Port, ips, targetPort)
if err != nil {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, targetPort, err)
continue
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
}
}
for svcPortName, lbEps := range udpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolUDP && svcPort.Name == svcPortName {
if util.ServiceTypeHasNodePort(svc) {
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
if err != nil {
klog.Errorf("Error in creating Node Port for svc %s, node port: %d - %v\n", svc.Name, svcPort.NodePort, err)
klog.V(5).Infof("Matching service %s found for ep: %s, with cluster IP: %s", svc.Name, ep.Name,
svc.Spec.ClusterIP)

protoPortMap := ovn.getLbEndpoints(ep)
klog.V(5).Infof("Matching service %s ports: %v", svc.Name, svc.Spec.Ports)
for proto, portMap := range protoPortMap {
for svcPortName, lbEps := range portMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
if !ovn.SCTPSupport && proto == kapi.ProtocolSCTP {
klog.Errorf("Rejecting endpoint creation for unsupported SCTP protocol: %s, %s", ep.Namespace, ep.Name)
continue
}
}
if util.ServiceTypeHasClusterIP(svc) {
var loadBalancer string
loadBalancer, err = ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Errorf("Failed to get loadbalancer for %s (%v)",
svcPort.Protocol, err)
continue
if util.ServiceTypeHasNodePort(svc) {
klog.V(5).Infof("Creating Gateways IP for NodePort: %d, %v", svcPort.NodePort, ips)
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
if err != nil {
klog.Errorf("Error in creating Node Port for svc %s, node port: %d - %v\n", svc.Name, svcPort.NodePort, err)
continue
}
}
err = ovn.createLoadBalancerVIP(loadBalancer, svc.Spec.ClusterIP, svcPort.Port, ips, targetPort)
if err != nil {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, targetPort, err)
continue
if util.ServiceTypeHasClusterIP(svc) {
var loadBalancer string
loadBalancer, err = ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Errorf("Failed to get loadbalancer for %s (%v)",
svcPort.Protocol, err)
continue
}
err = ovn.createLoadBalancerVIP(loadBalancer,
svc.Spec.ClusterIP, svcPort.Port, ips, targetPort)
if err != nil {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, targetPort, err)
continue
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
}
Expand All @@ -129,13 +109,7 @@ func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {

func (ovn *Controller) handleNodePortLB(node *kapi.Node) error {
physicalGateway := util.GWRouterPrefix + node.Name
var k8sNSLbTCP, k8sNSLbUDP, physicalIP string
if k8sNSLbTCP, _ = ovn.getGatewayLoadBalancer(physicalGateway, TCP); k8sNSLbTCP == "" {
return fmt.Errorf("TCP load balancer for node %q does not yet exist", node.Name)
}
if k8sNSLbUDP, _ = ovn.getGatewayLoadBalancer(physicalGateway, UDP); k8sNSLbUDP == "" {
return fmt.Errorf("UDP load balancer for node %q does not yet exist", node.Name)
}
var physicalIP string
if physicalIP, _ = ovn.getGatewayPhysicalIP(physicalGateway); physicalIP == "" {
return fmt.Errorf("gateway physical IP for node %q does not yet exist", node.Name)
}
Expand All @@ -157,31 +131,23 @@ func (ovn *Controller) handleNodePortLB(node *kapi.Node) error {
if !util.ServiceTypeHasNodePort(svc) {
continue
}
tcpPortMap := make(map[string]lbEndpoints)
udpPortMap := make(map[string]lbEndpoints)
ovn.getLbEndpoints(ep, tcpPortMap, udpPortMap)
for svcPortName, lbEps := range tcpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolTCP && svcPort.Name == svcPortName {
err = ovn.createLoadBalancerVIP(k8sNSLbTCP, physicalIP, svcPort.NodePort, ips, targetPort)
if err != nil {
klog.Errorf("failed to create VIP in load balancer %s - %v", k8sNSLbTCP, err)
continue
}
}
}
}
for svcPortName, lbEps := range udpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolUDP && svcPort.Name == svcPortName {
err = ovn.createLoadBalancerVIP(k8sNSLbUDP, physicalIP, svcPort.NodePort, ips, targetPort)
if err != nil {
klog.Errorf("failed to create VIP in load balancer %s - %v", k8sNSLbUDP, err)
continue
protoPortMap := ovn.getLbEndpoints(ep)
for proto, portMap := range protoPortMap {
for svcPortName, lbEps := range portMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
k8sNSLb, _ := ovn.getGatewayLoadBalancer(physicalGateway, proto)
if k8sNSLb == "" {
return fmt.Errorf("%s load balancer for node %q does not yet exist",
proto, node.Name)
}
err = ovn.createLoadBalancerVIP(k8sNSLb, physicalIP, svcPort.NodePort, ips, targetPort)
if err != nil {
klog.Errorf("failed to create VIP in load balancer %s - %v", k8sNSLb, err)
continue
}
}
}
}
Expand Down Expand Up @@ -215,22 +181,20 @@ func (ovn *Controller) updateExternalIPsLB() {
}
tcpPortMap := make(map[string]lbEndpoints)
udpPortMap := make(map[string]lbEndpoints)
ovn.getLbEndpoints(ep, tcpPortMap, udpPortMap)
for svcPortName, lbEps := range tcpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolTCP && svcPort.Name == svcPortName {
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
sctpPortMap := make(map[string]lbEndpoints)
protoPortMap := map[kapi.Protocol]map[string]lbEndpoints{
kapi.ProtocolTCP: tcpPortMap,
kapi.ProtocolUDP: udpPortMap,
kapi.ProtocolSCTP: sctpPortMap,
}
for svcPortName, lbEps := range udpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolUDP && svcPort.Name == svcPortName {
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
for proto, portMap := range protoPortMap {
for svcPortName, lbEps := range portMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
}
}
Expand Down Expand Up @@ -266,6 +230,7 @@ func (ovn *Controller) handleExternalIPs(svc *kapi.Service, svcPort kapi.Service
}

func (ovn *Controller) deleteEndpoints(ep *kapi.Endpoints) error {
klog.V(5).Infof("Deleting endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
svc, err := ovn.watchFactory.GetService(ep.Namespace, ep.Name)
if err != nil {
// This is not necessarily an error. For e.g when a service is deleted,
Expand Down

0 comments on commit 8dd8f5c

Please sign in to comment.