Skip to content

Commit

Permalink
RavenDB-2627 Basic data subscriptions are working (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkadiusz Palinski committed Dec 5, 2014
1 parent dd63822 commit d95fbe5
Show file tree
Hide file tree
Showing 25 changed files with 747 additions and 40 deletions.
3 changes: 3 additions & 0 deletions Raven.Abstractions/Data/Constants.cs
Expand Up @@ -184,5 +184,8 @@ static Constants()

public const string RavenDatabasesPrefix = "Raven/Databases/";
public const string RavenFileSystemsPrefix = "Raven/FileSystems/";

// Subscriptions
public const string RavenSubscriptionsPrefix = "Raven/Subscriptions/";
}
}
24 changes: 24 additions & 0 deletions Raven.Abstractions/Data/SubscriptionBatchOptions.cs
@@ -0,0 +1,24 @@
// -----------------------------------------------------------------------
// <copyright file="SubscriptionBatchOptions.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System;

namespace Raven.Abstractions.Data
{
public class SubscriptionBatchOptions
{
public SubscriptionBatchOptions()
{
MaxDocCount = 4096;
AcknowledgementTimeout = TimeSpan.FromMinutes(1);
}

public int? MaxSize { get; set; }

public int MaxDocCount { get; set; }

public TimeSpan AcknowledgementTimeout { get; set; }
}
}
20 changes: 20 additions & 0 deletions Raven.Abstractions/Data/SubscriptionCriteria.cs
@@ -0,0 +1,20 @@
// -----------------------------------------------------------------------
// <copyright file="SubscriptionCriteria.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System.Collections.Generic;

namespace Raven.Abstractions.Data
{
public class SubscriptionCriteria
{
public string KeyStartsWith { get; set; }

public string BelongsToCollection { get; set; }

public Dictionary<string, object> PropertiesMatch { get; set; }

public Dictionary<string, object> PropertiesNotMatch { get; set; }
}
}
14 changes: 14 additions & 0 deletions Raven.Abstractions/Data/SubscriptionDocument.cs
@@ -0,0 +1,14 @@
// -----------------------------------------------------------------------
// <copyright file="SubscriptionDocument.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
namespace Raven.Abstractions.Data
{
public class SubscriptionDocument
{
public string Name { get; set; }
public SubscriptionCriteria Criteria { get; set; }
public Etag AckEtag { get; set; }
}
}
3 changes: 2 additions & 1 deletion Raven.Abstractions/Data/UuidType.cs
Expand Up @@ -15,6 +15,7 @@ public enum UuidType : byte
Queue = 7,
Tasks = 8,
Indexing = 9,
DocumentReferences = 11
DocumentReferences = 11,
Subscriptions = 12,
}
}
3 changes: 3 additions & 0 deletions Raven.Abstractions/Raven.Abstractions.csproj
Expand Up @@ -214,6 +214,9 @@
<Compile Include="Data\StorageStats.cs" />
<Compile Include="Data\StreamResult.cs" />
<Compile Include="Data\StringDistanceTypes.cs" />
<Compile Include="Data\SubscriptionCriteria.cs" />
<Compile Include="Data\SubscriptionDocument.cs" />
<Compile Include="Data\SubscriptionBatchOptions.cs" />
<Compile Include="Data\SuggestionQuery.cs" />
<Compile Include="Data\SuggestionQueryResult.cs" />
<Compile Include="Data\TaskBasedOperationState.cs" />
Expand Down
16 changes: 13 additions & 3 deletions Raven.Client.Lightweight/Connection/Async/AsyncServerClient.cs
Expand Up @@ -1769,6 +1769,7 @@ public class YieldStreamResults : IAsyncEnumerator<RavenJObject>
private readonly int pageSize;

private readonly RavenPagingInformation pagingInformation;
private readonly bool batchReadingMode;

private readonly Stream stream;
private readonly StreamReader streamReader;
Expand All @@ -1777,20 +1778,21 @@ public class YieldStreamResults : IAsyncEnumerator<RavenJObject>

private bool wasInitialized;

public YieldStreamResults(HttpJsonRequest request, Stream stream, int start = 0, int pageSize = 0, RavenPagingInformation pagingInformation = null)
public YieldStreamResults(HttpJsonRequest request, Stream stream, int start = 0, int pageSize = 0, RavenPagingInformation pagingInformation = null, JsonTextReaderAsync jsonReader = null, bool batchReadingMode = false)
{
this.request = request;
this.start = start;
this.pageSize = pageSize;
this.pagingInformation = pagingInformation;
this.batchReadingMode = batchReadingMode;
this.stream = stream;
streamReader = new StreamReader(stream);
reader = new JsonTextReaderAsync(streamReader);
reader = jsonReader ?? new JsonTextReaderAsync(streamReader);
}

