Skip to content

Commit

Permalink
Fixup that adds sctp detection and backwards compat
Browse files Browse the repository at this point in the history
Adds event recording for unsupported svc creation.

Signed-off-by: Tim Rozet <trozet@redhat.com>
  • Loading branch information
trozet committed Apr 2, 2020
1 parent 4f5ed91 commit 9fa2ec4
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 62 deletions.
3 changes: 1 addition & 2 deletions go-controller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBl
gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE=
gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/gemnasium/klog-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0=
gopkg.in/gemnasium/klog-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
Expand Down Expand Up @@ -370,6 +368,7 @@ k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8=
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20191030222137-2b95a09bc58d h1:1P0iBJsBzxRmR+dIFnM+Iu4aLxnoa7lBqozW/0uHbT8=
Expand Down
4 changes: 4 additions & 0 deletions go-controller/pkg/ovn/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
targetPort := lbEps.Port
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol == proto && svcPort.Name == svcPortName {
if !ovn.SCTPSupport && proto == kapi.ProtocolSCTP {
klog.Errorf("Rejecting endpoint creation for unsupported SCTP protocol: %s, %s", ep.Namespace, ep.Name)
continue
}
if util.ServiceTypeHasNodePort(svc) {
klog.V(5).Infof("Creating Gateways IP for NodePort: %d, %v", svcPort.NodePort, ips)
err = ovn.createGatewaysVIP(svcPort.Protocol, svcPort.NodePort, targetPort, ips)
Expand Down
63 changes: 46 additions & 17 deletions go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,34 @@ func (oc *Controller) SetupMaster(masterNodeName string) error {
return err
}

// Determine SCTP support
stdout, stderr, err = util.RunOVSDBClientOVNNB("list-columns", "--data=bare", "--no-heading",
"--format=json", "OVN_Northbound", "Load_Balancer")
if err != nil {
klog.Errorf("Failed to query OVN NB DB for SCTP support, "+
"stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}
type OvsdbData struct {
Data [][]interface{}
}
var lbData OvsdbData
err = json.Unmarshal([]byte(stdout), &lbData)
if err != nil {
return err
}
oc.SCTPSupport = false
for _, entry := range lbData.Data {
if entry[0].(string) == "protocol" && strings.Contains(fmt.Sprintf("%v", entry[1]), "sctp") {
oc.SCTPSupport = true
klog.Info("SCTP support detected in OVN")
break
}
}
if !oc.SCTPSupport {
klog.Warningf("SCTP unsupported by this version of OVN. Kubernetes service creation with SCTP will not work ")
}

// If supported, enable IGMP relay on the router to forward multicast
// traffic between nodes.
if oc.multicastSupport {
Expand Down Expand Up @@ -161,7 +189,7 @@ func (oc *Controller) SetupMaster(masterNodeName string) error {
klog.Errorf("Failed to get sctp load-balancer, stderr: %q, error: %v", stderr, err)
return err
}
if oc.SCTPLoadBalancerUUID == "" {
if oc.SCTPLoadBalancerUUID == "" && oc.SCTPSupport {
oc.SCTPLoadBalancerUUID, stderr, err = util.RunOVNNbctl("--", "create", "load_balancer", "external_ids:k8s-cluster-lb-sctp=yes", "protocol=sctp")
if err != nil {
klog.Errorf("Failed to create sctp load-balancer, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
Expand Down Expand Up @@ -285,7 +313,7 @@ func (oc *Controller) syncGatewayLogicalNetwork(node *kapi.Node, l3GatewayConfig
return err
}

err = util.GatewayInit(clusterSubnets, subnet, joinSubnet, node.Name, l3GatewayConfig)
err = util.GatewayInit(clusterSubnets, subnet, joinSubnet, node.Name, l3GatewayConfig, oc.SCTPSupport)
if err != nil {
return fmt.Errorf("failed to init shared interface gateway: %v", err)
}
Expand Down Expand Up @@ -453,24 +481,25 @@ func (oc *Controller) ensureNodeLogicalNetwork(nodeName string, hostsubnet *net.
}
}

if oc.SCTPLoadBalancerUUID == "" {
return fmt.Errorf("SCTP cluster load balancer not created")
}
stdout, stderr, err = util.RunOVNNbctl("add", "logical_switch", nodeName, "load_balancer", oc.SCTPLoadBalancerUUID)
if err != nil {
klog.Errorf("Failed to add logical switch %v's loadbalancer, stdout: %q, stderr: %q, error: %v", nodeName, stdout, stderr, err)
return err
}

// Add any service reject ACLs applicable for SCTP LB
acls = oc.getAllACLsForServiceLB(oc.SCTPLoadBalancerUUID)
if len(acls) > 0 {
_, _, err = util.RunOVNNbctl("add", "logical_switch", nodeName, "acls", strings.Join(acls, ","))
if oc.SCTPSupport {
if oc.SCTPLoadBalancerUUID == "" {
return fmt.Errorf("SCTP cluster load balancer not created")
}
stdout, stderr, err = util.RunOVNNbctl("add", "logical_switch", nodeName, "load_balancer", oc.SCTPLoadBalancerUUID)
if err != nil {
klog.Warningf("Unable to add SCTP reject ACLs: %s for switch: %s, error %v", acls, nodeName, err)
klog.Errorf("Failed to add logical switch %v's loadbalancer, stdout: %q, stderr: %q, error: %v", nodeName, stdout, stderr, err)
return err
}
}

// Add any service reject ACLs applicable for SCTP LB
acls = oc.getAllACLsForServiceLB(oc.SCTPLoadBalancerUUID)
if len(acls) > 0 {
_, _, err = util.RunOVNNbctl("add", "logical_switch", nodeName, "acls", strings.Join(acls, ","))
if err != nil {
klog.Warningf("Unable to add SCTP reject ACLs: %s for switch: %s, error %v", acls, nodeName, err)
}
}
}
// Add the node to the logical switch cache
oc.lsMutex.Lock()
defer oc.lsMutex.Unlock()
Expand Down
128 changes: 116 additions & 12 deletions go-controller/pkg/ovn/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func cleanupGateway(fexec *ovntest.FakeExec, nodeName string, nodeSubnet string,
})
}

func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, string, string) {
func defaultFakeExec(nodeSubnet, nodeName string, sctpSupport bool) (*ovntest.FakeExec, string, string, string) {
const (
tcpLBUUID string = "1a3dfc82-2749-4931-9190-c30e7c0ecea3"
udpLBUUID string = "6d3142fc-53e8-4ac1-88e6-46094a5a9957"
Expand All @@ -85,6 +85,19 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st
"ovn-nbctl --timeout=15 --columns=_uuid list port_group",
"ovn-sbctl --timeout=15 --columns=_uuid list IGMP_Group",
"ovn-nbctl --timeout=15 -- --may-exist lr-add ovn_cluster_router -- set logical_router ovn_cluster_router external_ids:k8s-cluster-router=yes",
})
if sctpSupport {
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovsdb-client list-columns --data=bare --no-heading --format=json OVN_Northbound Load_Balancer",
Output: `{"data":[["_version","uuid"],["health_check",{"key":{"refTable":"Load_Balancer_Health_Check","type":"uuid"},"max":"unlimited","min":0}],["name","string"],["protocol",{"key":{"enum":["set",["sctp","tcp","udp"]],"type":"string"},"min":0}]],"headings":["Column","Type"]}`,
})
} else {
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovsdb-client list-columns --data=bare --no-heading --format=json OVN_Northbound Load_Balancer",
Output: `{"data":[["_version","uuid"],["health_check",{"key":{"refTable":"Load_Balancer_Health_Check","type":"uuid"},"max":"unlimited","min":0}],["name","string"],["protocol",{"key":{"enum":["set",["tcp","udp"]],"type":"string"},"min":0}]],"headings":["Column","Type"]}`,
})
}
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 -- set logical_router ovn_cluster_router options:mcast_relay=\"true\"",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find port_group name=mcastPortGroupDeny",
"ovn-nbctl --timeout=15 create port_group name=mcastPortGroupDeny external-ids:name=mcastPortGroupDeny",
Expand Down Expand Up @@ -113,11 +126,12 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:k8s-cluster-lb-sctp=yes",
Output: "",
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:k8s-cluster-lb-sctp=yes protocol=sctp",
Output: sctpLBUUID,
})

