Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sctp service support #1137

Merged
merged 3 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions dist/images/Dockerfile.fedora
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ RUN INSTALL_PKGS=" \
dnf install --best --refresh -y --setopt=tsflags=nodocs $INSTALL_PKGS && \
dnf clean all && rm -rf /var/cache/dnf/*

# ensure we pick up ovn-20.03.0-2.fc31 for SCTP fixes/support
# this should have no effect in the future once the
# RPM has propagated to all stable mirrors and can be removed
RUN dnf install -y ovn --best --advisory=FEDORA-2020-b570bbc33b || true
trozet marked this conversation as resolved.
Show resolved Hide resolved

RUN mkdir -p /var/run/openvswitch && \
mkdir -p /usr/libexec/cni/

Expand Down
1 change: 1 addition & 0 deletions go-controller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8=
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f h1:GiPwtSzdP43eI1hpPCbROQCCIgCuiMMNF8YUVLF3vJo=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Expand Down
10 changes: 9 additions & 1 deletion go-controller/pkg/node/gateway_init_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
. "github.com/onsi/gomega"
)

func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID string) {
func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID, sctpLBUUID string) {
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:TCP_lb_gateway_router=" + util.GWRouterPrefix + nodeName,
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:UDP_lb_gateway_router=" + util.GWRouterPrefix + nodeName,
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:SCTP_lb_gateway_router=" + util.GWRouterPrefix + nodeName,
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:TCP_lb_gateway_router=" + util.GWRouterPrefix + nodeName + " protocol=tcp",
Expand All @@ -41,9 +42,14 @@ func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID stri
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:UDP_lb_gateway_router=" + util.GWRouterPrefix + nodeName + " protocol=udp",
Output: udpLBUUID,
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:SCTP_lb_gateway_router=" + util.GWRouterPrefix + nodeName + " protocol=sctp",
Output: sctpLBUUID,
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 set logical_router " + util.GWRouterPrefix + nodeName + " load_balancer=" + tcpLBUUID,
"ovn-nbctl --timeout=15 add logical_router " + util.GWRouterPrefix + nodeName + " load_balancer " + udpLBUUID,
"ovn-nbctl --timeout=15 add logical_router " + util.GWRouterPrefix + nodeName + " load_balancer " + sctpLBUUID,
})
}

