From ea34baac3a427f0f60bf004dd59d1068bdb9a634 Mon Sep 17 00:00:00 2001 From: Sam Harwell Date: Fri, 3 Jan 2014 13:57:45 -0600 Subject: [PATCH] Updated QueuedMessageList to extend ReadOnlyCollectionPage --- .../Core/Domain/Queues/QueuedMessageList.cs | 108 ++++-------------- .../Core/Domain/Queues/QueuedMessageListId.cs | 44 +++++++ .../Core/Providers/IQueueingService.cs | 4 +- .../Synchronous/QueueingServiceExtensions.cs | 4 +- .../Rackspace/CloudQueuesProvider.cs | 81 +++++++------ src/corelib/corelib.v3.5.csproj | 1 + src/corelib/corelib.v4.0.csproj | 1 + .../Providers/Rackspace/UserQueuesTests.cs | 20 ++-- .../Rackspace/UserQueuesTestsSynchronous.cs | 20 ++-- 9 files changed, 140 insertions(+), 143 deletions(-) create mode 100644 src/corelib/Core/Domain/Queues/QueuedMessageListId.cs diff --git a/src/corelib/Core/Domain/Queues/QueuedMessageList.cs b/src/corelib/Core/Domain/Queues/QueuedMessageList.cs index 5251e04af..0b09cd617 100644 --- a/src/corelib/Core/Domain/Queues/QueuedMessageList.cs +++ b/src/corelib/Core/Domain/Queues/QueuedMessageList.cs @@ -2,110 +2,50 @@ { using System; using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using net.openstack.Core.Collections; using net.openstack.Core.Providers; - using Newtonsoft.Json; /// - /// Represents a collection of messages stored in a queue in the . + /// This class extends the class + /// to provide access to the opaque marker used for paginating messages + /// in the (via the + /// property. /// /// /// - [JsonObject(MemberSerialization.OptIn)] - public class QueuedMessageList + public class QueuedMessageList : BasicReadOnlyCollectionPage { /// - /// This is the backing field for the property. + /// This is the backing field for the property. /// - private static readonly QueuedMessageList _empty = new QueuedMessageList(Enumerable.Empty(), Enumerable.Empty()); - - /// - /// This is the backing field for the property. - /// - [JsonProperty("links")] - private Link[] _links; - - /// - /// This is the backing field for the property. - /// - [JsonProperty("messages")] - private QueuedMessage[] _messages; - - /// - /// Initializes a new instance of the class during - /// JSON deserialization. - /// - [JsonConstructor] - protected QueuedMessageList() - { - } - + private QueuedMessageListId _nextPageId; + /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class + /// that is a read-only wrapper around the specified list. /// - /// A collection of objects describing the messages in a queue. - /// A collection of objects describing resources related to the list of messages. - /// - /// If contains any values. - /// -or- - /// If contains any values. + /// The list to wrap. + /// A function that returns a representing the asynchronous operation to get the next page of items in the collection. If specified, this function implements . If the value is , then will return . + /// The identifier of the next page in the message list. + /// + /// If is . /// - public QueuedMessageList(IEnumerable messages, IEnumerable links) + public QueuedMessageList(IList list, Func>> getNextPageAsync, QueuedMessageListId nextPageId) + : base(list, getNextPageAsync) { - if (messages != null) - { - _messages = messages.ToArray(); - if (_messages.Contains(null)) - throw new ArgumentException("messages cannot contain any null values", "messages"); - } - - if (links != null) - { - _links = links.ToArray(); - if (_links.Contains(null)) - throw new ArgumentException("links cannot contain any null values", "links"); - } + _nextPageId = nextPageId; } /// - /// Gets an empty list of messages, which is not specific to any queue. + /// Gets the identifier of the next page of the message list. /// - public static QueuedMessageList Empty + public QueuedMessageListId NextPageId { get { - return _empty; - } - } - - /// - /// Gets a collection of objects describing resources related - /// to this list of messages. - /// - public ReadOnlyCollection Links - { - get - { - if (_links == null) - return null; - - return new ReadOnlyCollection(_links); - } - } - - /// - /// Gets a list of objects describing the messages in - /// the queue. - /// - public ReadOnlyCollection Messages - { - get - { - if (_messages == null) - return null; - - return new ReadOnlyCollection(_messages); + return _nextPageId; } } } diff --git a/src/corelib/Core/Domain/Queues/QueuedMessageListId.cs b/src/corelib/Core/Domain/Queues/QueuedMessageListId.cs new file mode 100644 index 000000000..f80219788 --- /dev/null +++ b/src/corelib/Core/Domain/Queues/QueuedMessageListId.cs @@ -0,0 +1,44 @@ +namespace net.openstack.Core.Domain.Queues +{ + using System; + using net.openstack.Core; + using net.openstack.Core.Providers; + using Newtonsoft.Json; + + /// + /// Represents the unique identifier of a message list page in the . + /// + /// + /// + /// + /// + [JsonConverter(typeof(QueuedMessageListId.Converter))] + public sealed class QueuedMessageListId : ResourceIdentifier + { + /// + /// Initializes a new instance of the class + /// with the specified identifier value. + /// + /// The identifier value. + /// If is null. + /// If is empty. + public QueuedMessageListId(string id) + : base(id) + { + } + + /// + /// Provides support for serializing and deserializing + /// objects to JSON string values. + /// + /// + private sealed class Converter : ConverterBase + { + /// + protected override QueuedMessageListId FromValue(string id) + { + return new QueuedMessageListId(id); + } + } + } +} diff --git a/src/corelib/Core/Providers/IQueueingService.cs b/src/corelib/Core/Providers/IQueueingService.cs index 7fdc8b881..6cdb743a3 100644 --- a/src/corelib/Core/Providers/IQueueingService.cs +++ b/src/corelib/Core/Providers/IQueueingService.cs @@ -160,7 +160,7 @@ Task GetQueueMetadataAsync(QueueName queueName, CancellationToken cancella /// Gets a list of messages currently in a queue. /// /// The queue name. - /// The object returned by a previous call to . If this value is , the list starts at the beginning. + /// The identifier of the message list page to return. This is obtained from . If this value is , the list starts at the beginning. /// The maximum number of messages to return. If this value is , a provider-specific default value is used. /// to include messages created by the current client; otherwise, . /// to include claimed messages; otherwise to return only unclaimed messages. @@ -170,7 +170,7 @@ Task GetQueueMetadataAsync(QueueName queueName, CancellationToken cancella /// If is less than or equal to 0. /// If the REST request does not return successfully. /// List Messages (OpenStack Marconi API v1 Blueprint) - Task ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken); + Task ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken); /// /// Gets detailed information about a specific queued message. diff --git a/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs b/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs index 7dfcf3f32..30b716677 100644 --- a/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs +++ b/src/corelib/Core/Synchronous/QueueingServiceExtensions.cs @@ -338,7 +338,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS /// /// The queueing service instance. /// The queue name. - /// The object returned by a previous call to . If this value is , the list starts at the beginning. + /// The identifier of the message list page to return. This is obtained from . If this value is , the list starts at the beginning. /// The maximum number of messages to return. If this value is , a provider-specific default value is used. /// to include messages created by the current client; otherwise, . /// to include claimed messages; otherwise to return only unclaimed messages. @@ -348,7 +348,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS /// If is less than or equal to 0. /// If the REST request does not return successfully. /// List Messages (OpenStack Marconi API v1 Blueprint) - public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed) + public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed) { if (queueingService == null) throw new ArgumentNullException("queueingService"); diff --git a/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs b/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs index 867270a78..d04c5890b 100644 --- a/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs +++ b/src/corelib/Providers/Rackspace/CloudQueuesProvider.cs @@ -353,7 +353,7 @@ public Task GetQueueStatisticsAsync(QueueName queueName, Cancel } /// - public Task ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken) + public Task ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken) { if (queueName == null) throw new ArgumentNullException("queueName"); @@ -369,49 +369,60 @@ public Task ListMessagesAsync(QueueName queueName, QueuedMess { "echo", echo.ToString() }, { "include_claimed", includeClaimed.ToString() } }; - if (marker != null) - { - Link nextLink = null; - if (marker.Links != null) + parameters["marker"] = marker.Value; + if (limit != null) + parameters["limit"] = limit.ToString(); + + Func>, HttpWebRequest> prepareRequest = + PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters); + + Func, Task> requestResource = + GetResponseAsyncFunc(cancellationToken); + + Func, QueuedMessageList> resultSelector = + task => { - foreach (Link link in marker.Links) + ReadOnlyCollection messages = null; + if (task.Result != null) + messages = task.Result.Messages; + + QueuedMessageListId nextMarker = null; + if (task.Result != null && task.Result.Links != null) { - if (link.Rel == "next") + Link nextLink = task.Result.Links.FirstOrDefault(i => string.Equals(i.Rel, "next", StringComparison.OrdinalIgnoreCase)); + if (nextLink != null) { - nextLink = link; - break; + Uri baseUri = new Uri("https://example.com"); + Uri absoluteUri; + if (nextLink.Href.StartsWith("/v1")) + absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length)); + else + absoluteUri = new Uri(baseUri, nextLink.Href); + + UriTemplateMatch match = template.Match(baseUri, absoluteUri); + if (!string.IsNullOrEmpty(match.BoundVariables["marker"])) + nextMarker = new QueuedMessageListId(match.BoundVariables["marker"]); } } - } - - if (nextLink == null) - { - return InternalTaskExtensions.CompletedTask(QueuedMessageList.Empty); - } - - Uri baseUri = new Uri("https://example.com"); - Uri absoluteUri; - if (nextLink.Href.StartsWith("/v1")) - absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length)); - else - absoluteUri = new Uri(baseUri, nextLink.Href); - - UriTemplateMatch match = template.Match(baseUri, absoluteUri); - parameters.Add("marker", match.BoundVariables["marker"]); - } - - if (limit != null) - parameters["limit"] = limit.ToString(); - Func>, HttpWebRequest> prepareRequest = - PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters); + if (messages == null || messages.Count == 0) + { + // use the same marker again + messages = messages ?? new ReadOnlyCollection(new QueuedMessage[0]); + nextMarker = marker; + } - Func, Task> requestResource = - GetResponseAsyncFunc(cancellationToken); + Func>> getNextPageAsync = null; + if (nextMarker != null || messages.Count == 0) + { + getNextPageAsync = + nextCancellationToken => ListMessagesAsync(queueName, nextMarker, limit, echo, includeClaimed, nextCancellationToken) + .ContinueWith(t => (ReadOnlyCollectionPage)t.Result, TaskContinuationOptions.ExecuteSynchronously); + } - Func, QueuedMessageList> resultSelector = - task => (task.Result != null && task.Result.Messages != null ? task.Result : null) ?? QueuedMessageList.Empty; + return new QueuedMessageList(messages, getNextPageAsync, nextMarker); + }; return AuthenticateServiceAsync(cancellationToken) .ContinueWith(prepareRequest) diff --git a/src/corelib/corelib.v3.5.csproj b/src/corelib/corelib.v3.5.csproj index 1ff256543..31828ab72 100644 --- a/src/corelib/corelib.v3.5.csproj +++ b/src/corelib/corelib.v3.5.csproj @@ -103,6 +103,7 @@ + diff --git a/src/corelib/corelib.v4.0.csproj b/src/corelib/corelib.v4.0.csproj index 0e63cd109..bf079ca75 100644 --- a/src/corelib/corelib.v4.0.csproj +++ b/src/corelib/corelib.v4.0.csproj @@ -93,6 +93,7 @@ + diff --git a/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs b/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs index 64736540f..e08c44917 100644 --- a/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs +++ b/src/testing/integration/Providers/Rackspace/UserQueuesTests.cs @@ -282,23 +282,23 @@ public async Task TestListAllQueueMessages() HashSet locatedMessages = new HashSet(); QueuedMessageList messages = await provider.ListMessagesAsync(queueName, null, null, true, false, cancellationTokenSource.Token); - foreach (QueuedMessage message in messages.Messages) + foreach (QueuedMessage message in messages) Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once."); - int deletedMessage = messages.Messages[0].Body.ToObject().ValueA; - await provider.DeleteMessageAsync(queueName, messages.Messages[0].Id, null, cancellationTokenSource.Token); + int deletedMessage = messages[0].Body.ToObject().ValueA; + await provider.DeleteMessageAsync(queueName, messages[0].Id, null, cancellationTokenSource.Token); - while (messages.Messages.Count > 0) + while (messages.Count > 0) { - QueuedMessageList tempList = await provider.ListMessagesAsync(queueName, messages, null, true, false, cancellationTokenSource.Token); - if (tempList.Messages.Count > 0) + QueuedMessageList tempList = await provider.ListMessagesAsync(queueName, messages.NextPageId, null, true, false, cancellationTokenSource.Token); + if (tempList.Count > 0) { - Assert.IsTrue(locatedMessages.Add(tempList.Messages[0].Body.ToObject().ValueA), "Received the same message more than once."); - await provider.DeleteMessageAsync(queueName, tempList.Messages[0].Id, null, cancellationTokenSource.Token); + Assert.IsTrue(locatedMessages.Add(tempList[0].Body.ToObject().ValueA), "Received the same message more than once."); + await provider.DeleteMessageAsync(queueName, tempList[0].Id, null, cancellationTokenSource.Token); } - messages = await provider.ListMessagesAsync(queueName, messages, null, true, false, cancellationTokenSource.Token); - foreach (QueuedMessage message in messages.Messages) + messages = await provider.ListMessagesAsync(queueName, messages.NextPageId, null, true, false, cancellationTokenSource.Token); + foreach (QueuedMessage message in messages) { Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once."); } diff --git a/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs b/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs index 9da071074..94fede3b9 100644 --- a/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs +++ b/src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs @@ -249,23 +249,23 @@ public void SynchronousTestListAllQueueMessages() HashSet locatedMessages = new HashSet(); QueuedMessageList messages = provider.ListMessages(queueName, null, null, true, false); - foreach (QueuedMessage message in messages.Messages) + foreach (QueuedMessage message in messages) Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once."); - int deletedMessage = messages.Messages[0].Body.ToObject().ValueA; - provider.DeleteMessage(queueName, messages.Messages[0].Id, null); + int deletedMessage = messages[0].Body.ToObject().ValueA; + provider.DeleteMessage(queueName, messages[0].Id, null); - while (messages.Messages.Count > 0) + while (messages.Count > 0) { - QueuedMessageList tempList = provider.ListMessages(queueName, messages, null, true, false); - if (tempList.Messages.Count > 0) + QueuedMessageList tempList = provider.ListMessages(queueName, messages.NextPageId, null, true, false); + if (tempList.Count > 0) { - Assert.IsTrue(locatedMessages.Add(tempList.Messages[0].Body.ToObject().ValueA), "Received the same message more than once."); - provider.DeleteMessage(queueName, tempList.Messages[0].Id, null); + Assert.IsTrue(locatedMessages.Add(tempList[0].Body.ToObject().ValueA), "Received the same message more than once."); + provider.DeleteMessage(queueName, tempList[0].Id, null); } - messages = provider.ListMessages(queueName, messages, null, true, false); - foreach (QueuedMessage message in messages.Messages) + messages = provider.ListMessages(queueName, messages.NextPageId, null, true, false); + foreach (QueuedMessage message in messages) { Assert.IsTrue(locatedMessages.Add(message.Body.ToObject().ValueA), "Received the same message more than once."); }