Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 24 additions & 84 deletions src/corelib/Core/Domain/Queues/QueuedMessageList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,110 +2,50 @@
{
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using net.openstack.Core.Collections;
using net.openstack.Core.Providers;
using Newtonsoft.Json;

/// <summary>
/// Represents a collection of messages stored in a queue in the <see cref="IQueueingService"/>.
/// This class extends the <see cref="ReadOnlyCollectionPage{T}"/> class
/// to provide access to the opaque marker used for paginating messages
/// in the <see cref="IQueueingService"/> (via the <see cref="NextPageId"/>
/// property.
/// </summary>
/// <threadsafety static="true" instance="false"/>
/// <preliminary/>
[JsonObject(MemberSerialization.OptIn)]
public class QueuedMessageList
public class QueuedMessageList : BasicReadOnlyCollectionPage<QueuedMessage>
{
/// <summary>
/// This is the backing field for the <see cref="Empty"/> property.
/// This is the backing field for the <see cref="NextPageId"/> property.
/// </summary>
private static readonly QueuedMessageList _empty = new QueuedMessageList(Enumerable.Empty<QueuedMessage>(), Enumerable.Empty<Link>());

/// <summary>
/// This is the backing field for the <see cref="Links"/> property.
/// </summary>
[JsonProperty("links")]
private Link[] _links;

/// <summary>
/// This is the backing field for the <see cref="Messages"/> property.
/// </summary>
[JsonProperty("messages")]
private QueuedMessage[] _messages;

/// <summary>
/// Initializes a new instance of the <see cref="QueuedMessageList"/> class during
/// JSON deserialization.
/// </summary>
[JsonConstructor]
protected QueuedMessageList()
{
}

private QueuedMessageListId _nextPageId;

/// <summary>
/// Initializes a new instance of the <see cref="QueuedMessageList"/> class.
/// Initializes a new instance of the <see cref="BasicReadOnlyCollectionPage{T}"/> class
/// that is a read-only wrapper around the specified list.
/// </summary>
/// <param name="messages">A collection of <see cref="QueuedMessage"/> objects describing the messages in a queue.</param>
/// <param name="links">A collection of <see cref="Link"/> objects describing resources related to the list of messages.</param>
/// <exception cref="ArgumentException">
/// If <paramref name="messages"/> contains any <see langword="null"/> values.
/// <para>-or-</para>
/// <para>If <paramref name="links"/> contains any <see langword="null"/> values.</para>
/// <param name="list">The list to wrap.</param>
/// <param name="getNextPageAsync">A function that returns a <see cref="Task{TResult}"/> representing the asynchronous operation to get the next page of items in the collection. If specified, this function implements <see cref="BasicReadOnlyCollectionPage{T}.GetNextPageAsync"/>. If the value is <see langword="null"/>, then <see cref="BasicReadOnlyCollectionPage{T}.CanHaveNextPage"/> will return <see langword="false"/>.</param>
/// <param name="nextPageId">The identifier of the next page in the message list.</param>
/// <exception cref="ArgumentNullException">
/// If <paramref name="list"/> is <see langword="null"/>.
/// </exception>
public QueuedMessageList(IEnumerable<QueuedMessage> messages, IEnumerable<Link> links)
public QueuedMessageList(IList<QueuedMessage> list, Func<CancellationToken, Task<ReadOnlyCollectionPage<QueuedMessage>>> getNextPageAsync, QueuedMessageListId nextPageId)
: base(list, getNextPageAsync)
{
if (messages != null)
{
_messages = messages.ToArray();
if (_messages.Contains(null))
throw new ArgumentException("messages cannot contain any null values", "messages");
}

if (links != null)
{
_links = links.ToArray();
if (_links.Contains(null))
throw new ArgumentException("links cannot contain any null values", "links");
}
_nextPageId = nextPageId;
}

/// <summary>
/// Gets an empty list of messages, which is not specific to any queue.
/// Gets the identifier of the next page of the message list.
/// </summary>
public static QueuedMessageList Empty
public QueuedMessageListId NextPageId
{
get
{
return _empty;
}
}

/// <summary>
/// Gets a collection of <see cref="Link"/> objects describing resources related
/// to this list of messages.
/// </summary>
public ReadOnlyCollection<Link> Links
{
get
{
if (_links == null)
return null;

return new ReadOnlyCollection<Link>(_links);
}
}

/// <summary>
/// Gets a list of <see cref="QueuedMessage"/> objects describing the messages in
/// the queue.
/// </summary>
public ReadOnlyCollection<QueuedMessage> Messages
{
get
{
if (_messages == null)
return null;

return new ReadOnlyCollection<QueuedMessage>(_messages);
return _nextPageId;
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions src/corelib/Core/Domain/Queues/QueuedMessageListId.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace net.openstack.Core.Domain.Queues
{
using System;
using net.openstack.Core;
using net.openstack.Core.Providers;
using Newtonsoft.Json;

/// <summary>
/// Represents the unique identifier of a message list page in the <see cref="IQueueingService"/>.
/// </summary>
/// <seealso cref="IQueueingService.ListMessagesAsync"/>
/// <seealso cref="QueuedMessageList.NextPageId"/>
/// <threadsafety static="true" instance="false"/>
/// <preliminary/>
[JsonConverter(typeof(QueuedMessageListId.Converter))]
public sealed class QueuedMessageListId : ResourceIdentifier<QueuedMessageListId>
{
/// <summary>
/// Initializes a new instance of the <see cref="QueuedMessageListId"/> class
/// with the specified identifier value.
/// </summary>
/// <param name="id">The identifier value.</param>
/// <exception cref="ArgumentNullException">If <paramref name="id"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">If <paramref name="id"/> is empty.</exception>
public QueuedMessageListId(string id)
: base(id)
{
}

/// <summary>
/// Provides support for serializing and deserializing <see cref="QueuedMessageListId"/>
/// objects to JSON string values.
/// </summary>
/// <threadsafety static="true" instance="false"/>
private sealed class Converter : ConverterBase
{
/// <inheritdoc/>
protected override QueuedMessageListId FromValue(string id)
{
return new QueuedMessageListId(id);
}
}
}
}
4 changes: 2 additions & 2 deletions src/corelib/Core/Providers/IQueueingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Task<T> GetQueueMetadataAsync<T>(QueueName queueName, CancellationToken cancella
/// Gets a list of messages currently in a queue.
/// </summary>
/// <param name="queueName">The queue name.</param>
/// <param name="marker">The <see cref="QueuedMessageList"/> object returned by a previous call to <see cref="ListMessagesAsync"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
/// <param name="marker">The identifier of the message list page to return. This is obtained from <see cref="QueuedMessageList.NextPageId"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
/// <param name="limit">The maximum number of messages to return. If this value is <see langword="null"/>, a provider-specific default value is used.</param>
/// <param name="echo"><see langword="true"/> to include messages created by the current client; otherwise, <see langword="false"/>.</param>
/// <param name="includeClaimed"><see langword="true"/> to include claimed messages; otherwise <see langword="false"/> to return only unclaimed messages.</param>
Expand All @@ -170,7 +170,7 @@ Task<T> GetQueueMetadataAsync<T>(QueueName queueName, CancellationToken cancella
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
/// <exception cref="WebException">If the REST request does not return successfully.</exception>
/// <seealso href="https://wiki.openstack.org/w/index.php?title=Marconi/specs/api/v1#List_Messages">List Messages (OpenStack Marconi API v1 Blueprint)</seealso>
Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken);
Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken);

/// <summary>
/// Gets detailed information about a specific queued message.
Expand Down
4 changes: 2 additions & 2 deletions src/corelib/Core/Synchronous/QueueingServiceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS
/// </summary>
/// <param name="queueingService">The queueing service instance.</param>
/// <param name="queueName">The queue name.</param>
/// <param name="marker">The <see cref="QueuedMessageList"/> object returned by a previous call to <see cref="IQueueingService.ListMessagesAsync"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
/// <param name="marker">The identifier of the message list page to return. This is obtained from <see cref="QueuedMessageList.NextPageId"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
/// <param name="limit">The maximum number of messages to return. If this value is <see langword="null"/>, a provider-specific default value is used.</param>
/// <param name="echo"><see langword="true"/> to include messages created by the current client; otherwise, <see langword="false"/>.</param>
/// <param name="includeClaimed"><see langword="true"/> to include claimed messages; otherwise <see langword="false"/> to return only unclaimed messages.</param>
Expand All @@ -348,7 +348,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
/// <exception cref="WebException">If the REST request does not return successfully.</exception>
/// <seealso href="https://wiki.openstack.org/w/index.php?title=Marconi/specs/api/v1#List_Messages">List Messages (OpenStack Marconi API v1 Blueprint)</seealso>
public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed)
public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed)
{
if (queueingService == null)
throw new ArgumentNullException("queueingService");
Expand Down
81 changes: 46 additions & 35 deletions src/corelib/Providers/Rackspace/CloudQueuesProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public Task<QueueStatistics> GetQueueStatisticsAsync(QueueName queueName, Cancel
}

/// <inheritdoc/>
public Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken)
public Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken)
{
if (queueName == null)
throw new ArgumentNullException("queueName");
Expand All @@ -369,49 +369,60 @@ public Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMess
{ "echo", echo.ToString() },
{ "include_claimed", includeClaimed.ToString() }
};

if (marker != null)
{
Link nextLink = null;
if (marker.Links != null)
parameters["marker"] = marker.Value;
if (limit != null)
parameters["limit"] = limit.ToString();

Func<Task<Tuple<IdentityToken, Uri>>, HttpWebRequest> prepareRequest =
PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters);

Func<Task<HttpWebRequest>, Task<ListCloudQueueMessagesResponse>> requestResource =
GetResponseAsyncFunc<ListCloudQueueMessagesResponse>(cancellationToken);

Func<Task<ListCloudQueueMessagesResponse>, QueuedMessageList> resultSelector =
task =>
{
foreach (Link link in marker.Links)
ReadOnlyCollection<QueuedMessage> messages = null;
if (task.Result != null)
messages = task.Result.Messages;

QueuedMessageListId nextMarker = null;
if (task.Result != null && task.Result.Links != null)
{
if (link.Rel == "next")
Link nextLink = task.Result.Links.FirstOrDefault(i => string.Equals(i.Rel, "next", StringComparison.OrdinalIgnoreCase));
if (nextLink != null)
{
nextLink = link;
break;
Uri baseUri = new Uri("https://example.com");
Uri absoluteUri;
if (nextLink.Href.StartsWith("/v1"))
absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length));
else
absoluteUri = new Uri(baseUri, nextLink.Href);

UriTemplateMatch match = template.Match(baseUri, absoluteUri);
if (!string.IsNullOrEmpty(match.BoundVariables["marker"]))
nextMarker = new QueuedMessageListId(match.BoundVariables["marker"]);
}
}
}

