Skip to content

Commit

Permalink
Make data encoding configurable (#252)
Browse files Browse the repository at this point in the history
This change adds support for configurable CloudEvent dataencoding
schemes. Currently, only application/xml and application/JSON are
supported.

The default encoding (if unspecified) remains application/xml to
preserve backwards-compatibility.

The kn plugin has a new flag to configure this behavior. For ease of
use, "xml" or "json" are accepted, i.e. "application/" must not be
specified.

Closes: #222
Signed-off-by: Michael Gasch <mgasch@vmware.com>
  • Loading branch information
Michael Gasch committed Jun 2, 2021
1 parent 0ea3f14 commit 8b252bc
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 28 deletions.
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ To see examples of the Source and Binding in action, check out our
The `VSphereSource` provides a simple mechanism to enable users to react to
vSphere events.

In order to receive events from vSphere (i.e. vCenter) there are **three key
In order to receive events from vSphere (i.e. vCenter) these are the **key
parts** in the configuration:

1. The vCenter address and secret information.
1. Where to send the events.
1. Checkpoint behavior.
1. Payload encoding scheme

```yaml
apiVersion: sources.tanzu.vmware.com/v1alpha1
Expand All @@ -70,6 +71,9 @@ spec:
checkpointConfig:
maxAgeSeconds: 300
periodSeconds: 10

# Set the CloudEvent data encoding scheme to JSON
payloadEncoding: application/json
```

Let's walk through each of these.
Expand Down Expand Up @@ -154,7 +158,7 @@ sink:

### Configuring Checkpoint and Event Replay

Let's focus on the last section of the sample source:
Let's focus on this section of the sample source:

```yaml
# Adjust checkpointing and event replay behavior
Expand Down Expand Up @@ -228,6 +232,21 @@ kubectl get cm vc-source-configmap -o jsonpath='{.data}'
}
```

### Configuring CloudEvent Payload Encoding

Let's focus on this section of the sample source:

```yaml
# Set the CloudEvent data encoding scheme to JSON
payloadEncoding: application/json
```

The default CloudEvent payload encoding scheme, i.e.
[`datacontenttype`](https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#datacontenttype),
produced by a `VSphereSource` in the `v1alpha1` API is `application/xml`.
Alternatively, this can be changed to `application/json` as shown in the sample
above. Other encoding schemes are currently **not implemented**.

## Basic `VSphereBinding` Example

The `VSphereBinding` provides a simple mechanism for a user application to call
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/sources/v1alpha1/vspheresource_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package v1alpha1

import (
"context"
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
"knative.dev/pkg/apis"

"github.com/vmware-tanzu/sources-for-knative/pkg/vsphere"
Expand All @@ -23,4 +25,11 @@ func (vs *VSphereSource) SetDefaults(ctx context.Context) {
if vs.Spec.CheckpointConfig.PeriodSeconds == 0 {
vs.Spec.CheckpointConfig.PeriodSeconds = int64(vsphere.CheckpointDefaultPeriod.Seconds())
}

// preserve backward-compatibility
if vs.Spec.PayloadEncoding == "" {
vs.Spec.PayloadEncoding = cloudevents.ApplicationXML
} else {
vs.Spec.PayloadEncoding = strings.ToLower(vs.Spec.PayloadEncoding)
}
}
32 changes: 31 additions & 1 deletion pkg/apis/sources/v1alpha1/vspheresource_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -22,7 +23,7 @@ func TestVSphereSourceDefaulting(t *testing.T) {
c *VSphereSource
want *VSphereSource
}{{
name: "no change",
name: "CheckpointConfig and PayloadEncoding not set",
c: &VSphereSource{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
Expand All @@ -43,6 +44,33 @@ func TestVSphereSourceDefaulting(t *testing.T) {
MaxAgeSeconds: 0,
PeriodSeconds: int64(vsphere.CheckpointDefaultPeriod.Seconds()),
},
PayloadEncoding: cloudevents.ApplicationXML,
},
},
}, {
name: "payloadEncoding set to JSON",
c: &VSphereSource{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
},
Spec: VSphereSourceSpec{
SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
PayloadEncoding: cloudevents.ApplicationJSON,
},
},
want: &VSphereSource{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
},
Spec: VSphereSourceSpec{
SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
CheckpointConfig: VCheckpointSpec{
MaxAgeSeconds: 0,
PeriodSeconds: int64(vsphere.CheckpointDefaultPeriod.Seconds()),
},
PayloadEncoding: cloudevents.ApplicationJSON,
},
},
}, {
Expand Down Expand Up @@ -86,6 +114,7 @@ func TestVSphereSourceDefaulting(t *testing.T) {
MaxAgeSeconds: 0,
PeriodSeconds: int64(vsphere.CheckpointDefaultPeriod.Seconds()),
},
PayloadEncoding: cloudevents.ApplicationXML,
},
},
}, {
Expand Down Expand Up @@ -114,6 +143,7 @@ func TestVSphereSourceDefaulting(t *testing.T) {
MaxAgeSeconds: 3600,
PeriodSeconds: 60,
},
PayloadEncoding: cloudevents.ApplicationXML,
},
},
}}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/sources/v1alpha1/vspheresource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type VSphereSourceSpec struct {

VAuthSpec `json:",inline"`
CheckpointConfig VCheckpointSpec `json:"checkpointConfig"`
PayloadEncoding string `json:"payloadEncoding"`
}

