Skip to content

Commit

Permalink
Moving things around so map/reduce can now parallelize a lot more of …
Browse files Browse the repository at this point in the history
…its internal operations and do more parallel writes to disk
  • Loading branch information
ayende committed Feb 16, 2013
1 parent 0c56df6 commit f36c5da
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 210 deletions.
67 changes: 47 additions & 20 deletions Raven.Database/Indexing/MapReduceIndex.cs
Expand Up @@ -21,13 +21,15 @@
using Raven.Database.Plugins;
using Raven.Imports.Newtonsoft.Json;
using Raven.Abstractions;
using Raven.Abstractions.Extensions;
using Raven.Abstractions.Data;
using Raven.Abstractions.Indexing;
using Raven.Abstractions.Linq;
using Raven.Database.Data;
using Raven.Database.Linq;
using Raven.Database.Storage;
using Raven.Json.Linq;
using Raven.Abstractions.Extensions;

namespace Raven.Database.Indexing
{
Expand Down Expand Up @@ -70,27 +72,28 @@ private class MapResultItem
var sourceCount = 0;
var sw = Stopwatch.StartNew();
var start = SystemTime.UtcNow;
var changed = new HashSet<ReduceKeyAndBucket>();
var deleted = new HashSet<ReduceKeyAndBucket>();
var documentsWrapped = batch.Docs.Select(doc =>
{
sourceCount++;
var documentId = doc.__document_id;
actions.MapReduce.DeleteMappedResultsForDocumentId((string)documentId, name, changed);
actions.MapReduce.DeleteMappedResultsForDocumentId((string)documentId, name, deleted);
return doc;
})
.Where(x => x is FilteredDocument == false)
.ToList();
var allReferencedDocs = new ConcurrentQueue<IDictionary<string, HashSet<string>>>();

if (documentsWrapped.Count > 0)
actions.MapReduce.UpdateRemovedMapReduceStats(name, changed);
actions.MapReduce.UpdateRemovedMapReduceStats(name, deleted);

var allState = new ConcurrentQueue<Tuple<HashSet<ReduceKeyAndBucket>, IndexingWorkStats>>();
var allState = new ConcurrentQueue<Tuple<HashSet<ReduceKeyAndBucket>, IndexingWorkStats, Dictionary<string, int>>>();
BackgroundTaskExecuter.Instance.ExecuteAllBuffered(context, documentsWrapped, partition =>
{
var localStats = new IndexingWorkStats();
var localChanges = new HashSet<ReduceKeyAndBucket>();
allState.Enqueue(Tuple.Create(localChanges, localStats));
var statsPerKey = new Dictionary<string, int>();
allState.Enqueue(Tuple.Create(localChanges, localStats, statsPerKey));
using (CurrentIndexingScope.Current = new CurrentIndexingScope(LoadDocument, allReferencedDocs.Enqueue))
{
Expand All @@ -106,18 +109,18 @@ private class MapResultItem
var documentId = GetDocumentId(currentDoc);
if (documentId != currentKey)
{
count += ProcessBatch(viewGenerator, currentDocumentResults, currentKey, localChanges, accessor);
count += ProcessBatch(viewGenerator, currentDocumentResults, currentKey, localChanges, accessor, statsPerKey);
currentDocumentResults.Clear();
currentKey = documentId;
}
currentDocumentResults.Add(new DynamicJsonObject(RavenJObject.FromObject(currentDoc, jsonSerializer)));
Interlocked.Increment(ref localStats.IndexingSuccesses);
}
count += ProcessBatch(viewGenerator, currentDocumentResults, currentKey, localChanges, accessor);
count += ProcessBatch(viewGenerator, currentDocumentResults, currentKey, localChanges, accessor, statsPerKey);
});
}
});


IDictionary<string, HashSet<string>> result;
while (allReferencedDocs.TryDequeue(out result))
Expand All @@ -129,12 +132,35 @@ private class MapResultItem
}
}

changed.UnionWith(allState.SelectMany(x => x.Item1));
var changed = allState.SelectMany(x => x.Item1).Concat(deleted)
.Distinct()
.ToList();

var stats = new IndexingWorkStats(allState.Select(x=>x.Item2));
var stats = new IndexingWorkStats(allState.Select(x => x.Item2));
var reduceKeyStats = allState.SelectMany(x => x.Item3)
.GroupBy(x => x.Key)
.Select(g => new {g.Key, Count = g.Sum(x => x.Value)})
.ToList();

BackgroundTaskExecuter.Instance.ExecuteAllBuffered(context, reduceKeyStats, enumerator => context.TransactionalStorage.Batch(accessor =>
{
while (enumerator.MoveNext())
{
var reduceKeyStat = enumerator.Current;
accessor.MapReduce.IncrementReduceKeyCounter(name, reduceKeyStat.Key, reduceKeyStat.Count);
}
}));

