Skip to content

Commit

Permalink
feat: Add scheduler with annotations (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
RealAnna committed Sep 21, 2022
1 parent 722e128 commit 9e29019
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 26 deletions.
64 changes: 38 additions & 26 deletions lfc-scheduler/pkg/klcpermit/permit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,22 @@ import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"time"
)

// Name is the name of the plugin used in the plugin registry and configurations.
const (
Name = "KLCPermit"
Wait = "Wait"
Success = "Success"
Name = "KLCPermit"
)

// Permit is a plugin that implements a wait for pre-deployment checks
type Permit struct {
recorder record.EventRecorder
handler framework.Handle
svcManager *ServiceManager
}

var _ framework.PermitPlugin = &Permit{}
Expand All @@ -35,33 +31,49 @@ func (pl *Permit) Name() string {

func (pl *Permit) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {

klog.InfoS("[Keptn Permit Plugin] waiting for pre-deployment checks on", klog.KObj(p.GetObjectMeta()))
klog.InfoS("[Keptn Permit Plugin] waiting for pre-deployment checks on", p.GetObjectMeta().GetName())

pl.recorder.Event(p, v1.EventTypeNormal, "SomeReason", "Waiting Pre-Deployment")
switch pl.svcManager.Permit(ctx, p) {

case Wait:
klog.Infof("[Keptn Permit Plugin] waiting for pre-deployment checks on", p.GetObjectMeta().GetName())
return framework.NewStatus(framework.Wait), 30 * time.Second
case Failure:
klog.Infof("[Keptn Permit Plugin] failed pre-deployment checks on", p.GetObjectMeta().GetName())
return framework.NewStatus(framework.Error), 0 * time.Second
case Success:
klog.Infof("[Keptn Permit Plugin] passed pre-deployment checks on", p.GetObjectMeta().GetName())
return framework.NewStatus(framework.Success), 0 * time.Second
default:
klog.Infof("[Keptn Permit Plugin] unknown status of pre-deployment checks for", p.GetObjectMeta().GetName())
return framework.NewStatus(framework.Wait), 30 * time.Second //TODO what makes sense here?
}

retStatus := framework.NewStatus(framework.Success)
waitTime := 0 * time.Second
return retStatus, waitTime
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
client, err := newClient()
if err != nil {
return nil, err
}

return &Permit{
recorder: setupRecorder(),
svcManager: NewServiceManager(client),
handler: h,
}, nil
}

func setupRecorder() record.EventRecorder {

var config *rest.Config
config, _ = rest.InClusterConfig()
func newClient() (dynamic.Interface, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, _ := kubernetes.NewForConfig(config)
eventSink := &clientcorev1.EventSinkImpl{
Interface: clientset.CoreV1().Events(v1.NamespaceAll),
dynClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(eventSink)
r := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "keptn-scheduler"})
return r

return dynClient, nil
}
95 changes: 95 additions & 0 deletions lfc-scheduler/pkg/klcpermit/service_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package klcpermit

import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
)

var serviceResource = schema.GroupVersionResource{Group: "lifecycle.keptn.sh", Version: "v1alpha1", Resource: "serviceruns"} //TODO change this resource name with workloadinstance and eventually appinstance :)

type Status string

const (
ServiceRunStatusNotSpecified Status = "Service run status not specified"
ServiceRunNotFound Status = "Service run not found"
Success Status = "Success"
Failure Status = "Failure"
Wait Status = "Wait"
)

const (
// ServiceRunPending means the application has been accepted by the system, but one or more of its
// serviceRuns has not been started.
ServiceRunPending string = "Pending"
// ServiceRunRunning means that serviceRun has been started.
ServiceRunRunning string = "Running"
// ServiceRunSucceeded means that serviceRun has been finished successfully.
ServiceRunSucceeded string = "Succeeded"
// ServiceRunFailed means that one or more pre-deployment checks was not successful and terminated.
ServiceRunFailed string = "Failed"
// ServiceRunUnknown means that for some reason the state of the application could not be obtained.
ServiceRunUnknown string = "Unknown"
)

type Manager interface {
Permit(context.Context, *corev1.Pod) Status
}

type ServiceManager struct {
dynamicClient dynamic.Interface
}

func NewServiceManager(d dynamic.Interface) *ServiceManager {
sMgr := &ServiceManager{
dynamicClient: d,
}
return sMgr
}

func (sMgr *ServiceManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
//List service run CRDs
name := GetCRDName(pod)
crd, err := sMgr.GetCRD(ctx, metav1.NamespaceDefault, name)

if err != nil {
klog.Infof("[Keptn Permit Plugin] could not find service crd %s, err:%s", name, err.Error())
return ServiceRunNotFound
}
//check CRD status
phase, found, err := unstructured.NestedString(crd.UnstructuredContent(), "status", "phase")
klog.Infof("[Keptn Permit Plugin] service crd %s, found %s with phase %s ", crd, found, phase)
if err == nil && found {
switch phase {
case ServiceRunPending:
return Wait
case ServiceRunFailed:
return Failure
case ServiceRunSucceeded:
return Success
case ServiceRunRunning:
return Wait
case ServiceRunUnknown:
return Wait
}

}
return ServiceRunStatusNotSpecified
}

//GetCRD returns unstructured to avoid tight coupling with the CRD resource
func (sMgr *ServiceManager) GetCRD(ctx context.Context, namespace string, name string) (*unstructured.Unstructured, error) {
// GET /apis/lifecycle.keptn.sh/v1/namespaces/{namespace}/servicerun/name
return sMgr.dynamicClient.Resource(serviceResource).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
}

func GetCRDName(pod *corev1.Pod) string {
application := pod.Annotations["keptn.sh/application"]
service := pod.Annotations["keptn.sh/service"]
version := pod.Annotations["keptn.sh/version"]
return application + "-" + service + "-" + version
}

0 comments on commit 9e29019

Please sign in to comment.