Skip to content

Commit

Permalink
fix code reviews
Browse files Browse the repository at this point in the history
Signed-off-by: Ram Lavi <ralavi@redhat.com>
  • Loading branch information
RamLavi committed Jun 10, 2020
1 parent 7ee59eb commit a3708fe
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 77 deletions.
43 changes: 19 additions & 24 deletions pkg/manager/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package manager
import (
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -13,35 +14,32 @@ import (
poolmanager "github.com/k8snetworkplumbingwg/kubemacpool/pkg/pool-manager"
)

func (k *KubeMacPoolManager) waitToStartLeading(poolManager *poolmanager.PoolManager) error {
<-k.mgr.Elected()
func (k *KubeMacPoolManager) waitToStartLeading(poolManger *poolmanager.PoolManager) error {
<-k.runtimeManager.Elected()
// If we reach here then we are in the elected pod.

err := poolManager.Start()
err := poolManger.Start()
if err != nil {
log.Error(err, "failed to start pool manager routines")
return err
return errors.Wrap(err, "failed to start pool manager routines")
}

err = k.markPodAsLeader()
err = k.AddLeaderLabelToElectedPod()
if err != nil {
log.Error(err, "failed marking pod as leader")
return err
return errors.Wrap(err, "failed marking pod as leader")
}

err = k.setLeadershipConditions(corev1.ConditionTrue)
if err != nil {
log.Error(err, "failed changing leadership condition to true")
return err
return errors.Wrap(err, "failed changing leadership condition to true")
}
return nil
}

func (k *KubeMacPoolManager) markPodAsLeader() error {
func (k *KubeMacPoolManager) AddLeaderLabelToElectedPod() error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod, err := k.clientset.CoreV1().Pods(k.podNamespace).Get(context.TODO(), k.podName, metav1.GetOptions{})
if err != nil {
return err
return errors.Wrap(err, "failed to get currently running kubemacpool manager pod")
}

pod.Labels[names.LEADER_LABEL] = "true"
Expand All @@ -51,7 +49,7 @@ func (k *KubeMacPoolManager) markPodAsLeader() error {
})

if err != nil {
return err
return errors.Wrap(err, "failed to update leader label to elected kubemacpool manager pod")
}

log.Info("marked this manager as leader for webhook", "podName", k.podName)
Expand All @@ -61,28 +59,25 @@ func (k *KubeMacPoolManager) markPodAsLeader() error {
// By setting this status to true in all pods, we declare the kubemacpool as ready and allow the webhooks to start running.
func (k *KubeMacPoolManager) setLeadershipConditions(status corev1.ConditionStatus) error {
podList := corev1.PodList{}
err := k.mgr.GetClient().List(context.TODO(), &podList, &client.ListOptions{Namespace: k.podNamespace})
err := k.runtimeManager.GetClient().List(context.TODO(), &podList, &client.ListOptions{Namespace: k.podNamespace})
if err != nil {
return err
return errors.Wrap(err, "failed to list kubemacpool manager pods")
}
for _, pod := range podList.Items {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
podKey := types.NamespacedName{Namespace: k.podNamespace, Name: pod.Name}
err := k.mgr.GetClient().Get(context.TODO(), podKey, &pod)
err := k.runtimeManager.GetClient().Get(context.TODO(), podKey, &pod)
if err != nil {
return err
return errors.Wrap(err, "failed to get kubemacpool manager pods")
}

pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{Type: "kubemacpool.io/leader-ready", Status: status, LastProbeTime: metav1.Time{}})
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{Type: names.LEADER_READY_CONDITION_TYPE, Status: status, LastProbeTime: metav1.Time{}})

err = k.mgr.GetClient().Status().Update(context.TODO(), &pod)
if err != nil {
return err
}
return nil
err = k.runtimeManager.GetClient().Status().Update(context.TODO(), &pod)
return err
})
if err != nil {
return err
return errors.Wrap(err, "failed to update Leadership readiness gate status to kubemacpool manager pods")
}
}
return nil
Expand Down
45 changes: 26 additions & 19 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kubevirt_api "kubevirt.io/client-go/api/v1"
Expand All @@ -50,7 +51,7 @@ type KubeMacPoolManager struct {
podNamespace string // manager pod namespace
podName string // manager pod name
waitingTime int // Duration in second to free macs of allocated vms that failed to start.
mgr manager.Manager // Delegated controller-runtime manager
runtimeManager manager.Manager // Delegated controller-runtime manager
}

