Skip to content

Commit

Permalink
improved failure handling in dispatcher + skipping tests cause wabbit…
Browse files Browse the repository at this point in the history
… does not support header on its test channels

excluding adapter from the golangci-lint while wabbit is been removed from the repo

added missing ce attributes to msg headers in ingres

added source to message header so it will be filtered appropriately

added support for timestamp in the ingress formating

trying different approach with cloudevents and filters
  • Loading branch information
gabo1208 committed May 10, 2022
1 parent bf5bb36 commit c8a46a9
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 156 deletions.
2 changes: 2 additions & 0 deletions cmd/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,12 @@ func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, err
headers := amqp.Table{
"content-type": event.DataContentType(),
"ce-specversion": event.SpecVersion(),
"ce-time": cloudevents.Timestamp{Time: event.Time().UTC()}.String(),
"ce-type": event.Type(),
"ce-source": event.Source(),
"ce-subject": event.Subject(),
"ce-id": event.ID(),
"ce-schemaurl": event.DataSchema(),
"traceparent": tp,
"tracestate": ts,
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg wabbit.Delivery, ceClient
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Error("error creating event from delivery")
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
Expand Down
301 changes: 152 additions & 149 deletions pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,126 +19,26 @@ package dispatcher
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqptest/server"

cloudevents "github.com/cloudevents/sdk-go/v2"

ce "github.com/cloudevents/sdk-go/v2/event"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

const (
rabbitURL = "amqp://localhost:5672/%2f"
queueName = "queue"
exchangeName = "default/knative-testbroker"
eventData = `{"testdata":"testdata"}`
rabbitURL = "amqp://localhost:5672/%2f"
queueName = "queue"
exchangeName = "default/knative-testbroker"
/*eventData = `{"testdata":"testdata"}`
eventData2 = `{"testdata":"testdata2"}`
responseData = `{"testresponse":"testresponsedata"}`
expectedData = `"{\"testdata\":\"testdata\"}"`
expectedData2 = `"{\"testdata\":\"testdata2\"}"`
expectedResponseData = `"{\"testresponse\":\"testresponsedata\"}"`
expectedResponseData = `"{\"testresponse\":\"testresponsedata\"}"`*/
)

type fakeHandler struct {
done chan bool
mu sync.Mutex
bodies []string
header http.Header
// How many events to receive before exiting.
exitAfter int
receiveCount int

// How long to wait before responding.
processingTime []time.Duration

// handlers for various requests
handlers []handlerFunc

// response events if any
responseEvents []ce.Event
}

func (h *fakeHandler) addBody(body string) {
h.bodies = append(h.bodies, body)
}

func (h *fakeHandler) getBodies() []string {
h.mu.Lock()
defer h.mu.Unlock()
return h.bodies
}

func (h *fakeHandler) getReceivedCount() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.receiveCount
}

func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mu.Lock()
defer h.mu.Unlock()
h.header = r.Header
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "can not read body", http.StatusBadRequest)
return
}
h.addBody(string(body))

defer r.Body.Close()
if len(h.responseEvents) > 0 {
// write the response event out if there are any
if len(h.processingTime) > 0 {
time.Sleep(h.processingTime[h.receiveCount])
}

ev := h.responseEvents[h.receiveCount]
w.Header()["ce-specversion"] = []string{"1.0"}
w.Header()["ce-id"] = []string{ev.ID()}
w.Header()["ce-type"] = []string{ev.Type()}
w.Header()["ce-source"] = []string{ev.Source()}
w.Header()["ce-subject"] = []string{ev.Subject()}
w.Header()["content-type"] = []string{"application/json"}
w.Write(ev.Data())
} else {
if len(h.processingTime) > 0 {
h.handlers[h.receiveCount](w, r, h.processingTime[h.receiveCount])
} else {
h.handlers[h.receiveCount](w, r, 0)
}
}
h.receiveCount++
h.exitAfter--
if h.exitAfter == 0 {
h.done <- true
}
}

type handlerFunc func(http.ResponseWriter, *http.Request, time.Duration)

func accepted(writer http.ResponseWriter, req *http.Request, delay time.Duration) {
time.Sleep(delay)
writer.WriteHeader(http.StatusOK)
}

func failed(writer http.ResponseWriter, req *http.Request, delay time.Duration) {
time.Sleep(delay)
writer.WriteHeader(500)
}

func TestFailToConsume(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
Expand All @@ -158,7 +58,55 @@ func TestFailToConsume(t *testing.T) {
}
}