if (nextLink == null)
{
return InternalTaskExtensions.CompletedTask(QueuedMessageList.Empty);
}

Uri baseUri = new Uri("https://example.com");
Uri absoluteUri;
if (nextLink.Href.StartsWith("/v1"))
absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length));
else
absoluteUri = new Uri(baseUri, nextLink.Href);

UriTemplateMatch match = template.Match(baseUri, absoluteUri);
parameters.Add("marker", match.BoundVariables["marker"]);
}

if (limit != null)
parameters["limit"] = limit.ToString();

Func<Task<Tuple<IdentityToken, Uri>>, HttpWebRequest> prepareRequest =
PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters);
if (messages == null || messages.Count == 0)
{
// use the same marker again
messages = messages ?? new ReadOnlyCollection<QueuedMessage>(new QueuedMessage[0]);
nextMarker = marker;
}

Func<Task<HttpWebRequest>, Task<QueuedMessageList>> requestResource =
GetResponseAsyncFunc<QueuedMessageList>(cancellationToken);
Func<CancellationToken, Task<ReadOnlyCollectionPage<QueuedMessage>>> getNextPageAsync = null;
if (nextMarker != null || messages.Count == 0)
{
getNextPageAsync =
nextCancellationToken => ListMessagesAsync(queueName, nextMarker, limit, echo, includeClaimed, nextCancellationToken)
.ContinueWith(t => (ReadOnlyCollectionPage<QueuedMessage>)t.Result, TaskContinuationOptions.ExecuteSynchronously);
}

