Permalink
Browse files

Moving a lot of the background calculation support to use the new Bac…

…kgroundTaskExecuter abstraction

Properly handling unloading of tenant databases
  • Loading branch information...
1 parent 41c2a70 commit f378557b71b5b358dab0c58d7da04e036f2eec77 @ayende ayende committed Feb 19, 2012
@@ -298,13 +298,19 @@ public InMemoryRavenConfiguration Configuration
public IndexStorage IndexStorage { get; private set; }
+ public event EventHandler Disposing;
+
#region IDisposable Members
public void Dispose()
{
if (disposed)
return;
+ var onDisposing = Disposing;
+ if(onDisposing!=null)
+ onDisposing(this, EventArgs.Empty);
+
var exceptionAggregator = new ExceptionAggregator(log, "Could not properly dispose of DatabaseDocument");
exceptionAggregator.Execute(() =>
@@ -0,0 +1,9 @@
+using Raven.Abstractions.Extensions;
+
+namespace Raven.Database.Indexing
+{
+ public class BackgroundTaskExecuter
+ {
+ public static IBackgroundTaskExecuter Instance = new DefaultBackgroundTaskExecuter();
+ }
+}
@@ -0,0 +1,116 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using NLog;
+using Raven.Database.Config;
+using Raven.Abstractions.Extensions;
+using Raven.Database.Util;
+
+namespace Raven.Database.Indexing
+{
+ public class DefaultBackgroundTaskExecuter : IBackgroundTaskExecuter
+ {
+ private static Logger logger = LogManager.GetCurrentClassLogger();
+
+ public IList<TResult> Apply<T, TResult>(IEnumerable<T> source, Func<T, TResult> func)
+ where TResult : class
+ {
+ return source.AsParallel()
+ .Select(func)
+ .Where(x => x != null)
+ .ToList();
+ }
+
+ private readonly ConcurrentDictionary<TimeSpan, Tuple<Timer, ConcurrentSet<IRepeatedAction>>> timers =
+ new ConcurrentDictionary<TimeSpan, Tuple<Timer, ConcurrentSet<IRepeatedAction>>>();
+
+ public void Repeat(IRepeatedAction action)
+ {
+ var tuple = timers.GetOrAddAtomically(action.RepeatDuration,
+ span =>
+ {
+ var repeatedActions = new ConcurrentSet<IRepeatedAction>
+ {
+ action
+ };
+ var timer = new Timer(ExecuteTimer, action.RepeatDuration,
+ action.RepeatDuration,
+ action.RepeatDuration);
+ return Tuple.Create(timer, repeatedActions);
+ });
+ tuple.Item2.TryAdd(action);
+ }
+
+ private void ExecuteTimer(object state)
+ {
+ var span = (TimeSpan) state;
+ Tuple<Timer, ConcurrentSet<IRepeatedAction>> tuple;
+ if (timers.TryGetValue(span, out tuple) == false)
+ return;
+
+ foreach (var repeatedAction in tuple.Item2)
+ {
+ if (repeatedAction.IsValid == false)
+ tuple.Item2.TryRemove(repeatedAction);
+
+ try
+ {
+ repeatedAction.Execute();
+ }
+ catch (Exception e)
+ {
+ logger.ErrorException("Could not execute repeated task", e);
+ }
+ }
+
+ if (tuple.Item2.Count != 0)
+ return;
+
+ if (timers.TryRemove(span, out tuple) == false)
+ return;
+
+ tuple.Item1.Dispose();
+ }
+
+ /// <summary>
+ /// Note that we assume that source is a relatively small number, expected to be
+ /// the number of indexes, not the number of documents.
+ /// </summary>
+ public void ExecuteAll<T>(InMemoryRavenConfiguration configuration, TaskScheduler scheduler, IList<T> source, Action<T, long> action)
+ {
+ if(configuration.MaxNumberOfParallelIndexTasks == 1)
+ {
+ long i = 0;
+ foreach (var item in source)
+ {
+ action(item, i++);
+ }
+ return;
+ }
+
+ var partitioneds = Partition(source, configuration.MaxNumberOfParallelIndexTasks).ToList();
+ int start = 0;
+ foreach (var partitioned in partitioneds)
+ {
+ var currentStart = start;
+ Parallel.ForEach(partitioned, new ParallelOptions
+ {
+ TaskScheduler = scheduler,
+ MaxDegreeOfParallelism = configuration.MaxNumberOfParallelIndexTasks
+ },(item,_,index)=>action(item, currentStart + index));
+ start += partitioned.Count;
+ }
+ }
+
+ static IEnumerable<IList<T>> Partition<T>(IList<T> source, int size)
+ {
+ for (int i = 0; i < source.Count; i+=size)
+ {
+ yield return source.Skip(i).Take(size).ToList();
+ }
+ }
+ }
+}
@@ -1,58 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Raven.Database.Config;
-
-namespace Raven.Database.Indexing
-{
- public class DefaultIndexingTaskExecuter : IIndexingTaskExecuter
- {
- public IList<TResult> Apply<T, TResult>(IEnumerable<T> source, Func<T, TResult> func)
- where TResult : class
- {
- return source.AsParallel()
- .Select(func)
- .Where(x => x != null)
- .ToList();
- }
-
- /// <summary>
- /// Note that we assume that source is a relatively small number, expected to be
- /// the number of indexes, not the number of documents.
- /// </summary>
- public void ExecuteAll<T>(InMemoryRavenConfiguration configuration, TaskScheduler scheduler, IList<T> source, Action<T, long> action)
- {
- if(configuration.MaxNumberOfParallelIndexTasks == 1)
- {
- long i = 0;
- foreach (var item in source)
- {
- action(item, i++);
- }
- return;
- }
-
- var partitioneds = Partition(source, configuration.MaxNumberOfParallelIndexTasks).ToList();
- int start = 0;
- foreach (var partitioned in partitioneds)
- {
- var currentStart = start;
- Parallel.ForEach(partitioned, new ParallelOptions
- {
- TaskScheduler = scheduler,
- MaxDegreeOfParallelism = configuration.MaxNumberOfParallelIndexTasks
- },(item,_,index)=>action(item, currentStart + index));
- start += partitioned.Count;
- }
- }
-
- static IEnumerable<IList<T>> Partition<T>(IList<T> source, int size)
- {
- for (int i = 0; i < source.Count; i+=size)
- {
- yield return source.Skip(i).Take(size).ToList();
- }
- }
- }
-}
@@ -5,10 +5,20 @@
namespace Raven.Database.Indexing
{
- public interface IIndexingTaskExecuter
+ public interface IBackgroundTaskExecuter
{
IList<TResult> Apply<T, TResult>(IEnumerable<T> source, Func<T, TResult> func)
where TResult : class;
+
+ void Repeat(IRepeatedAction action);
+
void ExecuteAll<T>(InMemoryRavenConfiguration configuration, TaskScheduler scheduler, IList<T> source, Action<T, long> action);
}
+
+ public interface IRepeatedAction
+ {
+ TimeSpan RepeatDuration { get; }
+ bool IsValid { get; }
+ void Execute();
+ }
}
@@ -73,7 +73,7 @@ protected override void ExecuteIndxingWork(IList<IndexToWorkOn> indexesToWorkOn)
{
var result = FilterIndexes(indexesToWorkOn, jsonDocs).ToList();
indexesToWorkOn = result.Select(x => x.Item1).ToList();
- IndexingTaskExecuter.Instance.ExecuteAll(context.Configuration, scheduler, result, (indexToWorkOn,_) =>
+ BackgroundTaskExecuter.Instance.ExecuteAll(context.Configuration, scheduler, result, (indexToWorkOn,_) =>
{
var index = indexToWorkOn.Item1;
var docs = indexToWorkOn.Item2;
@@ -133,7 +133,7 @@ private void MarkIndexes(IndexToWorkOn indexToWorkOn, ComparableByteArray lastIn
var documentRetriever = new DocumentRetriever(null, context.ReadTriggers);
var filteredDocs =
- IndexingTaskExecuter.Instance.Apply(jsonDocs, doc =>
+ BackgroundTaskExecuter.Instance.Apply(jsonDocs, doc =>
{
doc = documentRetriever.ExecuteReadTriggers(doc, null, ReadOperation.Index);
return doc == null ? null : new {Doc = doc, Json = JsonToExpando.Convert(doc.ToJson())};
@@ -144,7 +144,7 @@ private void MarkIndexes(IndexToWorkOn indexToWorkOn, ComparableByteArray lastIn
var results = new Tuple<IndexToWorkOn, IndexingBatch>[indexesToWorkOn.Count];
var actions = new Action<IStorageActionsAccessor>[indexesToWorkOn.Count];
- IndexingTaskExecuter.Instance.ExecuteAll(context.Configuration, scheduler, indexesToWorkOn, (indexToWorkOn, i) =>
+ BackgroundTaskExecuter.Instance.ExecuteAll(context.Configuration, scheduler, indexesToWorkOn, (indexToWorkOn, i) =>
{
var indexLastInedexEtag = new ComparableByteArray(indexToWorkOn.LastIndexedEtag.ToByteArray());
if (indexLastInedexEtag.CompareTo(lastIndexedEtag) >= 0)
@@ -1,9 +0,0 @@
-using Raven.Abstractions.Extensions;
-
-namespace Raven.Database.Indexing
-{
- public class IndexingTaskExecuter
- {
- public static IIndexingTaskExecuter Instance = new DefaultIndexingTaskExecuter();
- }
-}
@@ -106,7 +106,7 @@ protected override IndexToWorkOn GetIndexToWorkOn(IndexStats indexesStat)
protected override void ExecuteIndxingWork(IList<IndexToWorkOn> indexesToWorkOn)
{
- IndexingTaskExecuter.Instance.ExecuteAll(context.Configuration, scheduler, indexesToWorkOn, (indexToWorkOn, l) => HandleReduceForIndex(indexToWorkOn));
+ BackgroundTaskExecuter.Instance.ExecuteAll(context.Configuration, scheduler, indexesToWorkOn, (indexToWorkOn, l) => HandleReduceForIndex(indexToWorkOn));
}
protected override bool IsValidIndex(IndexStats indexesStat)
@@ -4,27 +4,43 @@
// </copyright>
//-----------------------------------------------------------------------
using System;
-using System.Threading;
using System.Linq;
+using Raven.Database.Indexing;
using Raven.Database.Queries;
namespace Raven.Database.Plugins.Builtins
{
- public class CleanupOldDynamicIndexes : AbstractBackgroundTask
+ public class CleanupOldDynamicIndexes : IStartupTask, IRepeatedAction
{
- protected override bool HandleWork()
+ private DocumentDatabase database;
+
+ public void Execute(DocumentDatabase theDatabase)
{
- var dynamicQueryRunner = Database.ExtensionsState.Values.OfType<DynamicQueryRunner>().FirstOrDefault();
- if (dynamicQueryRunner == null)
- return false;
+ if (theDatabase == null)
+ throw new ArgumentNullException("theDatabase");
+
+ database = theDatabase;
- dynamicQueryRunner.CleanupCache();
- return false;
+ BackgroundTaskExecuter.Instance.Repeat(this);
+ }
+
+ public TimeSpan RepeatDuration
+ {
+ get { return database.Configuration.TempIndexCleanupPeriod; }
}
- protected override TimeSpan TimeoutForNextWork()
+ public bool IsValid
{
- return Database.Configuration.TempIndexCleanupPeriod;
+ get { return database.Disposed == false; }
+ }
+
+ public void Execute()
+ {
+ var dynamicQueryRunner = database.ExtensionsState.Values.OfType<DynamicQueryRunner>().FirstOrDefault();
+ if (dynamicQueryRunner == null)
+ return;
+
+ dynamicQueryRunner.CleanupCache();
}
}
}
@@ -0,0 +1,21 @@
+using System;
+
+namespace Raven.Database.Plugins.Builtins.Tenants
+{
+ public class ModifiedTenantDatabase : AbstractPutTrigger
+ {
+ private const string RavenDatabasesPrefix = "Raven/Databases/";
+
+ public override void AfterCommit(string key, Raven.Json.Linq.RavenJObject document, Raven.Json.Linq.RavenJObject metadata, Guid etag)
+ {
+ if (key.StartsWith(RavenDatabasesPrefix, StringComparison.InvariantCultureIgnoreCase) == false)
+ return;
+
+ TenantDatabaseModified.Invoke(this, new TenantDatabaseModified.Event
+ {
+ Database = Database,
+ Name = key.Substring(RavenDatabasesPrefix.Length)
+ });
+ }
+ }
+}
@@ -1,29 +1,21 @@
using System;
-using Raven.Database.Util;
-namespace Raven.Database.Plugins.Builtins
+namespace Raven.Database.Plugins.Builtins.Tenants
{
public class RemoveTenantDatabase : AbstractDeleteTrigger
{
private const string RavenDatabasesPrefix = "Raven/Databases/";
- public static WeakEvent<Event> Occured = new WeakEvent<Event>();
public override void AfterCommit(string key)
{
if (key.StartsWith(RavenDatabasesPrefix, StringComparison.InvariantCultureIgnoreCase) == false)
return;
- Occured.Invoke(this, new Event
+ TenantDatabaseModified.Invoke(this, new TenantDatabaseModified.Event
{
Database = Database,
Name = key.Substring(RavenDatabasesPrefix.Length)
});
}
-
- public class Event : EventArgs
- {
- public DocumentDatabase Database { get; set; }
- public string Name { get; set; }
- }
}
}
Oops, something went wrong.

0 comments on commit f378557

Please sign in to comment.