Skip to content

Commit

Permalink
fix code reviews
Browse files Browse the repository at this point in the history
Signed-off-by: Ram Lavi <ralavi@redhat.com>
  • Loading branch information
RamLavi committed Jun 10, 2020
1 parent 7ee59eb commit a445b80
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 41 deletions.
30 changes: 14 additions & 16 deletions pkg/manager/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package manager
import (
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -13,17 +14,17 @@ import (
poolmanager "github.com/k8snetworkplumbingwg/kubemacpool/pkg/pool-manager"
)

func (k *KubeMacPoolManager) waitToStartLeading(poolManager *poolmanager.PoolManager) error {
<-k.mgr.Elected()
func (k *KubeMacPoolManager) waitToStartLeading(poolManger *poolmanager.PoolManager) error {
<-k.runtimeManager.Elected()
// If we reach here then we are in the elected pod.

err := poolManager.Start()
err := poolManger.Start()
if err != nil {
log.Error(err, "failed to start pool manager routines")
return err
}

err = k.markPodAsLeader()
err = k.AddLeaderLabelToElectedPod()
if err != nil {
log.Error(err, "failed marking pod as leader")
return err
Expand All @@ -37,7 +38,7 @@ func (k *KubeMacPoolManager) waitToStartLeading(poolManager *poolmanager.PoolMan
return nil
}

func (k *KubeMacPoolManager) markPodAsLeader() error {
func (k *KubeMacPoolManager) AddLeaderLabelToElectedPod() error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod, err := k.clientset.CoreV1().Pods(k.podNamespace).Get(context.TODO(), k.podName, metav1.GetOptions{})
if err != nil {
Expand All @@ -61,28 +62,25 @@ func (k *KubeMacPoolManager) markPodAsLeader() error {
// By setting this status to true in all pods, we declare the kubemacpool as ready and allow the webhooks to start running.
func (k *KubeMacPoolManager) setLeadershipConditions(status corev1.ConditionStatus) error {
podList := corev1.PodList{}
err := k.mgr.GetClient().List(context.TODO(), &podList, &client.ListOptions{Namespace: k.podNamespace})
err := k.runtimeManager.GetClient().List(context.TODO(), &podList, &client.ListOptions{Namespace: k.podNamespace})
if err != nil {
return err
return errors.Wrap(err, "failed to list kubemacpool manager pods")
}
for _, pod := range podList.Items {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
podKey := types.NamespacedName{Namespace: k.podNamespace, Name: pod.Name}
err := k.mgr.GetClient().Get(context.TODO(), podKey, &pod)
err := k.runtimeManager.GetClient().Get(context.TODO(), podKey, &pod)
if err != nil {
return err
return errors.Wrap(err, "failed to get kubemacpool manager pods")
}

pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{Type: "kubemacpool.io/leader-ready", Status: status, LastProbeTime: metav1.Time{}})
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{Type: names.READINESS_GATE_LEADERSHIP_LABEL, Status: status, LastProbeTime: metav1.Time{}})

err = k.mgr.GetClient().Status().Update(context.TODO(), &pod)
if err != nil {
return err
}
return nil
err = k.runtimeManager.GetClient().Status().Update(context.TODO(), &pod)
return err
})
if err != nil {
return err
return errors.Wrap(err, "failed to update Leadership readiness gate status to kubemacpool manager pods")
}
}
return nil
Expand Down
42 changes: 23 additions & 19 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kubevirt_api "kubevirt.io/client-go/api/v1"
Expand All @@ -50,7 +51,7 @@ type KubeMacPoolManager struct {
podNamespace string // manager pod namespace
podName string // manager pod name
waitingTime int // Duration in second to free macs of allocated vms that failed to start.
mgr manager.Manager // Delegated controller-runtime manager
runtimeManager manager.Manager // Delegated controller-runtime manager
}

func NewKubeMacPoolManager(podNamespace, podName, metricsAddr string, waitingTime int) *KubeMacPoolManager {
Expand Down Expand Up @@ -84,27 +85,20 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error {
}

for k.continueToRunManager {
log.Info("Setting up Manager")
mgr, err := manager.New(k.config, manager.Options{
MetricsBindAddress: k.metricsAddr,
LeaderElection: true,
LeaderElectionID: names.LEADER_ID,
LeaderElectionNamespace: k.podNamespace,
})
k.runtimeManager, err = k.initRuntimeManager()
if err != nil {
return fmt.Errorf("unable to set up manager error %v", err)
return errors.Wrap(err, "unable to set up manager")
}
k.mgr = mgr

err = kubevirt_api.AddToScheme(mgr.GetScheme())
err = kubevirt_api.AddToScheme(k.runtimeManager.GetScheme())
if err != nil {
return fmt.Errorf("unable to register kubevirt scheme error %v", err)
errors.Wrap(err, "unable to register kubevirt scheme")
}

isKubevirtInstalled := checkForKubevirt(k.clientset)
poolManager, err := poolmanager.NewPoolManager(k.clientset, rangeStart, rangeEnd, k.podNamespace, isKubevirtInstalled, k.waitingTime)
if err != nil {
return fmt.Errorf("unable to create pool manager error %v", err)
errors.Wrap(err, "unable to create pool manager")
}

if !isKubevirtInstalled {
Expand All @@ -116,20 +110,20 @@ func (k *KubeMacPoolManager) Run(rangeStart, rangeEnd net.HardwareAddr) error {
go k.waitToStartLeading(poolManager)

log.Info("Setting up controllers")
err = controller.AddToManager(mgr, poolManager)
err = controller.AddToManager(k.runtimeManager, poolManager)
if err != nil {
return fmt.Errorf("unable to register controllers to the manager error %v", err)
errors.Wrap(err, "unable to register controllers to the manager")
}

log.Info("Setting up webhooks")
err = webhook.AddToManager(mgr, poolManager)
err = webhook.AddToManager(k.runtimeManager, poolManager)
if err != nil {
return fmt.Errorf("unable to register webhooks to the manager error %v", err)
errors.Wrap(err, "unable to register webhooks to the manager")
}

err = mgr.Start(k.restartChannel)
err = k.runtimeManager.Start(k.restartChannel)
if err != nil {
return fmt.Errorf("unable to run the manager error %v", err)
errors.Wrap(err, "unable to run the manager")
}

// restart channels
Expand All @@ -149,6 +143,16 @@ func checkForKubevirt(kubeClient *kubernetes.Clientset) bool {
return false
}

func (k *KubeMacPoolManager) initRuntimeManager() (manager.Manager, error) {
log.Info("Setting up Manager")
return manager.New(k.config, manager.Options{
MetricsBindAddress: k.metricsAddr,
LeaderElection: true,
LeaderElectionID: names.LEADER_ID,
LeaderElectionNamespace: k.podNamespace,
})
}

// Check for Kubevirt CRD to be available
func (k *KubeMacPoolManager) waitForKubevirt() {
for _ = range time.Tick(5 * time.Second) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/names/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ const OPENSHIFT_RUNLABEL = "openshift.io/run-level"
const WAITING_VMS_CONFIGMAP = "kubemacpool-vm-configmap"

const WAIT_TIME_ARG = "wait-time"

const READINESS_GATE_LEADERSHIP_LABEL = "kubemacpool.io/leader-ready"
12 changes: 10 additions & 2 deletions pkg/pool-manager/virtualmachine_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (p *PoolManager) MarkVMAsReady(vm *kubevirt.VirtualMachine, parentLogger lo
// so we release the virtual machine
func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
//if we reach here then we are in the leader pod
logger := log.WithName("vmWaitingCleanupLook").WithValues("macPoolMap", p.macPoolMap)
logger := log.WithName("vmWaitingCleanupLook")
c := time.Tick(3 * time.Second)
logger.Info("starting cleanup loop for waiting mac addresses")
for _ = range c {
Expand All @@ -483,7 +483,7 @@ func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {

configMapUpdateNeeded := false
if configMap.Data == nil {
logger.Info("the configMap is empty", "configMapName", names.WAITING_VMS_CONFIGMAP)
logger.Info("the configMap is empty", "configMapName", names.WAITING_VMS_CONFIGMAP, "macPoolMap", p.macPoolMap)
p.poolMutex.Unlock()
continue
}
Expand All @@ -496,6 +496,8 @@ func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
continue
}

logger.Info("data:", "configMapName", names.WAITING_VMS_CONFIGMAP, "configMap.Data", configMap.Data, "macPoolMap", p.macPoolMap)

if time.Now().After(t.Add(time.Duration(waitTime) * time.Second)) {
configMapUpdateNeeded = true
delete(configMap.Data, macAddress)
Expand All @@ -509,6 +511,12 @@ func (p *PoolManager) vmWaitingCleanupLook(waitTime int) {
_, err = p.kubeClient.CoreV1().ConfigMaps(p.managerNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
}

if err == nil {
logger.Info("the configMap successfully updated", "configMapName", names.WAITING_VMS_CONFIGMAP, "macPoolMap", p.macPoolMap)
} else {
logger.Info("the configMap failed to update", "configMapName", names.WAITING_VMS_CONFIGMAP, "macPoolMap", p.macPoolMap)
}

p.poolMutex.Unlock()
}
}
Expand Down
4 changes: 0 additions & 4 deletions tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ func changeManagerReplicas(numOfReplica int32) error {
return false
}

if managerDeployment.Status.Replicas != numOfReplica {
return false
}

if managerDeployment.Status.ReadyReplicas != numOfReplica {
return false
}
Expand Down

0 comments on commit a445b80

Please sign in to comment.