diff --git a/Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs b/Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs
new file mode 100644
index 000000000000..29c957680ee7
--- /dev/null
+++ b/Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs
@@ -0,0 +1,39 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
+//
+// -----------------------------------------------------------------------
+using System;
+using System.Collections.Generic;
+using System.Configuration;
+using System.Linq;
+
+using Raven.Abstractions.Data;
+using Raven.Database.Indexing;
+
+namespace Raven.Database.Bundles.SqlReplication
+{
+ internal static class SqlReplicationClassifier
+ {
+ private static readonly Dictionary> Empty = new Dictionary>();
+
+ public static Dictionary> GroupConfigs(IList configs, Func getLastEtagFor)
+ {
+ if (configs.Count == 0)
+ return Empty;
+
+ var configsByEtag = configs
+ .Where(x => x.Disabled == false)
+ .Select(x => new SqlReplicationConfigWithLastReplicatedEtag(x, getLastEtagFor(x)))
+ .GroupBy(x => x.LastReplicatedEtag, DefaultIndexingClassifier.RoughEtagEqualityAndComparison.Instance)
+ .OrderByDescending(x => x.Key, DefaultIndexingClassifier.RoughEtagEqualityAndComparison.Instance)
+ .ToList();
+
+ if (configsByEtag.Count == 0)
+ return Empty;
+
+ return configsByEtag
+ .ToDictionary(x => x.Min(y => y.LastReplicatedEtag), x => x.Select(y => y).ToList());
+ }
+ }
+}
\ No newline at end of file
diff --git a/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs b/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs
index 4c1131720709..1876f1e8ab50 100644
--- a/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs
+++ b/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs
@@ -1,9 +1,35 @@
using System;
using System.Collections.Generic;
-using RTools_NTS.Util;
+
+using Raven.Abstractions.Data;
namespace Raven.Database.Bundles.SqlReplication
{
+ internal class SqlReplicationConfigWithLastReplicatedEtag : SqlReplicationConfig
+ {
+ public SqlReplicationConfigWithLastReplicatedEtag(SqlReplicationConfig config, Etag lastReplicatedEtag)
+ {
+ LastReplicatedEtag = lastReplicatedEtag;
+
+ ConnectionString = config.ConnectionString;
+ ConnectionStringName = config.ConnectionStringName;
+ ConnectionStringSettingName = config.ConnectionStringSettingName;
+ Disabled = config.Disabled;
+ FactoryName = config.FactoryName;
+ ForceSqlServerQueryRecompile = config.ForceSqlServerQueryRecompile;
+ Id = config.Id;
+ Name = config.Name;
+ ParameterizeDeletesDisabled = config.ParameterizeDeletesDisabled;
+ PredefinedConnectionStringSettingName = config.PredefinedConnectionStringSettingName;
+ QuoteTables = config.QuoteTables;
+ RavenEntityName = config.RavenEntityName;
+ Script = config.Script;
+ SqlReplicationTables = config.SqlReplicationTables;
+ }
+
+ public Etag LastReplicatedEtag { get; private set; }
+ }
+
public class SqlReplicationConfig
{
public SqlReplicationConfig()
diff --git a/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs b/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs
index 03e5d20a0ed9..b15e7e8c4162 100644
--- a/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs
+++ b/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs
@@ -22,16 +22,14 @@
using Raven.Abstractions.Logging;
using Raven.Abstractions.Util;
using Raven.Database.Extensions;
-using Raven.Database.Impl;
using Raven.Database.Indexing;
using Raven.Database.Json;
using Raven.Database.Plugins;
using Raven.Database.Prefetching;
using Raven.Database.Storage;
+using Raven.Database.Util;
using Raven.Json.Linq;
-using Task = System.Threading.Tasks.Task;
-
namespace Raven.Database.Bundles.SqlReplication
{
[InheritedExport(typeof(IStartupTask))]
@@ -53,8 +51,9 @@ private class ReplicatedDoc
public string Key;
}
- public const string RavenSqlReplicationStatus = "Raven/SqlReplication/Status";
- private readonly static ILog log = LogManager.GetCurrentClassLogger();
+ public const string RavenSqlReplicationStatus = "Raven/SqlReplication/Status";
+
+ private readonly static ILog Log = LogManager.GetCurrentClassLogger();
public event Action AfterReplicationCompleted = delegate { };
readonly Metrics sqlReplicationMetrics = new Metrics();
@@ -67,14 +66,19 @@ private class ReplicatedDoc
{
get { return statistics; }
}
- private PrefetchingBehavior prefetchingBehavior;
public readonly ConcurrentDictionary SqlReplicationMetricsCounters =
new ConcurrentDictionary();
+ private readonly ConcurrentSet prefetchingBehaviors = new ConcurrentSet();
+
+ private PrefetchingBehavior defaultPrefetchingBehavior;
+
public void Execute(DocumentDatabase database)
{
- prefetchingBehavior = database.Prefetcher.CreatePrefetchingBehavior(PrefetchingUser.SqlReplicator, null);
+ defaultPrefetchingBehavior = database.Prefetcher.CreatePrefetchingBehavior(PrefetchingUser.SqlReplicator, null);
+ defaultPrefetchingBehavior.ShouldHandleUnusedDocumentsAddedAfterCommit = true;
+ prefetchingBehaviors.TryAdd(defaultPrefetchingBehavior);
Database = database;
Database.Notifications.OnDocumentChange += (sender, notification, metadata) =>
@@ -95,7 +99,7 @@ public void Execute(DocumentDatabase database)
return;
replicationConfigs = null;
- log.Debug(() => "Sql Replication configuration was changed.");
+ Log.Debug(() => "Sql Replication configuration was changed.");
};
GetReplicationStatus();
@@ -110,7 +114,7 @@ public void Execute(DocumentDatabase database)
}
catch (Exception e)
{
- log.ErrorException("Fatal failure when replicating to SQL. All SQL Replication activity STOPPED", e);
+ Log.ErrorException("Fatal failure when replicating to SQL. All SQL Replication activity STOPPED", e);
}
}
}, TaskCreationOptions.LongRunning);
@@ -143,8 +147,8 @@ private void RecordDelete(string id, RavenJObject metadata)
if (hasChanges)
Database.WorkContext.ShouldNotifyAboutWork(() => "recorded a deleted document " + id);
});
- if (log.IsDebugEnabled)
- log.Debug(() => "recorded a deleted document " + id);
+ if (Log.IsDebugEnabled)
+ Log.Debug(() => "recorded a deleted document " + id);
}
private SqlReplicationStatus GetReplicationStatus()
@@ -191,208 +195,253 @@ private void BackgroundSqlReplication()
}) // have error or the timeout expired
.ToList();
- if (relevantConfigs.Count == 0)
+ var configGroups = SqlReplicationClassifier.GroupConfigs(relevantConfigs, c => GetLastEtagFor(localReplicationStatus, c));
+
+ if (configGroups.Count == 0)
{
Database.WorkContext.WaitForWork(TimeSpan.FromMinutes(10), ref workCounter, "Sql Replication");
continue;
}
- var leastReplicatedEtag = GetLeastReplicatedEtag(relevantConfigs, localReplicationStatus);
+ var usedPrefetchers = new ConcurrentSet();
- if (leastReplicatedEtag == null)
- {
- Database.WorkContext.WaitForWork(TimeSpan.FromMinutes(10), ref workCounter, "Sql Replication");
- continue;
- }
+ var groupedConfigs = configGroups
+ .Select(x =>
+ {
+ var result = new SqlConfigGroup
+ {
+ LastReplicatedEtag = x.Key,
+ ConfigsToWorkOn = x.Value,
+ PrefetchingBehavior = GetPrefetcherFor(x.Key, usedPrefetchers)
+ };
- List documents;
+ result.PrefetchingBehavior.AdditionalInfo = string.Format("Default prefetcher: {0}. For sql config group: [Configs: {1}, LastReplicatedEtag: {2}]", result.PrefetchingBehavior == defaultPrefetchingBehavior, string.Join(", ", result.ConfigsToWorkOn.Select(y => y.Name)), result.LastReplicatedEtag);
- using (prefetchingBehavior.DocumentBatchFrom(leastReplicatedEtag, out documents))
+ return result;
+ })
+ .ToList();
+
+ var successes = new ConcurrentQueue>();
+ try
{
- Etag latestEtag = null, lastBatchEtag = null;
- if (documents.Count != 0)
- lastBatchEtag = documents[documents.Count - 1].Etag;
+ BackgroundTaskExecuter.Instance.ExecuteAll(Database.WorkContext, groupedConfigs, (sqlConfigGroup, i) =>
+ {
+ Database.WorkContext.CancellationToken.ThrowIfCancellationRequested();
- var replicationDuration = Stopwatch.StartNew();
- documents.RemoveAll(x => x.Key.StartsWith("Raven/", StringComparison.InvariantCultureIgnoreCase)); // we ignore system documents here
+ var prefetchingBehavior = sqlConfigGroup.PrefetchingBehavior;
+ var configsToWorkOn = sqlConfigGroup.ConfigsToWorkOn;
- if (documents.Count != 0)
- latestEtag = documents[documents.Count - 1].Etag;
+ List documents;
+ using (prefetchingBehavior.DocumentBatchFrom(sqlConfigGroup.LastReplicatedEtag, out documents))
+ {
+ Etag latestEtag = null, lastBatchEtag = null;
+ if (documents.Count != 0)
+ lastBatchEtag = documents[documents.Count - 1].Etag;
- documents.RemoveAll(x => prefetchingBehavior.FilterDocuments(x) == false);
+ var replicationDuration = Stopwatch.StartNew();
+ documents.RemoveAll(x => x.Key.StartsWith("Raven/", StringComparison.InvariantCultureIgnoreCase)); // we ignore system documents here
- var deletedDocsByConfig = new Dictionary>();
+ if (documents.Count != 0)
+ latestEtag = documents[documents.Count - 1].Etag;
- foreach (var relevantConfig in relevantConfigs)
- {
- var cfg = relevantConfig;
- Database.TransactionalStorage.Batch(accessor =>
- {
- deletedDocsByConfig[cfg] = accessor.Lists.Read(GetSqlReplicationDeletionName(cfg),
- GetLastEtagFor(localReplicationStatus, cfg),
- latestEtag,
- MaxNumberOfDeletionsToReplicate + 1)
- .ToList();
- });
- }
+ documents.RemoveAll(x => prefetchingBehavior.FilterDocuments(x) == false);
- // No documents AND there aren't any deletes to replicate
- if (documents.Count == 0 && deletedDocsByConfig.Sum(x => x.Value.Count) == 0)
- {
- if (latestEtag != null)
- {
- // so we filtered some documents, let us update the etag about that.
- foreach (var lastReplicatedEtag in localReplicationStatus.LastReplicatedEtags)
+ var deletedDocsByConfig = new Dictionary>();
+
+ foreach (var configToWorkOn in configsToWorkOn)
{
- if (lastReplicatedEtag.LastDocEtag.CompareTo(latestEtag) <= 0)
- lastReplicatedEtag.LastDocEtag = latestEtag;
+ var cfg = configToWorkOn;
+ Database.TransactionalStorage.Batch(accessor =>
+ {
+ deletedDocsByConfig[cfg] = accessor.Lists.Read(GetSqlReplicationDeletionName(cfg),
+ cfg.LastReplicatedEtag,
+ latestEtag,
+ MaxNumberOfDeletionsToReplicate + 1)
+ .ToList();
+ });
}
- latestEtag = Etag.Max(latestEtag, lastBatchEtag);
- SaveNewReplicationStatus(localReplicationStatus);
- }
- else // no point in waiting if we just saved a new doc
- {
- Database.WorkContext.WaitForWork(TimeSpan.FromMinutes(10), ref workCounter, "Sql Replication");
- }
- continue;
- }
+ // No documents AND there aren't any deletes to replicate
+ if (documents.Count == 0 && deletedDocsByConfig.Sum(x => x.Value.Count) == 0)
+ {
+ // so we filtered some documents, let us update the etag about that.
+ if (latestEtag != null)
+ {
+ foreach (var configToWorkOn in configsToWorkOn)
+ successes.Enqueue(Tuple.Create(configToWorkOn, latestEtag));
+ }
- var successes = new ConcurrentQueue>();
- try
- {
- var itemsToReplicate = documents.Select(x =>
- {
- JsonDocument.EnsureIdInMetadata(x);
- var doc = x.ToJson();
- doc[Constants.DocumentIdFieldName] = x.Key;
+ return;
+ }
- return new ReplicatedDoc
+ var itemsToReplicate = documents.Select(x =>
{
- Document = doc,
- Etag = x.Etag,
- Key = x.Key,
- SerializedSizeOnDisk = x.SerializedSizeOnDisk
- };
- }).ToList();
-
- BackgroundTaskExecuter.Instance.ExecuteAllInterleaved(Database.WorkContext, relevantConfigs, replicationConfig =>
- {
+ JsonDocument.EnsureIdInMetadata(x);
+ var doc = x.ToJson();
+ doc[Constants.DocumentIdFieldName] = x.Key;
+
+ return new ReplicatedDoc
+ {
+ Document = doc,
+ Etag = x.Etag,
+ Key = x.Key,
+ SerializedSizeOnDisk = x.SerializedSizeOnDisk
+ };
+ }).ToList();
+
try
{
- var startTime = SystemTime.UtcNow;
- Stopwatch spRepTime = new Stopwatch();
- spRepTime.Start();
- var lastReplicatedEtag = GetLastEtagFor(localReplicationStatus, replicationConfig);
-
- var deletedDocs = deletedDocsByConfig[replicationConfig];
- var docsToReplicate = itemsToReplicate
- .Where(x => lastReplicatedEtag.CompareTo(x.Etag) < 0) // haven't replicate the etag yet
- .Where(document =>
+ BackgroundTaskExecuter.Instance.ExecuteAllInterleaved(Database.WorkContext, configsToWorkOn, replicationConfig =>
+ {
+ try
{
- var info = Database.Documents.GetRecentTouchesFor(document.Key);
- if (info != null)
+ var startTime = SystemTime.UtcNow;
+ var spRepTime = new Stopwatch();
+ spRepTime.Start();
+ var lastReplicatedEtag = replicationConfig.LastReplicatedEtag;
+
+ var deletedDocs = deletedDocsByConfig[replicationConfig];
+ var docsToReplicate = itemsToReplicate
+ .Where(x => lastReplicatedEtag.CompareTo(x.Etag) < 0) // haven't replicate the etag yet
+ .Where(document =>
+ {
+ var info = Database.Documents.GetRecentTouchesFor(document.Key);
+ if (info != null)
+ {
+ if (info.TouchedEtag.CompareTo(lastReplicatedEtag) > 0)
+ {
+ Log.Debug(
+ "Will not replicate document '{0}' to '{1}' because the updates after etag {2} are related document touches",
+ document.Key, replicationConfig.Name, info.TouchedEtag);
+ return false;
+ }
+ }
+ return true;
+ });
+
+ if (deletedDocs.Count >= MaxNumberOfDeletionsToReplicate + 1)
+ docsToReplicate = docsToReplicate.Where(x => EtagUtil.IsGreaterThan(x.Etag, deletedDocs[deletedDocs.Count - 1].Etag) == false);
+
+ var docsToReplicateAsList = docsToReplicate.ToList();
+
+ var currentLatestEtag = HandleDeletesAndChangesMerging(deletedDocs, docsToReplicateAsList);
+ if (currentLatestEtag == null && itemsToReplicate.Count > 0 && docsToReplicateAsList.Count == 0)
+ currentLatestEtag = lastBatchEtag;
+
+ int countOfReplicatedItems = 0;
+ if (ReplicateDeletionsToDestination(replicationConfig, deletedDocs) &&
+ ReplicateChangesToDestination(replicationConfig, docsToReplicateAsList, out countOfReplicatedItems))
{
- if (info.TouchedEtag.CompareTo(lastReplicatedEtag) > 0)
+ if (deletedDocs.Count > 0)
{
- log.Debug(
- "Will not replicate document '{0}' to '{1}' because the updates after etag {2} are related document touches",
- document.Key, replicationConfig.Name, info.TouchedEtag);
- return false;
+ Database.TransactionalStorage.Batch(accessor =>
+ accessor.Lists.RemoveAllBefore(GetSqlReplicationDeletionName(replicationConfig), deletedDocs[deletedDocs.Count - 1].Etag));
}
+ successes.Enqueue(Tuple.Create(replicationConfig, currentLatestEtag));
}
- return true;
- });
- if (deletedDocs.Count >= MaxNumberOfDeletionsToReplicate + 1)
- docsToReplicate = docsToReplicate.Where(x => EtagUtil.IsGreaterThan(x.Etag, deletedDocs[deletedDocs.Count - 1].Etag) == false);
+ spRepTime.Stop();
+ var elapsedMicroseconds = (long)(spRepTime.ElapsedTicks * SystemTime.MicroSecPerTick);
- var docsToReplicateAsList = docsToReplicate.ToList();
+ var sqlReplicationMetricsCounters = GetSqlReplicationMetricsManager(replicationConfig);
+ sqlReplicationMetricsCounters.SqlReplicationBatchSizeMeter.Mark(countOfReplicatedItems);
+ sqlReplicationMetricsCounters.SqlReplicationBatchSizeHistogram.Update(countOfReplicatedItems);
+ sqlReplicationMetricsCounters.SqlReplicationDurationHistogram.Update(elapsedMicroseconds);
- var currentLatestEtag = HandleDeletesAndChangesMerging(deletedDocs, docsToReplicateAsList);
- if (currentLatestEtag == null && itemsToReplicate.Count > 0 && docsToReplicateAsList.Count == 0)
- currentLatestEtag = lastBatchEtag;
-
- int countOfReplicatedItems = 0;
- if (ReplicateDeletionsToDestination(replicationConfig, deletedDocs) &&
- ReplicateChangesToDestination(replicationConfig, docsToReplicateAsList, out countOfReplicatedItems))
- {
- if (deletedDocs.Count > 0)
+ UpdateReplicationPerformance(replicationConfig, startTime, spRepTime.Elapsed, docsToReplicateAsList.Count);
+ }
+ catch (Exception e)
{
- Database.TransactionalStorage.Batch(accessor =>
- accessor.Lists.RemoveAllBefore(GetSqlReplicationDeletionName(replicationConfig), deletedDocs[deletedDocs.Count - 1].Etag));
+ Log.WarnException("Error while replication to SQL destination: " + replicationConfig.Name, e);
+ Database.AddAlert(new Alert
+ {
+ AlertLevel = AlertLevel.Error,
+ CreatedAt = SystemTime.UtcNow,
+ Exception = e.ToString(),
+ Title = "Sql Replication failure to replication",
+ Message = "Sql Replication could not replicate to " + replicationConfig.Name,
+ UniqueKey = "Sql Replication could not replicate to " + replicationConfig.Name
+ });
}
- successes.Enqueue(Tuple.Create(replicationConfig, currentLatestEtag));
- }
-
- spRepTime.Stop();
- var elapsedMicroseconds = (long)(spRepTime.ElapsedTicks * SystemTime.MicroSecPerTick);
-
- var sqlReplicationMetricsCounters = GetSqlReplicationMetricsManager(replicationConfig);
- sqlReplicationMetricsCounters.SqlReplicationBatchSizeMeter.Mark(countOfReplicatedItems);
- sqlReplicationMetricsCounters.SqlReplicationBatchSizeHistogram.Update(countOfReplicatedItems);
- sqlReplicationMetricsCounters.SqlReplicationDurationHistogram.Update(elapsedMicroseconds);
-
- UpdateReplicationPerformance(replicationConfig, startTime, spRepTime.Elapsed, docsToReplicateAsList.Count);
-
- }
- catch (Exception e)
- {
- log.WarnException("Error while replication to SQL destination: " + replicationConfig.Name, e);
- Database.AddAlert(new Alert
- {
- AlertLevel = AlertLevel.Error,
- CreatedAt = SystemTime.UtcNow,
- Exception = e.ToString(),
- Title = "Sql Replication failure to replication",
- Message = "Sql Replication could not replicate to " + replicationConfig.Name,
- UniqueKey = "Sql Replication could not replicate to " + replicationConfig.Name
});
}
- });
- if (successes.Count == 0)
- continue;
- foreach (var t in successes)
- {
- var cfg = t.Item1;
- var currentLatestEtag = t.Item2;
- var destEtag = localReplicationStatus.LastReplicatedEtags.FirstOrDefault(x => string.Equals(x.Name, cfg.Name, StringComparison.InvariantCultureIgnoreCase));
- if (destEtag == null)
+ finally
{
- localReplicationStatus.LastReplicatedEtags.Add(new LastReplicatedEtag
- {
- Name = cfg.Name,
- LastDocEtag = currentLatestEtag ?? Etag.Empty
- });
- }
- else
- {
- var lastDocEtag = destEtag.LastDocEtag;
- if (currentLatestEtag != null && EtagUtil.IsGreaterThan(currentLatestEtag, lastDocEtag))
- lastDocEtag = currentLatestEtag;
-
- destEtag.LastDocEtag = lastDocEtag;
+ prefetchingBehavior.CleanupDocuments(lastBatchEtag);
+ prefetchingBehavior.UpdateAutoThrottler(documents, replicationDuration.Elapsed);
}
}
+ });
- SaveNewReplicationStatus(localReplicationStatus);
- }
- finally
+ if (successes.Count == 0)
+ break;
+
+ foreach (var t in successes)
{
- AfterReplicationCompleted(successes.Count);
- var min = localReplicationStatus.LastReplicatedEtags.Min(x => new ComparableByteArray(x.LastDocEtag.ToByteArray()));
- if (min != null)
+ var cfg = t.Item1;
+ var currentLatestEtag = t.Item2;
+ var destEtag = localReplicationStatus.LastReplicatedEtags.FirstOrDefault(x => string.Equals(x.Name, cfg.Name, StringComparison.InvariantCultureIgnoreCase));
+ if (destEtag == null)
+ {
+ localReplicationStatus.LastReplicatedEtags.Add(new LastReplicatedEtag
+ {
+ Name = cfg.Name,
+ LastDocEtag = currentLatestEtag ?? Etag.Empty
+ });
+ }
+ else
{
- var lastMinReplicatedEtag = min.ToEtag();
- prefetchingBehavior.CleanupDocuments(lastMinReplicatedEtag);
- prefetchingBehavior.UpdateAutoThrottler(documents, replicationDuration.Elapsed);
+ var lastDocEtag = destEtag.LastDocEtag;
+ if (currentLatestEtag != null && EtagUtil.IsGreaterThan(currentLatestEtag, lastDocEtag))
+ lastDocEtag = currentLatestEtag;
+
+ destEtag.LastDocEtag = lastDocEtag;
}
}
+ SaveNewReplicationStatus(localReplicationStatus);
+ }
+ finally
+ {
+ AfterReplicationCompleted(successes.Count);
+ RemoveUnusedPrefetchers(usedPrefetchers);
}
}
}
+ private void RemoveUnusedPrefetchers(IEnumerable usedPrefetchingBehaviors)
+ {
+ var unused = prefetchingBehaviors.Except(usedPrefetchingBehaviors.Union(new[]
+ {
+ defaultPrefetchingBehavior
+ })).ToList();
+
+ if (unused.Count == 0)
+ return;
+
+ foreach (var unusedPrefetcher in unused)
+ {
+ prefetchingBehaviors.TryRemove(unusedPrefetcher);
+ Database.Prefetcher.RemovePrefetchingBehavior(unusedPrefetcher);
+ }
+ }
+
+ private PrefetchingBehavior GetPrefetcherFor(Etag fromEtag, ConcurrentSet usedPrefetchers)
+ {
+ foreach (var prefetchingBehavior in prefetchingBehaviors)
+ {
+ if (prefetchingBehavior.CanUsePrefetcherToLoadFrom(fromEtag) && usedPrefetchers.TryAdd(prefetchingBehavior))
+ return prefetchingBehavior;
+ }
+
+ var newPrefetcher = Database.Prefetcher.CreatePrefetchingBehavior(PrefetchingUser.SqlReplicator, null);
+
+ prefetchingBehaviors.Add(newPrefetcher);
+ usedPrefetchers.Add(newPrefetcher);
+
+ return newPrefetcher;
+ }
+
private void UpdateReplicationPerformance(SqlReplicationConfig replicationConfig, DateTime startTime, TimeSpan elapsed, int batchSize)
{
var performance = new SqlReplicationPerformanceStats
@@ -489,7 +538,7 @@ private bool ReplicateDeletionsToDestination(SqlReplicationConfig cfg, IEnumerab
writer.DeleteItems(sqlReplicationTable.TableName, sqlReplicationTable.DocumentKeyColumn, cfg.ParameterizeDeletesDisabled, identifiers);
}
writer.Commit();
- log.Debug("Replicated deletes of {0} for config {1}", string.Join(", ", identifiers), cfg.Name);
+ Log.Debug("Replicated deletes of {0} for config {1}", string.Join(", ", identifiers), cfg.Name);
}
return true;
@@ -500,29 +549,14 @@ private static string GetSqlReplicationDeletionName(SqlReplicationConfig replica
return "SqlReplication/Deletions/" + replicationConfig.Name;
}
- private Etag GetLeastReplicatedEtag(IEnumerable config, SqlReplicationStatus localReplicationStatus)
- {
- Etag leastReplicatedEtag = null;
- foreach (var sqlReplicationConfig in config)
- {
- var lastEtag = GetLastEtagFor(localReplicationStatus, sqlReplicationConfig);
- if (leastReplicatedEtag == null)
- leastReplicatedEtag = lastEtag;
- else if (lastEtag.CompareTo(leastReplicatedEtag) < 0)
- leastReplicatedEtag = lastEtag;
- }
-
- return leastReplicatedEtag;
- }
-
private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable docs, out int countOfReplicatedItems)
{
countOfReplicatedItems = 0;
- var replicationStats = statistics.GetOrAdd(cfg.Name, name => new SqlReplicationStatistics(name));
+ var replicationStats = statistics.GetOrAdd(cfg.Name, name => new SqlReplicationStatistics(name));
var scriptResult = ApplyConversionScript(cfg, docs, replicationStats);
if (scriptResult.Ids.Count == 0)
return true;
-
+
countOfReplicatedItems = scriptResult.Data.Sum(x => x.Value.Count);
try
{
@@ -530,12 +564,12 @@ private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable
{
if (writer.Execute(scriptResult))
{
- log.Debug("Replicated changes of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name);
+ Log.Debug("Replicated changes of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name);
replicationStats.CompleteSuccess(countOfReplicatedItems);
}
else
{
- log.Debug("Replicated changes (with some errors) of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name);
+ Log.Debug("Replicated changes (with some errors) of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name);
replicationStats.Success(countOfReplicatedItems);
}
}
@@ -543,7 +577,7 @@ private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable
}
catch (Exception e)
{
- log.WarnException("Failure to replicate changes to relational database for: " + cfg.Name, e);
+ Log.WarnException("Failure to replicate changes to relational database for: " + cfg.Name, e);
SqlReplicationStatistics replicationStatistics;
DateTime newTime;
if (statistics.TryGetValue(cfg.Name, out replicationStatistics) == false)
@@ -552,16 +586,16 @@ private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable
}
else
{
- if (replicationStatistics.LastErrorTime == DateTime.MinValue)
- {
- newTime = SystemTime.UtcNow.AddSeconds(5);
- }
- else
- {
- // double the fallback time (but don't cross 15 minutes)
- var totalSeconds = (SystemTime.UtcNow - replicationStatistics.LastErrorTime).TotalSeconds;
- newTime = SystemTime.UtcNow.AddSeconds(Math.Min(60*15, Math.Max(5, totalSeconds*2)));
- }
+ if (replicationStatistics.LastErrorTime == DateTime.MinValue)
+ {
+ newTime = SystemTime.UtcNow.AddSeconds(5);
+ }
+ else
+ {
+ // double the fallback time (but don't cross 15 minutes)
+ var totalSeconds = (SystemTime.UtcNow - replicationStatistics.LastErrorTime).TotalSeconds;
+ newTime = SystemTime.UtcNow.AddSeconds(Math.Min(60 * 15, Math.Max(5, totalSeconds * 2)));
+ }
}
replicationStats.RecordWriteError(e, Database, countOfReplicatedItems, newTime);
return false;
@@ -588,9 +622,9 @@ private ConversionScriptResult ApplyConversionScript(SqlReplicationConfig cfg, I
{
patcher.Apply(scope, replicatedDoc.Document, new ScriptedPatchRequest { Script = cfg.Script }, replicatedDoc.SerializedSizeOnDisk);
- if (log.IsDebugEnabled && patcher.Debug.Count > 0)
+ if (Log.IsDebugEnabled && patcher.Debug.Count > 0)
{
- log.Debug("Debug output for doc: {0} for script {1}:\r\n.{2}", replicatedDoc.Key, cfg.Name, string.Join("\r\n", patcher.Debug));
+ Log.Debug("Debug output for doc: {0} for script {1}:\r\n.{2}", replicatedDoc.Key, cfg.Name, string.Join("\r\n", patcher.Debug));
patcher.Debug.Clear();
}
@@ -601,14 +635,14 @@ private ConversionScriptResult ApplyConversionScript(SqlReplicationConfig cfg, I
{
replicationStats.MarkScriptAsInvalid(Database, cfg.Script);
- log.WarnException("Could not parse SQL Replication script for " + cfg.Name, e);
+ Log.WarnException("Could not parse SQL Replication script for " + cfg.Name, e);
return result;
}
catch (Exception diffExceptionName)
{
replicationStats.RecordScriptError(Database, diffExceptionName);
- log.WarnException("Could not process SQL Replication script for " + cfg.Name + ", skipping document: " + replicatedDoc.Key, diffExceptionName);
+ Log.WarnException("Could not process SQL Replication script for " + cfg.Name + ", skipping document: " + replicatedDoc.Key, diffExceptionName);
}
}
}
@@ -666,12 +700,12 @@ public RelationalDatabaseWriter.TableQuerySummary[] SimulateSqlReplicationSQLQue
else
{
var simulatedwriter = new RelationalDatabaseWriterSimulator(Database, sqlReplication, stats);
- resutls = new List()
+ resutls = new List
{
- new RelationalDatabaseWriter.TableQuerySummary()
+ new RelationalDatabaseWriter.TableQuerySummary
{
Commands = simulatedwriter.SimulateExecuteCommandText(scriptResult)
- .Select(x => new RelationalDatabaseWriter.TableQuerySummary.CommandData()
+ .Select(x => new RelationalDatabaseWriter.TableQuerySummary.CommandData
{
CommandText = x
}).ToArray()
@@ -686,7 +720,7 @@ public RelationalDatabaseWriter.TableQuerySummary[] SimulateSqlReplicationSQLQue
}
catch (Exception e)
{
- alert = new Alert()
+ alert = new Alert
{
AlertLevel = AlertLevel.Error,
CreatedAt = SystemTime.UtcNow,
@@ -730,8 +764,8 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep
if (validateSqlReplicationName && string.IsNullOrWhiteSpace(cfg.Name))
{
if (writeToLog)
- log.Warn("Could not find name for sql replication document {0}, ignoring", sqlReplicationConfigDocumentKey);
- replicationStats.LastAlert = new Alert()
+ Log.Warn("Could not find name for sql replication document {0}, ignoring", sqlReplicationConfigDocumentKey);
+ replicationStats.LastAlert = new Alert
{
AlertLevel = AlertLevel.Error,
CreatedAt = DateTime.UtcNow,
@@ -751,10 +785,10 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep
else
{
if (writeToLog)
- log.Warn("Could not find predefined connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.",
+ Log.Warn("Could not find predefined connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.",
cfg.PredefinedConnectionStringSettingName,
sqlReplicationConfigDocumentKey);
- replicationStats.LastAlert = new Alert()
+ replicationStats.LastAlert = new Alert
{
AlertLevel = AlertLevel.Error,
CreatedAt = DateTime.UtcNow,
@@ -772,10 +806,10 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep
if (connectionString == null)
{
if (writeToLog)
- log.Warn("Could not find connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.",
+ Log.Warn("Could not find connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.",
cfg.ConnectionStringName, sqlReplicationConfigDocumentKey);
- replicationStats.LastAlert = new Alert()
+ replicationStats.LastAlert = new Alert
{
AlertLevel = AlertLevel.Error,
CreatedAt = DateTime.UtcNow,
@@ -794,10 +828,10 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep
if (string.IsNullOrWhiteSpace(setting))
{
if (writeToLog)
- log.Warn("Could not find setting named '{0}' for sql replication config: {1}, ignoring sql replication setting.",
+ Log.Warn("Could not find setting named '{0}' for sql replication config: {1}, ignoring sql replication setting.",
cfg.ConnectionStringSettingName,
sqlReplicationConfigDocumentKey);
- replicationStats.LastAlert = new Alert()
+ replicationStats.LastAlert = new Alert
{
AlertLevel = AlertLevel.Error,
CreatedAt = DateTime.UtcNow,
@@ -814,8 +848,19 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep
public void Dispose()
{
- if (prefetchingBehavior != null)
+ foreach (var prefetchingBehavior in prefetchingBehaviors)
+ {
prefetchingBehavior.Dispose();
+ }
+ }
+
+ private class SqlConfigGroup
+ {
+ public Etag LastReplicatedEtag { get; set; }
+
+ public List ConfigsToWorkOn { get; set; }
+
+ public PrefetchingBehavior PrefetchingBehavior { get; set; }
}
}
}
\ No newline at end of file
diff --git a/Raven.Database/Indexing/DefaultIndexingClassifier.cs b/Raven.Database/Indexing/DefaultIndexingClassifier.cs
index 0487f6ff6cdd..20d0c7a5f30a 100644
--- a/Raven.Database/Indexing/DefaultIndexingClassifier.cs
+++ b/Raven.Database/Indexing/DefaultIndexingClassifier.cs
@@ -15,8 +15,6 @@ public class DefaultIndexingClassifier : IIndexingClassifier
{
private readonly Dictionary> empty = new Dictionary>();
- private RoughEtagEqualityAndComparison comparison = new RoughEtagEqualityAndComparison();
-
public Dictionary> GroupMapIndexes(IList indexes)
{
if (indexes.Count == 0)
@@ -24,8 +22,8 @@ public class DefaultIndexingClassifier : IIndexingClassifier
var indexesByIndexedEtag = indexes
.Where(x => x.Index.IsMapIndexingInProgress == false) // indexes with precomputed docs are processed separately
- .GroupBy(x => x.LastIndexedEtag, comparison)
- .OrderByDescending(x => x.Key, comparison)
+ .GroupBy(x => x.LastIndexedEtag, RoughEtagEqualityAndComparison.Instance)
+ .OrderByDescending(x => x.Key, RoughEtagEqualityAndComparison.Instance)
.ToList();
if (indexesByIndexedEtag.Count == 0)
@@ -39,6 +37,8 @@ public class DefaultIndexingClassifier : IIndexingClassifier
// at that point, it doesn't matter much, it would be gone within one or two indexing cycles
public class RoughEtagEqualityAndComparison : IEqualityComparer, IComparer
{
+ public static RoughEtagEqualityAndComparison Instance = new RoughEtagEqualityAndComparison();
+
public bool Equals(Etag x, Etag y)
{
return Compare(x, y) == 0;
diff --git a/Raven.Database/Raven.Database.csproj b/Raven.Database/Raven.Database.csproj
index a1406d14fe26..a151b103daed 100644
--- a/Raven.Database/Raven.Database.csproj
+++ b/Raven.Database/Raven.Database.csproj
@@ -236,6 +236,7 @@
+