Skip to content

Commit

Permalink
RavenDB-4031, optimization: after removing the documents from the queue,
Browse files Browse the repository at this point in the history
try creating a new future batch if:
1) we have a future batch, which was finished, try creating a future batch with its results.
2) we don't have any future batches and the prefetching queue isn't empty, try creating a new future batch using the last document in the queue
  • Loading branch information
grisha-kotler committed Nov 17, 2015
1 parent 73ce1f2 commit f1def39
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
3 changes: 2 additions & 1 deletion Raven.Database/Indexing/IndexingExecuter.cs
Expand Up @@ -279,7 +279,8 @@ private PrefetchingBehavior GetPrefetcherFor(Etag fromEtag, ConcurrentSet<Prefet
{
foreach (var prefetchingBehavior in prefetchingBehaviors)
{
// at this point we've already checked for using the default prefetcher
// at this point we've already verified that we can't use the default prefetcher
// if it's empty, we don't need to use it
if (prefetchingBehavior.IsDefault == false && prefetchingBehavior.IsEmpty() && usedPrefetchers.TryAdd(prefetchingBehavior))
return prefetchingBehavior;
}
Expand Down
17 changes: 16 additions & 1 deletion Raven.Database/Prefetching/ConcurrentJsonDocumentSortedList.cs
Expand Up @@ -87,6 +87,22 @@ public bool TryPeek(out JsonDocument result)
}
}

public bool TryPeekLastDocument(out JsonDocument result)
{
try
{
slim.EnterReadLock();

result = innerList.Values.LastOrDefault();

return result != null;
}
finally
{
slim.ExitReadLock();
}
}

public bool TryDequeue(out JsonDocument result)
{
try
Expand All @@ -113,7 +129,6 @@ public Etag NextDocumentETag()
return TryPeek(out result) == false ? null : result.Etag;
}


public int LoadedSize
{
get
Expand Down
9 changes: 1 addition & 8 deletions Raven.Database/Prefetching/Prefetcher.cs
Expand Up @@ -34,8 +34,7 @@ public PrefetchingBehavior CreatePrefetchingBehavior(PrefetchingUser user, BaseB
prefetchingUserDescription,
isDefault,
GetPrefetchintBehavioursCount,
GetPrefetchingBehaviourSummary,
IsDefatultPrefetchingBehaviourBusy);
GetPrefetchingBehaviourSummary);

prefetchingBehaviors = new List<PrefetchingBehavior>(prefetchingBehaviors)
{
Expand Down Expand Up @@ -101,12 +100,6 @@ private PrefetchingSummary GetPrefetchingBehaviourSummary()
return summary;
}

private bool IsDefatultPrefetchingBehaviourBusy()
{
var defaultPrefetcher = prefetchingBehaviors.FirstOrDefault(x => x.IsDefault);
return defaultPrefetcher != null && defaultPrefetcher.IsEmpty() == false;
}

