Skip to content

Commit

Permalink
feature: reflect remote events to the local cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli authored and adamjensenbot committed Jun 8, 2023
1 parent 88a7cc2 commit 3c2b522
Show file tree
Hide file tree
Showing 13 changed files with 633 additions and 1 deletion.
1 change: 1 addition & 0 deletions cmd/virtual-kubelet/root/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func InstallFlags(flags *pflag.FlagSet, o *Opts) {
"The number of service account reflection workers (applies only if API server support is enabled in token API mode)")
flags.UintVar(&o.PersistentVolumeClaimWorkers, "persistentvolumeclaim-reflection-workers", o.PersistentVolumeClaimWorkers,
"The number of persistentvolumeclaim reflection workers")
flags.UintVar(&o.EventWorkers, "event-reflection-workers", o.EventWorkers, "The number of event reflection workers")

flags.DurationVar(&o.NodeLeaseDuration, "node-lease-duration", o.NodeLeaseDuration, "The duration of the node leases")
flags.DurationVar(&o.NodePingInterval, "node-ping-interval", o.NodePingInterval,
Expand Down
3 changes: 3 additions & 0 deletions cmd/virtual-kubelet/root/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
DefaultSecretWorkers = 3
DefaultServiceAccountWorkers = 3
DefaultPersistenVolumeClaimWorkers = 3
DefaultEventWorkers = 3

DefaultNodePingTimeout = 1 * time.Second
DefaultNodeCheckNetwork = true
Expand Down Expand Up @@ -83,6 +84,7 @@ type Opts struct {
SecretWorkers uint
ServiceAccountWorkers uint
PersistentVolumeClaimWorkers uint
EventWorkers uint

NodeLeaseDuration time.Duration
NodePingInterval time.Duration
Expand Down Expand Up @@ -125,6 +127,7 @@ func NewOpts() *Opts {
SecretWorkers: DefaultSecretWorkers,
ServiceAccountWorkers: DefaultServiceAccountWorkers,
PersistentVolumeClaimWorkers: DefaultPersistenVolumeClaimWorkers,
EventWorkers: DefaultEventWorkers,

NodeLeaseDuration: node.DefaultLeaseDuration * time.Second,
NodePingInterval: node.DefaultPingInterval,
Expand Down
1 change: 1 addition & 0 deletions cmd/virtual-kubelet/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func runRootCommand(ctx context.Context, c *Opts) error {
SecretWorkers: c.SecretWorkers,
ServiceAccountWorkers: c.ServiceAccountWorkers,
PersistenVolumeClaimWorkers: c.PersistentVolumeClaimWorkers,
EventWorkers: c.EventWorkers,

EnableAPIServerSupport: c.EnableAPIServerSupport,
EnableStorage: c.EnableStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ rules:
- events
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
3 changes: 3 additions & 0 deletions pkg/virtualKubelet/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/liqotech/liqo/pkg/liqonet/ipam"
"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/configuration"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/event"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/exposition"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/manager"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/namespacemap"
Expand Down Expand Up @@ -70,6 +71,7 @@ type InitConfig struct {
ConfigMapWorkers uint
SecretWorkers uint
ServiceAccountWorkers uint
EventWorkers uint

EnableAPIServerSupport bool
EnableStorage bool
Expand Down Expand Up @@ -142,6 +144,7 @@ func NewLiqoProvider(ctx context.Context, cfg *InitConfig, eb record.EventBroadc
With(podreflector).
With(storage.NewPersistentVolumeClaimReflector(cfg.PersistenVolumeClaimWorkers,
cfg.VirtualStorageClassName, cfg.RemoteRealStorageClassName, cfg.EnableStorage)).
With(event.NewEventReflector(cfg.EventWorkers)).
WithNamespaceHandler(namespaceMapHandler)

if !cfg.DisableIPReflection {
Expand Down
16 changes: 16 additions & 0 deletions pkg/virtualKubelet/reflection/event/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package event implements the reflection logic for events.
package event
234 changes: 234 additions & 0 deletions pkg/virtualKubelet/reflection/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package event

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
netv1 "k8s.io/api/networking/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
discoveryv1listers "k8s.io/client-go/listers/discovery/v1"
netv1listers "k8s.io/client-go/listers/networking/v1"
"k8s.io/klog/v2"
"k8s.io/utils/trace"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/generic"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/manager"
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/options"
)

var _ manager.NamespacedReflector = (*NamespacedEventReflector)(nil)

const (
// EventReflectorName -> The name associated with the Event reflector.
EventReflectorName = "Event"
)

// NamespacedEventReflector manages the Event reflection for a given pair of local and remote namespaces.
type NamespacedEventReflector struct {
generic.NamespacedReflector

localEvents corev1listers.EventNamespaceLister
remoteEvents corev1listers.EventNamespaceLister
localEventsClient v1.EventInterface

localConfigMaps corev1listers.ConfigMapNamespaceLister
localSecrets corev1listers.SecretNamespaceLister
localEndpointSlices discoveryv1listers.EndpointSliceNamespaceLister
localIngresses netv1listers.IngressNamespaceLister
localServices corev1listers.ServiceNamespaceLister
localPvcs corev1listers.PersistentVolumeClaimNamespaceLister
localPods corev1listers.PodNamespaceLister
}

// NewEventReflector returns a new EventReflector instance.
func NewEventReflector(workers uint) manager.Reflector {
return generic.NewReflector(EventReflectorName, NewNamespacedEventReflector, generic.WithoutFallback(), workers)
}

// NewNamespacedEventReflector returns a new NamespacedEventReflector instance.
func NewNamespacedEventReflector(opts *options.NamespacedOpts) manager.NamespacedReflector {
local := opts.LocalFactory.Core().V1().Events()
remote := opts.RemoteFactory.Core().V1().Events()

localConfigMaps := opts.LocalFactory.Core().V1().ConfigMaps()
localSecrets := opts.LocalFactory.Core().V1().Secrets()
localEndpointSlices := opts.LocalFactory.Discovery().V1().EndpointSlices()
localIngresses := opts.LocalFactory.Networking().V1().Ingresses()
localServices := opts.LocalFactory.Core().V1().Services()
localPvcs := opts.LocalFactory.Core().V1().PersistentVolumeClaims()
localPods := opts.LocalFactory.Core().V1().Pods()

_, err := local.Informer().AddEventHandler(opts.HandlerFactory(generic.NamespacedKeyer(opts.LocalNamespace)))
utilruntime.Must(err)
_, err = remote.Informer().AddEventHandler(opts.HandlerFactory(generic.NamespacedKeyer(opts.LocalNamespace)))
utilruntime.Must(err)

return &NamespacedEventReflector{
NamespacedReflector: generic.NewNamespacedReflector(opts, EventReflectorName),
localEvents: local.Lister().Events(opts.LocalNamespace),
remoteEvents: remote.Lister().Events(opts.RemoteNamespace),
localEventsClient: opts.LocalClient.CoreV1().Events(opts.LocalNamespace),

localConfigMaps: localConfigMaps.Lister().ConfigMaps(opts.LocalNamespace),
localSecrets: localSecrets.Lister().Secrets(opts.LocalNamespace),
localEndpointSlices: localEndpointSlices.Lister().EndpointSlices(opts.LocalNamespace),
localIngresses: localIngresses.Lister().Ingresses(opts.LocalNamespace),
localServices: localServices.Lister().Services(opts.LocalNamespace),
localPvcs: localPvcs.Lister().PersistentVolumeClaims(opts.LocalNamespace),
localPods: localPods.Lister().Pods(opts.LocalNamespace),
}
}

// Handle reconciles Event objects.
func (ner *NamespacedEventReflector) Handle(ctx context.Context, name string) error {
tracer := trace.FromContext(ctx)

// Retrieve the local and remote objects (only not found errors can occur).
klog.V(4).Infof("Handling reflection of local Event %q (remote: %q)", ner.LocalRef(name), ner.RemoteRef(name))
local, lerr := ner.localEvents.Get(name)
utilruntime.Must(client.IgnoreNotFound(lerr))
remote, rerr := ner.remoteEvents.Get(name)
utilruntime.Must(client.IgnoreNotFound(rerr))
tracer.Step("Retrieved the local and remote objects")

// Abort the reflection if the local object is not managed by us, as we do not want to mutate others' objects.
if lerr == nil && !forge.IsReflected(local) {
if rerr == nil { // Do not output the warning event in case the event was triggered by the local object (i.e., the remote one does not exists).
klog.Infof("Skipping reflection of remote Event %q as local already exists and is not managed by us", ner.RemoteRef(name))
}
return nil
}

// Abort the reflection if the remote object has the "skip-reflection" annotation.
if !kerrors.IsNotFound(rerr) && ner.ShouldSkipReflection(remote) {
klog.Infof("Skipping reflection of remote Event %q as marked with the skip annotation", ner.RemoteRef(name))
if kerrors.IsNotFound(lerr) { // The remote object does not already exist, hence no further action is required.
return nil
}

// Otherwise, let pretend the remote object does not exist, so that the local one gets deleted.
rerr = kerrors.NewNotFound(corev1.Resource("Event"), local.GetName())
}

tracer.Step("Performed the sanity checks")

// The remote Event does no longer exist. Ensure it is also absent from the local cluster.
if kerrors.IsNotFound(rerr) {
defer tracer.Step("Ensured the absence of the local object")
if !kerrors.IsNotFound(lerr) {
klog.V(4).Infof("Deleting local Event %q, since remote %q does no longer exist", ner.LocalRef(name), ner.RemoteRef(name))
return ner.DeleteLocal(ctx, ner.localEventsClient, EventReflectorName, name, local.GetUID())
}

klog.V(4).Infof("Local Event %q and remote Event %q both vanished", ner.LocalRef(name), ner.RemoteRef(name))
return nil
}

// The local Event does not exist yet. Forge it and create it on the local cluster.
if kerrors.IsNotFound(lerr) {
defer tracer.Step("Ensured the presence of the local object")
klog.V(4).Infof("Creating local Event %q, since remote %q does not exist yet", ner.LocalRef(name), ner.RemoteRef(name))

localInvolvedObject, err := ner.getLocalObject(remote.InvolvedObject.Kind, remote.InvolvedObject.APIVersion, remote.InvolvedObject.Name)
if err != nil {
// Skip the reflection of the event if the involved object does not exist in the local cluster
klog.V(4).Infof("Unable to get local object %q: %v", remote.InvolvedObject.Name, err)
return nil
}

source := corev1.EventSource{
Component: remote.Source.Component + " (remote)",
Host: remote.Source.Host,
}

e := corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: remote.Name,
Namespace: ner.LocalNamespace(),
Labels: labels.Merge(remote.GetLabels(), forge.ReflectionLabels()),
Annotations: remote.GetAnnotations(),
},
InvolvedObject: corev1.ObjectReference{
APIVersion: remote.InvolvedObject.APIVersion,
Kind: remote.InvolvedObject.Kind,
Name: remote.InvolvedObject.Name,
Namespace: ner.LocalNamespace(),
FieldPath: remote.InvolvedObject.FieldPath,
UID: localInvolvedObject.GetUID(),
},
Reason: remote.Reason,
Message: remote.Message,
Type: remote.Type,
Source: source,
FirstTimestamp: remote.FirstTimestamp,
LastTimestamp: remote.LastTimestamp,
Count: remote.Count,
EventTime: remote.EventTime,
Series: remote.Series,
Action: remote.Action,
Related: remote.Related,
ReportingController: remote.ReportingController,
ReportingInstance: remote.ReportingInstance,
}

_, err = ner.localEventsClient.Create(ctx, &e, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Unable to create local Event %q: %v", ner.LocalRef(name), err)
return err
}
}

klog.Infof("Local Event %q successfully enforced (remote: %q)", ner.LocalRef(name), ner.RemoteRef(name))

return nil
}

func (ner *NamespacedEventReflector) getLocalObject(kind, apiVersion, name string) (client.Object, error) {
gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
return nil, err
}

switch {
case gv.Group == corev1.GroupName && gv.Version == corev1.SchemeGroupVersion.Version && kind == "ConfigMap":
return ner.localConfigMaps.Get(name)
case gv.Group == corev1.GroupName && gv.Version == corev1.SchemeGroupVersion.Version && kind == "Secret":
return ner.localSecrets.Get(name)
case gv.Group == discoveryv1.GroupName && gv.Version == discoveryv1.SchemeGroupVersion.Version && kind == "EndpointSlice":
return ner.localEndpointSlices.Get(name)
case gv.Group == netv1.GroupName && gv.Version == netv1.SchemeGroupVersion.Version && kind == "Ingress":
return ner.localIngresses.Get(name)
case gv.Group == corev1.GroupName && gv.Version == corev1.SchemeGroupVersion.Version && kind == "Service":
return ner.localServices.Get(name)
case gv.Group == corev1.GroupName && gv.Version == corev1.SchemeGroupVersion.Version && kind == "PersistentVolumeClaim":
return ner.localPvcs.Get(name)
case gv.Group == corev1.GroupName && gv.Version == corev1.SchemeGroupVersion.Version && kind == "Pod":
return ner.localPods.Get(name)
default:
return nil, fmt.Errorf("unable to get local object %q: kind %q and apiVersion %q not supported", name, kind, apiVersion)
}
}
Loading

0 comments on commit 3c2b522

Please sign in to comment.