From 71e37bf020cbefef168ab411f69e2c7d31a3921d Mon Sep 17 00:00:00 2001 From: halspang <70976921+halspang@users.noreply.github.com> Date: Mon, 15 Aug 2022 17:31:14 -0700 Subject: [PATCH] Allow metadata to flow through service bus queue (#1911) * Allow metadata to flow through service bus queue This commit lets metadata bind to a message and be read when the message is received. This allows for the distributed tracing of events. Signed-off-by: Hal Spang * Remove saved fields from ApplicationProperties Signed-off-by: Hal Spang * Add a certification test for metadata Signed-off-by: Hal Spang Signed-off-by: Hal Spang Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Signed-off-by: Andrew Duss --- .../servicebusqueues/servicebusqueues.go | 20 +++++- .../bindings/azure/servicebusqueues/README.md | 5 ++ .../servicebusqueues/servicebusqueue_test.go | 69 +++++++++++++++++++ 3 files changed, 93 insertions(+), 1 deletion(-) diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index 275965a25a..4c88a5e9d5 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -155,7 +155,8 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke } msg := &servicebus.Message{ - Body: req.Data, + Body: req.Data, + ApplicationProperties: make(map[string]interface{}), } if val, ok := req.Metadata[id]; ok && val != "" { msg.MessageID = &val @@ -163,6 +164,16 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke if val, ok := req.Metadata[correlationID]; ok && val != "" { msg.CorrelationID = &val } + + // Include incoming metadata in the message to be used when it is read. + for k, v := range req.Metadata { + // Don't include the values that are saved in MessageID or CorrelationID. + if k == id || k == correlationID { + continue + } + msg.ApplicationProperties[k] = v + } + ttl, ok, err := contrib_metadata.TryGetTTL(req.Metadata) if err != nil { return nil, err @@ -262,6 +273,13 @@ func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.Ha metadata[label] = *msg.Subject } + // Passthrough any custom metadata to the handler. + for key, val := range msg.ApplicationProperties { + if stringVal, ok := val.(string); ok { + metadata[key] = stringVal + } + } + _, err := handler(a.ctx, &bindings.ReadResponse{ Data: msg.Body, Metadata: metadata, diff --git a/tests/certification/bindings/azure/servicebusqueues/README.md b/tests/certification/bindings/azure/servicebusqueues/README.md index 5e9a7da3d2..db96f997d3 100644 --- a/tests/certification/bindings/azure/servicebusqueues/README.md +++ b/tests/certification/bindings/azure/servicebusqueues/README.md @@ -31,6 +31,11 @@ The purpose of this module is to provide tests that certify the Azure Service Bu - Start an application that is guaranteed to fail - Ensure the binding continues to read incoming messages - Ensure the messages that are failed are retried +- Verify Metadata flows through event + - Create an output/input binding + - Run dapr application with components + - Invoke the output binding providing metadata + - Receive the message and validate the metadata ### Future Tests 1. Provide iterations around the different auth mechanisms supported by Azure Service Bus. diff --git a/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go b/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go index 5e30bdbd4f..58737d08a3 100644 --- a/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go +++ b/tests/certification/bindings/azure/servicebusqueues/servicebusqueue_test.go @@ -353,3 +353,72 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) { Step("send and wait", test). Run() } + +func TestServiceBusQueueMetadata(t *testing.T) { + log := logger.NewLogger("dapr.components") + messages := watcher.NewUnordered() + + ports, _ := dapr_testing.GetFreePorts(3) + grpcPort := ports[0] + httpPort := ports[1] + appPort := ports[2] + + test := func(ctx flow.Context) error { + client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort)) + require.NoError(t, err, "Could not initialize dapr client.") + + // Send events that the application above will observe. + ctx.Log("Invoking binding!") + req := &daprClient.InvokeBindingRequest{Name: "sb-binding-1", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"TestMetadata": "Some Metadata"}} + err = client.InvokeOutputBinding(ctx, req) + require.NoError(ctx, err, "error publishing message") + + // Do the messages we observed match what we expect? + messages.Assert(ctx, time.Minute) + + return nil + } + + // Application logic that tracks messages from a topic. + application := func(ctx flow.Context, s common.Service) (err error) { + // Setup the input binding endpoints + err = multierr.Combine(err, + s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + messages.Observe(string(in.Data)) + ctx.Logf("Got message: %s - %+v", string(in.Data), in.Metadata) + require.NotEmpty(t, in.Metadata) + require.Contains(t, in.Metadata, "TestMetadata") + require.Equal(t, "Some Metadata", in.Metadata["TestMetadata"]) + + return []byte("{}"), nil + })) + + return err + } + + flow.New(t, "servicebusqueue certification"). + // Run the application logic above. + Step(app.Run("metadataApp", fmt.Sprintf(":%d", appPort), application)). + Step(sidecar.Run("metadataSidecar", + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), + embedded.WithDaprGRPCPort(grpcPort), + embedded.WithDaprHTTPPort(httpPort), + embedded.WithComponentsPath("./components/standard"), + runtime.WithOutputBindings( + binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding { + return binding_asb.NewAzureServiceBusQueues(log) + }), + ), + runtime.WithInputBindings( + binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding { + return binding_asb.NewAzureServiceBusQueues(log) + }), + ), + runtime.WithSecretStores( + secretstores_loader.New("local.env", func() secretstores.SecretStore { + return secretstore_env.NewEnvSecretStore(log) + }), + ))). + Step("send and wait", test). + Run() +}