BackgroundTaskExecuter.Instance.ExecuteAllBuffered(context, changed, enumerator => context.TransactionalStorage.Batch(accessor =>
{
while (enumerator.MoveNext())
{
accessor.MapReduce.ScheduleReductions(name, 0, enumerator.Current);
}
}));


UpdateIndexingStats(context, stats);
actions.MapReduce.ScheduleReductions(name, 0, changed);
AddindexingPerformanceStat(new IndexingPerformanceStats
{
OutputCount = count,
Expand All @@ -146,12 +172,9 @@ private class MapResultItem
logIndexing.Debug("Mapped {0} documents for {1}", count, name);
}

private int ProcessBatch(
AbstractViewGenerator viewGenerator,
List<object> currentDocumentResults,
string currentKey,
HashSet<ReduceKeyAndBucket> changes,
IStorageActionsAccessor actions)
private int ProcessBatch(AbstractViewGenerator viewGenerator, List<object> currentDocumentResults, string currentKey, HashSet<ReduceKeyAndBucket> changes,
IStorageActionsAccessor actions,
IDictionary<string, int> statsPerKey)
{
if (currentKey == null || currentDocumentResults.Count == 0)
return 0;
Expand All @@ -166,14 +189,15 @@ private class MapResultItem
if (reduceValue == null)
{
logIndexing.Debug("Field {0} is used as the reduce key and cannot be null, skipping document {1}",
viewGenerator.GroupByExtraction, currentKey);
viewGenerator.GroupByExtraction, currentKey);
continue;
}
var reduceKey = ReduceKeyToString(reduceValue);
string reduceKey = ReduceKeyToString(reduceValue);

var data = GetMappedData(doc);

actions.MapReduce.PutMappedResult(name, currentKey, reduceKey, data);
statsPerKey[reduceKey] = statsPerKey.GetOrDefault(reduceKey) + 1;
actions.General.MaybePulseTransaction();
changes.Add(new ReduceKeyAndBucket(IndexingUtil.MapBucket(currentKey), reduceKey));
}
Expand Down Expand Up @@ -243,7 +267,10 @@ public override void Remove(string[] keys, WorkContext context)
}
actions.MapReduce.UpdateRemovedMapReduceStats(name, reduceKeyAndBuckets);
actions.MapReduce.ScheduleReductions(name, 0, reduceKeyAndBuckets);
foreach (var reduceKeyAndBucket in reduceKeyAndBuckets)
{
actions.MapReduce.ScheduleReductions(name, 0, reduceKeyAndBucket);
}
});
Write((writer, analyzer, stats) =>
{
Expand Down
138 changes: 77 additions & 61 deletions Raven.Database/Indexing/ReducingExecuter.cs
@@ -1,7 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Raven.Abstractions.Data;
using Raven.Abstractions.Logging;
using Raven.Database.Json;
Expand Down Expand Up @@ -32,7 +34,7 @@ protected void HandleReduceForIndex(IndexToWorkOn indexToWorkOn)
IList<ReduceTypePerKey> mappedResultsInfo = null;
transactionalStorage.Batch(actions =>
{
mappedResultsInfo = actions.MapReduce.GetReduceTypesPerKeys(indexToWorkOn.IndexName,
mappedResultsInfo = actions.MapReduce.GetReduceTypesPerKeys(indexToWorkOn.IndexName,
context.CurrentNumberOfItemsToReduceInSingleBatch,
context.NumberOfItemsToExecuteReduceInSingleStep).ToList();
});
Expand Down Expand Up @@ -66,7 +68,7 @@ protected void HandleReduceForIndex(IndexToWorkOn indexToWorkOn)
{
var latest = actions.MapReduce.DeleteScheduledReduction(itemsToDelete);
if(latest == null)
if (latest == null)
return;
actions.Indexing.UpdateLastReduced(indexToWorkOn.IndexName, latest.Etag, latest.Timestamp);
});
Expand All @@ -91,8 +93,10 @@ private void MultiStepReduce(IndexToWorkOn index, string[] keysToReduce, Abstrac
// we exceeded the limit of items to reduce in single step
// now we need to schedule reductions at level 0 for all map results with given reduce key
var mappedItems = actions.MapReduce.GetMappedBuckets(index.IndexName, localReduceKey).ToList();
actions.MapReduce.ScheduleReductions(index.IndexName, 0,
mappedItems.Select(x => new ReduceKeyAndBucket(x, localReduceKey)));
foreach (var result in mappedItems.Select(x => new ReduceKeyAndBucket(x, localReduceKey)))
{
actions.MapReduce.ScheduleReductions(index.IndexName, 0, result);
}
}
});

