Skip to content

Commit

Permalink
Proper syncing of how we handle the etag for future batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ayende committed Oct 28, 2012
1 parent 939e217 commit 187f63a
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 47 deletions.
6 changes: 5 additions & 1 deletion Raven.Database/DocumentDatabase.cs
Expand Up @@ -100,6 +100,10 @@ public class DocumentDatabase : IUuidGenerator, IDisposable

private readonly WorkContext workContext;
private readonly IndexingExecuter indexingExecuter;
public IndexingExecuter IndexingExecuter
{
get { return indexingExecuter; }
}

private readonly ConcurrentDictionary<Guid, CommittableTransaction> promotedTransactions = new ConcurrentDictionary<Guid, CommittableTransaction>();

Expand Down Expand Up @@ -1640,7 +1644,7 @@ public static string BuildVersion

static string productVersion;
private volatile bool disposed;
private ValidateLicense validateLicense;
private readonly ValidateLicense validateLicense;
public string ServerUrl
{
get
Expand Down
105 changes: 69 additions & 36 deletions Raven.Database/Indexing/IndexingExecuter.cs
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Raven.Abstractions;
Expand Down Expand Up @@ -59,10 +60,16 @@ protected override IndexToWorkOn GetIndexToWorkOn(IndexStats indexesStat)
private class FutureIndexBatch
{
public Guid StartingEtag;
public Task<JsonDocument[]> Task;
public Task<JsonResults> Task;
public int Age;
}

public class JsonResults
{
public JsonDocument[] Results;
public bool LoadedFromDisk;
}

private int currentIndexingAge;

private readonly ConcurrentSet<FutureIndexBatch> futureIndexBatches = new ConcurrentSet<FutureIndexBatch>();
Expand All @@ -79,23 +86,26 @@ protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn

var operationCancelled = false;
TimeSpan indexingDuration = TimeSpan.Zero;
JsonDocument[] jsonDocs = null;
JsonResults jsonDocs = null;
try
{
jsonDocs = GetJsonDocuments(lastIndexedGuidForAllIndexes);

Log.Debug("Found a total of {0} documents that requires indexing since etag: {1}",
jsonDocs.Length, lastIndexedGuidForAllIndexes);
if(Log.IsDebugEnabled)
{
Log.Debug("Found a total of {0} documents that requires indexing since etag: {1}: ({2})",
jsonDocs.Results.Length, lastIndexedGuidForAllIndexes, string.Join(", ", jsonDocs.Results.Select(x=>x.Key)));
}

context.ReportIndexingActualBatchSize(jsonDocs.Length);
context.ReportIndexingActualBatchSize(jsonDocs.Results.Length);
context.CancellationToken.ThrowIfCancellationRequested();

MaybeAddFutureBatch(jsonDocs);

if (jsonDocs.Length > 0)
if (jsonDocs.Results.Length > 0)
{
context.IndexedPerSecIncreaseBy(jsonDocs.Length);
var result = FilterIndexes(indexesToWorkOn, jsonDocs).ToList();
context.IndexedPerSecIncreaseBy(jsonDocs.Results.Length);
var result = FilterIndexes(indexesToWorkOn, jsonDocs.Results).ToList();
indexesToWorkOn = result.Select(x => x.Item1).ToList();
var sw = Stopwatch.StartNew();
BackgroundTaskExecuter.Instance.ExecuteAll(context, result, (indexToWorkOn, _) =>
Expand All @@ -115,16 +125,16 @@ protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn
}
finally
{
if (operationCancelled == false && jsonDocs != null && jsonDocs.Length > 0)
if (operationCancelled == false && jsonDocs != null && jsonDocs.Results.Length > 0)
{
var lastByEtag = GetHighestEtag(jsonDocs);
var lastByEtag = GetHighestEtag(jsonDocs.Results);
var lastModified = lastByEtag.LastModified.Value;
var lastEtag = lastByEtag.Etag.Value;

if (Log.IsDebugEnabled)
{
Log.Debug("Aftering indexing {0} documents, the new last etag for is: {1} for {2}",
jsonDocs.Length,
jsonDocs.Results.Length,
lastEtag,
string.Join(", ", indexesToWorkOn.Select(x => x.IndexName))
);
Expand All @@ -140,7 +150,7 @@ protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn
}
});

UpdateAutoThrottler(jsonDocs, indexingDuration);
UpdateAutoThrottler(jsonDocs.Results, indexingDuration);
}

// make sure that we don't have too much "future cache" items
Expand All @@ -157,29 +167,29 @@ private void UpdateAutoThrottler(JsonDocument[] jsonDocs, TimeSpan indexingDurat
var futureLen = futureIndexBatches.Sum(x =>
{
if (x.Task.IsCompleted)
return x.Task.Result.Length;
return x.Task.Result.Results.Length;
return autoTuner.NumberOfItemsToIndexInSingleBatch / 15;
});

var futureSize = futureIndexBatches.Sum(x =>
{
if (x.Task.IsCompleted)
return x.Task.Result.Sum(s => s.SerializedSizeOnDisk);
return x.Task.Result.Results.Sum(s => s.SerializedSizeOnDisk);
return autoTuner.NumberOfItemsToIndexInSingleBatch * 256;
});
autoTuner.AutoThrottleBatchSize(jsonDocs.Length + futureLen, futureSize + jsonDocs.Sum(x => x.SerializedSizeOnDisk), indexingDuration);
}

private JsonDocument[] GetJsonDocuments(Guid lastIndexedGuidForAllIndexes)
public JsonResults GetJsonDocuments(Guid lastIndexedGuidForAllIndexes)
{
var futureResults = GetFutureJsonDocuments(lastIndexedGuidForAllIndexes);
if (futureResults != null)
return futureResults;
return GetJsonDocs(lastIndexedGuidForAllIndexes);
}

private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
private JsonResults GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
{
var nextBatch = futureIndexBatches.FirstOrDefault(x => x.StartingEtag == lastIndexedGuidForAllIndexes);
if (nextBatch == null)
Expand All @@ -195,13 +205,13 @@ private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
}
var timeToWait = 500;

var items = new List<JsonDocument[]>
var items = new List<JsonResults>
{
nextBatch.Task.Result
};
while (true)
{
var nextHighestEtag = GetNextHighestEtag(nextBatch.Task.Result);
var nextHighestEtag = GetNextHighestEtag(nextBatch.Task.Result.Results);
nextBatch = futureIndexBatches.FirstOrDefault(x => x.StartingEtag == nextHighestEtag);
if (nextBatch == null)
{
Expand All @@ -222,9 +232,13 @@ private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
// while we were fetching things, so we have several versions
// of the same doc loaded, this will make sure that we will only
// take one of them.
return items.SelectMany(x => x).GroupBy(x => x.Key)
.Select(g => g.OrderBy(x => x.Etag).First())
.ToArray();
return new JsonResults
{
Results = items.SelectMany(x => x.Results).GroupBy(x => x.Key)
.Select(g => g.OrderBy(x => x.Etag).First())
.ToArray(),
LoadedFromDisk = items.Aggregate(false, (prev, results) => prev | results.LoadedFromDisk)
};
}
catch (Exception e)
{
Expand All @@ -233,18 +247,20 @@ private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
}
}

private void MaybeAddFutureBatch(JsonDocument[] past)


private void MaybeAddFutureBatch(JsonResults past)
{
if (context.Configuration.MaxNumberOfParallelIndexTasks == 1)
return;
if (past.Length == 0)
if (past.Results.Length == 0 || past.LoadedFromDisk == false)
return;
if (futureIndexBatches.Count > 5) // we limit the number of future calls we do
{
var alreadyLoaded = futureIndexBatches.Sum(x =>
{
if (x.Task.IsCompleted)
return x.Task.Result.Length;
return x.Task.Result.Results.Length;
return 0;
});

Expand All @@ -257,7 +273,7 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
return;

// we loaded the maximum amount, there are probably more items to read now.
var nextEtag = GetNextHighestEtag(past);
var nextEtag = GetNextHighestEtag(past.Results);

var nextBatch = futureIndexBatches.FirstOrDefault(x => x.StartingEtag == nextEtag);

Expand All @@ -278,7 +294,7 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
{
var jsonDocuments = GetJsonDocs(nextEtag);
int localWork = workCounter;
while (jsonDocuments.Length == 0 && context.DoWork)
while (jsonDocuments.Results.Length == 0 && context.DoWork)
{
futureBatchStat.Retries++;
Expand All @@ -288,7 +304,7 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
jsonDocuments = GetJsonDocs(nextEtag);
}
futureBatchStat.Duration = sp.Elapsed;
futureBatchStat.Size = jsonDocuments.Length;
futureBatchStat.Size = jsonDocuments.Results.Length;
MaybeAddFutureBatch(jsonDocuments);
return jsonDocuments;
})
Expand All @@ -297,12 +313,10 @@ private void MaybeAddFutureBatch(JsonDocument[] past)

private static Guid GetNextHighestEtag(JsonDocument[] past)
{
var jsonDocument = GetHighestEtag(past);
var byteArray = jsonDocument.Etag.Value.ToByteArray();
var next = BitConverter.ToInt64(byteArray.Skip(8).Reverse().ToArray(), 0) + 1;
return new Guid(byteArray.Take(8).Concat(BitConverter.GetBytes(next).Reverse()).ToArray());
return GetHighestEtag(past).Etag.Value;
}


private static JsonDocument GetHighestEtag(JsonDocument[] past)
{
var highest = new ComparableByteArray(Guid.Empty);
Expand Down Expand Up @@ -341,7 +355,7 @@ protected override void Dispose()
futureIndexBatches.Clear();
}

private JsonDocument[] GetJsonDocs(Guid lastIndexed)
private JsonResults GetJsonDocs(Guid lastIndexed)
{
JsonDocument[] jsonDocs = null;
transactionalStorage.Batch(actions =>
Expand All @@ -359,7 +373,11 @@ private JsonDocument[] GetJsonDocs(Guid lastIndexed)
})
.ToArray();
});
return jsonDocs;
return new JsonResults
{
Results = jsonDocs,
LoadedFromDisk = true
};
}