type VCheckpointSpec struct {
Expand Down
14 changes: 12 additions & 2 deletions pkg/apis/sources/v1alpha1/vspheresource_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package v1alpha1

import (
"context"
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
"knative.dev/pkg/apis"
)

Expand All @@ -18,8 +20,16 @@ func (vs *VSphereSource) Validate(ctx context.Context) *apis.FieldError {

// Validate implements apis.Validatable
func (vsss *VSphereSourceSpec) Validate(ctx context.Context) *apis.FieldError {
return vsss.Sink.Validate(ctx).ViaField("sink").Also(vsss.VAuthSpec.Validate(ctx)).Also(vsss.CheckpointConfig.
Validate(ctx))
err := vsss.Sink.Validate(ctx).ViaField("sink").
Also(vsss.VAuthSpec.Validate(ctx)).
Also(vsss.CheckpointConfig.
Validate(ctx))

encoding := strings.ToLower(vsss.PayloadEncoding)
if (encoding != cloudevents.ApplicationJSON) && (encoding != cloudevents.ApplicationXML) {
err = err.Also(apis.ErrInvalidValue(encoding, "payloadEncoding"))
}
return err
}

func (vcs VCheckpointSpec) Validate(ctx context.Context) (err *apis.FieldError) {
Expand Down
37 changes: 34 additions & 3 deletions pkg/apis/sources/v1alpha1/vspheresource_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

Expand Down Expand Up @@ -39,11 +40,38 @@ func TestVSphereSourceValidation(t *testing.T) {
Name: "valid",
},
Spec: VSphereSourceSpec{
SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
PayloadEncoding: cloudevents.ApplicationXML,
},
},
want: nil,
}, {
name: "valid with JSON payloadEncoding",
c: &VSphereSource{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
},
Spec: VSphereSourceSpec{
SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
PayloadEncoding: cloudevents.ApplicationJSON,
},
},
want: nil,
}, {
name: "invalid payloadEncoding",
c: &VSphereSource{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
},
Spec: VSphereSourceSpec{
SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
PayloadEncoding: "application/text",
},
},
want: apis.ErrInvalidValue("application/text", "spec.payloadEncoding"),
}, {
name: "missing VAuthSpec",
c: &VSphereSource{
Expand All @@ -53,6 +81,7 @@ func TestVSphereSourceValidation(t *testing.T) {
Spec: VSphereSourceSpec{
SourceSpec: validSourceSpec,
// VAuthSpec: validVAuthSpec,
PayloadEncoding: cloudevents.ApplicationXML,
},
},
want: apis.ErrMissingField("spec.address.host", "spec.secretRef.name"),
Expand All @@ -64,7 +93,8 @@ func TestVSphereSourceValidation(t *testing.T) {
},
Spec: VSphereSourceSpec{
// SourceSpec: validSourceSpec,
VAuthSpec: validVAuthSpec,
VAuthSpec: validVAuthSpec,
PayloadEncoding: cloudevents.ApplicationXML,
},
},
want: apis.ErrGeneric("expected at least one, got none", "spec.sink.ref", "spec.sink.uri"),
Expand All @@ -81,6 +111,7 @@ func TestVSphereSourceValidation(t *testing.T) {
MaxAgeSeconds: -10,
PeriodSeconds: -5,
},
PayloadEncoding: cloudevents.ApplicationXML,
},
},
want: apis.ErrInvalidValue("-10", "spec.checkpointConfig.maxAgeSeconds").Also(apis.ErrInvalidValue("-5",
Expand Down
4 changes: 4 additions & 0 deletions pkg/reconciler/vspheresource/resources/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -94,6 +95,9 @@ func MakeDeployment(ctx context.Context, vms *v1alpha1.VSphereSource, adapterIma
}, {
Name: "VSPHERE_CHECKPOINT_CONFIG",
Value: string(jsonBytes),
}, {
Name: "VSPHERE_PAYLOAD_ENCODING",
Value: strings.ToLower(vms.Spec.PayloadEncoding),
}, {
Name: "K_CE_OVERRIDES",
Value: ceOverrides,
Expand Down
36 changes: 20 additions & 16 deletions pkg/vsphere/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type envConfig struct {

// CheckpointConfig configures the checkpoint behavior of this controller
CheckpointConfig string `envconfig:"VSPHERE_CHECKPOINT_CONFIG" default:"{}"`

// PayloadEncoding configures the encoding format for the cloud event payload
PayloadEncoding string `envconfig:"VSPHERE_PAYLOAD_ENCODING" default:"application/xml"`
}

func NewEnvConfig() adapter.EnvConfigAccessor {
Expand All @@ -45,13 +48,14 @@ func NewEnvConfig() adapter.EnvConfigAccessor {

// vAdapter implements the vSphereSource adapter to trigger a Sink.
type vAdapter struct {
Logger *zap.SugaredLogger
Namespace string
Source string
VClient *govmomi.Client
CEClient cloudevents.Client
KVStore kvstore.Interface
CpConfig CheckpointConfig
Logger *zap.SugaredLogger
Namespace string
Source string
VClient *govmomi.Client
CEClient cloudevents.Client
KVStore kvstore.Interface
CpConfig CheckpointConfig
PayloadEncoding string
}

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
Expand Down Expand Up @@ -87,13 +91,14 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
}

return &vAdapter{
Logger: logger,
Namespace: env.Namespace,
Source: source,
VClient: vClient,
CEClient: ceClient,
KVStore: store,
CpConfig: *cpconf,
Logger: logger,
Namespace: env.Namespace,
Source: source,
VClient: vClient,
CEClient: ceClient,
KVStore: store,
CpConfig: *cpconf,
PayloadEncoding: env.PayloadEncoding,
}
}

Expand Down Expand Up @@ -247,8 +252,7 @@ func (a *vAdapter) sendEvents(ctx context.Context, baseEvents []types.BaseEvent)

// TODO(mattmoor): Consider setting the subject

// TODO: make encoding configurable?
if err := ev.SetData(cloudevents.ApplicationXML, be); err != nil {
if err := ev.SetData(a.PayloadEncoding, be); err != nil {
return success, fmt.Errorf("set data on event: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/vsphere/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestSendEvents(t *testing.T) {
}
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))

adapter := vAdapter{Logger: logger.Sugar(), CEClient: c, Source: source}
adapter := vAdapter{Logger: logger.Sugar(), CEClient: c, Source: source, PayloadEncoding: cloudevents.ApplicationXML}
count, sendResult := adapter.sendEvents(ctx, tc.baseEvents)

if count != tc.result.count {
Expand Down
1 change: 1 addition & 0 deletions plugins/vsphere/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Flags:
-a, --address string URL of ESXi or vCenter instance to connect to (same as VC_URL)
--checkpoint-age duration maximum allowed age for replaying events determined by last successful event in checkpoint (default 5m0s)
--checkpoint-period duration period between saving checkpoints (default 10s)
--encoding string CloudEvent data encoding scheme (xml or json) (default "xml")
-h, --help help for source
--name string name of the source to create
-n, --namespace string namespace of the source to create (default namespace if omitted)
Expand Down

0 comments on commit 8b252bc

Please sign in to comment.