if sctpSupport {
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 -- create load_balancer external_ids:k8s-cluster-lb-sctp=yes protocol=sctp",
Output: sctpLBUUID,
})
}
// Node-related logical network stuff
ip, cidr, err := net.ParseCIDR(nodeSubnet)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -140,7 +154,13 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st
"ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " stor-" + nodeName + " -- set logical_switch_port stor-" + nodeName + " type=router options:router-port=rtos-" + nodeName + " addresses=\"" + lrpMAC + "\"",
"ovn-nbctl --timeout=15 set logical_switch " + nodeName + " load_balancer=" + tcpLBUUID,
"ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + udpLBUUID,
"ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + sctpLBUUID,
})
if sctpSupport {
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + sctpLBUUID,
})
}
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " k8s-" + nodeName + " -- lsp-set-addresses " + "k8s-" + nodeName + " " + mgmtMAC + " " + nodeMgmtPortIP.String(),
"ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + nodeMgmtPortIP.String() + " allow-related",
})
Expand Down Expand Up @@ -174,9 +194,8 @@ func addNodeportLBs(fexec *ovntest.FakeExec, nodeName, tcpLBUUID, udpLBUUID, sct
Output: sctpLBUUID,
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 set logical_router " + util.GWRouterPrefix + nodeName + " load_balancer=" + tcpLBUUID,
"ovn-nbctl --timeout=15 add logical_router " + util.GWRouterPrefix + nodeName + " load_balancer " + udpLBUUID,
"ovn-nbctl --timeout=15 add logical_router " + util.GWRouterPrefix + nodeName + " load_balancer " + sctpLBUUID,
"ovn-nbctl --timeout=15 set logical_router " + util.GWRouterPrefix + nodeName + " load_balancer=" + tcpLBUUID +
"," + udpLBUUID + "," + sctpLBUUID,
})
}