private IEnumerable<Tuple<IndexToWorkOn, IndexingBatch>> FilterIndexes(IList<IndexToWorkOn> indexesToWorkOn, JsonDocument[] jsonDocs)
Expand Down Expand Up @@ -445,7 +463,10 @@ private JsonDocument[] GetJsonDocs(Guid lastIndexed)
actions[i] = accessor => accessor.Indexing.UpdateLastIndexed(indexName, lastEtag, lastModified);
return;
}
Log.Debug("Going to index {0} documents in {1}", batch.Ids.Count, indexToWorkOn);
if(Log.IsDebugEnabled)
{
Log.Debug("Going to index {0} documents in {1}: ({2})", batch.Ids.Count, indexToWorkOn, string.Join(", ", batch.Ids));
}
results[i] = Tuple.Create(indexToWorkOn, batch);
});
Expand Down Expand Up @@ -534,12 +555,24 @@ public void AfterCommit(JsonDocument[] docs)

futureIndexBatches.Add(new FutureIndexBatch
{
StartingEtag = GetLowestEtag(docs),
Task = new CompletedTask<JsonDocument[]>(docs),
StartingEtag = DecrementEtag(GetLowestEtag(docs)),
Task = new CompletedTask<JsonResults>(new JsonResults
{
Results = docs,
LoadedFromDisk = false
}),
Age = currentIndexingAge
});
}