Expand All @@ -101,8 +105,8 @@ private void MultiStepReduce(IndexToWorkOn index, string[] keysToReduce, Abstrac
var level = i;

var reduceParams = new GetItemsToReduceParams(
index.IndexName,
keysToReduce,
index.IndexName,
keysToReduce,
level,
true,
itemsToDelete);
Expand All @@ -114,7 +118,7 @@ private void MultiStepReduce(IndexToWorkOn index, string[] keysToReduce, Abstrac
{
context.CancellationToken.ThrowIfCancellationRequested();
var batchTimeWatcher = Stopwatch.StartNew();
var batchTimeWatcher = Stopwatch.StartNew();
reduceParams.Take = context.CurrentNumberOfItemsToReduceInSingleBatch;
var persistedResults = actions.MapReduce.GetItemsToReduce(reduceParams).ToList();
Expand Down Expand Up @@ -156,7 +160,10 @@ private void MultiStepReduce(IndexToWorkOn index, string[] keysToReduce, Abstrac
.Select(x => new ReduceKeyAndBucket(x.Bucket / 1024, x.ReduceKey))
.Distinct()
.ToArray();
actions.MapReduce.ScheduleReductions(index.IndexName, level + 1, reduceKeysAndBuckets);
foreach (var reduceKeysAndBucket in reduceKeysAndBuckets)
{
actions.MapReduce.ScheduleReductions(index.IndexName, level + 1, reduceKeysAndBucket);
}
}
var results = persistedResults
Expand Down Expand Up @@ -184,83 +191,92 @@ private void MultiStepReduce(IndexToWorkOn index, string[] keysToReduce, Abstrac
{
string localReduceKey = reduceKey;
transactionalStorage.Batch(actions =>
actions.MapReduce.UpdatePerformedReduceType(index.IndexName, localReduceKey,
ReduceType.MultiStep));
actions.MapReduce.UpdatePerformedReduceType(index.IndexName, localReduceKey,
ReduceType.MultiStep));
}
}

private void SingleStepReduce(IndexToWorkOn index, string[] keysToReduce, AbstractViewGenerator viewGenerator,
List<object> itemsToDelete)
List<object> itemsToDelete)
{
var needToMoveToSingleStep = new HashSet<string>();

Log.Debug(() => string.Format("Executing single step reducing for {0} keys [{1}]", keysToReduce.Length, string.Join(", ", keysToReduce)));
transactionalStorage.Batch(actions =>
var batchTimeWatcher = Stopwatch.StartNew();
var count = 0;
var size = 0;
var state = new ConcurrentQueue <Tuple<HashSet<string>, List<MappedResultInfo>>>();
BackgroundTaskExecuter.Instance.ExecuteAllBuffered(context, keysToReduce, enumerator =>
{
var batchTimeWatcher = Stopwatch.StartNew();
var getItemsToReduceParams = new GetItemsToReduceParams(index: index.IndexName, reduceKeys: keysToReduce, level: 0,
loadData: false,
itemsToDelete: itemsToDelete)
var localKeys = new HashSet<string>();
while (enumerator.MoveNext())
{
Take = int.MaxValue// just get all, we do the rate limit when we load the number of keys to reduce, anyway
};
var scheduledItems = actions.MapReduce.GetItemsToReduce(getItemsToReduceParams).ToList();
// Only look at the scheduled batch for this run, not the entire set of pending reductions.
//var batchKeys = scheduledItems.Select(x => x.ReduceKey).ToArray();
foreach (var reduceKey in keysToReduce)
localKeys.Add(enumerator.Current);
}
transactionalStorage.Batch(actions =>
{
var lastPerformedReduceType = actions.MapReduce.GetLastPerformedReduceType(index.IndexName, reduceKey);
var getItemsToReduceParams = new GetItemsToReduceParams(index: index.IndexName, reduceKeys: localKeys, level: 0,
loadData: false,
itemsToDelete: itemsToDelete)
{
Take = int.MaxValue// just get all, we do the rate limit when we load the number of keys to reduce, anyway
};
var scheduledItems = actions.MapReduce.GetItemsToReduce(getItemsToReduceParams).ToList();
if (lastPerformedReduceType != ReduceType.SingleStep)
needToMoveToSingleStep.Add(reduceKey);
foreach (var reduceKey in localKeys)
{
var lastPerformedReduceType = actions.MapReduce.GetLastPerformedReduceType(index.IndexName, reduceKey);
if (lastPerformedReduceType != ReduceType.MultiStep)
continue;
if (lastPerformedReduceType != ReduceType.SingleStep)
needToMoveToSingleStep.Add(reduceKey);
Log.Debug("Key {0} was moved from multi step to single step reduce, removing existing reduce results records",
reduceKey);
if (lastPerformedReduceType != ReduceType.MultiStep)
continue;
// now we are in single step but previously multi step reduce was performed for the given key
var mappedBuckets = actions.MapReduce.GetMappedBuckets(index.IndexName, reduceKey).ToList();
Log.Debug("Key {0} was moved from multi step to single step reduce, removing existing reduce results records",
reduceKey);
// add scheduled items too to be sure we will delete reduce results of already deleted documents
mappedBuckets.AddRange(scheduledItems.Select(x => x.Bucket));
// now we are in single step but previously multi step reduce was performed for the given key
var mappedBuckets = actions.MapReduce.GetMappedBuckets(index.IndexName, reduceKey).ToList();
foreach (var mappedBucket in mappedBuckets.Distinct())
{
actions.MapReduce.RemoveReduceResults(index.IndexName, 1, reduceKey, mappedBucket);
actions.MapReduce.RemoveReduceResults(index.IndexName, 2, reduceKey, mappedBucket / 1024);
}
}
// add scheduled items too to be sure we will delete reduce results of already deleted documents
mappedBuckets.AddRange(scheduledItems.Select(x => x.Bucket));
var mappedResults = actions.MapReduce.GetMappedResults(
index.IndexName,
keysToReduce,
loadData: true
).ToList();
foreach (var mappedBucket in mappedBuckets.Distinct())
{
actions.MapReduce.RemoveReduceResults(index.IndexName, 1, reduceKey, mappedBucket);
actions.MapReduce.RemoveReduceResults(index.IndexName, 2, reduceKey, mappedBucket / 1024);
}
}
var count = mappedResults.Count;
var size = mappedResults.Sum(x => x.Size);
var mappedResults = actions.MapReduce.GetMappedResults(
index.IndexName,
localKeys,
loadData: true
).ToList();
var reduceKeys = new HashSet<string>(keysToReduce);
Interlocked.Add(ref count, mappedResults.Count);
Interlocked.Add(ref size, mappedResults.Sum(x => x.Size));
mappedResults.ApplyIfNotNull(x => x.Bucket = 0);
mappedResults.ApplyIfNotNull(x => x.Bucket = 0);
var results = mappedResults
.Where(x => x.Data != null)
.GroupBy(x => x.Bucket, x => JsonToExpando.Convert(x.Data))
.ToArray();
state.Enqueue(Tuple.Create(localKeys, mappedResults));
});
});

context.ReducedPerSecIncreaseBy(results.Length);
var reduceKeys = new HashSet<string>(state.SelectMany(x=>x.Item1));

context.IndexStorage.Reduce(index.IndexName, viewGenerator, results, 2, context, actions, reduceKeys);
var results = state.SelectMany(x=>x.Item2)
.Where(x => x.Data != null)
.GroupBy(x => x.Bucket, x => JsonToExpando.Convert(x.Data))
.ToArray();
context.ReducedPerSecIncreaseBy(results.Length);

autoTuner.AutoThrottleBatchSize(count, size, batchTimeWatcher.Elapsed);
});
context.TransactionalStorage.Batch(actions =>
context.IndexStorage.Reduce(index.IndexName, viewGenerator, results, 2, context, actions, reduceKeys)
);

