Skip to content

Commit

Permalink
Merge pull request #2549 from grisha-kotler/RavenDB-3758
Browse files Browse the repository at this point in the history
RavenDB-3758 Side by Side indexing bug
  • Loading branch information
ayende committed Aug 13, 2015
2 parents 97078ec + 5ecaffc commit 599758f
Show file tree
Hide file tree
Showing 25 changed files with 762 additions and 417 deletions.
1 change: 1 addition & 0 deletions Raven.Abstractions/Data/Constants.cs
Expand Up @@ -76,6 +76,7 @@ static Constants()
public const string MaxNumberOfItemsToProcessInTestIndexes = "Raven/Indexing/MaxNumberOfItemsToProcessInTestIndexes";

public const string IndexReplacePrefix = "Raven/Indexes/Replace/";
public const string SideBySideIndexNamePrefix = "ReplacementOf/";

//Paths
public const string RavenDataDir = "Raven/DataDir";
Expand Down
27 changes: 27 additions & 0 deletions Raven.Abstractions/Data/SideBySideIndexes.cs
@@ -0,0 +1,27 @@
// -----------------------------------------------------------------------
// <copyright file="IndexToAdd.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System;

namespace Raven.Abstractions.Data
{
public class SideBySideIndexes
{
/// <summary>
/// The name of an index that will be added
/// </summary>
public IndexToAdd[] IndexesToAdd { get; set; }

/// <summary>
/// Definition of an index
/// </summary>
public Etag MinimumEtagBeforeReplace { get; set; }

/// <summary>
/// Priority of an index
/// </summary>
public DateTime? ReplaceTimeUtc { get; set; }
}
}
1 change: 1 addition & 0 deletions Raven.Abstractions/Raven.Abstractions.csproj
Expand Up @@ -174,6 +174,7 @@
<Compile Include="Data\BulkInsertOptions.cs" />
<Compile Include="Data\Etag.cs" />
<Compile Include="Data\IndexFailureInformation.cs" />
<Compile Include="Data\SideBySideIndexes.cs" />
<Compile Include="Data\IndexToAdd.cs" />
<Compile Include="Data\IOperationState.cs" />
<Compile Include="Data\MoreLikeThisQueryResult.cs" />
Expand Down
48 changes: 38 additions & 10 deletions Raven.Client.Lightweight/Connection/Async/AsyncServerClient.cs
Expand Up @@ -254,10 +254,17 @@ public Task<string> PutIndexAsync(string name, IndexDefinition indexDef, bool ov
{
return ExecuteWithReplication("PUT", operationMetadata => DirectPutIndexAsync(name, indexDef, overwrite, operationMetadata, token), token);
}

public Task<string[]> PutIndexesAsync(IndexToAdd[] indexesToAdd, CancellationToken token = default(CancellationToken))
{
return ExecuteWithReplication("PUT", operationMetadata => DirectPutIndexesAsync(indexesToAdd, operationMetadata, token), token);
}

public Task<string[]> PutSideBySideIndexesAsync(IndexToAdd[] indexesToAdd, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null, CancellationToken token = default(CancellationToken))
{
return ExecuteWithReplication("PUT", operationMetadata => DirectPutSideBySideIndexesAsync(indexesToAdd, operationMetadata, minimumEtagBeforeReplace, replaceTimeUtc, token), token);
}

public Task<string> PutTransformerAsync(string name, TransformerDefinition transformerDefinition, CancellationToken token = default(CancellationToken))
{
return ExecuteWithReplication("PUT", operationMetadata => DirectPutTransformerAsync(name, transformerDefinition, operationMetadata, token), token);
Expand Down Expand Up @@ -309,14 +316,32 @@ public async Task<string> DirectPutIndexAsync(string name, IndexDefinition index
public async Task<string[]> DirectPutIndexesAsync(IndexToAdd[] indexesToAdd, OperationMetadata operationMetadata, CancellationToken token = default(CancellationToken))
{
var requestUri = operationMetadata.Url + "/indexes";
return await PutIndexes(operationMetadata, token, requestUri, indexesToAdd);
}

public async Task<string[]> DirectPutSideBySideIndexesAsync(IndexToAdd[] indexesToAdd, OperationMetadata operationMetadata, Etag minimumEtagBeforeReplace, DateTime? replaceTimeUtc, CancellationToken token = default(CancellationToken))
{
var sideBySideIndexes = new SideBySideIndexes
{
IndexesToAdd = indexesToAdd,
MinimumEtagBeforeReplace = minimumEtagBeforeReplace,
ReplaceTimeUtc = replaceTimeUtc
};

var requestUri = operationMetadata.Url + "/side-by-side-indexes";
return await PutIndexes(operationMetadata, token, requestUri, sideBySideIndexes);
}

private async Task<string[]> PutIndexes(OperationMetadata operationMetadata, CancellationToken token, string requestUri, object obj)
{
using (var request = jsonRequestFactory.CreateHttpJsonRequest(new CreateHttpJsonRequestParams(this, requestUri, "PUT", operationMetadata.Credentials, convention).AddOperationHeaders(OperationsHeaders)))
{
var serializeObject = JsonConvert.SerializeObject(indexesToAdd, Default.Converters);
var serializeObject = JsonConvert.SerializeObject(obj, Default.Converters);

ErrorResponseException responseException;
try
{
await request.WriteAsync(serializeObject).ConfigureAwait(false);
await request.WriteAsync(serializeObject).WithCancellation(token).ConfigureAwait(false);
var result = await request.ReadResponseJsonAsync().ConfigureAwait(false);
return result
.Value<RavenJArray>("Indexes")
Expand All @@ -325,13 +350,15 @@ public async Task<string[]> DirectPutIndexesAsync(IndexToAdd[] indexesToAdd, Ope
}
catch (ErrorResponseException e)
{
if (e.StatusCode != HttpStatusCode.BadRequest) throw;
if (e.StatusCode != HttpStatusCode.BadRequest)
throw;
responseException = e;
}
var error = await responseException.TryReadErrorResponseObject(new { Error = "", Message = "", IndexDefinitionProperty = "", ProblematicText = "" }).ConfigureAwait(false);
if (error == null) throw responseException;
var error = await responseException.TryReadErrorResponseObject(new {Error = "", Message = "", IndexDefinitionProperty = "", ProblematicText = ""}).ConfigureAwait(false);
if (error == null)
throw responseException;

throw new IndexCompilationException(error.Message) { IndexDefinitionProperty = error.IndexDefinitionProperty, ProblematicText = error.ProblematicText };
throw new IndexCompilationException(error.Message) {IndexDefinitionProperty = error.IndexDefinitionProperty, ProblematicText = error.ProblematicText};
}
}

Expand All @@ -357,12 +384,13 @@ public async Task<string[]> DirectPutIndexesAsync(IndexToAdd[] indexesToAdd, Ope
}
catch (ErrorResponseException e)
{
if (e.StatusCode != HttpStatusCode.BadRequest) throw;

if (e.StatusCode != HttpStatusCode.BadRequest)
throw;
responseException = e;
}
var error = await responseException.TryReadErrorResponseObject(new { Error = "", Message = "" }).ConfigureAwait(false);
if (error == null) throw responseException;
if (error == null)
throw responseException;

throw new TransformCompilationException(error.Message);
}
Expand Down Expand Up @@ -2104,7 +2132,7 @@ public string UrlFor(string documentKey)
public ILowLevelBulkInsertOperation GetBulkInsertOperation(BulkInsertOptions options, IDatabaseChanges changes)
{
if (options.ChunkedBulkInsertOptions != null)
return new ChunkedRemoteBulkInsertOperation(options,this,changes);
return new ChunkedRemoteBulkInsertOperation(options, this, changes);
return new RemoteBulkInsertOperation(options, this, changes);
}

Expand Down
Expand Up @@ -515,6 +515,15 @@ public interface IAsyncDatabaseCommands : IDisposable, IHoldProfilingInformation
/// <param name="token">The cancellation token.</param>
Task<string[]> PutIndexesAsync(IndexToAdd[] indexesToAdd, CancellationToken token = default(CancellationToken));

/// <summary>
/// Creates multiple side by side indexes with the specified name, using given index definitions and priorities
/// </summary>
/// <param name="indexesToAdd">indexes to add</param>
/// <param name="minimumEtagBeforeReplace">minimum index etag before replace</param>
/// <param name="replaceTimeUtc">replace time in utc</param>
/// <param name="token">The cancellation token.</param>
Task<string[]> PutSideBySideIndexesAsync(IndexToAdd[] indexesToAdd, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null, CancellationToken token = default(CancellationToken));

/// <summary>
/// Creates an index with the specified name, based on an index definition
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions Raven.Client.Lightweight/Connection/IDatabaseCommands.cs
Expand Up @@ -471,6 +471,12 @@ public interface IDatabaseCommands : IHoldProfilingInformation
/// <param name="indexesToAdd">indexes to add</param>
string[] PutIndexes(IndexToAdd[] indexesToAdd);

/// <summary>
/// Creates multiple side by side indexes with the specified name, using given index definitions and priorities
/// </summary>
/// <param name="indexesToAdd">indexes to add</param>
string[] PutSideBySideIndexes(IndexToAdd[] indexesToAdd, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null);

/// <summary>
/// Creates an index with the specified name, based on an index definition
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions Raven.Client.Lightweight/Connection/ServerClient.cs
Expand Up @@ -233,6 +233,11 @@ public string[] PutIndexes(IndexToAdd[] indexesToAdd)
return AsyncHelpers.RunSync(() => asyncServerClient.PutIndexesAsync(indexesToAdd));
}

public string[] PutSideBySideIndexes(IndexToAdd[] indexesToAdd, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null)
{
return AsyncHelpers.RunSync(() => asyncServerClient.PutSideBySideIndexesAsync(indexesToAdd, minimumEtagBeforeReplace, replaceTimeUtc));
}

public bool IndexHasChanged(string name, IndexDefinition indexDef)
{
return AsyncHelpers.RunSync(() => asyncServerClient.IndexHasChangedAsync(name, indexDef));
Expand Down
36 changes: 35 additions & 1 deletion Raven.Client.Lightweight/DocumentStoreBase.cs
Expand Up @@ -90,7 +90,7 @@ public void SetListeners(DocumentSessionListeners newListeners)
public abstract IDocumentSession OpenSession(string database);
public abstract IDocumentSession OpenSession(OpenSessionOptions sessionOptions);
public abstract IDatabaseCommands DatabaseCommands { get; }

/// <summary>
/// Executes the index creation.
/// </summary>
Expand All @@ -116,6 +116,40 @@ public virtual void ExecuteIndexes(List<AbstractIndexCreationTask> indexCreation
task.AfterExecute(DatabaseCommands, Conventions);
}

public virtual void SideBySideExecuteIndexes(List<AbstractIndexCreationTask> indexCreationTasks, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null)
{
var indexesToAdd = indexCreationTasks
.Select(x => new IndexToAdd
{
Definition = x.CreateIndexDefinition(),
Name = x.IndexName,
Priority = x.Priority ?? IndexingPriority.Normal
})
.ToArray();

DatabaseCommands.PutSideBySideIndexes(indexesToAdd, minimumEtagBeforeReplace, replaceTimeUtc);

foreach (var task in indexCreationTasks)
task.AfterExecute(DatabaseCommands, Conventions);
}

public virtual async Task SideBySideExecuteIndexesAsync(List<AbstractIndexCreationTask> indexCreationTasks, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null)
{
var indexesToAdd = indexCreationTasks
.Select(x => new IndexToAdd
{
Definition = x.CreateIndexDefinition(),
Name = x.IndexName,
Priority = x.Priority ?? IndexingPriority.Normal
})
.ToArray();

await AsyncDatabaseCommands.PutSideBySideIndexesAsync(indexesToAdd, minimumEtagBeforeReplace, replaceTimeUtc).ConfigureAwait(false);

foreach (var task in indexCreationTasks)
task.AfterExecute(DatabaseCommands, Conventions);
}

/// <summary>
/// Executes the index creation.
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions Raven.Client.Lightweight/IDocumentStore.cs
Expand Up @@ -150,6 +150,10 @@ public interface IDocumentStore : IDisposalNotification
/// </summary>
Task SideBySideExecuteIndexAsync(AbstractIndexCreationTask indexCreationTask, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null);

void SideBySideExecuteIndexes(List<AbstractIndexCreationTask> indexCreationTasks, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null);

Task SideBySideExecuteIndexesAsync(List<AbstractIndexCreationTask> indexCreationTasks, Etag minimumEtagBeforeReplace = null, DateTime? replaceTimeUtc = null);

/// <summary>
/// Executes the index creation.
/// </summary>
Expand Down
28 changes: 17 additions & 11 deletions Raven.Client.Lightweight/Indexes/AbstractIndexCreationTask.cs
Expand Up @@ -226,16 +226,16 @@ public virtual void SideBySideExecute(IDatabaseCommands databaseCommands, Docume
{
Conventions = documentConvention;
var indexDefinition = CreateIndexDefinition();
var replaceIndexName = "ReplacementOf/" + IndexName;

var replaceIndexName = Constants.SideBySideIndexNamePrefix + IndexName;
//check if side by side index exists
var sideBySideDef = databaseCommands.GetIndex(replaceIndexName);
if (sideBySideDef != null)
{
if (CurrentOrLegacyIndexDefinitionEquals(documentConvention, sideBySideDef, indexDefinition))
return;

UpdateSideBySideIndex(databaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, replaceIndexName, indexDefinition);
UpdateSideBySideIndex(databaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, replaceIndexName, indexDefinition, documentConvention);
return;
}

Expand All @@ -246,16 +246,17 @@ public virtual void SideBySideExecute(IDatabaseCommands databaseCommands, Docume
if (CurrentOrLegacyIndexDefinitionEquals(documentConvention, serverDef, indexDefinition))
return;

UpdateSideBySideIndex(databaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, replaceIndexName, indexDefinition);
UpdateSideBySideIndex(databaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, replaceIndexName, indexDefinition, documentConvention);
}
else
{
// since index doesn't exist yet - create it in normal mode
databaseCommands.PutIndex(IndexName, indexDefinition);
AfterExecute(databaseCommands, documentConvention);
}
}

private void UpdateSideBySideIndex(IDatabaseCommands databaseCommands, Etag minimumEtagBeforeReplace, DateTime? replaceTimeUtc, string replaceIndexName, IndexDefinition indexDefinition)
private void UpdateSideBySideIndex(IDatabaseCommands databaseCommands, Etag minimumEtagBeforeReplace, DateTime? replaceTimeUtc, string replaceIndexName, IndexDefinition indexDefinition, DocumentConvention documentConvention)
{
databaseCommands.PutIndex(replaceIndexName, indexDefinition, true);

Expand All @@ -264,6 +265,8 @@ private void UpdateSideBySideIndex(IDatabaseCommands databaseCommands, Etag mini
null,
RavenJObject.FromObject(new IndexReplaceDocument { IndexToReplace = IndexName, MinimumEtagBeforeReplace = minimumEtagBeforeReplace, ReplaceTimeUtc = replaceTimeUtc }),
new RavenJObject());

AfterExecute(databaseCommands, documentConvention);
}

/// <summary>
Expand Down Expand Up @@ -297,13 +300,13 @@ public virtual void Execute(IDatabaseCommands databaseCommands, DocumentConventi

public virtual void AfterExecute(IDatabaseCommands databaseCommands, DocumentConvention documentConvention)
{
if (Conventions.IndexAndTransformerReplicationMode.HasFlag(IndexAndTransformerReplicationMode.Indexes))
if (documentConvention.IndexAndTransformerReplicationMode.HasFlag(IndexAndTransformerReplicationMode.Indexes))
ReplicateIndexesIfNeeded(databaseCommands);
}

public virtual async Task AfterExecuteAsync(IAsyncDatabaseCommands asyncDatabaseCommands, DocumentConvention documentConvention, CancellationToken token = default(CancellationToken))
{
if (Conventions.IndexAndTransformerReplicationMode.HasFlag(IndexAndTransformerReplicationMode.Indexes))
if (documentConvention.IndexAndTransformerReplicationMode.HasFlag(IndexAndTransformerReplicationMode.Indexes))
await ReplicateIndexesIfNeededAsync(asyncDatabaseCommands).ConfigureAwait(false);
}

Expand Down Expand Up @@ -408,15 +411,15 @@ public virtual async Task SideBySideExecuteAsync(IAsyncDatabaseCommands asyncDat
Conventions = documentConvention;
var indexDefinition = CreateIndexDefinition();

var replaceIndexName = "ReplacementOf/" + IndexName;
var replaceIndexName = Constants.SideBySideIndexNamePrefix + IndexName;
//check if side by side index exists
var sideBySideDef = await asyncDatabaseCommands.GetIndexAsync(replaceIndexName, token).ConfigureAwait(false);
if (sideBySideDef != null)
{
if (CurrentOrLegacyIndexDefinitionEquals(documentConvention, sideBySideDef, indexDefinition))
return;

await UpdateSideBySideIndexAsync(asyncDatabaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, token, replaceIndexName, indexDefinition);
await UpdateSideBySideIndexAsync(asyncDatabaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, token, replaceIndexName, indexDefinition, documentConvention);
return;
}

Expand All @@ -426,16 +429,17 @@ public virtual async Task SideBySideExecuteAsync(IAsyncDatabaseCommands asyncDat
if (CurrentOrLegacyIndexDefinitionEquals(documentConvention, serverDef, indexDefinition))
return;

await UpdateSideBySideIndexAsync(asyncDatabaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, token, replaceIndexName, indexDefinition);
await UpdateSideBySideIndexAsync(asyncDatabaseCommands, minimumEtagBeforeReplace, replaceTimeUtc, token, replaceIndexName, indexDefinition, documentConvention);
}
else
{
// since index doesn't exist yet - create it in normal mode
await asyncDatabaseCommands.PutIndexAsync(IndexName, indexDefinition, token).ConfigureAwait(false);
await AfterExecuteAsync(asyncDatabaseCommands, documentConvention, token);
}
}

private async Task UpdateSideBySideIndexAsync(IAsyncDatabaseCommands asyncDatabaseCommands, Etag minimumEtagBeforeReplace, DateTime? replaceTimeUtc, CancellationToken token, string replaceIndexName, IndexDefinition indexDefinition)
private async Task UpdateSideBySideIndexAsync(IAsyncDatabaseCommands asyncDatabaseCommands, Etag minimumEtagBeforeReplace, DateTime? replaceTimeUtc, CancellationToken token, string replaceIndexName, IndexDefinition indexDefinition, DocumentConvention documentConvention)
{
await asyncDatabaseCommands.PutIndexAsync(replaceIndexName, indexDefinition, true, token).ConfigureAwait(false);

Expand All @@ -445,6 +449,8 @@ await asyncDatabaseCommands
RavenJObject.FromObject(new IndexReplaceDocument {IndexToReplace = IndexName, MinimumEtagBeforeReplace = minimumEtagBeforeReplace, ReplaceTimeUtc = replaceTimeUtc}),
new RavenJObject(),
token).ConfigureAwait(false);

await AfterExecuteAsync(asyncDatabaseCommands, documentConvention, token);
}

/// <summary>
Expand Down

0 comments on commit 599758f

Please sign in to comment.