/
RPCClient.cs
84 lines (71 loc) · 2.75 KB
/
RPCClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class RpcClient : IDisposable
{
private const string QUEUE_NAME = "rpc_queue";
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> callbackMapper = new();
public RpcClient()
{
var factory = new ConnectionFactory { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
// declare a server-named queue
replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
};
channel.BasicConsume(consumer: consumer,
queue: replyQueueName,
autoAck: true);
}
public Task<string> CallAsync(string message, CancellationToken cancellationToken = default)
{
IBasicProperties props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(exchange: string.Empty,
routingKey: QUEUE_NAME,
basicProperties: props,
body: messageBytes);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out _));
return tcs.Task;
}
public void Dispose()
{
channel.Close();
connection.Close();
}
}
public class Rpc
{
public static async Task Main(string[] args)
{
Console.WriteLine("RPC Client");
string n = args.Length > 0 ? args[0] : "30";
await InvokeAsync(n);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static async Task InvokeAsync(string n)
{
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
}
}