autoTuner.AutoThrottleBatchSize(count, size, batchTimeWatcher.Elapsed);

foreach (var reduceKey in needToMoveToSingleStep)
{
Expand Down Expand Up @@ -296,7 +312,7 @@ protected override IndexToWorkOn GetIndexToWorkOn(IndexStats indexesStat)

protected override void ExecuteIndexingWork(IList<IndexToWorkOn> indexesToWorkOn)
{
BackgroundTaskExecuter.Instance.ExecuteAllInterleaved(context, indexesToWorkOn,
BackgroundTaskExecuter.Instance.ExecuteAllInterleaved(context, indexesToWorkOn,
HandleReduceForIndex);
}

Expand Down
1 change: 1 addition & 0 deletions Raven.Database/Raven.Database.csproj
Expand Up @@ -688,6 +688,7 @@
<Compile Include="Storage\Esent\StorageActions\Indexing.cs" />
<Compile Include="Storage\Esent\StorageActions\Lists.cs" />
<Compile Include="Storage\Esent\StorageActions\MappedResults.cs" />
<Compile Include="Storage\Esent\StorageActions\OptimizedDeleter.cs" />
<Compile Include="Storage\Esent\StorageActions\OptimizedIndexReader.cs" />
<Compile Include="Storage\Esent\StorageActions\Queue.cs" />
<Compile Include="Storage\Esent\StorageActions\Staleness.cs" />
Expand Down

0 comments on commit f36c5da

Please sign in to comment.