Expand Down Expand Up @@ -207,7 +226,7 @@ var _ = Describe("Master Operations", func() {
mgmtMAC string = "01:02:03:04:05:06"
)

fexec, tcpLBUUID, udpLBUUID, sctpLBUUID := defaultFakeExec(nodeSubnet, nodeName)
fexec, tcpLBUUID, udpLBUUID, sctpLBUUID := defaultFakeExec(nodeSubnet, nodeName, true)
cleanupGateway(fexec, nodeName, nodeSubnet, clusterCIDR, nextHop)

testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -274,6 +293,88 @@ var _ = Describe("Master Operations", func() {
Expect(err).NotTo(HaveOccurred())
})

It("works without SCTP support", func() {
const (
clusterIPNet string = "10.1.0.0"
clusterCIDR string = clusterIPNet + "/16"
)

app.Action = func(ctx *cli.Context) error {
const (
nodeName string = "node1"
nodeSubnet string = "10.1.0.0/24"
clusterCIDR string = "10.1.0.0/16"
nextHop string = "10.1.0.2"
mgmtMAC string = "01:02:03:04:05:06"
)

fexec, tcpLBUUID, udpLBUUID, _ := defaultFakeExec(nodeSubnet, nodeName, false)
cleanupGateway(fexec, nodeName, nodeSubnet, clusterCIDR, nextHop)

testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
}}

fakeClient := fake.NewSimpleClientset(&v1.NodeList{
Items: []v1.Node{testNode},
})

err := util.SetExec(fexec)
Expect(err).NotTo(HaveOccurred())

_, err = config.InitConfig(ctx, fexec, nil)
Expect(err).NotTo(HaveOccurred())

nodeAnnotator := kube.NewNodeAnnotator(&kube.Kube{fakeClient}, &testNode)
err = util.SetDisabledL3GatewayConfig(nodeAnnotator)
Expect(err).NotTo(HaveOccurred())
err = util.SetNodeManagementPortMacAddr(nodeAnnotator, mgmtMAC)
Expect(err).NotTo(HaveOccurred())
err = nodeAnnotator.Run()
Expect(err).NotTo(HaveOccurred())

stopChan := make(chan struct{})
f, err := factory.NewWatchFactory(fakeClient, stopChan)
Expect(err).NotTo(HaveOccurred())
defer close(stopChan)

clusterController := NewOvnController(fakeClient, f, stopChan)
Expect(clusterController).NotTo(BeNil())
clusterController.TCPLoadBalancerUUID = tcpLBUUID
clusterController.UDPLoadBalancerUUID = udpLBUUID
clusterController.SCTPLoadBalancerUUID = ""

err = clusterController.StartClusterMaster("master")
Expect(err).NotTo(HaveOccurred())

err = clusterController.WatchNodes()
Expect(err).NotTo(HaveOccurred())

Expect(fexec.CalledMatchesExpected()).To(BeTrue(), fexec.ErrorDesc)
updatedNode, err := fakeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

subnetFromAnnotation, err := util.ParseNodeHostSubnetAnnotation(updatedNode)
Expect(err).NotTo(HaveOccurred())
Expect(subnetFromAnnotation.String()).To(Equal(nodeSubnet))

macFromAnnotation, err := util.ParseNodeManagementPortMacAddr(updatedNode)
Expect(err).NotTo(HaveOccurred())
Expect(macFromAnnotation).To(Equal(mgmtMAC))

Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
return nil
}

err := app.Run([]string{
app.Name,
"-cluster-subnets=" + clusterCIDR,
"-enable-multicast",
"-enable-hybrid-overlay",
})
Expect(err).NotTo(HaveOccurred())
})