public void Dispose()
{
foreach (var prefetchingBehavior in prefetchingBehaviors)
Expand Down
61 changes: 45 additions & 16 deletions Raven.Database/Prefetching/PrefetchingBehavior.cs
Expand Up @@ -67,8 +67,7 @@ private class DiskFetchPerformanceStats
string prefetchingUserDescription,
bool isDefault = false,
Func<int> getPrefetchintBehavioursCount = null,
Func<PrefetchingSummary> getPrefetcherSummary = null,
Func<bool> isDefaultBusy = null)
Func<PrefetchingSummary> getPrefetcherSummary = null)
{
this.context = context;
this.autoTuner = autoTuner;
Expand All @@ -77,7 +76,6 @@ private class DiskFetchPerformanceStats
this.IsDefault = isDefault;
this.getPrefetchintBehavioursCount = getPrefetchintBehavioursCount ?? (() => 1);
this.getPrefetcherSummary = getPrefetcherSummary ?? GetSummary;
this.isDefaultBusy = isDefaultBusy ?? (() => IsDefault);
MemoryStatistics.RegisterLowMemoryHandler(this);
LastTimeUsed = DateTime.MinValue;

Expand All @@ -100,7 +98,6 @@ private class DiskFetchPerformanceStats
public bool IsDefault { get; private set; }
private readonly Func<int> getPrefetchintBehavioursCount;
private readonly Func<PrefetchingSummary> getPrefetcherSummary;
private readonly Func<bool> isDefaultBusy;
public List<IndexToWorkOn> Indexes { get; set; }
public string LastIndexedEtag { get; set; }
public DateTime LastTimeUsed { get; private set; }
Expand Down Expand Up @@ -329,6 +326,10 @@ private List<JsonDocument> GetDocsFromBatchWithPossibleDuplicates(Etag etag, int

docsLoaded = TryGetDocumentsFromQueue(nextEtagToIndex, result, take);

// we removed some documents from the queue
// we'll try to create a new future batch, if possible
MaybeAddFutureBatch();

if (docsLoaded)
{
etag = result[result.Count - 1].Etag;
Expand Down Expand Up @@ -658,6 +659,27 @@ public PrefetchingSummary GetSummary()
};
}

private void MaybeAddFutureBatch()
{
var maxFutureBatch = GetCompletedFutureBatchWithMaxStartingEtag();
if (maxFutureBatch != null)
{
// we found a future batch with the latest starting etag (which completed fetching documents)
// we'll try to create a new future batch using the latest results
MaybeAddFutureBatch(maxFutureBatch.Task.Result);
}
else if (futureIndexBatches.Count == 0 && prefetchingQueue.Count > 0)
{
// we don't have any future batches, but have some documents in the queue
// we'll try to create a new future batch using the latest document in the queue
JsonDocument lastDocument;
if (prefetchingQueue.TryPeekLastDocument(out lastDocument))
{
MaybeAddFutureBatch(new List<JsonDocument> { lastDocument });
}
}
}

private void MaybeAddFutureBatch(List<JsonDocument> past)
{
if (context.Configuration.DisableDocumentPreFetching || context.RunIndexing == false)
Expand All @@ -667,7 +689,7 @@ private void MaybeAddFutureBatch(List<JsonDocument> past)
if (past.Count == 0)
return;

var numberOfSplitTasks = Math.Max(2, Environment.ProcessorCount / 2); ;
var numberOfSplitTasks = Math.Max(2, Environment.ProcessorCount / 2);
var actualFutureIndexBatchesCount = GetActualFutureIndexBatchesCount(numberOfSplitTasks);
// no need to load more than 5 future batches
if (actualFutureIndexBatchesCount > 5)
Expand Down Expand Up @@ -719,11 +741,7 @@ private bool CanPrefetchMoreDocs(bool isFutureBatch = false)
}

var localSummary = GetSummary();
// the default prefetcher will hold the updated or added documents
// if we have more than one prefetcher, we can allow prefetching more,
// if the default prefetcher isn't 'busy'.
var numberOfPrefetchingBehaviors =
Math.Max(1, getPrefetchintBehavioursCount() - ((IsDefault || isDefaultBusy() == false) ? 1 : 0));
var numberOfPrefetchingBehaviors = getPrefetchintBehavioursCount();

loadedSizeInBytes = localSummary.PrefetchingQueueLoadedSize + localSummary.FutureIndexBatchesLoadedSize;
var maxLoadedSizeInBytesInASingleBatch = maxAllowedToLoadInBytes/numberOfPrefetchingBehaviors;
Expand Down Expand Up @@ -787,12 +805,7 @@ private void TryScheduleFutureIndexBatch(List<JsonDocument> past, int numberOfSp

if (futureIndexBatches.ContainsKey(nextEtag))
{
FutureIndexBatch maxFutureIndexBatch = null;
foreach (var futureIndexBatch in futureIndexBatches.Values)
{
if (maxFutureIndexBatch == null || futureIndexBatch.StartingEtag.CompareTo(maxFutureIndexBatch.StartingEtag) > 0)
maxFutureIndexBatch = futureIndexBatch;
}
var maxFutureIndexBatch = GetCompletedFutureBatchWithMaxStartingEtag();

if (maxFutureIndexBatch == null || nextEtag.CompareTo(maxFutureIndexBatch.StartingEtag) >= 0 ||
maxFutureIndexBatch.Task.IsCompleted == false || maxFutureIndexBatch.Task.Status != TaskStatus.RanToCompletion)
Expand Down Expand Up @@ -872,6 +885,22 @@ private void TryScheduleFutureIndexBatch(List<JsonDocument> past, int numberOfSp
});
}

private FutureIndexBatch GetCompletedFutureBatchWithMaxStartingEtag()
{
FutureIndexBatch maxFutureIndexBatch = null;

foreach (var futureIndexBatch in futureIndexBatches.Values)
{
if (futureIndexBatch.Task.IsCompleted == false || futureIndexBatch.Task.Status != TaskStatus.RanToCompletion)
continue;

if (maxFutureIndexBatch == null || futureIndexBatch.StartingEtag.CompareTo(maxFutureIndexBatch.StartingEtag) > 0)
maxFutureIndexBatch = futureIndexBatch;
}

return maxFutureIndexBatch;
}

private void CalculateAverageLoadTimes(out double loadTimePerDocMs, out long largestDocSize, out string largestDocKey)
{
var docCount = 0;
Expand Down

0 comments on commit f1def39

Please sign in to comment.