private Guid DecrementEtag(Guid etag)
{
var bytes = etag.ToByteArray();
var part = BitConverter.ToInt64(bytes.Skip(8).Reverse().ToArray(), 0) - 1;

return new Guid(bytes.Take(8).Concat(BitConverter.GetBytes(part).Reverse()).ToArray());
}


private static Guid GetLowestEtag(JsonDocument[] past)
{
Expand Down
2 changes: 2 additions & 0 deletions Raven.Tests/Bugs/FullTextSearchOnTags.cs
Expand Up @@ -431,6 +431,7 @@ public void Can_search_inner_words()
}
}
}


[Fact]
public void Can_search_inner_words_with_extra_condition()
Expand Down Expand Up @@ -461,6 +462,7 @@ public void Can_search_inner_words_with_extra_condition()
.Search(x => x.Name, "Photo", options: SearchOptions.And)
.Where(x => x.Id == "1")
.ToList();
WaitForUserToContinueTheTest(store);
Assert.NotEmpty(images);
Assert.True(images.Count == 1);
}
Expand Down
1 change: 0 additions & 1 deletion Raven.Tests/Bugs/MultiOutputReduce.cs
Expand Up @@ -46,7 +46,6 @@ public void CanGetCorrectResultsFromAllItems()
.Customize(x => x.WaitForNonStaleResults())
.Where(x => x.OrderId != null)
.ToList();

Assert.Equal(12*5, searchResults.Count);
foreach (var searchResult in searchResults)
{
Expand Down
8 changes: 4 additions & 4 deletions Raven.Tryouts/NLog.config
@@ -1,8 +1,8 @@
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.netfx35.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<targets>
<!--<target xsi:type="AsyncWrapper" name="AsyncLog">
<target xsi:type="AsyncWrapper" name="AsyncLog">
<target name="File" xsi:type="File"
fileName="${basedir}\Logs\${environment:Run}-${environment:Test}.csv">
fileName="${basedir}\Logs\${environment:Run}.csv">
<layout xsi:type="CsvLayout">
<column name="time" layout="${time}" />
<column name="logger" layout="${logger}"/>
Expand All @@ -13,9 +13,9 @@
<column name="thread id" layout="${threadid}" />
</layout>
</target>
</target>-->
</target>
</targets>
<rules>
<!--<logger name="Raven.*" writeTo="AsyncLog"/>-->
<logger name="Raven.*" writeTo="AsyncLog"/>
</rules>
</nlog>

0 comments on commit 187f63a

Please sign in to comment.