Permalink
Browse files

Proper syncing of how we handle the etag for future batches

  • Loading branch information...
1 parent 939e217 commit 187f63a2e1bf303225e7be2a8b1c4113d06c84ad @ayende ayende committed Oct 28, 2012
@@ -100,6 +100,10 @@ public class DocumentDatabase : IUuidGenerator, IDisposable
private readonly WorkContext workContext;
private readonly IndexingExecuter indexingExecuter;
+ public IndexingExecuter IndexingExecuter
+ {
+ get { return indexingExecuter; }
+ }
private readonly ConcurrentDictionary<Guid, CommittableTransaction> promotedTransactions = new ConcurrentDictionary<Guid, CommittableTransaction>();
@@ -1640,7 +1644,7 @@ public static string BuildVersion
static string productVersion;
private volatile bool disposed;
- private ValidateLicense validateLicense;
+ private readonly ValidateLicense validateLicense;
public string ServerUrl
{
get
@@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
+using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Raven.Abstractions;
@@ -59,10 +60,16 @@ protected override IndexToWorkOn GetIndexToWorkOn(IndexStats indexesStat)
private class FutureIndexBatch
{
public Guid StartingEtag;
- public Task<JsonDocument[]> Task;
+ public Task<JsonResults> Task;
public int Age;
}
+ public class JsonResults
+ {
+ public JsonDocument[] Results;
+ public bool LoadedFromDisk;
+ }
+
private int currentIndexingAge;
private readonly ConcurrentSet<FutureIndexBatch> futureIndexBatches = new ConcurrentSet<FutureIndexBatch>();
@@ -79,23 +86,26 @@ protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn
var operationCancelled = false;
TimeSpan indexingDuration = TimeSpan.Zero;
- JsonDocument[] jsonDocs = null;
+ JsonResults jsonDocs = null;
try
{
jsonDocs = GetJsonDocuments(lastIndexedGuidForAllIndexes);
- Log.Debug("Found a total of {0} documents that requires indexing since etag: {1}",
- jsonDocs.Length, lastIndexedGuidForAllIndexes);
+ if(Log.IsDebugEnabled)
+ {
+ Log.Debug("Found a total of {0} documents that requires indexing since etag: {1}: ({2})",
+ jsonDocs.Results.Length, lastIndexedGuidForAllIndexes, string.Join(", ", jsonDocs.Results.Select(x=>x.Key)));
+ }
- context.ReportIndexingActualBatchSize(jsonDocs.Length);
+ context.ReportIndexingActualBatchSize(jsonDocs.Results.Length);
context.CancellationToken.ThrowIfCancellationRequested();
MaybeAddFutureBatch(jsonDocs);
- if (jsonDocs.Length > 0)
+ if (jsonDocs.Results.Length > 0)
{
- context.IndexedPerSecIncreaseBy(jsonDocs.Length);
- var result = FilterIndexes(indexesToWorkOn, jsonDocs).ToList();
+ context.IndexedPerSecIncreaseBy(jsonDocs.Results.Length);
+ var result = FilterIndexes(indexesToWorkOn, jsonDocs.Results).ToList();
indexesToWorkOn = result.Select(x => x.Item1).ToList();
var sw = Stopwatch.StartNew();
BackgroundTaskExecuter.Instance.ExecuteAll(context, result, (indexToWorkOn, _) =>
@@ -115,16 +125,16 @@ protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn
}
finally
{
- if (operationCancelled == false && jsonDocs != null && jsonDocs.Length > 0)
+ if (operationCancelled == false && jsonDocs != null && jsonDocs.Results.Length > 0)
{
- var lastByEtag = GetHighestEtag(jsonDocs);
+ var lastByEtag = GetHighestEtag(jsonDocs.Results);
var lastModified = lastByEtag.LastModified.Value;
var lastEtag = lastByEtag.Etag.Value;
if (Log.IsDebugEnabled)
{
Log.Debug("Aftering indexing {0} documents, the new last etag for is: {1} for {2}",
- jsonDocs.Length,
+ jsonDocs.Results.Length,
lastEtag,
string.Join(", ", indexesToWorkOn.Select(x => x.IndexName))
);
@@ -140,7 +150,7 @@ protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn
}
});
- UpdateAutoThrottler(jsonDocs, indexingDuration);
+ UpdateAutoThrottler(jsonDocs.Results, indexingDuration);
}
// make sure that we don't have too much "future cache" items
@@ -157,29 +167,29 @@ private void UpdateAutoThrottler(JsonDocument[] jsonDocs, TimeSpan indexingDurat
var futureLen = futureIndexBatches.Sum(x =>
{
if (x.Task.IsCompleted)
- return x.Task.Result.Length;
+ return x.Task.Result.Results.Length;
return autoTuner.NumberOfItemsToIndexInSingleBatch / 15;
});
var futureSize = futureIndexBatches.Sum(x =>
{
if (x.Task.IsCompleted)
- return x.Task.Result.Sum(s => s.SerializedSizeOnDisk);
+ return x.Task.Result.Results.Sum(s => s.SerializedSizeOnDisk);
return autoTuner.NumberOfItemsToIndexInSingleBatch * 256;
});
autoTuner.AutoThrottleBatchSize(jsonDocs.Length + futureLen, futureSize + jsonDocs.Sum(x => x.SerializedSizeOnDisk), indexingDuration);
}
- private JsonDocument[] GetJsonDocuments(Guid lastIndexedGuidForAllIndexes)
+ public JsonResults GetJsonDocuments(Guid lastIndexedGuidForAllIndexes)
{
var futureResults = GetFutureJsonDocuments(lastIndexedGuidForAllIndexes);
if (futureResults != null)
return futureResults;
return GetJsonDocs(lastIndexedGuidForAllIndexes);
}
- private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
+ private JsonResults GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
{
var nextBatch = futureIndexBatches.FirstOrDefault(x => x.StartingEtag == lastIndexedGuidForAllIndexes);
if (nextBatch == null)
@@ -195,13 +205,13 @@ private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
}
var timeToWait = 500;
- var items = new List<JsonDocument[]>
+ var items = new List<JsonResults>
{
nextBatch.Task.Result
};
while (true)
{
- var nextHighestEtag = GetNextHighestEtag(nextBatch.Task.Result);
+ var nextHighestEtag = GetNextHighestEtag(nextBatch.Task.Result.Results);
nextBatch = futureIndexBatches.FirstOrDefault(x => x.StartingEtag == nextHighestEtag);
if (nextBatch == null)
{
@@ -222,9 +232,13 @@ private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
// while we were fetching things, so we have several versions
// of the same doc loaded, this will make sure that we will only
// take one of them.
- return items.SelectMany(x => x).GroupBy(x => x.Key)
- .Select(g => g.OrderBy(x => x.Etag).First())
- .ToArray();
+ return new JsonResults
+ {
+ Results = items.SelectMany(x => x.Results).GroupBy(x => x.Key)
+ .Select(g => g.OrderBy(x => x.Etag).First())
+ .ToArray(),
+ LoadedFromDisk = items.Aggregate(false, (prev, results) => prev | results.LoadedFromDisk)
+ };
}
catch (Exception e)
{
@@ -233,18 +247,20 @@ private JsonDocument[] GetFutureJsonDocuments(Guid lastIndexedGuidForAllIndexes)
}
}
- private void MaybeAddFutureBatch(JsonDocument[] past)
+
+
+ private void MaybeAddFutureBatch(JsonResults past)
{
if (context.Configuration.MaxNumberOfParallelIndexTasks == 1)
return;
- if (past.Length == 0)
+ if (past.Results.Length == 0 || past.LoadedFromDisk == false)
return;
if (futureIndexBatches.Count > 5) // we limit the number of future calls we do
{
var alreadyLoaded = futureIndexBatches.Sum(x =>
{
if (x.Task.IsCompleted)
- return x.Task.Result.Length;
+ return x.Task.Result.Results.Length;
return 0;
});
@@ -257,7 +273,7 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
return;
// we loaded the maximum amount, there are probably more items to read now.
- var nextEtag = GetNextHighestEtag(past);
+ var nextEtag = GetNextHighestEtag(past.Results);
var nextBatch = futureIndexBatches.FirstOrDefault(x => x.StartingEtag == nextEtag);
@@ -278,7 +294,7 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
{
var jsonDocuments = GetJsonDocs(nextEtag);
int localWork = workCounter;
- while (jsonDocuments.Length == 0 && context.DoWork)
+ while (jsonDocuments.Results.Length == 0 && context.DoWork)
{
futureBatchStat.Retries++;
@@ -288,7 +304,7 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
jsonDocuments = GetJsonDocs(nextEtag);
}
futureBatchStat.Duration = sp.Elapsed;
- futureBatchStat.Size = jsonDocuments.Length;
+ futureBatchStat.Size = jsonDocuments.Results.Length;
MaybeAddFutureBatch(jsonDocuments);
return jsonDocuments;
})
@@ -297,12 +313,10 @@ private void MaybeAddFutureBatch(JsonDocument[] past)
private static Guid GetNextHighestEtag(JsonDocument[] past)
{
- var jsonDocument = GetHighestEtag(past);
- var byteArray = jsonDocument.Etag.Value.ToByteArray();
- var next = BitConverter.ToInt64(byteArray.Skip(8).Reverse().ToArray(), 0) + 1;
- return new Guid(byteArray.Take(8).Concat(BitConverter.GetBytes(next).Reverse()).ToArray());
+ return GetHighestEtag(past).Etag.Value;
}
+
private static JsonDocument GetHighestEtag(JsonDocument[] past)
{
var highest = new ComparableByteArray(Guid.Empty);
@@ -341,7 +355,7 @@ protected override void Dispose()
futureIndexBatches.Clear();
}
- private JsonDocument[] GetJsonDocs(Guid lastIndexed)
+ private JsonResults GetJsonDocs(Guid lastIndexed)
{
JsonDocument[] jsonDocs = null;
transactionalStorage.Batch(actions =>
@@ -359,7 +373,11 @@ private JsonDocument[] GetJsonDocs(Guid lastIndexed)
})
.ToArray();
});
- return jsonDocs;
+ return new JsonResults
+ {
+ Results = jsonDocs,
+ LoadedFromDisk = true
+ };
}
private IEnumerable<Tuple<IndexToWorkOn, IndexingBatch>> FilterIndexes(IList<IndexToWorkOn> indexesToWorkOn, JsonDocument[] jsonDocs)
@@ -445,7 +463,10 @@ private JsonDocument[] GetJsonDocs(Guid lastIndexed)
actions[i] = accessor => accessor.Indexing.UpdateLastIndexed(indexName, lastEtag, lastModified);
return;
}
- Log.Debug("Going to index {0} documents in {1}", batch.Ids.Count, indexToWorkOn);
+ if(Log.IsDebugEnabled)
+ {
+ Log.Debug("Going to index {0} documents in {1}: ({2})", batch.Ids.Count, indexToWorkOn, string.Join(", ", batch.Ids));
+ }
results[i] = Tuple.Create(indexToWorkOn, batch);
});
@@ -534,12 +555,24 @@ public void AfterCommit(JsonDocument[] docs)
futureIndexBatches.Add(new FutureIndexBatch
{
- StartingEtag = GetLowestEtag(docs),
- Task = new CompletedTask<JsonDocument[]>(docs),
+ StartingEtag = DecrementEtag(GetLowestEtag(docs)),
+ Task = new CompletedTask<JsonResults>(new JsonResults
+ {
+ Results = docs,
+ LoadedFromDisk = false
+ }),
Age = currentIndexingAge
});
}
+ private Guid DecrementEtag(Guid etag)
+ {
+ var bytes = etag.ToByteArray();
+ var part = BitConverter.ToInt64(bytes.Skip(8).Reverse().ToArray(), 0) - 1;
+
+ return new Guid(bytes.Take(8).Concat(BitConverter.GetBytes(part).Reverse()).ToArray());
+ }
+
private static Guid GetLowestEtag(JsonDocument[] past)
{
@@ -431,6 +431,7 @@ public void Can_search_inner_words()
}
}
}
+
[Fact]
public void Can_search_inner_words_with_extra_condition()
@@ -461,6 +462,7 @@ public void Can_search_inner_words_with_extra_condition()
.Search(x => x.Name, "Photo", options: SearchOptions.And)
.Where(x => x.Id == "1")
.ToList();
+ WaitForUserToContinueTheTest(store);
Assert.NotEmpty(images);
Assert.True(images.Count == 1);
}
@@ -46,7 +46,6 @@ public void CanGetCorrectResultsFromAllItems()
.Customize(x => x.WaitForNonStaleResults())
.Where(x => x.OrderId != null)
.ToList();
-
Assert.Equal(12*5, searchResults.Count);
foreach (var searchResult in searchResults)
{
@@ -1,8 +1,8 @@
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.netfx35.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<targets>
- <!--<target xsi:type="AsyncWrapper" name="AsyncLog">
+ <target xsi:type="AsyncWrapper" name="AsyncLog">
<target name="File" xsi:type="File"
- fileName="${basedir}\Logs\${environment:Run}-${environment:Test}.csv">
+ fileName="${basedir}\Logs\${environment:Run}.csv">
<layout xsi:type="CsvLayout">
<column name="time" layout="${time}" />
<column name="logger" layout="${logger}"/>
@@ -13,9 +13,9 @@
<column name="thread id" layout="${threadid}" />
</layout>
</target>
- </target>-->
+ </target>
</targets>
<rules>
- <!--<logger name="Raven.*" writeTo="AsyncLog"/>-->
+ <logger name="Raven.*" writeTo="AsyncLog"/>
</rules>
</nlog>
Oops, something went wrong.

0 comments on commit 187f63a

Please sign in to comment.