Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions pkg/cloudevents/clients/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package common

import (
certificatev1 "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

clusterv1 "open-cluster-management.io/api/cluster/v1"
workv1 "open-cluster-management.io/api/work/v1"
)

const (
// CloudEventsDataTypeAnnotationKey is the key of the cloudevents data type annotation.
CloudEventsDataTypeAnnotationKey = "cloudevents.open-cluster-management.io/datatype"

// CloudEventsResourceVersionAnnotationKey is the key of the resource resourceversion annotation.
//
// This annotation is used for tracing a resource specific changes, the value of this annotation
// should be a sequence number representing the resource specific generation.
CloudEventsResourceVersionAnnotationKey = "cloudevents.open-cluster-management.io/resourceversion"

// CloudEventsSequenceIDAnnotationKey is the key of the status update event sequence ID.
// The sequence id represents the order in which status update events occur on a single agent.
CloudEventsSequenceIDAnnotationKey = "cloudevents.open-cluster-management.io/sequenceid"
)

// CloudEventsOriginalSourceLabelKey is the key of the cloudevents original source label.
const CloudEventsOriginalSourceLabelKey = "cloudevents.open-cluster-management.io/originalsource"

const (
CreateRequestAction = "create_request"
UpdateRequestAction = "update_request"
DeleteRequestAction = "delete_request"
)

// ResourceDeleted represents a resource is deleted.
const ResourceDeleted = "Deleted"

const ResourceFinalizer = "cloudevents.open-cluster-management.io/resource-cleanup"

var ManagedClusterGK = schema.GroupKind{Group: clusterv1.GroupName, Kind: "ManagedCluster"}
var ManagedClusterGR = schema.GroupResource{Group: clusterv1.GroupName, Resource: "managedclusters"}

var ManifestWorkGK = schema.GroupKind{Group: workv1.GroupName, Kind: "ManifestWork"}
var ManifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "manifestworks"}

var CSRGK = schema.GroupKind{Group: certificatev1.GroupName, Kind: "CertificateSigningRequest"}
var CSRGR = schema.GroupResource{Group: certificatev1.GroupName, Resource: "certificatesigningrequests"}
36 changes: 36 additions & 0 deletions pkg/cloudevents/clients/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package errors

import (
"fmt"
"net/http"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const StatusReasonPublishError metav1.StatusReason = "PublishError"

// NewPublishError returns an error indicating a resource could not be published, and the client can try again.
func NewPublishError(qualifiedResource schema.GroupResource, name string, err error) *errors.StatusError {
return &errors.StatusError{
ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusInternalServerError,
Reason: StatusReasonPublishError,
Details: &metav1.StatusDetails{
Group: qualifiedResource.Group,
Kind: qualifiedResource.Resource,
Name: name,
Causes: []metav1.StatusCause{{Message: err.Error()}},
},
Message: fmt.Sprintf("Failed to publish work %s: %v", name, err),
},
}
}

// IsPublishError determines if err is a publish error which indicates that the request can be retried
// by the client.
func IsPublishError(err error) bool {
return errors.ReasonForError(err) == StatusReasonPublishError
}
15 changes: 15 additions & 0 deletions pkg/cloudevents/clients/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package errors

import (
"fmt"
"testing"

"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
)

func TestPublishError(t *testing.T) {
err := NewPublishError(common.ManagedClusterGR, "test", fmt.Errorf("failed to publish resource"))
if !IsPublishError(err) {
t.Errorf("expected publish error, but failed")
}
}
226 changes: 226 additions & 0 deletions pkg/cloudevents/clients/options/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package options

