-
Notifications
You must be signed in to change notification settings - Fork 0
/
AzureQueue.cs
executable file
·126 lines (105 loc) · 4.18 KB
/
AzureQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using Communication.AzureQueueDependencies;
using Cryptography;
namespace Communication
{
public class AzureQueue : BaseQueue, IQueue
{
#region private members
private const int MessagePeekTimeInSeconds = 60;
private ICloudQueueWrapper m_queue;
private readonly ICloudQueueClientWrapper m_queueClient;
private bool m_isActive;
private bool m_isInitialized;
private readonly bool m_isEncrypted;
private readonly string m_queueName;
#endregion
public AzureQueue(string queueName, ICloudQueueClientWrapper queueClient, ICryptoActions cryptoActions,
bool isEncrypted) : base(cryptoActions)
{
m_queueClient = queueClient;
m_isEncrypted = isEncrypted;
m_isActive = false;
m_queueName = queueName;
m_isInitialized = false;
}
public async Task InitializeAsync()
{
m_queue = m_queueClient.GetQueueReference(m_queueName);
await m_queue.CreateIfNotExistsAsync();
m_isInitialized = true;
}
/// <summary>
/// Enqueues a message, it will be automatically signed and if chosen (ctor) encrypted as well
/// </summary>
/// <param name="msg">Message.</param>
public async Task EnqueueAsync(byte[] msg)
{
ThrowIfNotInitialized();
var messageInBytes = CreateMessage(msg, m_cryptoActions, m_isEncrypted);
try
{
await m_queue.AddMessageAsync(new CloudQueueMessage(messageInBytes));
}
catch (StorageException ex)
{
Console.WriteLine($"Exception was thrown when trying to push message to queue, exception: {ex}");
throw;
}
}
public Task<string> DequeueAsync(Action<byte[]> callbackOnSuccess, Action<Message> callbackOnFailure)
{
throw new SecureCommunicationException(
"This method signature is not supported for the Azure BaseQueue implementation");
}
/// <summary>
/// Dequeues a message. The signature will be verified, in case of a verification failure a failure callback will be called.
/// The callback receives a single argument which is the decrypted and verified message
/// </summary>
/// <param name="callbackOnSuccess">Callback when message is verified</param>
/// <param name="callbackOnFailure">Callback when verification failed</param>
/// <param name="waitTime">Time to wait between dequeues</param>
public Task DequeueAsync(Action<byte[]> callbackOnSuccess, Action<Message> callbackOnFailure, TimeSpan waitTime)
{
ThrowIfNotInitialized();
m_isActive = true;
CloudQueueMessage retrievedMessage = null;
var dequeueTask = Task.Run(async () =>
{
while (m_isActive)
{
retrievedMessage = await m_queue.GetMessageAsync(TimeSpan.FromSeconds(MessagePeekTimeInSeconds),
new QueueRequestOptions(), new OperationContext());
if (retrievedMessage != null)
{
ProccessMessage(callbackOnSuccess, callbackOnFailure, retrievedMessage.AsBytes);
await m_queue.DeleteMessageAsync(retrievedMessage);
}
Thread.Sleep((int) waitTime.TotalMilliseconds);
}
});
return dequeueTask;
}
/// <summary>
/// Stops the dequeuing process
/// </summary>
public void CancelListeningOnQueue()
{
ThrowIfNotInitialized();
m_isActive = false;
}
#region privateMethods
private void ThrowIfNotInitialized()
{
if (!m_isInitialized)
{
throw new SecureCommunicationException("Object was not initialized");
}
}
#endregion
}
}