Skip to content

Commit

Permalink
Allow metadata to flow through service bus queue (dapr#1911)
Browse files Browse the repository at this point in the history
* Allow metadata to flow through service bus queue

This commit lets metadata bind to a message and be read when
the message is received. This allows for the distributed tracing
of events.

Signed-off-by: Hal Spang <halspang@microsoft.com>

* Remove saved fields from ApplicationProperties

Signed-off-by: Hal Spang <halspang@microsoft.com>

* Add a certification test for metadata

Signed-off-by: Hal Spang <halspang@microsoft.com>

Signed-off-by: Hal Spang <halspang@microsoft.com>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
Signed-off-by: Andrew Duss <andy.duss@storable.com>
  • Loading branch information
3 people authored and Andrew Duss committed Aug 18, 2022
1 parent 1b67380 commit 71e37bf
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
20 changes: 19 additions & 1 deletion bindings/azure/servicebusqueues/servicebusqueues.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,25 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke
}

msg := &servicebus.Message{
Body: req.Data,
Body: req.Data,
ApplicationProperties: make(map[string]interface{}),
}
if val, ok := req.Metadata[id]; ok && val != "" {
msg.MessageID = &val
}
if val, ok := req.Metadata[correlationID]; ok && val != "" {
msg.CorrelationID = &val
}

// Include incoming metadata in the message to be used when it is read.
for k, v := range req.Metadata {
// Don't include the values that are saved in MessageID or CorrelationID.
if k == id || k == correlationID {
continue
}
msg.ApplicationProperties[k] = v
}

ttl, ok, err := contrib_metadata.TryGetTTL(req.Metadata)
if err != nil {
return nil, err
Expand Down Expand Up @@ -262,6 +273,13 @@ func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.Ha
metadata[label] = *msg.Subject
}

// Passthrough any custom metadata to the handler.
for key, val := range msg.ApplicationProperties {
if stringVal, ok := val.(string); ok {
metadata[key] = stringVal
}
}

_, err := handler(a.ctx, &bindings.ReadResponse{
Data: msg.Body,
Metadata: metadata,
Expand Down
5 changes: 5 additions & 0 deletions tests/certification/bindings/azure/servicebusqueues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ The purpose of this module is to provide tests that certify the Azure Service Bu
- Start an application that is guaranteed to fail
- Ensure the binding continues to read incoming messages
- Ensure the messages that are failed are retried
- Verify Metadata flows through event
- Create an output/input binding
- Run dapr application with components
- Invoke the output binding providing metadata
- Receive the message and validate the metadata

### Future Tests
1. Provide iterations around the different auth mechanisms supported by Azure Service Bus.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,72 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
Step("send and wait", test).
Run()
}

func TestServiceBusQueueMetadata(t *testing.T) {
log := logger.NewLogger("dapr.components")
messages := watcher.NewUnordered()

ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]

test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")

// Send events that the application above will observe.
ctx.Log("Invoking binding!")
req := &daprClient.InvokeBindingRequest{Name: "sb-binding-1", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"TestMetadata": "Some Metadata"}}
err = client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")

// Do the messages we observed match what we expect?
messages.Assert(ctx, time.Minute)

return nil
}

// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s - %+v", string(in.Data), in.Metadata)
require.NotEmpty(t, in.Metadata)
require.Contains(t, in.Metadata, "TestMetadata")
require.Equal(t, "Some Metadata", in.Metadata["TestMetadata"])

return []byte("{}"), nil
}))

return err
}

flow.New(t, "servicebusqueue certification").
// Run the application logic above.
Step(app.Run("metadataApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("metadataSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/standard"),
runtime.WithOutputBindings(
binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithInputBindings(
binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithSecretStores(
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(log)
}),
))).
Step("send and wait", test).
Run()
}

0 comments on commit 71e37bf

Please sign in to comment.