It("does not allocate a hostsubnet for a node that already has one", func() {
const (
clusterIPNet string = "10.1.0.0"
Expand All @@ -297,7 +398,7 @@ var _ = Describe("Master Operations", func() {
Items: []v1.Node{testNode},
})

fexec, tcpLBUUID, udpLBUUID, sctpLBUUID := defaultFakeExec(nodeSubnet, nodeName)
fexec, tcpLBUUID, udpLBUUID, sctpLBUUID := defaultFakeExec(nodeSubnet, nodeName, true)
err := util.SetExec(fexec)
Expect(err).NotTo(HaveOccurred())
cleanupGateway(fexec, nodeName, nodeSubnet, clusterCIDR, nextHop)
Expand Down Expand Up @@ -515,6 +616,7 @@ subnet=%s
clusterController.TCPLoadBalancerUUID = tcpLBUUID
clusterController.UDPLoadBalancerUUID = udpLBUUID
clusterController.SCTPLoadBalancerUUID = sctpLBUUID
clusterController.SCTPSupport = true
_ = clusterController.joinSubnetAllocator.AddNetworkRange("100.64.0.0/16", 3)

// Let the real code run and ensure OVN database sync
Expand Down Expand Up @@ -710,6 +812,7 @@ var _ = Describe("Gateway Init Operations", func() {
clusterController.TCPLoadBalancerUUID = tcpLBUUID
clusterController.UDPLoadBalancerUUID = udpLBUUID
clusterController.SCTPLoadBalancerUUID = sctpLBUUID
clusterController.SCTPSupport = true
_ = clusterController.joinSubnetAllocator.AddNetworkRange("100.64.0.0/16", 3)

// Let the real code run and ensure OVN database sync
Expand Down Expand Up @@ -902,6 +1005,7 @@ var _ = Describe("Gateway Init Operations", func() {
clusterController.TCPLoadBalancerUUID = tcpLBUUID
clusterController.UDPLoadBalancerUUID = udpLBUUID
clusterController.SCTPLoadBalancerUUID = sctpLBUUID
clusterController.SCTPSupport = true
_ = clusterController.joinSubnetAllocator.AddNetworkRange("100.64.0.0/16", 3)

// Let the real code run and ensure OVN database sync
Expand Down
13 changes: 6 additions & 7 deletions go-controller/pkg/ovn/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
Expand Down Expand Up @@ -57,6 +55,7 @@ type Controller struct {
TCPLoadBalancerUUID string
UDPLoadBalancerUUID string
SCTPLoadBalancerUUID string
SCTPSupport bool

// For TCP, UDP, and SCTP type traffic, cache OVN load-balancers used for the
// cluster's east-west traffic.
Expand Down Expand Up @@ -121,6 +120,9 @@ type Controller struct {
serviceLBMap map[string]map[string]*loadBalancerConf

serviceLBLock sync.Mutex

// event recorder used to post events to k8s
recorder record.EventRecorder
}

const (
Expand Down Expand Up @@ -160,6 +162,7 @@ func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory,
serviceVIPToNameLock: sync.Mutex{},
serviceLBMap: make(map[string]map[string]*loadBalancerConf),
serviceLBLock: sync.Mutex{},
recorder: util.EventRecorder(kubeClient),
}
}

Expand Down Expand Up @@ -321,10 +324,6 @@ func (oc *Controller) ovnControllerEventChecker(stopChan chan struct{}) {
return
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: oc.kube.Events()})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, kapi.EventSource{Component: "kube-proxy"})

for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -352,7 +351,7 @@ func (oc *Controller) ovnControllerEventChecker(stopChan chan struct{}) {
Name: serviceName.Name,
}
klog.V(5).Infof("Sending a NeedPods event for service %s in namespace %s.", serviceName.Name, serviceName.Namespace)
recorder.Eventf(&serviceRef, kapi.EventTypeNormal, "NeedPods", "The service %s needs pods", serviceName.Name)
oc.recorder.Eventf(&serviceRef, kapi.EventTypeNormal, "NeedPods", "The service %s needs pods", serviceName.Name)
}
}
case <-stopChan:
Expand Down

0 comments on commit 9fa2ec4

Please sign in to comment.