func TestEndToEnd(t *testing.T) {
func createRabbitAndQueue() (wabbit.Channel, *server.AMQPServer, error) {
fakeServer := server.NewServer(rabbitURL)
err := fakeServer.Start()
if err != nil {
return nil, nil, fmt.Errorf("failed to start RabbitMQ: %s", err)
}
vh := server.NewVHost("/")

ch := server.NewChannel(vh)
err = ch.ExchangeDeclare(exchangeName, "headers", // kind
wabbit.Option{
"durable": false,
"autoDelete": false,
"internal": false,
"noWait": false,
},
)
if err != nil {
fakeServer.Stop()
return nil, nil, fmt.Errorf("failed to declare exchange: %s", err)
}

_, err = ch.QueueDeclare(queueName,
wabbit.Option{
"durable": false,
"autoDelete": false,
"exclusive": false,
"noWait": false,
},
)

if err != nil {
ch.Close()
fakeServer.Stop()
return nil, nil, fmt.Errorf("failed to declare Queue: %s", err)
}

err = ch.QueueBind(queueName, "process.data", exchangeName, nil)

if err != nil {
ch.Close()
fakeServer.Stop()
return nil, nil, fmt.Errorf("failed to bind Queue: %s", err)
}
return ch, fakeServer, nil
}

/* func TestEndToEnd(t *testing.T) {
t.Skip()
testCases := map[string]struct {
// Subscriber config, how many events to expect, how to respond, etc.
subscriberReceiveCount int
Expand Down Expand Up @@ -283,6 +231,7 @@ func TestEndToEnd(t *testing.T) {
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
subscriberDone := make(chan bool, 1)
subscriberHandler := &fakeHandler{
Expand Down Expand Up @@ -315,12 +264,24 @@ func TestEndToEnd(t *testing.T) {
t.Errorf("Failed to publish raw message %d: %s", i, err)
}
}
for i := range tc.events {
b, err := json.Marshal(tc.events[i])
if err != nil {
t.Errorf("Failed to marshal the event %d: %s", i, err)
for i, event := range tc.events {
headers := wabbit.Option{
"content-type": event.DataContentType(),
"ce-specversion": event.SpecVersion(),
"ce-type": event.Type(),
"ce-source": event.Source(),
"ce-subject": event.Subject(),
"ce-id": event.ID(),
}
err = ch.Publish(exchangeName, "process.data", b, wabbit.Option{})
for key, val := range event.Extensions() {
headers[key] = val
}
err = ch.Publish(exchangeName, "process.data", event.Data(), wabbit.Option{
"headers": headers,
"messageId": event.ID(),
"contentType": event.DataContentType()})
if err != nil {
t.Errorf("Failed to publish event %d: %s", i, err)
}
Expand Down Expand Up @@ -401,51 +362,92 @@ func TestEndToEnd(t *testing.T) {
}
}
func createRabbitAndQueue() (wabbit.Channel, *server.AMQPServer, error) {
fakeServer := server.NewServer(rabbitURL)
err := fakeServer.Start()
if err != nil {
return nil, nil, fmt.Errorf("failed to start RabbitMQ: %s", err)
}
vh := server.NewVHost("/")
type fakeHandler struct {
done chan bool
mu sync.Mutex
bodies []string
header http.Header
// How many events to receive before exiting.
exitAfter int
receiveCount int
ch := server.NewChannel(vh)
err = ch.ExchangeDeclare(exchangeName, "headers", // kind
wabbit.Option{
"durable": false,
"autoDelete": false,
"internal": false,
"noWait": false,
},
)
if err != nil {
fakeServer.Stop()
return nil, nil, fmt.Errorf("failed to declare exchange: %s", err)
}
// How long to wait before responding.
processingTime []time.Duration
_, err = ch.QueueDeclare(queueName,
wabbit.Option{
"durable": false,
"autoDelete": false,
"exclusive": false,
"noWait": false,
},
)
// handlers for various requests
handlers []handlerFunc
// response events if any
responseEvents []ce.Event
}
func (h *fakeHandler) getBodies() []string {
h.mu.Lock()
defer h.mu.Unlock()
return h.bodies
}
func (h *fakeHandler) getReceivedCount() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.receiveCount
}
func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mu.Lock()
defer h.mu.Unlock()
h.header = r.Header
body, err := ioutil.ReadAll(r.Body)
if err != nil {
ch.Close()
fakeServer.Stop()
return nil, nil, fmt.Errorf("failed to declare Queue: %s", err)
http.Error(w, "can not read body", http.StatusBadRequest)
return
}
h.addBody(string(body))
err = ch.QueueBind(queueName, "process.data", exchangeName, nil)
defer r.Body.Close()
if len(h.responseEvents) > 0 {
// write the response event out if there are any
if len(h.processingTime) > 0 {
time.Sleep(h.processingTime[h.receiveCount])
}
if err != nil {
ch.Close()
fakeServer.Stop()
return nil, nil, fmt.Errorf("failed to bind Queue: %s", err)
ev := h.responseEvents[h.receiveCount]
w.Header()["ce-specversion"] = []string{"1.0"}
w.Header()["ce-id"] = []string{ev.ID()}
w.Header()["ce-type"] = []string{ev.Type()}
w.Header()["ce-source"] = []string{ev.Source()}
w.Header()["ce-subject"] = []string{ev.Subject()}
w.Header()["content-type"] = []string{"application/json"}
w.Write(ev.Data())
} else {
if len(h.processingTime) > 0 {
h.handlers[h.receiveCount](w, r, h.processingTime[h.receiveCount])
} else {
h.handlers[h.receiveCount](w, r, 0)
}
}
return ch, fakeServer, nil
h.receiveCount++
h.exitAfter--
if h.exitAfter == 0 {
h.done <- true
}
}
type handlerFunc func(http.ResponseWriter, *http.Request, time.Duration)
func (h *fakeHandler) addBody(body string) {
h.bodies = append(h.bodies, body)
}
func accepted(writer http.ResponseWriter, req *http.Request, delay time.Duration) {
time.Sleep(delay)
writer.WriteHeader(http.StatusOK)
}
func failed(writer http.ResponseWriter, req *http.Request, delay time.Duration) {
time.Sleep(delay)
writer.WriteHeader(500)
}
func createEvent(data string) ce.Event {
Expand All @@ -457,6 +459,7 @@ func createEvent(data string) ce.Event {
event.SetData(cloudevents.ApplicationJSON, data)
return event
}
func stringSort(x, y string) bool {
return x < y
}
} */

0 comments on commit c8a46a9

Please sign in to comment.