Skip to content

Commit

Permalink
fix virtual-kubelet provider to use the vk external module
Browse files Browse the repository at this point in the history
- Make NotifyPods a strongly typed function.
- Wipe out GetPodStatus erroneous implementation.
  • Loading branch information
fprojetto authored and Filippo Projetto (Rebase PR Action) committed Aug 3, 2021
1 parent 72b0396 commit 2281d82
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 28 deletions.
5 changes: 3 additions & 2 deletions pkg/virtualKubelet/apiReflection/controller/controller.go
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

Expand All @@ -21,7 +22,7 @@ const (

// APIController defines the interface exposed by a controller implementing API reflection.
type APIController interface {
SetInformingFunc(apiReflection.ApiType, func(interface{}))
SetInformingFunc(apiReflection.ApiType, func(*corev1.Pod))
CacheManager() storage.CacheManagerReaderAdder
StartController()
StopController() error
Expand Down Expand Up @@ -114,7 +115,7 @@ func (c *Controller) incomingReflectionControlLoop() {
}

// SetInformingFunc configures the handlers triggered for a certain API type by incoming reflection events.
func (c *Controller) SetInformingFunc(api apiReflection.ApiType, handler func(interface{})) {
func (c *Controller) SetInformingFunc(api apiReflection.ApiType, handler func(*corev1.Pod)) {
c.incomingReflectorsController.SetInforming(api, handler)
}

Expand Down
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection"
Expand Down Expand Up @@ -69,7 +70,8 @@ func (c *IncomingReflectorsController) Start() {
}
}

func (c *IncomingReflectorsController) SetInforming(api apimgmt.ApiType, handler func(interface{})) {
// SetInforming configures the handlers triggered for a certain API type by incoming reflection events.
func (c *IncomingReflectorsController) SetInforming(api apimgmt.ApiType, handler func(*corev1.Pod)) {
c.apiReflectors[api].(ri.APIReflector).SetInforming(handler)
}

Expand Down
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"sync"

apicorev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

Expand Down Expand Up @@ -36,7 +37,7 @@ type IncomingAPIReflectorsController interface {
SpecializedAPIReflectorsController

buildIncomingReflector(api apimgmt.ApiType, opts map[options.OptionKey]options.Option) ri.IncomingAPIReflector
SetInforming(api apimgmt.ApiType, handler func(interface{}))
SetInforming(api apimgmt.ApiType, handler func(*apicorev1.Pod))
}

type ReflectorsController struct {
Expand Down
@@ -1,6 +1,8 @@
package test

import (
corev1 "k8s.io/api/core/v1"

apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection"
"github.com/liqotech/liqo/pkg/virtualKubelet/storage"
)
Expand All @@ -9,7 +11,8 @@ type MockController struct {
Manager storage.CacheManagerReaderAdder
}

func (m MockController) SetInformingFunc(apiType apimgmt.ApiType, f func(interface{})) {
// SetInformingFunc implementation.
func (m MockController) SetInformingFunc(apiType apimgmt.ApiType, f func(*corev1.Pod)) {
panic("implement me")
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/virtualKubelet/apiReflection/reflectors/apiReflector.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand All @@ -21,7 +22,7 @@ type GenericAPIReflector struct {
Api apimgmt.ApiType
PreProcessingHandlers ri.PreProcessingHandlers
OutputChan chan apimgmt.ApiEvent
informingFunc func(obj interface{})
informingFunc func(pod *corev1.Pod)

ForeignClient kubernetes.Interface
HomeClient kubernetes.Interface
Expand Down Expand Up @@ -143,13 +144,15 @@ func (r *GenericAPIReflector) Inform(obj apimgmt.ApiEvent) {
r.OutputChan <- obj
}

func (r *GenericAPIReflector) SetInforming(handler func(interface{})) {
// SetInforming configures the handlers triggered for a certain API type by incoming reflection events.
func (r *GenericAPIReflector) SetInforming(handler func(*corev1.Pod)) {
r.informingFunc = handler
}

func (r *GenericAPIReflector) PushToInforming(obj interface{}) {
// PushToInforming pushes a pod to the informing function.
func (r *GenericAPIReflector) PushToInforming(pod *corev1.Pod) {
if r.informingFunc != nil {
r.informingFunc(obj)
r.informingFunc(pod)
} else {
klog.V(3).Info("cannot push object to informing function, not existing yet")
}
Expand Down
Expand Up @@ -3,6 +3,7 @@ package reflectorsInterfaces
import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -38,8 +39,8 @@ type APIReflector interface {
SetupHandlers(api apimgmt.ApiType, reflectionType ReflectionType, namespace, nattedNs string)
SetPreProcessingHandlers(PreProcessingHandlers)

SetInforming(handler func(interface{}))
PushToInforming(interface{})
SetInforming(handler func(*corev1.Pod))
PushToInforming(*corev1.Pod)
}

type SpecializedAPIReflector interface {
Expand Down
22 changes: 5 additions & 17 deletions pkg/virtualKubelet/provider/pods.go
Expand Up @@ -176,23 +176,11 @@ func (p *LiqoProvider) GetPod(ctx context.Context, namespace, name string) (pod
return homePod.(*corev1.Pod), nil
}

// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
// GetPodStatus not implemented, panic if the method gets invoked.
// GetPodStatus should only be called by the virtual kubelet if the provider does not implement the PodNotifier interface.
// The LiqoProvider implements PodNotifier interface so we don't expect GetPodStatus to get called.
func (p *LiqoProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
klog.V(3).Infof("PROVIDER: pod %s/%s status requested to the provider", namespace, name)

foreignNamespace, err := p.namespaceMapper.NatNamespace(namespace)

if err != nil {
return nil, nil
}

foreignPod, err := p.apiController.CacheManager().GetForeignAPIByIndex(apimgmgt.Pods, foreignNamespace, name)
if err != nil {
return nil, errors.Wrap(err, "error while retrieving foreign pod")
}

return &foreignPod.(*corev1.Pod).Status, nil
panic("Virtual Kubelet called GetPodStatus unexpectedly.")
}

// GetPods returns a list of all pods known to be "running".
Expand Down Expand Up @@ -444,7 +432,7 @@ func (p *LiqoProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, err

// NotifyPods is called to set a pod informing callback function. This should be called before any operations are ready
// within the provider.
func (p *LiqoProvider) NotifyPods(ctx context.Context, notifier func(interface{})) {
func (p *LiqoProvider) NotifyPods(ctx context.Context, notifier func(*corev1.Pod)) {
p.apiController.SetInformingFunc(apimgmgt.Pods, notifier)
p.apiController.SetInformingFunc(apimgmgt.ReplicaSets, notifier)
}

0 comments on commit 2281d82

Please sign in to comment.