Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Updating things so we can have enough contextual information for the …

…error handler
  • Loading branch information...
commit 5026d4ebe845f135dea28568522d4cd1ef4daa04 1 parent 82ec0c4
@ayende ayende authored
View
9 Raven.Client.Lightweight/Document/ShardedDocumentQuery.cs
@@ -122,11 +122,16 @@ protected override void ExecuteActualQuery()
while (true)
{
var currentCopy = results;
- results = shardStrategy.ShardAccessStrategy.Apply(ShardDatabaseCommands, (dbCmd, i) =>
+ results = shardStrategy.ShardAccessStrategy.Apply(ShardDatabaseCommands,
+ new ShardRequestData
+ {
+ EntityType = typeof(T),
+ Query = IndexQuery
+ }, (dbCmd, i) =>
{
if (currentCopy[i]) // if we already got a good result here, do nothing
return true;
-
+
var queryOp = shardQueryOperations[i];
using (queryOp.EnterQueryContext())
View
27 Raven.Client.Lightweight/Shard/DefaultShardResolutionStrategy.cs
@@ -139,21 +139,28 @@ public virtual IList<string> PotentialShardsFor(ShardRequestData requestData)
return potentialShardsFor;
}
- if (requestData.Key == null)
- return null; // we are only optimized for keys
+ if(requestData.Keys.Count == 0) // we are only optimized for keys
+ return null;
+
// we are looking for search by key, let us see if we can narrow it down by using the
// embedded shard id.
+ var list = new List<string>();
+ foreach (var key in requestData.Keys)
+ {
+ var start = key.IndexOf(shardStrategy.Conventions.IdentityPartsSeparator, StringComparison.InvariantCultureIgnoreCase);
+ if (start == -1)
+ return null; // if we couldn't figure it out, select from all
- var start = requestData.Key.IndexOf(shardStrategy.Conventions.IdentityPartsSeparator, StringComparison.InvariantCultureIgnoreCase);
- if (start == -1)
- return null;
-
- var maybeShardId = requestData.Key.Substring(0, start);
+ var maybeShardId = key.Substring(0, start);
- return ShardIds.Any(x => string.Equals(maybeShardId, x, StringComparison.InvariantCultureIgnoreCase)) ?
- new[] {maybeShardId} : // we found a matching shard
- null; // couldn't find a matching shard, let us try all of them
+ if (ShardIds.Any(x => string.Equals(maybeShardId, x, StringComparison.InvariantCultureIgnoreCase)))
+ list.Add(maybeShardId);
+ else
+ return null; // we couldn't find it there, select from all
+
+ }
+ return list.ToArray();
}
}
}
View
10 Raven.Client.Lightweight/Shard/IShardAccessStrategy.cs
@@ -16,9 +16,17 @@ namespace Raven.Client.Shard
public interface IShardAccessStrategy
{
/// <summary>
+ /// Occurs on error, allows to handle an error one (or more) of the nodes
+ /// is failing
+ /// </summary>
+ event ShardingErrorHandle OnError;
+
+ /// <summary>
/// Applies the specified action to all shard sessions.
/// </summary>
- T[] Apply<T>(ICollection<IDatabaseCommands> commands, Func<IDatabaseCommands, int, T> operation);
+ T[] Apply<T>(IList<IDatabaseCommands> commands, ShardRequestData request, Func<IDatabaseCommands, int, T> operation);
}
+
+ public delegate bool ShardingErrorHandle(IDatabaseCommands failingCommands, ShardRequestData request, Exception exception);
}
#endif
View
30 Raven.Client.Lightweight/Shard/ParallelShardAccessStrategy.cs
@@ -16,27 +16,43 @@ namespace Raven.Client.Shard
/// <summary>
/// Apply an operation to all the shard session in parallel
/// </summary>
- public class ParallelShardAccessStrategy: IShardAccessStrategy
+ public class ParallelShardAccessStrategy : IShardAccessStrategy
{
/// <summary>
/// Applies the specified action to all shard sessions in parallel
/// </summary>
- public T[] Apply<T>(ICollection<IDatabaseCommands> commands, Func<IDatabaseCommands,int, T> operation)
+ public T[] Apply<T>(IList<IDatabaseCommands> commands, ShardRequestData request, Func<IDatabaseCommands, int, T> operation)
{
var returnedLists = new T[commands.Count];
-
+ var valueSet = new bool[commands.Count];
commands
.Select((cmd, i) =>
Task.Factory.StartNew(() => operation(cmd, i))
.ContinueWith(task =>
- {
- returnedLists[i] = task.Result;
- })
+ {
+ try
+ {
+ returnedLists[i] = task.Result;
+ valueSet[i] = true;
+ }
+ catch (Exception e)
+ {
+ var error = OnError;
+ if (error == null)
+ throw;
+ if (error(commands[i], request, e) == false)
+ {
+ throw;
+ }
+ }
+ })
)
.WaitAll();
- return returnedLists.ToArray();
+ return returnedLists.Where((t, i) => valueSet[i]).ToArray();
}
+
+ public event ShardingErrorHandle OnError;
}
}
#endif
View
25 Raven.Client.Lightweight/Shard/SequentialShardAccessStrategy.cs
@@ -23,10 +23,31 @@ public class SequentialShardAccessStrategy : IShardAccessStrategy
/// <param name="commands">The shard sessions.</param>
/// <param name="operation">The operation.</param>
/// <returns></returns>
- public T[] Apply<T>(ICollection<IDatabaseCommands> commands, Func<IDatabaseCommands, int, T> operation)
+ public T[] Apply<T>(IList<IDatabaseCommands> commands, ShardRequestData request, Func<IDatabaseCommands, int, T> operation)
{
- return commands.Select(operation).ToArray();
+ var list = new List<T>();
+ for (int i = 0; i < commands.Count; i++)
+ {
+ try
+ {
+ list.Add(operation(commands[i], i));
+ }
+ catch (Exception e)
+ {
+ var error = OnError;
+ if (error == null)
+ throw;
+ if(error(commands[i], request, e) == false)
+ {
+ throw;
+ }
+ }
+ }
+
+ return list.ToArray();
}
+
+ public event ShardingErrorHandle OnError;
}
}
#endif
View
8 Raven.Client.Lightweight/Shard/ShardRequestData.cs
@@ -5,6 +5,7 @@
//-----------------------------------------------------------------------
using System;
+using System.Collections.Generic;
using Raven.Abstractions.Data;
namespace Raven.Client.Shard
@@ -18,7 +19,7 @@ public class ShardRequestData
/// Gets or sets the key.
/// </summary>
/// <value>The key.</value>
- public string Key { get; set; }
+ public List<string> Keys { get; set; }
/// <summary>
/// Gets or sets the type of the entity.
@@ -30,5 +31,10 @@ public class ShardRequestData
/// Gets or sets the query being executed
/// </summary>
public IndexQuery Query { get; set; }
+
+ public ShardRequestData()
+ {
+ Keys = new List<string>();
+ }
}
}
View
80 Raven.Client.Lightweight/Shard/ShardedDocumentSession.cs
@@ -89,13 +89,14 @@ private IList<IDatabaseCommands> GetCommandsToOperateOn(ShardRequestData resulti
protected override JsonDocument GetJsonDocument(string documentKey)
{
- var dbCommands = GetCommandsToOperateOn(new ShardRequestData
+ var shardRequestData = new ShardRequestData
{
- EntityType = typeof(object),
- Key = documentKey
- });
+ EntityType = typeof (object), Keys = {documentKey}
+ };
+ var dbCommands = GetCommandsToOperateOn(shardRequestData);
var documents = shardStrategy.ShardAccessStrategy.Apply(dbCommands,
+ shardRequestData,
(commands, i) => commands.Get(documentKey));
var document = documents.FirstOrDefault(x => x != null);
@@ -171,9 +172,10 @@ Lazy<TResult> ILazySessionOperations.Load<TResult>(string id)
public Lazy<TResult> Load<TResult>(string id, Action<TResult> onEval)
{
var cmds = GetCommandsToOperateOn(new ShardRequestData
- {
- Key = id, EntityType = typeof (TResult)
- });
+ {
+ Keys = {id},
+ EntityType = typeof (TResult)
+ });
var lazyLoadOperation = new LazyLoadOperation<TResult>(id, new LoadOperation(this, () =>
{
@@ -269,8 +271,8 @@ public Lazy<T[]> LazyLoadInternal<T>(string[] ids, string[] includes, Action<T[]
id,
shards = GetCommandsToOperateOn(new ShardRequestData
{
- Key = id,
- EntityType = typeof(T),
+ Keys = {id},
+ EntityType = typeof (T),
})
})
.GroupBy(x => x.shards, new DbCmdsListComparer());
@@ -322,7 +324,7 @@ private bool ExecuteLazyOperationsSingleStep()
{
var lazyOperations = operationPerShard.Select(x => x.Item1).ToArray();
var requests = lazyOperations.Select(x => x.CraeteRequest()).ToArray();
- var multiResponses = shardStrategy.ShardAccessStrategy.Apply(operationPerShard.Key, (commands, i) => commands.MultiGet(requests));
+ var multiResponses = shardStrategy.ShardAccessStrategy.Apply(operationPerShard.Key, new ShardRequestData(), (commands, i) => commands.MultiGet(requests));
var sb = new StringBuilder();
foreach (var response in from shardReponses in multiResponses
@@ -361,12 +363,12 @@ public T Load<T>(string id)
}
IncrementRequestCount();
- var dbCommands = GetCommandsToOperateOn(new ShardRequestData
- {
- EntityType = typeof (T),
- Key = id
- });
- var results = shardStrategy.ShardAccessStrategy.Apply(dbCommands, (commands, i) =>
+ var shardRequestData = new ShardRequestData
+ {
+ EntityType = typeof (T), Keys = {id}
+ };
+ var dbCommands = GetCommandsToOperateOn(shardRequestData);
+ var results = shardStrategy.ShardAccessStrategy.Apply(dbCommands, shardRequestData, (commands, i) =>
{
var loadOperation = new LoadOperation(this, commands.DisableAllCaching, id);
bool retry;
@@ -474,7 +476,7 @@ public override void Defer(params ICommandData[] commands)
{
var shardsToOperateOn = GetShardsToOperateOn(new ShardRequestData
{
- Key = cmd.Key
+ Keys = {cmd.Key}
}).Select(x => x.Item1).ToList();
if (shardsToOperateOn.Count == 0)
@@ -579,32 +581,34 @@ public void Refresh<T>(T entity)
IncrementRequestCount();
- var dbCommands = GetCommandsToOperateOn(new ShardRequestData
+ var shardRequestData = new ShardRequestData
{
- EntityType = typeof(T),
- Key = value.Key
- });
- //TODO: shard access
- foreach (var dbCmd in dbCommands)
+ EntityType = typeof (T), Keys = {value.Key}
+ };
+ var dbCommands = GetCommandsToOperateOn(shardRequestData);
+
+ var results = shardStrategy.ShardAccessStrategy.Apply(dbCommands, shardRequestData, (dbCmd, i) =>
{
var jsonDocument = dbCmd.Get(value.Key);
if (jsonDocument == null)
- continue;
+ return false;
value.Metadata = jsonDocument.Metadata;
- value.OriginalMetadata = (RavenJObject)jsonDocument.Metadata.CloneToken();
+ value.OriginalMetadata = (RavenJObject) jsonDocument.Metadata.CloneToken();
value.ETag = jsonDocument.Etag;
value.OriginalValue = jsonDocument.DataAsJson;
var newEntity = ConvertToEntity<T>(value.Key, jsonDocument.DataAsJson, jsonDocument.Metadata);
- foreach (var property in entity.GetType().GetProperties())
+ foreach (var property in entity.GetType().GetProperties().Where(property => property.CanWrite && property.CanRead && property.GetIndexParameters().Length == 0))
{
- if (!property.CanWrite || !property.CanRead || property.GetIndexParameters().Length != 0)
- continue;
property.SetValue(entity, property.GetValue(newEntity, null), null);
}
- }
+ return true;
+ });
- throw new InvalidOperationException("Document '" + value.Key + "' no longer exists and was probably deleted");
+ if (results.All(x => x == false))
+ {
+ throw new InvalidOperationException("Document '" + value.Key + "' no longer exists and was probably deleted");
+ }
}
IDatabaseCommands ISyncAdvancedSessionOperation.DatabaseCommands
@@ -690,7 +694,7 @@ public T[] LoadInternal<T>(string[] ids, string[] includes)
shards = GetCommandsToOperateOn(new ShardRequestData
{
EntityType = typeof (T),
- Key = id
+ Keys = {id}
})
})
.GroupBy(x => x.shards, new DbCmdsListComparer());
@@ -699,7 +703,11 @@ public T[] LoadInternal<T>(string[] ids, string[] includes)
foreach (var shard in idsAndShards)
{
var currentShardIds = shard.Select(x => x.id).ToArray();
- var multiLoadOperations = shardStrategy.ShardAccessStrategy.Apply(shard.Key, (dbCmd, i) =>
+ var multiLoadOperations = shardStrategy.ShardAccessStrategy.Apply(shard.Key, new ShardRequestData
+ {
+ EntityType = typeof(T),
+ Keys = currentShardIds.ToList()
+ }, (dbCmd, i) =>
{
var multiLoadOperation = new MultiLoadOperation(this, dbCmd.DisableAllCaching, currentShardIds);
MultiLoadResult multiLoadResult;
@@ -753,7 +761,7 @@ public T[] LoadInternal<T>(string[] ids)
shards = GetCommandsToOperateOn(new ShardRequestData
{
EntityType = typeof(T),
- Key = id
+ Keys = {id}
})
})
.GroupBy(x => x.shards, new DbCmdsListComparer());
@@ -761,7 +769,11 @@ public T[] LoadInternal<T>(string[] ids)
foreach (var shard in idsAndShards)
{
var currentShardIds = shard.Select(x => x.id).ToArray();
- var multiLoadOperations = shardStrategy.ShardAccessStrategy.Apply(shard.Key, (dbCmd, i) =>
+ var multiLoadOperations = shardStrategy.ShardAccessStrategy.Apply(shard.Key, new ShardRequestData
+ {
+ EntityType = typeof(T),
+ Keys = currentShardIds.ToList()
+ }, (dbCmd, i) =>
{
var multiLoadOperation = new MultiLoadOperation(this, dbCmd.DisableAllCaching, currentShardIds);
MultiLoadResult multiLoadResult;
View
12 Raven.Client.Lightweight/Shard/ShardedDocumentStore.cs
@@ -281,11 +281,13 @@ public IDatabaseCommands DatabaseCommandsFor(string shardId)
public override void ExecuteIndex(AbstractIndexCreationTask indexCreationTask)
{
var list = ShardStrategy.Shards.Values.Select(x => x.DatabaseCommands).ToList();
- ShardStrategy.ShardAccessStrategy.Apply<object>(list, (commands, i) =>
- {
- indexCreationTask.Execute(commands, Conventions);
- return null;
- });
+ ShardStrategy.ShardAccessStrategy.Apply<object>(list,
+ new ShardRequestData()
+ , (commands, i) =>
+ {
+ indexCreationTask.Execute(commands, Conventions);
+ return null;
+ });
}
}
}
View
1  Raven.Tests/Shard/RoundRobinSharding.cs
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using Raven.Client;
+using Raven.Client.Connection;
using Raven.Client.Document;
using Raven.Client.Shard;
using Raven.Server;
View
4 Raven.Tests/Shard/WhenUsingParallelAccessStrategy.cs
@@ -38,7 +38,7 @@ public void NullResultIsNotAnException()
using (var shard1 = new DocumentStore { Url = "http://localhost:8079" }.Initialize())
using (var session = shard1.OpenSession())
{
- var results = new ParallelShardAccessStrategy().Apply(new[] { session.Advanced.DatabaseCommands }, (x, i) => (IList<Company>)null);
+ var results = new ParallelShardAccessStrategy().Apply(new[] { session.Advanced.DatabaseCommands }, new ShardRequestData(), (x, i) => (IList<Company>)null);
Assert.Equal(1, results.Length);
Assert.Null(results[0]);
@@ -53,7 +53,7 @@ public void ExecutionExceptionsAreRethrown()
using (var session = shard1.OpenSession())
{
var parallelShardAccessStrategy = new ParallelShardAccessStrategy();
- Assert.Throws<ApplicationException>(() => parallelShardAccessStrategy.Apply<object>(new[] {session.Advanced.DatabaseCommands}, (x, i) => { throw new ApplicationException(); }));
+ Assert.Throws<ApplicationException>(() => parallelShardAccessStrategy.Apply<object>(new[] {session.Advanced.DatabaseCommands}, new ShardRequestData(), (x, i) => { throw new ApplicationException(); }));
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.