-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
64 lines (55 loc) · 2.98 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Consumer;
using RabbitMQ.Next.Publisher;
using RabbitMQ.Next.Serialization.Dynamic;
using RabbitMQ.Next.Serialization.MessagePack;
using RabbitMQ.Next.Serialization.SystemJson;
namespace RabbitMQ.Next.Examples.DynamicSerializer;
internal static class Program
{
private static async Task Main()
{
await using var connection = ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.UseDynamicSerializer(serializer => serializer
.When(m => "application/json".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseSystemJsonSerializer()
.When(m => "application/msgpack".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseMessagePackSerializer()
.When(_ => true).UseSystemJsonSerializer()
)
.Build();
Console.WriteLine("Connection opened");
await PublishMessagesAsync(connection);
Console.WriteLine("Messages were publisher, press any key to consume the messages.");
Console.ReadKey();
await ConsumeMessagesAsync(connection);
Console.WriteLine("Done");
}
private static async Task PublishMessagesAsync(IConnection connection)
{
await using var publisher = connection.Publisher("amq.fanout");
// The message will be formatted using MessagePackSerializer, because there is corresponding registration
await publisher.PublishAsync(new DummyDto { SomeProperty = "some message with msgpack content type"},
message => message.SetContentType("application/msgpack"));
// The message will be formatted using SystemJsonSerializer, because there is corresponding registration
await publisher.PublishAsync(new DummyDto { SomeProperty = "some message with json content type"},
message => message.SetContentType("application/json"));
// The last two messages will be formatted using SystemJsonSerializer, because the last registered serializer accept any messages
await publisher.PublishAsync(new DummyDto { SomeProperty = "some message without specified content type"});
await publisher.PublishAsync(new DummyDto { SomeProperty = "some message with unknown content type"},
message => message.SetContentType("application/x-unknown"));
}
private static async Task ConsumeMessagesAsync(IConnection connection)
{
await using var consumer = connection.Consumer(
builder => builder
.BindToQueue("my-queue")
.PrefetchCount(10));
var cancellation = new CancellationTokenSource(10_000); // simply cancel after 10 seconds
await consumer.ConsumeAsync((message, content) =>
{
Console.WriteLine($"Message content-type: {message.ContentType}, {content.Get<DummyDto>().SomeProperty}");
} ,cancellation.Token);
}
}