import (
"context"
"fmt"

"k8s.io/klog/v2"

"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

type GenericClientOptions[T generic.ResourceObject] struct {
config any
codec generic.Codec[T]
watcherStore store.ClientWatcherStore[T]
clientID string
sourceID string
clusterName string
resync bool
}

// NewGenericClientOptions create a GenericClientOptions
//
// - config, available configurations:
//
// MQTTOptions (*mqtt.MQTTOptions): builds a generic cloudevents client with MQTT
//
// GRPCOptions (*grpc.GRPCOptions): builds a generic cloudevents client with GRPC
//
// KafkaOptions (*kafka.KafkaOptions): builds a generic cloudevents client with Kafka
//
// - codec, the codec for resource
//
// - clientID, the client ID for generic cloudevents client.
//
// TODO using a specified config instead of any
func NewGenericClientOptions[T generic.ResourceObject](config any,
codec generic.Codec[T],
clientID string) *GenericClientOptions[T] {
return &GenericClientOptions[T]{
config: config,
codec: codec,
clientID: clientID,
resync: true,
}
}

// WithClientWatcherStore set the ClientWatcherStore. The client uses this store to caches the resources and
// watch the resource events. For agent, the AgentInformerWatcherStore is used by default
//
// TODO provide a default ClientWatcherStore for source.
func (o *GenericClientOptions[T]) WithClientWatcherStore(store store.ClientWatcherStore[T]) *GenericClientOptions[T] {
o.watcherStore = store
return o
}

// WithSourceID set the source ID when building a client for a source.
func (o *GenericClientOptions[T]) WithSourceID(sourceID string) *GenericClientOptions[T] {
o.sourceID = sourceID
return o
}

// WithClusterName set the managed cluster name when building a client for an agent.
func (o *GenericClientOptions[T]) WithClusterName(clusterName string) *GenericClientOptions[T] {
o.clusterName = clusterName
return o
}

// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when
// 1. after the client's store is initiated
// 2. the client reconnected
func (o *GenericClientOptions[T]) WithResyncEnabled(resync bool) *GenericClientOptions[T] {
o.resync = resync
return o
}

func (o *GenericClientOptions[T]) ClusterName() string {
return o.clusterName
}

func (o *GenericClientOptions[T]) SourceID() string {
return o.sourceID
}

func (o *GenericClientOptions[T]) WatcherStore() store.ClientWatcherStore[T] {
return o.watcherStore
}

func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (*generic.CloudEventAgentClient[T], error) {
if len(o.clientID) == 0 {
return nil, fmt.Errorf("client id is required")
}

if len(o.clusterName) == 0 {
return nil, fmt.Errorf("cluster name is required")
}

if o.watcherStore == nil {
o.watcherStore = store.NewAgentInformerWatcherStore[T]()
}

options, err := generic.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID)
if err != nil {
return nil, err
}

cloudEventsClient, err := generic.NewCloudEventAgentClient(
ctx,
options,
store.NewAgentWatcherStoreLister(o.watcherStore),
statushash.StatusHash,
o.codec,
)
if err != nil {
return nil, err
}

// start to subscribe
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)

// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
if !o.resync {
klog.V(4).Infof("resync is disabled, do nothing")
continue
}

// when receiving a client reconnected signal, we resync all sources for this agent
// TODO after supporting multiple sources, we should only resync agent known sources
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}
}()

if !o.resync {
return cloudEventsClient, nil
}

// start a go routine to resync the works after this client's store is initiated
go func() {
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}()

return cloudEventsClient, nil
}

func (o *GenericClientOptions[T]) SourceClient(ctx context.Context) (*generic.CloudEventSourceClient[T], error) {
if len(o.clientID) == 0 {
return nil, fmt.Errorf("client id is required")
}

if len(o.sourceID) == 0 {
return nil, fmt.Errorf("source id is required")
}

if o.watcherStore == nil {
return nil, fmt.Errorf("a watcher store is required")
}

options, err := generic.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID)
if err != nil {
return nil, err
}

cloudEventsClient, err := generic.NewCloudEventSourceClient(
ctx,
options,
store.NewSourceWatcherStoreLister(o.watcherStore),
statushash.StatusHash,
o.codec,
)
if err != nil {
return nil, err
}

// start to subscribe
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
if !o.resync {
klog.V(4).Infof("resync is disabled, do nothing")
continue
}

// when receiving a client reconnected signal, we resync all clusters for this source
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}
}()

if !o.resync {
return cloudEventsClient, nil
}

// start a go routine to resync the works after this client's store is initiated
go func() {
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}()

return cloudEventsClient, nil
}
33 changes: 33 additions & 0 deletions pkg/cloudevents/clients/statushash/statushash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package statushash

import (
"crypto/sha256"
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
)

// StatusHash returns the SHA256 checksum of a resource status.
func StatusHash[T generic.ResourceObject](resource T) (string, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource)
if err != nil {
return "", err
}

status, found, err := unstructured.NestedMap(u, "status")
if err != nil {
return "", err
}
if !found {
return "", fmt.Errorf("no status for the resource %s", resource.GetUID())
}

statusBytes, err := json.Marshal(status)
if err != nil {
return "", fmt.Errorf("failed to marshal resource status, %v", err)
}
return fmt.Sprintf("%x", sha256.Sum256(statusBytes)), nil
}
Loading