diff --git a/src/corelib/Core/Domain/Queues/QueuedMessageList.cs b/src/corelib/Core/Domain/Queues/QueuedMessageList.cs
index 5251e04af..0b09cd617 100644
--- a/src/corelib/Core/Domain/Queues/QueuedMessageList.cs
+++ b/src/corelib/Core/Domain/Queues/QueuedMessageList.cs
@@ -2,110 +2,50 @@
{
using System;
using System.Collections.Generic;
- using System.Collections.ObjectModel;
- using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using net.openstack.Core.Collections;
using net.openstack.Core.Providers;
- using Newtonsoft.Json;
///
- /// Represents a collection of messages stored in a queue in the .
+ /// This class extends the class
+ /// to provide access to the opaque marker used for paginating messages
+ /// in the (via the
+ /// property.
///
///
///
- [JsonObject(MemberSerialization.OptIn)]
- public class QueuedMessageList
+ public class QueuedMessageList : BasicReadOnlyCollectionPage
{
///
- /// This is the backing field for the property.
+ /// This is the backing field for the property.
///
- private static readonly QueuedMessageList _empty = new QueuedMessageList(Enumerable.Empty(), Enumerable.Empty());
-
- ///
- /// This is the backing field for the property.
- ///
- [JsonProperty("links")]
- private Link[] _links;
-
- ///
- /// This is the backing field for the property.
- ///
- [JsonProperty("messages")]
- private QueuedMessage[] _messages;
-
- ///
- /// Initializes a new instance of the class during
- /// JSON deserialization.
- ///
- [JsonConstructor]
- protected QueuedMessageList()
- {
- }
-
+ private QueuedMessageListId _nextPageId;
+
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class
+ /// that is a read-only wrapper around the specified list.
///
- /// A collection of objects describing the messages in a queue.
- /// A collection of objects describing resources related to the list of messages.
- ///
- /// If contains any values.
- /// -or-
- /// If contains any values.
+ /// The list to wrap.
+ /// A function that returns a representing the asynchronous operation to get the next page of items in the collection. If specified, this function implements . If the value is , then will return .
+ /// The identifier of the next page in the message list.
+ ///
+ /// If is .
///
- public QueuedMessageList(IEnumerable messages, IEnumerable links)
+ public QueuedMessageList(IList list, Func>> getNextPageAsync, QueuedMessageListId nextPageId)
+ : base(list, getNextPageAsync)
{
- if (messages != null)
- {
- _messages = messages.ToArray();
- if (_messages.Contains(null))
- throw new ArgumentException("messages cannot contain any null values", "messages");
- }
-
- if (links != null)
- {
- _links = links.ToArray();
- if (_links.Contains(null))
- throw new ArgumentException("links cannot contain any null values", "links");
- }
+ _nextPageId = nextPageId;
}
///
- /// Gets an empty list of messages, which is not specific to any queue.
+ /// Gets the identifier of the next page of the message list.
///
- public static QueuedMessageList Empty
+ public QueuedMessageListId NextPageId
{
get
{
- return _empty;
- }
- }
-
- ///
- /// Gets a collection of objects describing resources related
- /// to this list of messages.
- ///
- public ReadOnlyCollection Links
- {
- get
- {
- if (_links == null)
- return null;
-
- return new ReadOnlyCollection(_links);
- }
- }
-
- ///
- /// Gets a list of objects describing the messages in
- /// the queue.
- ///
- public ReadOnlyCollection Messages
- {
- get
- {
- if (_messages == null)
- return null;
-
- return new ReadOnlyCollection(_messages);
+ return _nextPageId;
}
}
}
diff --git a/src/corelib/Core/Domain/Queues/QueuedMessageListId.cs b/src/corelib/Core/Domain/Queues/QueuedMessageListId.cs
new file mode 100644
index 000000000..f80219788
--- /dev/null
+++ b/src/corelib/Core/Domain/Queues/QueuedMessageListId.cs
@@ -0,0 +1,44 @@
+namespace net.openstack.Core.Domain.Queues
+{
+ using System;
+ using net.openstack.Core;
+ using net.openstack.Core.Providers;
+ using Newtonsoft.Json;
+
+ ///
+ /// Represents the unique identifier of a message list page in the .
+ ///
+ ///
+ ///
+ ///
+ ///
+ [JsonConverter(typeof(QueuedMessageListId.Converter))]
+ public sealed class QueuedMessageListId : ResourceIdentifier
+ {
+ ///
+ /// Initializes a new instance of the class
+ /// with the specified identifier value.
+ ///
+ /// The identifier value.
+ /// If is null.
+ /// If is empty.
+ public QueuedMessageListId(string id)
+ : base(id)
+ {
+ }
+
+ ///
+ /// Provides support for serializing and deserializing
+ /// objects to JSON string values.
+ ///
+ ///
+ private sealed class Converter : ConverterBase
+ {
+ ///
+ protected override QueuedMessageListId FromValue(string id)
+ {
+ return new QueuedMessageListId(id);
+ }
+ }
+ }
+}
diff --git a/src/corelib/Core/Providers/IQueueingService.cs b/src/corelib/Core/Providers/IQueueingService.cs
index 7fdc8b881..6cdb743a3 100644
--- a/src/corelib/Core/Providers/IQueueingService.cs
+++ b/src/corelib/Core/Providers/IQueueingService.cs
@@ -160,7 +160,7 @@ Task GetQueueMetadataAsync(QueueName queueName, CancellationToken cancella
/// Gets a list of messages currently in a queue.
///
/// The queue name.
- /// The object returned by a previous call to . If this value is , the list starts at the beginning.
+ /// The identifier of the message list page to return. This is obtained from . If this value is , the list starts at the beginning.
/// The maximum number of messages to return. If this value is , a provider-specific default value is used.
/// to include messages created by the current client; otherwise, .
/// to include claimed messages; otherwise to return only unclaimed messages.
@@ -170,7 +170,7 @@ Task GetQueueMetadataAsync(QueueName queueName, CancellationToken cancella
/// If is less than or equal to 0.
/// If the REST request does not return successfully.
/// List Messages (OpenStack Marconi API v1 Blueprint)
- Task ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken);
+ Task ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken);
///
/// Gets detailed information about a specific queued message.
diff --git a/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs b/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs
index 7dfcf3f32..30b716677 100644
--- a/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs
+++ b/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs
@@ -338,7 +338,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS
///
/// The queueing service instance.
/// The queue name.
- /// The object returned by a previous call to . If this value is , the list starts at the beginning.
+ /// The identifier of the message list page to return. This is obtained from . If this value is , the list starts at the beginning.
/// The maximum number of messages to return. If this value is , a provider-specific default value is used.
/// to include messages created by the current client; otherwise, .
/// to include claimed messages; otherwise to return only unclaimed messages.
@@ -348,7 +348,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS
/// If is less than or equal to 0.
/// If the REST request does not return successfully.
/// List Messages (OpenStack Marconi API v1 Blueprint)
- public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed)
+ public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed)
{
if (queueingService == null)
throw new ArgumentNullException("queueingService");
diff --git a/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs b/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs
index 867270a78..d04c5890b 100644
--- a/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs
+++ b/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs
@@ -353,7 +353,7 @@ public Task GetQueueStatisticsAsync(QueueName queueName, Cancel
}
///
- public Task ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken)
+ public Task ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken)
{
if (queueName == null)
throw new ArgumentNullException("queueName");
@@ -369,49 +369,60 @@ public Task ListMessagesAsync(QueueName queueName, QueuedMess
{ "echo", echo.ToString() },
{ "include_claimed", includeClaimed.ToString() }
};
-
if (marker != null)
- {
- Link nextLink = null;
- if (marker.Links != null)
+ parameters["marker"] = marker.Value;
+ if (limit != null)
+ parameters["limit"] = limit.ToString();
+
+ Func>, HttpWebRequest> prepareRequest =
+ PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters);
+
+ Func, Task> requestResource =
+ GetResponseAsyncFunc(cancellationToken);
+
+ Func, QueuedMessageList> resultSelector =
+ task =>
{
- foreach (Link link in marker.Links)
+ ReadOnlyCollection messages = null;
+ if (task.Result != null)
+ messages = task.Result.Messages;
+
+ QueuedMessageListId nextMarker = null;
+ if (task.Result != null && task.Result.Links != null)
{
- if (link.Rel == "next")
+ Link nextLink = task.Result.Links.FirstOrDefault(i => string.Equals(i.Rel, "next", StringComparison.OrdinalIgnoreCase));
+ if (nextLink != null)
{
- nextLink = link;
- break;
+ Uri baseUri = new Uri("https://example.com");
+ Uri absoluteUri;
+ if (nextLink.Href.StartsWith("/v1"))
+ absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length));
+ else
+ absoluteUri = new Uri(baseUri, nextLink.Href);
+
+ UriTemplateMatch match = template.Match(baseUri, absoluteUri);
+ if (!string.IsNullOrEmpty(match.BoundVariables["marker"]))
+ nextMarker = new QueuedMessageListId(match.BoundVariables["marker"]);
}
}
- }
-
- if (nextLink == null)
- {
- return InternalTaskExtensions.CompletedTask(QueuedMessageList.Empty);
- }
-
- Uri baseUri = new Uri("https://example.com");
- Uri absoluteUri;
- if (nextLink.Href.StartsWith("/v1"))
- absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length));
- else
- absoluteUri = new Uri(baseUri, nextLink.Href);
-
- UriTemplateMatch match = template.Match(baseUri, absoluteUri);
- parameters.Add("marker", match.BoundVariables["marker"]);
- }
-
- if (limit != null)
- parameters["limit"] = limit.ToString();
- Func>, HttpWebRequest> prepareRequest =
- PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters);
+ if (messages == null || messages.Count == 0)
+ {
+ // use the same marker again
+ messages = messages ?? new ReadOnlyCollection(new QueuedMessage[0]);
+ nextMarker = marker;
+ }
- Func, Task> requestResource =
- GetResponseAsyncFunc(cancellationToken);
+ Func>> getNextPageAsync = null;
+ if (nextMarker != null || messages.Count == 0)
+ {
+ getNextPageAsync =
+ nextCancellationToken => ListMessagesAsync(queueName, nextMarker, limit, echo, includeClaimed, nextCancellationToken)
+ .ContinueWith(t => (ReadOnlyCollectionPage)t.Result, TaskContinuationOptions.ExecuteSynchronously);
+ }
- Func, QueuedMessageList> resultSelector =
- task => (task.Result != null && task.Result.Messages != null ? task.Result : null) ?? QueuedMessageList.Empty;
+ return new QueuedMessageList(messages, getNextPageAsync, nextMarker);
+ };
return AuthenticateServiceAsync(cancellationToken)
.ContinueWith(prepareRequest)
diff --git a/src/corelib/corelib.v3.5.csproj b/src/corelib/corelib.v3.5.csproj
index 1ff256543..31828ab72 100644
--- a/src/corelib/corelib.v3.5.csproj
+++ b/src/corelib/corelib.v3.5.csproj
@@ -103,6 +103,7 @@
+
diff --git a/src/corelib/corelib.v4.0.csproj b/src/corelib/corelib.v4.0.csproj
index 0e63cd109..bf079ca75 100644
--- a/src/corelib/corelib.v4.0.csproj
+++ b/src/corelib/corelib.v4.0.csproj
@@ -93,6 +93,7 @@
+
diff --git a/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs b/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs
index 64736540f..e08c44917 100644
--- a/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs
+++ b/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs
@@ -282,23 +282,23 @@ public async Task TestListAllQueueMessages()
HashSet locatedMessages = new HashSet();
QueuedMessageList messages = await provider.ListMessagesAsync(queueName, null, null, true, false, cancellationTokenSource.Token);
- foreach (QueuedMessage message in messages.Messages)
+ foreach (QueuedMessage message in messages)
Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once.");
- int deletedMessage = messages.Messages[0].Body.ToObject().ValueA;
- await provider.DeleteMessageAsync(queueName, messages.Messages[0].Id, null, cancellationTokenSource.Token);
+ int deletedMessage = messages[0].Body.ToObject().ValueA;
+ await provider.DeleteMessageAsync(queueName, messages[0].Id, null, cancellationTokenSource.Token);
- while (messages.Messages.Count > 0)
+ while (messages.Count > 0)
{
- QueuedMessageList tempList = await provider.ListMessagesAsync(queueName, messages, null, true, false, cancellationTokenSource.Token);
- if (tempList.Messages.Count > 0)
+ QueuedMessageList tempList = await provider.ListMessagesAsync(queueName, messages.NextPageId, null, true, false, cancellationTokenSource.Token);
+ if (tempList.Count > 0)
{
- Assert.IsTrue(locatedMessages.Add(tempList.Messages[0].Body.ToObject().ValueA), "Received the same message more than once.");
- await provider.DeleteMessageAsync(queueName, tempList.Messages[0].Id, null, cancellationTokenSource.Token);
+ Assert.IsTrue(locatedMessages.Add(tempList[0].Body.ToObject().ValueA), "Received the same message more than once.");
+ await provider.DeleteMessageAsync(queueName, tempList[0].Id, null, cancellationTokenSource.Token);
}
- messages = await provider.ListMessagesAsync(queueName, messages, null, true, false, cancellationTokenSource.Token);
- foreach (QueuedMessage message in messages.Messages)
+ messages = await provider.ListMessagesAsync(queueName, messages.NextPageId, null, true, false, cancellationTokenSource.Token);
+ foreach (QueuedMessage message in messages)
{
Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once.");
}
diff --git a/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs b/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs
index 9da071074..94fede3b9 100644
--- a/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs
+++ b/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs
@@ -249,23 +249,23 @@ public void SynchronousTestListAllQueueMessages()
HashSet locatedMessages = new HashSet();
QueuedMessageList messages = provider.ListMessages(queueName, null, null, true, false);
- foreach (QueuedMessage message in messages.Messages)
+ foreach (QueuedMessage message in messages)
Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once.");
- int deletedMessage = messages.Messages[0].Body.ToObject().ValueA;
- provider.DeleteMessage(queueName, messages.Messages[0].Id, null);
+ int deletedMessage = messages[0].Body.ToObject().ValueA;
+ provider.DeleteMessage(queueName, messages[0].Id, null);
- while (messages.Messages.Count > 0)
+ while (messages.Count > 0)
{
- QueuedMessageList tempList = provider.ListMessages(queueName, messages, null, true, false);
- if (tempList.Messages.Count > 0)
+ QueuedMessageList tempList = provider.ListMessages(queueName, messages.NextPageId, null, true, false);
+ if (tempList.Count > 0)
{
- Assert.IsTrue(locatedMessages.Add(tempList.Messages[0].Body.ToObject().ValueA), "Received the same message more than once.");
- provider.DeleteMessage(queueName, tempList.Messages[0].Id, null);
+ Assert.IsTrue(locatedMessages.Add(tempList[0].Body.ToObject().ValueA), "Received the same message more than once.");
+ provider.DeleteMessage(queueName, tempList[0].Id, null);
}
- messages = provider.ListMessages(queueName, messages, null, true, false);
- foreach (QueuedMessage message in messages.Messages)
+ messages = provider.ListMessages(queueName, messages.NextPageId, null, true, false);
+ foreach (QueuedMessage message in messages)
{
Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once.");
}