Skip to content

Commit

Permalink
add ready status for provider network
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Aug 24, 2021
1 parent 8906e45 commit ef9dbc5
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 63 deletions.
5 changes: 5 additions & 0 deletions dist/images/install-pre-1.16.sh
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ spec:
- name: DefaultInterface
type: string
JSONPath: .spec.defaultInterface
- name: Ready
type: boolean
JSONPath: .status.ready
validation:
openAPIV3Schema:
properties:
Expand Down Expand Up @@ -433,6 +436,8 @@ spec:
status:
type: object
properties:
ready:
type: boolean
readyNodes:
type: array
items:
Expand Down
5 changes: 5 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ spec:
status:
type: object
properties:
ready:
type: boolean
readyNodes:
type: array
items:
Expand Down Expand Up @@ -672,6 +674,9 @@ spec:
- name: DefaultInterface
type: string
jsonPath: .spec.defaultInterface
- name: Ready
type: boolean
jsonPath: .status.ready
scope: Cluster
names:
plural: provider-networks
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ type ProviderNetworkSpec struct {
}

type ProviderNetworkStatus struct {
// +optional
// +patchStrategy=merge
Ready bool `json:"ready" patchStrategy:"merge"`

// +optional
// +patchStrategy=merge
ReadyNodes []string `json:"readyNodes,omitempty" patchStrategy:"merge"`
Expand Down
35 changes: 22 additions & 13 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ type Controller struct {
vlansLister kubeovnlister.VlanLister
vlanSynced cache.InformerSynced

providerNetworksLister kubeovnlister.ProviderNetworkLister
providerNetworkSynced cache.InformerSynced
providerNetworksLister kubeovnlister.ProviderNetworkLister
providerNetworkSynced cache.InformerSynced
updateProviderNetworkQueue workqueue.RateLimitingInterface

addVlanQueue workqueue.RateLimitingInterface
delVlanQueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -197,8 +198,9 @@ func NewController(config *Configuration) *Controller {
delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"),
updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"),

providerNetworksLister: providerNetworkInformer.Lister(),
providerNetworkSynced: providerNetworkInformer.Informer().HasSynced,
providerNetworksLister: providerNetworkInformer.Lister(),
providerNetworkSynced: providerNetworkInformer.Informer().HasSynced,
updateProviderNetworkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateProviderNetwork"),

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
Expand Down Expand Up @@ -306,6 +308,10 @@ func NewController(config *Configuration) *Controller {
UpdateFunc: controller.enqueueUpdateVlan,
})

providerNetworkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.enqueueUpdateProviderNetwork,
})

return controller
}

Expand All @@ -331,28 +337,28 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

if err := c.InitDefaultVpc(); err != nil {
klog.Fatalf("failed to init default vpc %v", err)
klog.Fatalf("failed to init default vpc: %v", err)
}

if err := c.InitOVN(); err != nil {
klog.Fatalf("failed to init ovn resource %v", err)
klog.Fatalf("failed to init ovn resource: %v", err)
}

if err := c.InitIPAM(); err != nil {
klog.Fatalf("failed to init ipam %v", err)
klog.Fatalf("failed to init ipam: %v", err)
}

// remove resources in ovndb that not exist any more in kubernetes resources
if err := c.gc(); err != nil {
klog.Fatalf("gc failed %v", err)
klog.Fatalf("gc failed: %v", err)
}

