Skip to content

Commit

Permalink
added tests to protocol spec
Browse files Browse the repository at this point in the history
using ce protocol binding to get msg from a rabbitmq msg delivery
  • Loading branch information
gabo1208 committed May 10, 2022
1 parent ad60f0d commit bf5bb36
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (a *Adapter) postMessage(msg wabbit.Delivery) error {
req,
a.logger)
if err != nil {
a.logger.Error("error writting event to http", zap.Error(err))
a.logger.Error("error writing event to http", zap.Error(err))
return err
}

Expand Down
20 changes: 8 additions & 12 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ package dispatcher

import (
"context"
"encoding/json"
"net/http"
"sync"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/pkg/errors"
amqperr "github.com/rabbitmq/amqp091-go"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing-rabbitmq/pkg/rabbit"
"knative.dev/eventing-rabbitmq/pkg/utils"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
Expand Down Expand Up @@ -137,24 +138,19 @@ func isSuccess(ctx context.Context, result protocol.Result) bool {
}

func (d *Dispatcher) dispatch(ctx context.Context, msg wabbit.Delivery, ceClient cloudevents.Client) {
event := cloudevents.NewEvent()
err := json.Unmarshal(msg.Body(), &event)
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed to unmarshal event (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
logging.FromContext(ctx).Error("error creating event from delivery")
}

ctx, span := readSpan(ctx, msg)
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(&event)...)
span.AddAttributes(client.EventTraceAttributes(event)...)
}

logging.FromContext(ctx).Debugf("Got event as: %+v", event)
logging.FromContext(ctx).Debugf("Got event as: %+v", *event)
ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)

if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
Expand All @@ -163,7 +159,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg wabbit.Delivery, ceClient
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, event)
response, result := ceClient.Request(ctx, *event)
if !isSuccess(ctx, result) {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/rabbit/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ const (
contentTypeHeader = "content-type"
)

var (
specs = spec.WithPrefix(prefix)
)
var specs = spec.WithPrefix(prefix)

// Message holds a rabbitmq message.
// this message *can* be read several times safely
Expand Down
Loading

0 comments on commit bf5bb36

Please sign in to comment.