func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string, waitingTime int) *KubeMacPoolManager {
Expand Down Expand Up @@ -84,27 +85,20 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error {
}

for k.continueToRunManager {
log.Info("Setting up Manager")
mgr, err := manager.New(k.config, manager.Options{
MetricsBindAddress: k.metricsAddr,
LeaderElection: true,
LeaderElectionID: names.LEADER_ID,
LeaderElectionNamespace: k.podNamespace,
})
err = k.initRuntimeManager()
if err != nil {
return fmt.Errorf("unable to set up manager error %v", err)
return errors.Wrap(err, "unable to set up manager")
}
k.mgr = mgr

err = kubevirt_api.AddToScheme(mgr.GetScheme())
err = kubevirt_api.AddToScheme(k.runtimeManager.GetScheme())
if err != nil {
return fmt.Errorf("unable to register kubevirt scheme error %v", err)
errors.Wrap(err, "unable to register kubevirt scheme")
}

isKubevirtInstalled := checkForKubevirt(k.clientset)
poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, k.podNamespace, isKubevirtInstalled, k.waitingTime)
if err != nil {
return fmt.Errorf("unable to create pool manager error %v", err)
errors.Wrap(err, "unable to create pool manager")
}

if !isKubevirtInstalled {
Expand All @@ -116,20 +110,20 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error {
go k.waitToStartLeading(poolManager)

log.Info("Setting up controllers")
err = controller.AddToManager(mgr, poolManager)
err = controller.AddToManager(k.runtimeManager, poolManager)
if err != nil {
return fmt.Errorf("unable to register controllers to the manager error %v", err)
errors.Wrap(err, "unable to register controllers to the manager")
}

log.Info("Setting up webhooks")
err = webhook.AddToManager(mgr, poolManager)
err = webhook.AddToManager(k.runtimeManager, poolManager)
if err != nil {
return fmt.Errorf("unable to register webhooks to the manager error %v", err)
errors.Wrap(err, "unable to register webhooks to the manager")
}

err = mgr.Start(k.restartChannel)
err = k.runtimeManager.Start(k.restartChannel)
if err != nil {
return fmt.Errorf("unable to run the manager error %v", err)
errors.Wrap(err, "unable to run the manager")
}

// restart channels
Expand All @@ -149,6 +143,19 @@ func checkForKubevirt(kubeClient *kubernetes.Clientset) bool {
return false
}

func (k *KubeMacPoolManager) initRuntimeManager() error {
log.Info("Setting up Manager")
var err error
k.runtimeManager, err = manager.New(k.config, manager.Options{
MetricsBindAddress: k.metricsAddr,
LeaderElection: true,
LeaderElectionID: names.LEADER_ID,
LeaderElectionNamespace: k.podNamespace,
})

return err
}

// Check for Kubevirt CRD to be available
func (k *KubeMacPoolManager) waitForKubevirt() {
for _ = range time.Tick(5 * time.Second) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/names/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ const OPENSHIFT_RUNLABEL = "openshift.io/run-level"
const WAITING_VMS_CONFIGMAP = "kubemacpool-vm-configmap"

const WAIT_TIME_ARG = "wait-time"

const LEADER_READY_CONDITION_TYPE = "kubemacpool.io/leader-ready"
4 changes: 2 additions & 2 deletions pkg/pool-manager/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func NewPoolManager(kubeClient kubernetes.Interface, rangeStart, rangeEnd net.Ha
func (p *PoolManager) Start() error {
err := p.InitMaps()
if err != nil {
return err
return errors.Wrap(err, "failed Init pool manager maps")
}

if p.isKubevirt {
go p.vmWaitingCleanupLook(p.waitTime)
go p.vmWaitingCleanupLook()
}
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/pool-manager/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ var _ = Describe("Pool", func() {
createPoolManager := func(startMacAddr, endMacAddr string, fakeObjectsForClient ...runtime.Object) *PoolManager {
fakeClient := fake.NewSimpleClientset(fakeObjectsForClient...)
startPoolRangeEnv, err := net.ParseMAC(startMacAddr)
Expect(err).ToNot(HaveOccurred())
Expect(err).ToNot(HaveOccurred(), "should successfully parse starting mac address range")
endPoolRangeEnv, err := net.ParseMAC(endMacAddr)
Expect(err).ToNot(HaveOccurred())
Expect(err).ToNot(HaveOccurred(), "should successfully parse ending mac address range")
poolManager, err := NewPoolManager(fakeClient, startPoolRangeEnv, endPoolRangeEnv, names.MANAGER_NAMESPACE, false, 10)
Expect(err).ToNot(HaveOccurred())
Expect(err).ToNot(HaveOccurred(), "should successfully initialize poolManager")
err = poolManager.Start()
Expect(err).ToNot(HaveOccurred(), "should successfully start poolManager routines")

return poolManager
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/pool-manager/virtualmachine_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,9 @@ func (p *PoolManager) MarkVMAsReady(vm *kubevirt.VirtualMachine, parentLogger lo
// mutating webhook but we didn't get the creation event in the controller loop
// this mean the create was failed by some other mutating or validating webhook
// so we release the virtual machine
func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
func (p *PoolManager) vmWaitingCleanupLook() {
//if we reach here then we are in the leader pod
logger := log.WithName("vmWaitingCleanupLook").WithValues("macPoolMap", p.macPoolMap)
logger := log.WithName("vmWaitingCleanupLook")
c := time.Tick(3 * time.Second)
logger.Info("starting cleanup loop for waiting mac addresses")
for _ = range c {
Expand All @@ -483,7 +483,7 @@ func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {

configMapUpdateNeeded := false
if configMap.Data == nil {
logger.Info("the configMap is empty", "configMapName", names.WAITING_VMS_CONFIGMAP)
logger.Info("the configMap is empty", "configMapName", names.WAITING_VMS_CONFIGMAP, "macPoolMap", p.macPoolMap)
p.poolMutex.Unlock()
continue
}
Expand All @@ -496,7 +496,9 @@ func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
continue
}

if time.Now().After(t.Add(time.Duration(waitTime) * time.Second)) {
logger.Info("data:", "configMapName", names.WAITING_VMS_CONFIGMAP, "configMap.Data", configMap.Data, "macPoolMap", p.macPoolMap)

if time.Now().After(t.Add(time.Duration(p.waitTime) * time.Second)) {
configMapUpdateNeeded = true
delete(configMap.Data, macAddress)
macAddress = strings.Replace(macAddress, "-", ":", 5)
Expand All @@ -509,6 +511,12 @@ func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
_, err = p.kubeClient.CoreV1().ConfigMaps(p.managerNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
}

if err == nil {
logger.Info("the configMap successfully updated", "configMapName", names.WAITING_VMS_CONFIGMAP, "macPoolMap", p.macPoolMap)
} else {
logger.Info("the configMap failed to update", "configMapName", names.WAITING_VMS_CONFIGMAP, "macPoolMap", p.macPoolMap)
}

p.poolMutex.Unlock()
}
}
Expand Down
25 changes: 0 additions & 25 deletions tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,35 +314,10 @@ func changeManagerReplicas(numOfReplica int32) error {
return false
}

if managerDeployment.Status.Replicas != numOfReplica {
return false
}

if managerDeployment.Status.ReadyReplicas != numOfReplica {
return false
}

podsList, err := testClient.KubeClient.CoreV1().Pods(managerNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false
}

if len(podsList.Items) != int(numOfReplica) {
return false
}

numberOfReadyPods := int32(0)
for _, podObject := range podsList.Items {
for _, condition := range podObject.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
numberOfReadyPods += 1
}
}
}
if numberOfReadyPods < numOfReplica {
return false
}

return true

}, 2*time.Minute, 3*time.Second).Should(BeTrue(), "failed to change kubemacpool deployment number of replicas")
Expand Down

0 comments on commit a3708fe

Please sign in to comment.