c.registerSubnetMetrics()
if err := c.initSyncCrdIPs(); err != nil {
klog.Errorf("failed to sync crd ips %v", err)
klog.Errorf("failed to sync crd ips: %v", err)
}
if err := c.initSyncCrdSubnets(); err != nil {
klog.Errorf("failed to sync crd subnets %v", err)
klog.Errorf("failed to sync crd subnets: %v", err)
}
if err := c.initSyncCrdVlans(); err != nil {
klog.Errorf("failed to sync crd vlans: %v", err)
Expand Down Expand Up @@ -394,6 +400,8 @@ func (c *Controller) shutdown() {
c.delVlanQueue.ShutDown()
c.updateVlanQueue.ShutDown()

c.updateProviderNetworkQueue.ShutDown()

c.addOrUpdateVpcQueue.ShutDown()
c.updateVpcStatusQueue.ShutDown()
c.delVpcQueue.ShutDown()
Expand Down Expand Up @@ -430,7 +438,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {
time.Sleep(3 * time.Second)
lss, err := c.ovnClient.ListLogicalSwitch()
if err != nil {
klog.Fatalf("failed to list logical switch, %v", err)
klog.Fatalf("failed to list logical switch: %v", err)
}

if util.IsStringIn(c.config.DefaultLogicalSwitch, lss) && util.IsStringIn(c.config.NodeSwitch, lss) {
Expand All @@ -449,7 +457,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {
time.Sleep(3 * time.Second)
nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Fatalf("failed to list nodes, %v", err)
klog.Fatalf("failed to list nodes: %v", err)
}
for _, node := range nodes {
if node.Annotations[util.AllocatedAnnotation] != "true" {
Expand All @@ -468,6 +476,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {

go wait.Until(c.runDelVpcWorker, time.Second, stopCh)
go wait.Until(c.runUpdateVpcStatusWorker, time.Second, stopCh)
go wait.Until(c.runUpdateProviderNetworkWorker, time.Second, stopCh)

// run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
go wait.Until(c.runDeleteTcpServiceWorker, time.Second, stopCh)
Expand Down Expand Up @@ -505,7 +514,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {

go wait.Until(func() {
if err := c.markAndCleanLSP(); err != nil {
klog.Errorf("gc lsp error %v", err)
klog.Errorf("gc lsp error: %v", err)
}
}, 6*time.Minute, stopCh)

Expand Down
56 changes: 28 additions & 28 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ import (

func (c *Controller) InitOVN() error {
if err := c.initClusterRouter(); err != nil {
klog.Errorf("init cluster router failed %v", err)
klog.Errorf("init cluster router failed: %v", err)
return err
}

if err := c.initLoadBalancer(); err != nil {
klog.Errorf("init load balancer failed %v", err)
klog.Errorf("init load balancer failed: %v", err)
return err
}

if err := c.initDefaultVlan(); err != nil {
klog.Errorf("init default vlan failed %v", err)
klog.Errorf("init default vlan failed: %v", err)
return err
}

if err := c.initNodeSwitch(); err != nil {
klog.Errorf("init node switch failed %v", err)
klog.Errorf("init node switch failed: %v", err)
return err
}

if err := c.initDefaultLogicalSwitch(); err != nil {
klog.Errorf("init default switch failed %v", err)
klog.Errorf("init default switch failed: %v", err)
return err
}

Expand All @@ -51,7 +51,7 @@ func (c *Controller) InitDefaultVpc() error {
vpc.Name = util.DefaultVpc
vpc, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Create(context.Background(), vpc, metav1.CreateOptions{})
if err != nil {
klog.Errorf("init default vpc failed %v", err)
klog.Errorf("init default vpc failed: %v", err)
return err
}
}
Expand All @@ -71,7 +71,7 @@ func (c *Controller) InitDefaultVpc() error {
}
_, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Patch(context.Background(), vpc.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("init default vpc failed %v", err)
klog.Errorf("init default vpc failed: %v", err)
return err
}
return nil
Expand All @@ -86,7 +86,7 @@ func (c *Controller) initDefaultLogicalSwitch() error {
if util.CheckProtocol(c.config.DefaultCIDR) == kubeovnv1.ProtocolDual {
subnet.Spec.CIDRBlock = c.config.DefaultCIDR
if err := formatSubnet(subnet, c); err != nil {
klog.Errorf("init format subnet %s failed %v", c.config.DefaultLogicalSwitch, err)
klog.Errorf("init format subnet %s failed: %v", c.config.DefaultLogicalSwitch, err)
return err
}
}
Expand All @@ -95,7 +95,7 @@ func (c *Controller) initDefaultLogicalSwitch() error {
}

if !k8serrors.IsNotFound(err) {
klog.Errorf("get default subnet %s failed %v", c.config.DefaultLogicalSwitch, err)
klog.Errorf("get default subnet %s failed: %v", c.config.DefaultLogicalSwitch, err)
return err
}

Expand Down Expand Up @@ -131,7 +131,7 @@ func (c *Controller) initNodeSwitch() error {
if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolDual {
subnet.Spec.CIDRBlock = c.config.NodeSwitchCIDR
if err := formatSubnet(subnet, c); err != nil {
klog.Errorf("init format subnet %s failed %v", c.config.NodeSwitch, err)
klog.Errorf("init format subnet %s failed: %v", c.config.NodeSwitch, err)
return err
}
}
Expand All @@ -140,7 +140,7 @@ func (c *Controller) initNodeSwitch() error {
}

if !k8serrors.IsNotFound(err) {
klog.Errorf("get node subnet %s failed %v", c.config.NodeSwitch, err)
klog.Errorf("get node subnet %s failed: %v", c.config.NodeSwitch, err)
return err
}

Expand Down Expand Up @@ -168,7 +168,7 @@ func (c *Controller) initClusterRouter() error {
if err != nil {
return err
}
klog.Infof("exists routers %v", lrs)
klog.Infof("exists routers: %v", lrs)
for _, r := range lrs {
if c.config.ClusterRouter == r {
return nil
Expand All @@ -181,13 +181,13 @@ func (c *Controller) initClusterRouter() error {
func (c *Controller) initLoadBalancer() error {
tcpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find tcp lb %v", err)
return fmt.Errorf("failed to find tcp lb: %v", err)
}
if tcpLb == "" {
klog.Infof("init cluster tcp load balancer %s", c.config.ClusterTcpLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpLoadBalancer, util.ProtocolTCP, "")
if err != nil {
klog.Errorf("failed to crate cluster tcp load balancer %v", err)
klog.Errorf("failed to crate cluster tcp load balancer: %v", err)
return err
}
} else {
Expand All @@ -196,13 +196,13 @@ func (c *Controller) initLoadBalancer() error {

tcpSessionLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpSessionLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find tcp session lb %v", err)
return fmt.Errorf("failed to find tcp session lb: %v", err)
}
if tcpSessionLb == "" {
klog.Infof("init cluster tcp session load balancer %s", c.config.ClusterTcpSessionLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterTcpSessionLoadBalancer, util.ProtocolTCP, "ip_src")
if err != nil {
klog.Errorf("failed to crate cluster tcp session load balancer %v", err)
klog.Errorf("failed to crate cluster tcp session load balancer: %v", err)
return err
}
} else {
Expand All @@ -211,7 +211,7 @@ func (c *Controller) initLoadBalancer() error {

udpLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp lb %v", err)
return fmt.Errorf("failed to find udp lb: %v", err)
}
if udpLb == "" {
klog.Infof("init cluster udp load balancer %s", c.config.ClusterUdpLoadBalancer)
Expand All @@ -226,13 +226,13 @@ func (c *Controller) initLoadBalancer() error {

udpSessionLb, err := c.ovnClient.FindLoadbalancer(c.config.ClusterUdpSessionLoadBalancer)
if err != nil {
return fmt.Errorf("failed to find udp session lb %v", err)
return fmt.Errorf("failed to find udp session lb: %v", err)
}
if udpSessionLb == "" {
klog.Infof("init cluster udp session load balancer %s", c.config.ClusterUdpSessionLoadBalancer)
err := c.ovnClient.CreateLoadBalancer(c.config.ClusterUdpSessionLoadBalancer, util.ProtocolUDP, "ip_src")
if err != nil {
klog.Errorf("failed to crate cluster udp session load balancer %v", err)
klog.Errorf("failed to crate cluster udp session load balancer: %v", err)
return err
}
} else {
Expand All @@ -245,18 +245,18 @@ func (c *Controller) initLoadBalancer() error {
func (c *Controller) InitIPAM() error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnet, %v", err)
klog.Errorf("failed to list subnet: %v", err)
return err
}
for _, subnet := range subnets {
if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.ExcludeIps); err != nil {
klog.Errorf("failed to init subnet %s, %v", subnet.Name, err)
klog.Errorf("failed to init subnet %s: %v", subnet.Name, err)
}
}

pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list pods, %v", err)
klog.Errorf("failed to list pods: %v", err)
return err
}
for _, pod := range pods {
Expand All @@ -269,14 +269,14 @@ func (c *Controller) InitIPAM() error {
pod.Annotations[util.MacAddressAnnotation],
pod.Annotations[util.LogicalSwitchAnnotation])
if err != nil {
klog.Errorf("failed to init pod %s.%s address %s, %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
klog.Errorf("failed to init pod %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
}
}
}

nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list nodes, %v", err)
klog.Errorf("failed to list nodes: %v", err)
return err
}
for _, node := range nodes {
Expand All @@ -286,7 +286,7 @@ func (c *Controller) InitIPAM() error {
node.Annotations[util.MacAddressAnnotation],
node.Annotations[util.LogicalSwitchAnnotation])
if err != nil {
klog.Errorf("failed to init node %s.%s address %s, %v", node.Name, node.Namespace, node.Annotations[util.IpAddressAnnotation], err)
klog.Errorf("failed to init node %s.%s address %s: %v", node.Name, node.Namespace, node.Annotations[util.IpAddressAnnotation], err)
}
if v4IP != "" && v6IP != "" {
node.Annotations[util.IpAddressAnnotation] = util.GetStringIP(v4IP, v6IP)
Expand Down Expand Up @@ -335,7 +335,7 @@ func (c *Controller) initDefaultVlan() error {
}

if !k8serrors.IsNotFound(err) {
klog.Errorf("get default vlan %s failed %v", c.config.DefaultVlanName, err)
klog.Errorf("get default vlan %s failed: %v", c.config.DefaultVlanName, err)
return err
}

Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *Controller) initSyncCrdIPs() error {

_, err := c.config.KubeOvnClient.KubeovnV1().IPs().Update(context.Background(), &ip, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to sync crd ip %s, %v", ip.Spec.IPAddress, err)
klog.Errorf("failed to sync crd ip %s: %v", ip.Spec.IPAddress, err)
return err
}
}
Expand All @@ -399,7 +399,7 @@ func (c *Controller) initSyncCrdSubnets() error {
err = calcSubnetStatusIP(subnet, c)
}
if err != nil {
klog.Errorf("failed to calculate subnet %s used ip, %v", subnet.Name, err)
klog.Errorf("failed to calculate subnet %s used ip: %v", subnet.Name, err)
return err
}
}
Expand Down

0 comments on commit ef9dbc5

Please sign in to comment.