Func<Task<QueuedMessageList>, QueuedMessageList> resultSelector =
task => (task.Result != null && task.Result.Messages != null ? task.Result : null) ?? QueuedMessageList.Empty;
return new QueuedMessageList(messages, getNextPageAsync, nextMarker);
};

return AuthenticateServiceAsync(cancellationToken)
.ContinueWith(prepareRequest)
Expand Down
1 change: 1 addition & 0 deletions src/corelib/corelib.v3.5.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
<Compile Include="Core\Domain\Queues\NamespaceDoc.cs" />
<Compile Include="Core\Domain\Queues\QueuedMessage.cs" />
<Compile Include="Core\Domain\Queues\QueuedMessageList.cs" />
<Compile Include="Core\Domain\Queues\QueuedMessageListId.cs" />
<Compile Include="Core\Domain\Queues\QueueMessagesStatistics.cs" />
<Compile Include="Core\Domain\Queues\QueueName.cs" />
<Compile Include="Core\Domain\Queues\QueueStatistics.cs" />
Expand Down
1 change: 1 addition & 0 deletions src/corelib/corelib.v4.0.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<Compile Include="Core\Domain\Queues\NamespaceDoc.cs" />
<Compile Include="Core\Domain\Queues\QueuedMessage.cs" />
<Compile Include="Core\Domain\Queues\QueuedMessageList.cs" />
<Compile Include="Core\Domain\Queues\QueuedMessageListId.cs" />
<Compile Include="Core\Domain\Queues\QueueMessagesStatistics.cs" />
<Compile Include="Core\Domain\Queues\QueueName.cs" />
<Compile Include="Core\Domain\Queues\QueueStatistics.cs" />
Expand Down
Loading