private async Task InitAsync()
{
if (await reader.ReadAsync().ConfigureAwait(false) == false || reader.TokenType != JsonToken.StartObject)
if (batchReadingMode == false && (await reader.ReadAsync().ConfigureAwait(false) == false || reader.TokenType != JsonToken.StartObject))
throw new InvalidOperationException("Unexpected data at start of stream");

if (await reader.ReadAsync().ConfigureAwait(false) == false || reader.TokenType != JsonToken.PropertyName || Equals("Results", reader.Value) == false)
Expand All @@ -1802,6 +1804,9 @@ private async Task InitAsync()

public void Dispose()
{
if(batchReadingMode)
return;

reader.Close();
streamReader.Close();
stream.Close();
Expand Down Expand Up @@ -1861,13 +1866,18 @@ private async Task TryReadNextPageStart()
var err = await reader.ReadAsString().ConfigureAwait(false);
throw new InvalidOperationException("Server error" + Environment.NewLine + err);
default:
if(batchReadingMode)
break;
throw new InvalidOperationException("Unexpected property name: " + reader.Value);
}

}

private async Task EnsureValidEndOfResponse()
{
if (batchReadingMode)
return;

if (reader.TokenType != JsonToken.EndObject && await reader.ReadAsync().ConfigureAwait(false) == false)
throw new InvalidOperationException("Unexpected end of response - missing EndObject token");

Expand Down
Expand Up @@ -696,42 +696,32 @@ public async Task WriteAsync(string data)
}).ConfigureAwait(false);
}