Expand All @@ -60,6 +66,7 @@ func shareGatewayInterfaceTest(app *cli.App, testNS ns.NetNS,
systemID string = "cb9ec8fa-b409-4ef3-9f42-d9283c47aac6"
tcpLBUUID string = "d2e858b2-cb5a-441b-a670-ed450f79a91f"
udpLBUUID string = "12832f14-eb0f-44d4-b8db-4cccbc73c792"
sctpLBUUID string = "0514c521-a120-4756-aec6-883fe5db7139"
nodeSubnet string = "10.1.1.0/24"
gwRouter string = util.GWRouterPrefix + nodeName
mgtPortName string = "k8s-" + nodeName
Expand Down Expand Up @@ -293,6 +300,7 @@ var _ = Describe("Gateway Init Operations", func() {
systemID string = "cb9ec8fa-b409-4ef3-9f42-d9283c47aac6"
tcpLBUUID string = "d2e858b2-cb5a-441b-a670-ed450f79a91f"
udpLBUUID string = "12832f14-eb0f-44d4-b8db-4cccbc73c792"
sctpLBUUID string = "0514c521-a120-4756-aec6-883fe5db7139"
nodeSubnet string = "10.1.1.0/24"
gwRouter string = util.GWRouterPrefix + nodeName
clusterIPNet string = "10.1.0.0"
Expand Down
7 changes: 4 additions & 3 deletions go-controller/pkg/node/gateway_localnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ func localnetIPTablesHelper(subnet string) (util.IPTablesHelper, error) {
func localnetIptRules(svc *kapi.Service, gatewayIP string) []iptRule {
rules := make([]iptRule, 0)
for _, svcPort := range svc.Spec.Ports {
protocol := svcPort.Protocol
if protocol != kapi.ProtocolUDP && protocol != kapi.ProtocolTCP {
protocol = kapi.ProtocolTCP
protocol, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("Invalid service port %s: %v", svcPort.Name, err)
continue
}

nodePort := fmt.Sprintf("%d", svcPort.NodePort)
Expand Down
19 changes: 12 additions & 7 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func addService(service *kapi.Service, inport, outport, gwBridge string) {
}

for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != kapi.ProtocolTCP &&
svcPort.Protocol != kapi.ProtocolUDP {
_, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("Skipping service add. Invalid service port %s: %v", svcPort.Name, err)
continue
}
protocol := strings.ToLower(string(svcPort.Protocol))
Expand All @@ -45,8 +46,9 @@ func deleteService(service *kapi.Service, inport, gwBridge string) {
}

for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != kapi.ProtocolTCP &&
svcPort.Protocol != kapi.ProtocolUDP {
_, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("Skipping service delete. Invalid service port %s: %v", svcPort.Name, err)
continue
}

Expand Down Expand Up @@ -84,11 +86,12 @@ func syncServices(services []interface{}, inport, gwBridge string) {
continue
}

prot := svcPort.Protocol
if prot != kapi.ProtocolTCP && prot != kapi.ProtocolUDP {
proto, err := util.ValidateProtocol(svcPort.Protocol)
if err != nil {
klog.Errorf("syncServices error for service port %s: %v", svcPort.Name, err)
continue
}
protocol := strings.ToLower(string(prot))
protocol := strings.ToLower(string(proto))
nodePortKey := fmt.Sprintf("%s_%d", protocol, port)
nodePorts[nodePortKey] = true
}
Expand Down Expand Up @@ -119,6 +122,8 @@ func syncServices(services []interface{}, inport, gwBridge string) {
key = fmt.Sprintf("tcp_%s", group[1])
} else if strings.Contains(flow, "udp") {
key = fmt.Sprintf("udp_%s", group[1])
} else if strings.Contains(flow, "sctp") {
key = fmt.Sprintf("sctp_%s", group[1])
} else {
continue
}
Expand Down
205 changes: 85 additions & 120 deletions go-controller/pkg/ovn/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,36 @@ type lbEndpoints struct {
Port int32
}

func (ovn *Controller) getLbEndpoints(ep *kapi.Endpoints, tcpPortMap, udpPortMap map[string]lbEndpoints) {
func (ovn *Controller) getLbEndpoints(ep *kapi.Endpoints) map[kapi.Protocol]map[string]lbEndpoints {
protoPortMap := map[kapi.Protocol]map[string]lbEndpoints{
kapi.ProtocolTCP: make(map[string]lbEndpoints),
kapi.ProtocolUDP: make(map[string]lbEndpoints),
kapi.ProtocolSCTP: make(map[string]lbEndpoints),
}
for _, s := range ep.Subsets {
for _, ip := range s.Addresses {
for _, port := range s.Ports {
var ips []string
var portMap map[string]lbEndpoints
if port.Protocol == kapi.ProtocolUDP {
portMap = udpPortMap
} else if port.Protocol == kapi.ProtocolTCP {
portMap = tcpPortMap
if _, err := util.ValidateProtocol(port.Protocol); err != nil {
klog.Errorf("Invalid endpoint port: %s: %v", port.Name, err)
continue
}
if lbEps, ok := portMap[port.Name]; ok {
ips = lbEps.IPs
if lbEps, ok := protoPortMap[port.Protocol][port.Name]; ok {
ips = append(lbEps.IPs, ip.IP)
} else {
ips = make([]string, 0)
ips = []string{ip.IP}
}
ips = append(ips, ip.IP)
portMap[port.Name] = lbEndpoints{IPs: ips, Port: port.Port}
protoPortMap[port.Protocol][port.Name] = lbEndpoints{IPs: ips, Port: port.Port}
}
}
}
klog.V(5).Infof("Tcp table: %v\nUdp table: %v", tcpPortMap, udpPortMap)
klog.V(5).Infof("Endpoint Protocol Map is: %v", protoPortMap)
return protoPortMap
}

// AddEndpoints adds endpoints and creates corresponding resources in OVN
func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
klog.V(5).Infof("Adding endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
// get service
// TODO: cache the service
svc, err := ovn.watchFactory.GetService(ep.Namespace, ep.Name)
Expand All @@ -55,71 +59,47 @@ func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
svc.Name, svc.Spec.ClusterIP)
return nil
}
tcpPortMap := make(map[string]lbEndpoints)
udpPortMap := make(map[string]lbEndpoints)
ovn.getLbEndpoints(ep, tcpPortMap, udpPortMap)
for svcPortName, lbEps := range tcpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolTCP && svcPort.Name == svcPortName {
if util.ServiceTypeHasNodePort(svc) {
klog.V(5).Infof("Creating Gateways IP for NodePort: %d, %v", svcPort.NodePort, ips)
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
if err != nil {
klog.Errorf("Error in creating Node Port for svc %s, node port: %d - %v\n", svc.Name, svcPort.NodePort, err)
continue
}
}
if util.ServiceTypeHasClusterIP(svc) {
var loadBalancer string
loadBalancer, err = ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Errorf("Failed to get loadbalancer for %s (%v)",
svcPort.Protocol, err)
continue
}
err = ovn.createLoadBalancerVIP(loadBalancer,
svc.Spec.ClusterIP, svcPort.Port, ips, targetPort)
if err != nil {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, targetPort, err)
continue
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
}
}
for svcPortName, lbEps := range udpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolUDP && svcPort.Name == svcPortName {
if util.ServiceTypeHasNodePort(svc) {
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
if err != nil {
klog.Errorf("Error in creating Node Port for svc %s, node port: %d - %v\n", svc.Name, svcPort.NodePort, err)
klog.V(5).Infof("Matching service %s found for ep: %s, with cluster IP: %s", svc.Name, ep.Name,
svc.Spec.ClusterIP)

protoPortMap := ovn.getLbEndpoints(ep)
klog.V(5).Infof("Matching service %s ports: %v", svc.Name, svc.Spec.Ports)
for proto, portMap := range protoPortMap {
for svcPortName, lbEps := range portMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
if !ovn.SCTPSupport && proto == kapi.ProtocolSCTP {
klog.Errorf("Rejecting endpoint creation for unsupported SCTP protocol: %s, %s", ep.Namespace, ep.Name)
continue
}
}
if util.ServiceTypeHasClusterIP(svc) {
var loadBalancer string
loadBalancer, err = ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Errorf("Failed to get loadbalancer for %s (%v)",
svcPort.Protocol, err)
continue
if util.ServiceTypeHasNodePort(svc) {
klog.V(5).Infof("Creating Gateways IP for NodePort: %d, %v", svcPort.NodePort, ips)
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
if err != nil {
klog.Errorf("Error in creating Node Port for svc %s, node port: %d - %v\n", svc.Name, svcPort.NodePort, err)
continue
}
}
err = ovn.createLoadBalancerVIP(loadBalancer, svc.Spec.ClusterIP, svcPort.Port, ips, targetPort)
if err != nil {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, targetPort, err)
continue
if util.ServiceTypeHasClusterIP(svc) {
var loadBalancer string
loadBalancer, err = ovn.getLoadBalancer(svcPort.Protocol)
if err != nil {
klog.Errorf("Failed to get loadbalancer for %s (%v)",
svcPort.Protocol, err)
continue
}
err = ovn.createLoadBalancerVIP(loadBalancer,
svc.Spec.ClusterIP, svcPort.Port, ips, targetPort)
if err != nil {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, targetPort, err)
continue
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
}
Expand All @@ -129,13 +109,7 @@ func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {

func (ovn *Controller) handleNodePortLB(node *kapi.Node) error {
physicalGateway := util.GWRouterPrefix + node.Name
var k8sNSLbTCP, k8sNSLbUDP, physicalIP string
if k8sNSLbTCP, _ = ovn.getGatewayLoadBalancer(physicalGateway, TCP); k8sNSLbTCP == "" {
return fmt.Errorf("TCP load balancer for node %q does not yet exist", node.Name)
}
if k8sNSLbUDP, _ = ovn.getGatewayLoadBalancer(physicalGateway, UDP); k8sNSLbUDP == "" {
return fmt.Errorf("UDP load balancer for node %q does not yet exist", node.Name)
}
var physicalIP string
if physicalIP, _ = ovn.getGatewayPhysicalIP(physicalGateway); physicalIP == "" {
return fmt.Errorf("gateway physical IP for node %q does not yet exist", node.Name)
}
Expand All @@ -157,31 +131,23 @@ func (ovn *Controller) handleNodePortLB(node *kapi.Node) error {
if !util.ServiceTypeHasNodePort(svc) {
continue
}
tcpPortMap := make(map[string]lbEndpoints)
udpPortMap := make(map[string]lbEndpoints)
ovn.getLbEndpoints(ep, tcpPortMap, udpPortMap)
for svcPortName, lbEps := range tcpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolTCP && svcPort.Name == svcPortName {
err = ovn.createLoadBalancerVIP(k8sNSLbTCP, physicalIP, svcPort.NodePort, ips, targetPort)
if err != nil {
klog.Errorf("failed to create VIP in load balancer %s - %v", k8sNSLbTCP, err)
continue
}
}
}
}
for svcPortName, lbEps := range udpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolUDP && svcPort.Name == svcPortName {
err = ovn.createLoadBalancerVIP(k8sNSLbUDP, physicalIP, svcPort.NodePort, ips, targetPort)
if err != nil {
klog.Errorf("failed to create VIP in load balancer %s - %v", k8sNSLbUDP, err)
continue
protoPortMap := ovn.getLbEndpoints(ep)
for proto, portMap := range protoPortMap {
for svcPortName, lbEps := range portMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
k8sNSLb, _ := ovn.getGatewayLoadBalancer(physicalGateway, proto)
if k8sNSLb == "" {
return fmt.Errorf("%s load balancer for node %q does not yet exist",
proto, node.Name)
}
err = ovn.createLoadBalancerVIP(k8sNSLb, physicalIP, svcPort.NodePort, ips, targetPort)
if err != nil {
klog.Errorf("failed to create VIP in load balancer %s - %v", k8sNSLb, err)
continue
}
}
}
}
Expand Down Expand Up @@ -215,22 +181,20 @@ func (ovn *Controller) updateExternalIPsLB() {
}
tcpPortMap := make(map[string]lbEndpoints)
udpPortMap := make(map[string]lbEndpoints)
ovn.getLbEndpoints(ep, tcpPortMap, udpPortMap)
for svcPortName, lbEps := range tcpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolTCP && svcPort.Name == svcPortName {
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
sctpPortMap := make(map[string]lbEndpoints)
protoPortMap := map[kapi.Protocol]map[string]lbEndpoints{
kapi.ProtocolTCP: tcpPortMap,
kapi.ProtocolUDP: udpPortMap,
kapi.ProtocolSCTP: sctpPortMap,
}
for svcPortName, lbEps := range udpPortMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == kapi.ProtocolUDP && svcPort.Name == svcPortName {
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
for proto, portMap := range protoPortMap {
for svcPortName, lbEps := range portMap {
ips := lbEps.IPs
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
ovn.handleExternalIPs(svc, svcPort, ips, targetPort, false)
}
}
}
}
Expand Down Expand Up @@ -266,6 +230,7 @@ func (ovn *Controller) handleExternalIPs(svc *kapi.Service, svcPort kapi.Service
}

func (ovn *Controller) deleteEndpoints(ep *kapi.Endpoints) error {
klog.V(5).Infof("Deleting endpoints: %s for namespace: %s", ep.Name, ep.Namespace)
svc, err := ovn.watchFactory.GetService(ep.Namespace, ep.Name)
if err != nil {
// This is not necessarily an error. For e.g when a service is deleted,
Expand Down