public async Task<HttpResponseMessage> ExecuteRawResponseAsync(string data)
public Task<HttpResponseMessage> ExecuteRawResponseAsync(RavenJToken data)
{
Response = await RunWithAuthRetry(async () =>
{
var rawRequestMessage = new HttpRequestMessage(new HttpMethod(Method), Url)
{
Content = new CompressedStringContent(data, factory.DisableRequestCompression),
};
CopyHeadersToHttpRequestMessage(rawRequestMessage);
var response = await httpClient.SendAsync(rawRequestMessage, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
if (response.IsSuccessStatusCode == false &&
(response.StatusCode == HttpStatusCode.PreconditionFailed ||
response.StatusCode == HttpStatusCode.Forbidden ||
response.StatusCode == HttpStatusCode.Unauthorized))
{
throw new ErrorResponseException(response, "Failed request");
}
return response;
}).ConfigureAwait(false);

return ExecuteRawResponseInternalAsync(new JsonContent(data));
}

ResponseStatusCode = Response.StatusCode;
public Task<HttpResponseMessage> ExecuteRawResponseAsync(string data)
{
return ExecuteRawResponseInternalAsync(new CompressedStringContent(data, factory.DisableRequestCompression));
}

return Response;
public Task<HttpResponseMessage> ExecuteRawResponseAsync()
{
return ExecuteRawResponseInternalAsync(null);
}

public async Task<HttpResponseMessage> ExecuteRawResponseAsync()
private async Task<HttpResponseMessage> ExecuteRawResponseInternalAsync(HttpContent content)
{
Response = await RunWithAuthRetry(async () =>
{
var rawRequestMessage = new HttpRequestMessage(new HttpMethod(Method), Url);
if (content != null)
{
rawRequestMessage.Content = content;
}
CopyHeadersToHttpRequestMessage(rawRequestMessage);
var response = await httpClient.SendAsync(rawRequestMessage, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
Expand Down
9 changes: 9 additions & 0 deletions Raven.Client.Lightweight/Document/IReliableSubscriptions.cs
@@ -0,0 +1,9 @@
using Raven.Abstractions.Data;

namespace Raven.Client.Document
{
public interface IReliableSubscriptions
{
Subscription Create(string name, SubscriptionCriteria criteria, SubscriptionBatchOptions options, string database = null);
}
}
166 changes: 166 additions & 0 deletions Raven.Client.Lightweight/Document/Subscription.cs
@@ -0,0 +1,166 @@
// -----------------------------------------------------------------------
// <copyright file="Subscription.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.IO;
using System.Threading.Tasks;
using Raven.Abstractions.Connection;
using Raven.Abstractions.Data;
using Raven.Abstractions.Extensions;
using Raven.Client.Changes;
using Raven.Client.Connection;
using Raven.Client.Connection.Async;
using Raven.Client.Extensions;
using Raven.Database.Util;
using Raven.Imports.Newtonsoft.Json;
using Raven.Json.Linq;

namespace Raven.Client.Document
{
public class Subscription : IObservable<JsonDocument>, IDisposable
{
private readonly SubscriptionBatchOptions options;
private readonly IAsyncDatabaseCommands commands;
private readonly ConcurrentSet<IObserver<JsonDocument>> subscribers = new ConcurrentSet<IObserver<JsonDocument>>();
private bool open;

public Subscription(string name, SubscriptionBatchOptions options, string database, IDocumentStore documentStore)
{
Name = name;
this.options = options;

commands = database == null ?
documentStore.AsyncDatabaseCommands.ForSystemDatabase() :
documentStore.AsyncDatabaseCommands.ForDatabase(database);
}

public string Name { get; private set; }

public Task Task { get; private set; }

private void Open()
{
Task = OpenConnection()
.ObserveException()
.ContinueWith(task => task.AssertNotFailed());

open = true;
}

private async Task OpenConnection()
{
if (string.IsNullOrEmpty(Name))
throw new InvalidOperationException("Subscription does not have any name");

var subscriptionRequest = commands.CreateRequest("/subscriptions/open?name=" + Name, "POST", disableRequestCompression: true, disableAuthentication: true);

using (var response = await subscriptionRequest.ExecuteRawResponseAsync(RavenJObject.FromObject(options)).ConfigureAwait(false))
{
await response.AssertNotFailingResponse().ConfigureAwait(false);

var responseStream = await response.GetResponseStreamWithHttpDecompression().ConfigureAwait(false);

using (var streamReader = new StreamReader(responseStream))
{
do
{
var jsonReader = new JsonTextReaderAsync(streamReader);

string type = await TryReadType(jsonReader);

if (type == null)
continue;

switch (type)
{
case "Data":
using (var streamedDocs = new AsyncServerClient.YieldStreamResults(subscriptionRequest, responseStream, jsonReader: jsonReader, batchReadingMode: true))
{
while (await streamedDocs.MoveNextAsync().ConfigureAwait(false))
{
var jsonDoc = SerializationHelper.RavenJObjectToJsonDocument(streamedDocs.Current);
foreach (var subscriber in subscribers)
{
subscriber.OnNext(jsonDoc);
}
}
}

var lastProcessedEtagInBatch = await ReadLastProcessedEtag(jsonReader);

await EnsureValidEndOfMessage(jsonReader, streamReader);

using (var acknowledgmentRequest = commands.CreateRequest(
string.Format("/subscriptions/acknowledgeBatch?name={0}&lastEtag={1}", Name, lastProcessedEtagInBatch), "POST"))
{
acknowledgmentRequest.ExecuteRequest();
}

break;
case "Heartbeat":
await EnsureValidEndOfMessage(jsonReader, streamReader);
break;
default:
throw new InvalidOperationException("Unknown type of stream part: " + type);
}
} while (true);
}
}
}

private static async Task EnsureValidEndOfMessage(JsonTextReaderAsync reader, StreamReader streamReader)
{
if (reader.TokenType != JsonToken.EndObject && await reader.ReadAsync().ConfigureAwait(false) == false)
throw new InvalidOperationException("Unexpected end of message - missing EndObject token");

if (reader.TokenType != JsonToken.EndObject)
throw new InvalidOperationException(string.Format("Unexpected token type at the end of the message: {0}. Error: {1}", reader.TokenType, streamReader.ReadToEnd()));
}

private static async Task<string> TryReadType(JsonTextReaderAsync reader)
{
if (await reader.ReadAsync().ConfigureAwait(false) == false || reader.TokenType == JsonToken.None)
return null;

if (reader.TokenType != JsonToken.StartObject)
throw new InvalidOperationException("Invalid subscription stream format. Unexpected toke type:" + reader.TokenType);

if (await reader.ReadAsync().ConfigureAwait(false) && reader.TokenType != JsonToken.PropertyName)
throw new InvalidOperationException("Invalid subscription stream format. Unexpected toke type:" + reader.TokenType);

if (Equals("Type", reader.Value) == false)
throw new InvalidOperationException("Unexpected property name. Got: '" + reader.TokenType + "' instead of 'Type'");

return await reader.ReadAsString().ConfigureAwait(false);
}

private static async Task<Etag> ReadLastProcessedEtag(JsonTextReaderAsync reader)
{
if (Equals("LastProcessedEtag", reader.Value) == false)
throw new InvalidOperationException("Unexpected property name. Got: '" + reader.TokenType + "' instead of 'LastProcessedEtag'");

return Etag.Parse(await reader.ReadAsString().ConfigureAwait(false));
}

public IDisposable Subscribe(IObserver<JsonDocument> observer)
{
if (subscribers.TryAdd(observer))
{
if (!open)
Open();
}

return new DisposableAction(() => subscribers.TryRemove(observer));
}

public void Dispose()
{
foreach (var subscriber in subscribers)
{
subscriber.OnCompleted();
}
}
}
}

0 comments on commit d95fbe5

Please sign in to comment.