From 4501e7230d98a99adc4bd5082a1cfe5ff87066a3 Mon Sep 17 00:00:00 2001 From: Pawel Pekrol Date: Wed, 15 Apr 2015 12:52:00 +0200 Subject: [PATCH] RavenDB-3272 started working on multiple replication groups in SQL Replication --- .../SqlReplicationClassifier.cs | 39 ++ .../SqlReplication/SqlReplicationConfig.cs | 28 +- .../SqlReplication/SqlReplicationTask.cs | 471 ++++++++++-------- .../Indexing/DefaultIndexingClassifier.cs | 8 +- Raven.Database/Raven.Database.csproj | 1 + 5 files changed, 329 insertions(+), 218 deletions(-) create mode 100644 Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs diff --git a/Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs b/Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs new file mode 100644 index 000000000000..29c957680ee7 --- /dev/null +++ b/Raven.Database/Bundles/SqlReplication/SqlReplicationClassifier.cs @@ -0,0 +1,39 @@ +// ----------------------------------------------------------------------- +// +// Copyright (c) Hibernating Rhinos LTD. All rights reserved. +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Generic; +using System.Configuration; +using System.Linq; + +using Raven.Abstractions.Data; +using Raven.Database.Indexing; + +namespace Raven.Database.Bundles.SqlReplication +{ + internal static class SqlReplicationClassifier + { + private static readonly Dictionary> Empty = new Dictionary>(); + + public static Dictionary> GroupConfigs(IList configs, Func getLastEtagFor) + { + if (configs.Count == 0) + return Empty; + + var configsByEtag = configs + .Where(x => x.Disabled == false) + .Select(x => new SqlReplicationConfigWithLastReplicatedEtag(x, getLastEtagFor(x))) + .GroupBy(x => x.LastReplicatedEtag, DefaultIndexingClassifier.RoughEtagEqualityAndComparison.Instance) + .OrderByDescending(x => x.Key, DefaultIndexingClassifier.RoughEtagEqualityAndComparison.Instance) + .ToList(); + + if (configsByEtag.Count == 0) + return Empty; + + return configsByEtag + .ToDictionary(x => x.Min(y => y.LastReplicatedEtag), x => x.Select(y => y).ToList()); + } + } +} \ No newline at end of file diff --git a/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs b/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs index 4c1131720709..1876f1e8ab50 100644 --- a/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs +++ b/Raven.Database/Bundles/SqlReplication/SqlReplicationConfig.cs @@ -1,9 +1,35 @@ using System; using System.Collections.Generic; -using RTools_NTS.Util; + +using Raven.Abstractions.Data; namespace Raven.Database.Bundles.SqlReplication { + internal class SqlReplicationConfigWithLastReplicatedEtag : SqlReplicationConfig + { + public SqlReplicationConfigWithLastReplicatedEtag(SqlReplicationConfig config, Etag lastReplicatedEtag) + { + LastReplicatedEtag = lastReplicatedEtag; + + ConnectionString = config.ConnectionString; + ConnectionStringName = config.ConnectionStringName; + ConnectionStringSettingName = config.ConnectionStringSettingName; + Disabled = config.Disabled; + FactoryName = config.FactoryName; + ForceSqlServerQueryRecompile = config.ForceSqlServerQueryRecompile; + Id = config.Id; + Name = config.Name; + ParameterizeDeletesDisabled = config.ParameterizeDeletesDisabled; + PredefinedConnectionStringSettingName = config.PredefinedConnectionStringSettingName; + QuoteTables = config.QuoteTables; + RavenEntityName = config.RavenEntityName; + Script = config.Script; + SqlReplicationTables = config.SqlReplicationTables; + } + + public Etag LastReplicatedEtag { get; private set; } + } + public class SqlReplicationConfig { public SqlReplicationConfig() diff --git a/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs b/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs index 03e5d20a0ed9..b15e7e8c4162 100644 --- a/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs +++ b/Raven.Database/Bundles/SqlReplication/SqlReplicationTask.cs @@ -22,16 +22,14 @@ using Raven.Abstractions.Logging; using Raven.Abstractions.Util; using Raven.Database.Extensions; -using Raven.Database.Impl; using Raven.Database.Indexing; using Raven.Database.Json; using Raven.Database.Plugins; using Raven.Database.Prefetching; using Raven.Database.Storage; +using Raven.Database.Util; using Raven.Json.Linq; -using Task = System.Threading.Tasks.Task; - namespace Raven.Database.Bundles.SqlReplication { [InheritedExport(typeof(IStartupTask))] @@ -53,8 +51,9 @@ private class ReplicatedDoc public string Key; } - public const string RavenSqlReplicationStatus = "Raven/SqlReplication/Status"; - private readonly static ILog log = LogManager.GetCurrentClassLogger(); + public const string RavenSqlReplicationStatus = "Raven/SqlReplication/Status"; + + private readonly static ILog Log = LogManager.GetCurrentClassLogger(); public event Action AfterReplicationCompleted = delegate { }; readonly Metrics sqlReplicationMetrics = new Metrics(); @@ -67,14 +66,19 @@ private class ReplicatedDoc { get { return statistics; } } - private PrefetchingBehavior prefetchingBehavior; public readonly ConcurrentDictionary SqlReplicationMetricsCounters = new ConcurrentDictionary(); + private readonly ConcurrentSet prefetchingBehaviors = new ConcurrentSet(); + + private PrefetchingBehavior defaultPrefetchingBehavior; + public void Execute(DocumentDatabase database) { - prefetchingBehavior = database.Prefetcher.CreatePrefetchingBehavior(PrefetchingUser.SqlReplicator, null); + defaultPrefetchingBehavior = database.Prefetcher.CreatePrefetchingBehavior(PrefetchingUser.SqlReplicator, null); + defaultPrefetchingBehavior.ShouldHandleUnusedDocumentsAddedAfterCommit = true; + prefetchingBehaviors.TryAdd(defaultPrefetchingBehavior); Database = database; Database.Notifications.OnDocumentChange += (sender, notification, metadata) => @@ -95,7 +99,7 @@ public void Execute(DocumentDatabase database) return; replicationConfigs = null; - log.Debug(() => "Sql Replication configuration was changed."); + Log.Debug(() => "Sql Replication configuration was changed."); }; GetReplicationStatus(); @@ -110,7 +114,7 @@ public void Execute(DocumentDatabase database) } catch (Exception e) { - log.ErrorException("Fatal failure when replicating to SQL. All SQL Replication activity STOPPED", e); + Log.ErrorException("Fatal failure when replicating to SQL. All SQL Replication activity STOPPED", e); } } }, TaskCreationOptions.LongRunning); @@ -143,8 +147,8 @@ private void RecordDelete(string id, RavenJObject metadata) if (hasChanges) Database.WorkContext.ShouldNotifyAboutWork(() => "recorded a deleted document " + id); }); - if (log.IsDebugEnabled) - log.Debug(() => "recorded a deleted document " + id); + if (Log.IsDebugEnabled) + Log.Debug(() => "recorded a deleted document " + id); } private SqlReplicationStatus GetReplicationStatus() @@ -191,208 +195,253 @@ private void BackgroundSqlReplication() }) // have error or the timeout expired .ToList(); - if (relevantConfigs.Count == 0) + var configGroups = SqlReplicationClassifier.GroupConfigs(relevantConfigs, c => GetLastEtagFor(localReplicationStatus, c)); + + if (configGroups.Count == 0) { Database.WorkContext.WaitForWork(TimeSpan.FromMinutes(10), ref workCounter, "Sql Replication"); continue; } - var leastReplicatedEtag = GetLeastReplicatedEtag(relevantConfigs, localReplicationStatus); + var usedPrefetchers = new ConcurrentSet(); - if (leastReplicatedEtag == null) - { - Database.WorkContext.WaitForWork(TimeSpan.FromMinutes(10), ref workCounter, "Sql Replication"); - continue; - } + var groupedConfigs = configGroups + .Select(x => + { + var result = new SqlConfigGroup + { + LastReplicatedEtag = x.Key, + ConfigsToWorkOn = x.Value, + PrefetchingBehavior = GetPrefetcherFor(x.Key, usedPrefetchers) + }; - List documents; + result.PrefetchingBehavior.AdditionalInfo = string.Format("Default prefetcher: {0}. For sql config group: [Configs: {1}, LastReplicatedEtag: {2}]", result.PrefetchingBehavior == defaultPrefetchingBehavior, string.Join(", ", result.ConfigsToWorkOn.Select(y => y.Name)), result.LastReplicatedEtag); - using (prefetchingBehavior.DocumentBatchFrom(leastReplicatedEtag, out documents)) + return result; + }) + .ToList(); + + var successes = new ConcurrentQueue>(); + try { - Etag latestEtag = null, lastBatchEtag = null; - if (documents.Count != 0) - lastBatchEtag = documents[documents.Count - 1].Etag; + BackgroundTaskExecuter.Instance.ExecuteAll(Database.WorkContext, groupedConfigs, (sqlConfigGroup, i) => + { + Database.WorkContext.CancellationToken.ThrowIfCancellationRequested(); - var replicationDuration = Stopwatch.StartNew(); - documents.RemoveAll(x => x.Key.StartsWith("Raven/", StringComparison.InvariantCultureIgnoreCase)); // we ignore system documents here + var prefetchingBehavior = sqlConfigGroup.PrefetchingBehavior; + var configsToWorkOn = sqlConfigGroup.ConfigsToWorkOn; - if (documents.Count != 0) - latestEtag = documents[documents.Count - 1].Etag; + List documents; + using (prefetchingBehavior.DocumentBatchFrom(sqlConfigGroup.LastReplicatedEtag, out documents)) + { + Etag latestEtag = null, lastBatchEtag = null; + if (documents.Count != 0) + lastBatchEtag = documents[documents.Count - 1].Etag; - documents.RemoveAll(x => prefetchingBehavior.FilterDocuments(x) == false); + var replicationDuration = Stopwatch.StartNew(); + documents.RemoveAll(x => x.Key.StartsWith("Raven/", StringComparison.InvariantCultureIgnoreCase)); // we ignore system documents here - var deletedDocsByConfig = new Dictionary>(); + if (documents.Count != 0) + latestEtag = documents[documents.Count - 1].Etag; - foreach (var relevantConfig in relevantConfigs) - { - var cfg = relevantConfig; - Database.TransactionalStorage.Batch(accessor => - { - deletedDocsByConfig[cfg] = accessor.Lists.Read(GetSqlReplicationDeletionName(cfg), - GetLastEtagFor(localReplicationStatus, cfg), - latestEtag, - MaxNumberOfDeletionsToReplicate + 1) - .ToList(); - }); - } + documents.RemoveAll(x => prefetchingBehavior.FilterDocuments(x) == false); - // No documents AND there aren't any deletes to replicate - if (documents.Count == 0 && deletedDocsByConfig.Sum(x => x.Value.Count) == 0) - { - if (latestEtag != null) - { - // so we filtered some documents, let us update the etag about that. - foreach (var lastReplicatedEtag in localReplicationStatus.LastReplicatedEtags) + var deletedDocsByConfig = new Dictionary>(); + + foreach (var configToWorkOn in configsToWorkOn) { - if (lastReplicatedEtag.LastDocEtag.CompareTo(latestEtag) <= 0) - lastReplicatedEtag.LastDocEtag = latestEtag; + var cfg = configToWorkOn; + Database.TransactionalStorage.Batch(accessor => + { + deletedDocsByConfig[cfg] = accessor.Lists.Read(GetSqlReplicationDeletionName(cfg), + cfg.LastReplicatedEtag, + latestEtag, + MaxNumberOfDeletionsToReplicate + 1) + .ToList(); + }); } - latestEtag = Etag.Max(latestEtag, lastBatchEtag); - SaveNewReplicationStatus(localReplicationStatus); - } - else // no point in waiting if we just saved a new doc - { - Database.WorkContext.WaitForWork(TimeSpan.FromMinutes(10), ref workCounter, "Sql Replication"); - } - continue; - } + // No documents AND there aren't any deletes to replicate + if (documents.Count == 0 && deletedDocsByConfig.Sum(x => x.Value.Count) == 0) + { + // so we filtered some documents, let us update the etag about that. + if (latestEtag != null) + { + foreach (var configToWorkOn in configsToWorkOn) + successes.Enqueue(Tuple.Create(configToWorkOn, latestEtag)); + } - var successes = new ConcurrentQueue>(); - try - { - var itemsToReplicate = documents.Select(x => - { - JsonDocument.EnsureIdInMetadata(x); - var doc = x.ToJson(); - doc[Constants.DocumentIdFieldName] = x.Key; + return; + } - return new ReplicatedDoc + var itemsToReplicate = documents.Select(x => { - Document = doc, - Etag = x.Etag, - Key = x.Key, - SerializedSizeOnDisk = x.SerializedSizeOnDisk - }; - }).ToList(); - - BackgroundTaskExecuter.Instance.ExecuteAllInterleaved(Database.WorkContext, relevantConfigs, replicationConfig => - { + JsonDocument.EnsureIdInMetadata(x); + var doc = x.ToJson(); + doc[Constants.DocumentIdFieldName] = x.Key; + + return new ReplicatedDoc + { + Document = doc, + Etag = x.Etag, + Key = x.Key, + SerializedSizeOnDisk = x.SerializedSizeOnDisk + }; + }).ToList(); + try { - var startTime = SystemTime.UtcNow; - Stopwatch spRepTime = new Stopwatch(); - spRepTime.Start(); - var lastReplicatedEtag = GetLastEtagFor(localReplicationStatus, replicationConfig); - - var deletedDocs = deletedDocsByConfig[replicationConfig]; - var docsToReplicate = itemsToReplicate - .Where(x => lastReplicatedEtag.CompareTo(x.Etag) < 0) // haven't replicate the etag yet - .Where(document => + BackgroundTaskExecuter.Instance.ExecuteAllInterleaved(Database.WorkContext, configsToWorkOn, replicationConfig => + { + try { - var info = Database.Documents.GetRecentTouchesFor(document.Key); - if (info != null) + var startTime = SystemTime.UtcNow; + var spRepTime = new Stopwatch(); + spRepTime.Start(); + var lastReplicatedEtag = replicationConfig.LastReplicatedEtag; + + var deletedDocs = deletedDocsByConfig[replicationConfig]; + var docsToReplicate = itemsToReplicate + .Where(x => lastReplicatedEtag.CompareTo(x.Etag) < 0) // haven't replicate the etag yet + .Where(document => + { + var info = Database.Documents.GetRecentTouchesFor(document.Key); + if (info != null) + { + if (info.TouchedEtag.CompareTo(lastReplicatedEtag) > 0) + { + Log.Debug( + "Will not replicate document '{0}' to '{1}' because the updates after etag {2} are related document touches", + document.Key, replicationConfig.Name, info.TouchedEtag); + return false; + } + } + return true; + }); + + if (deletedDocs.Count >= MaxNumberOfDeletionsToReplicate + 1) + docsToReplicate = docsToReplicate.Where(x => EtagUtil.IsGreaterThan(x.Etag, deletedDocs[deletedDocs.Count - 1].Etag) == false); + + var docsToReplicateAsList = docsToReplicate.ToList(); + + var currentLatestEtag = HandleDeletesAndChangesMerging(deletedDocs, docsToReplicateAsList); + if (currentLatestEtag == null && itemsToReplicate.Count > 0 && docsToReplicateAsList.Count == 0) + currentLatestEtag = lastBatchEtag; + + int countOfReplicatedItems = 0; + if (ReplicateDeletionsToDestination(replicationConfig, deletedDocs) && + ReplicateChangesToDestination(replicationConfig, docsToReplicateAsList, out countOfReplicatedItems)) { - if (info.TouchedEtag.CompareTo(lastReplicatedEtag) > 0) + if (deletedDocs.Count > 0) { - log.Debug( - "Will not replicate document '{0}' to '{1}' because the updates after etag {2} are related document touches", - document.Key, replicationConfig.Name, info.TouchedEtag); - return false; + Database.TransactionalStorage.Batch(accessor => + accessor.Lists.RemoveAllBefore(GetSqlReplicationDeletionName(replicationConfig), deletedDocs[deletedDocs.Count - 1].Etag)); } + successes.Enqueue(Tuple.Create(replicationConfig, currentLatestEtag)); } - return true; - }); - if (deletedDocs.Count >= MaxNumberOfDeletionsToReplicate + 1) - docsToReplicate = docsToReplicate.Where(x => EtagUtil.IsGreaterThan(x.Etag, deletedDocs[deletedDocs.Count - 1].Etag) == false); + spRepTime.Stop(); + var elapsedMicroseconds = (long)(spRepTime.ElapsedTicks * SystemTime.MicroSecPerTick); - var docsToReplicateAsList = docsToReplicate.ToList(); + var sqlReplicationMetricsCounters = GetSqlReplicationMetricsManager(replicationConfig); + sqlReplicationMetricsCounters.SqlReplicationBatchSizeMeter.Mark(countOfReplicatedItems); + sqlReplicationMetricsCounters.SqlReplicationBatchSizeHistogram.Update(countOfReplicatedItems); + sqlReplicationMetricsCounters.SqlReplicationDurationHistogram.Update(elapsedMicroseconds); - var currentLatestEtag = HandleDeletesAndChangesMerging(deletedDocs, docsToReplicateAsList); - if (currentLatestEtag == null && itemsToReplicate.Count > 0 && docsToReplicateAsList.Count == 0) - currentLatestEtag = lastBatchEtag; - - int countOfReplicatedItems = 0; - if (ReplicateDeletionsToDestination(replicationConfig, deletedDocs) && - ReplicateChangesToDestination(replicationConfig, docsToReplicateAsList, out countOfReplicatedItems)) - { - if (deletedDocs.Count > 0) + UpdateReplicationPerformance(replicationConfig, startTime, spRepTime.Elapsed, docsToReplicateAsList.Count); + } + catch (Exception e) { - Database.TransactionalStorage.Batch(accessor => - accessor.Lists.RemoveAllBefore(GetSqlReplicationDeletionName(replicationConfig), deletedDocs[deletedDocs.Count - 1].Etag)); + Log.WarnException("Error while replication to SQL destination: " + replicationConfig.Name, e); + Database.AddAlert(new Alert + { + AlertLevel = AlertLevel.Error, + CreatedAt = SystemTime.UtcNow, + Exception = e.ToString(), + Title = "Sql Replication failure to replication", + Message = "Sql Replication could not replicate to " + replicationConfig.Name, + UniqueKey = "Sql Replication could not replicate to " + replicationConfig.Name + }); } - successes.Enqueue(Tuple.Create(replicationConfig, currentLatestEtag)); - } - - spRepTime.Stop(); - var elapsedMicroseconds = (long)(spRepTime.ElapsedTicks * SystemTime.MicroSecPerTick); - - var sqlReplicationMetricsCounters = GetSqlReplicationMetricsManager(replicationConfig); - sqlReplicationMetricsCounters.SqlReplicationBatchSizeMeter.Mark(countOfReplicatedItems); - sqlReplicationMetricsCounters.SqlReplicationBatchSizeHistogram.Update(countOfReplicatedItems); - sqlReplicationMetricsCounters.SqlReplicationDurationHistogram.Update(elapsedMicroseconds); - - UpdateReplicationPerformance(replicationConfig, startTime, spRepTime.Elapsed, docsToReplicateAsList.Count); - - } - catch (Exception e) - { - log.WarnException("Error while replication to SQL destination: " + replicationConfig.Name, e); - Database.AddAlert(new Alert - { - AlertLevel = AlertLevel.Error, - CreatedAt = SystemTime.UtcNow, - Exception = e.ToString(), - Title = "Sql Replication failure to replication", - Message = "Sql Replication could not replicate to " + replicationConfig.Name, - UniqueKey = "Sql Replication could not replicate to " + replicationConfig.Name }); } - }); - if (successes.Count == 0) - continue; - foreach (var t in successes) - { - var cfg = t.Item1; - var currentLatestEtag = t.Item2; - var destEtag = localReplicationStatus.LastReplicatedEtags.FirstOrDefault(x => string.Equals(x.Name, cfg.Name, StringComparison.InvariantCultureIgnoreCase)); - if (destEtag == null) + finally { - localReplicationStatus.LastReplicatedEtags.Add(new LastReplicatedEtag - { - Name = cfg.Name, - LastDocEtag = currentLatestEtag ?? Etag.Empty - }); - } - else - { - var lastDocEtag = destEtag.LastDocEtag; - if (currentLatestEtag != null && EtagUtil.IsGreaterThan(currentLatestEtag, lastDocEtag)) - lastDocEtag = currentLatestEtag; - - destEtag.LastDocEtag = lastDocEtag; + prefetchingBehavior.CleanupDocuments(lastBatchEtag); + prefetchingBehavior.UpdateAutoThrottler(documents, replicationDuration.Elapsed); } } + }); - SaveNewReplicationStatus(localReplicationStatus); - } - finally + if (successes.Count == 0) + break; + + foreach (var t in successes) { - AfterReplicationCompleted(successes.Count); - var min = localReplicationStatus.LastReplicatedEtags.Min(x => new ComparableByteArray(x.LastDocEtag.ToByteArray())); - if (min != null) + var cfg = t.Item1; + var currentLatestEtag = t.Item2; + var destEtag = localReplicationStatus.LastReplicatedEtags.FirstOrDefault(x => string.Equals(x.Name, cfg.Name, StringComparison.InvariantCultureIgnoreCase)); + if (destEtag == null) + { + localReplicationStatus.LastReplicatedEtags.Add(new LastReplicatedEtag + { + Name = cfg.Name, + LastDocEtag = currentLatestEtag ?? Etag.Empty + }); + } + else { - var lastMinReplicatedEtag = min.ToEtag(); - prefetchingBehavior.CleanupDocuments(lastMinReplicatedEtag); - prefetchingBehavior.UpdateAutoThrottler(documents, replicationDuration.Elapsed); + var lastDocEtag = destEtag.LastDocEtag; + if (currentLatestEtag != null && EtagUtil.IsGreaterThan(currentLatestEtag, lastDocEtag)) + lastDocEtag = currentLatestEtag; + + destEtag.LastDocEtag = lastDocEtag; } } + SaveNewReplicationStatus(localReplicationStatus); + } + finally + { + AfterReplicationCompleted(successes.Count); + RemoveUnusedPrefetchers(usedPrefetchers); } } } + private void RemoveUnusedPrefetchers(IEnumerable usedPrefetchingBehaviors) + { + var unused = prefetchingBehaviors.Except(usedPrefetchingBehaviors.Union(new[] + { + defaultPrefetchingBehavior + })).ToList(); + + if (unused.Count == 0) + return; + + foreach (var unusedPrefetcher in unused) + { + prefetchingBehaviors.TryRemove(unusedPrefetcher); + Database.Prefetcher.RemovePrefetchingBehavior(unusedPrefetcher); + } + } + + private PrefetchingBehavior GetPrefetcherFor(Etag fromEtag, ConcurrentSet usedPrefetchers) + { + foreach (var prefetchingBehavior in prefetchingBehaviors) + { + if (prefetchingBehavior.CanUsePrefetcherToLoadFrom(fromEtag) && usedPrefetchers.TryAdd(prefetchingBehavior)) + return prefetchingBehavior; + } + + var newPrefetcher = Database.Prefetcher.CreatePrefetchingBehavior(PrefetchingUser.SqlReplicator, null); + + prefetchingBehaviors.Add(newPrefetcher); + usedPrefetchers.Add(newPrefetcher); + + return newPrefetcher; + } + private void UpdateReplicationPerformance(SqlReplicationConfig replicationConfig, DateTime startTime, TimeSpan elapsed, int batchSize) { var performance = new SqlReplicationPerformanceStats @@ -489,7 +538,7 @@ private bool ReplicateDeletionsToDestination(SqlReplicationConfig cfg, IEnumerab writer.DeleteItems(sqlReplicationTable.TableName, sqlReplicationTable.DocumentKeyColumn, cfg.ParameterizeDeletesDisabled, identifiers); } writer.Commit(); - log.Debug("Replicated deletes of {0} for config {1}", string.Join(", ", identifiers), cfg.Name); + Log.Debug("Replicated deletes of {0} for config {1}", string.Join(", ", identifiers), cfg.Name); } return true; @@ -500,29 +549,14 @@ private static string GetSqlReplicationDeletionName(SqlReplicationConfig replica return "SqlReplication/Deletions/" + replicationConfig.Name; } - private Etag GetLeastReplicatedEtag(IEnumerable config, SqlReplicationStatus localReplicationStatus) - { - Etag leastReplicatedEtag = null; - foreach (var sqlReplicationConfig in config) - { - var lastEtag = GetLastEtagFor(localReplicationStatus, sqlReplicationConfig); - if (leastReplicatedEtag == null) - leastReplicatedEtag = lastEtag; - else if (lastEtag.CompareTo(leastReplicatedEtag) < 0) - leastReplicatedEtag = lastEtag; - } - - return leastReplicatedEtag; - } - private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable docs, out int countOfReplicatedItems) { countOfReplicatedItems = 0; - var replicationStats = statistics.GetOrAdd(cfg.Name, name => new SqlReplicationStatistics(name)); + var replicationStats = statistics.GetOrAdd(cfg.Name, name => new SqlReplicationStatistics(name)); var scriptResult = ApplyConversionScript(cfg, docs, replicationStats); if (scriptResult.Ids.Count == 0) return true; - + countOfReplicatedItems = scriptResult.Data.Sum(x => x.Value.Count); try { @@ -530,12 +564,12 @@ private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable { if (writer.Execute(scriptResult)) { - log.Debug("Replicated changes of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name); + Log.Debug("Replicated changes of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name); replicationStats.CompleteSuccess(countOfReplicatedItems); } else { - log.Debug("Replicated changes (with some errors) of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name); + Log.Debug("Replicated changes (with some errors) of {0} for replication {1}", string.Join(", ", docs.Select(d => d.Key)), cfg.Name); replicationStats.Success(countOfReplicatedItems); } } @@ -543,7 +577,7 @@ private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable } catch (Exception e) { - log.WarnException("Failure to replicate changes to relational database for: " + cfg.Name, e); + Log.WarnException("Failure to replicate changes to relational database for: " + cfg.Name, e); SqlReplicationStatistics replicationStatistics; DateTime newTime; if (statistics.TryGetValue(cfg.Name, out replicationStatistics) == false) @@ -552,16 +586,16 @@ private bool ReplicateChangesToDestination(SqlReplicationConfig cfg, IEnumerable } else { - if (replicationStatistics.LastErrorTime == DateTime.MinValue) - { - newTime = SystemTime.UtcNow.AddSeconds(5); - } - else - { - // double the fallback time (but don't cross 15 minutes) - var totalSeconds = (SystemTime.UtcNow - replicationStatistics.LastErrorTime).TotalSeconds; - newTime = SystemTime.UtcNow.AddSeconds(Math.Min(60*15, Math.Max(5, totalSeconds*2))); - } + if (replicationStatistics.LastErrorTime == DateTime.MinValue) + { + newTime = SystemTime.UtcNow.AddSeconds(5); + } + else + { + // double the fallback time (but don't cross 15 minutes) + var totalSeconds = (SystemTime.UtcNow - replicationStatistics.LastErrorTime).TotalSeconds; + newTime = SystemTime.UtcNow.AddSeconds(Math.Min(60 * 15, Math.Max(5, totalSeconds * 2))); + } } replicationStats.RecordWriteError(e, Database, countOfReplicatedItems, newTime); return false; @@ -588,9 +622,9 @@ private ConversionScriptResult ApplyConversionScript(SqlReplicationConfig cfg, I { patcher.Apply(scope, replicatedDoc.Document, new ScriptedPatchRequest { Script = cfg.Script }, replicatedDoc.SerializedSizeOnDisk); - if (log.IsDebugEnabled && patcher.Debug.Count > 0) + if (Log.IsDebugEnabled && patcher.Debug.Count > 0) { - log.Debug("Debug output for doc: {0} for script {1}:\r\n.{2}", replicatedDoc.Key, cfg.Name, string.Join("\r\n", patcher.Debug)); + Log.Debug("Debug output for doc: {0} for script {1}:\r\n.{2}", replicatedDoc.Key, cfg.Name, string.Join("\r\n", patcher.Debug)); patcher.Debug.Clear(); } @@ -601,14 +635,14 @@ private ConversionScriptResult ApplyConversionScript(SqlReplicationConfig cfg, I { replicationStats.MarkScriptAsInvalid(Database, cfg.Script); - log.WarnException("Could not parse SQL Replication script for " + cfg.Name, e); + Log.WarnException("Could not parse SQL Replication script for " + cfg.Name, e); return result; } catch (Exception diffExceptionName) { replicationStats.RecordScriptError(Database, diffExceptionName); - log.WarnException("Could not process SQL Replication script for " + cfg.Name + ", skipping document: " + replicatedDoc.Key, diffExceptionName); + Log.WarnException("Could not process SQL Replication script for " + cfg.Name + ", skipping document: " + replicatedDoc.Key, diffExceptionName); } } } @@ -666,12 +700,12 @@ public RelationalDatabaseWriter.TableQuerySummary[] SimulateSqlReplicationSQLQue else { var simulatedwriter = new RelationalDatabaseWriterSimulator(Database, sqlReplication, stats); - resutls = new List() + resutls = new List { - new RelationalDatabaseWriter.TableQuerySummary() + new RelationalDatabaseWriter.TableQuerySummary { Commands = simulatedwriter.SimulateExecuteCommandText(scriptResult) - .Select(x => new RelationalDatabaseWriter.TableQuerySummary.CommandData() + .Select(x => new RelationalDatabaseWriter.TableQuerySummary.CommandData { CommandText = x }).ToArray() @@ -686,7 +720,7 @@ public RelationalDatabaseWriter.TableQuerySummary[] SimulateSqlReplicationSQLQue } catch (Exception e) { - alert = new Alert() + alert = new Alert { AlertLevel = AlertLevel.Error, CreatedAt = SystemTime.UtcNow, @@ -730,8 +764,8 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep if (validateSqlReplicationName && string.IsNullOrWhiteSpace(cfg.Name)) { if (writeToLog) - log.Warn("Could not find name for sql replication document {0}, ignoring", sqlReplicationConfigDocumentKey); - replicationStats.LastAlert = new Alert() + Log.Warn("Could not find name for sql replication document {0}, ignoring", sqlReplicationConfigDocumentKey); + replicationStats.LastAlert = new Alert { AlertLevel = AlertLevel.Error, CreatedAt = DateTime.UtcNow, @@ -751,10 +785,10 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep else { if (writeToLog) - log.Warn("Could not find predefined connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.", + Log.Warn("Could not find predefined connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.", cfg.PredefinedConnectionStringSettingName, sqlReplicationConfigDocumentKey); - replicationStats.LastAlert = new Alert() + replicationStats.LastAlert = new Alert { AlertLevel = AlertLevel.Error, CreatedAt = DateTime.UtcNow, @@ -772,10 +806,10 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep if (connectionString == null) { if (writeToLog) - log.Warn("Could not find connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.", + Log.Warn("Could not find connection string named '{0}' for sql replication config: {1}, ignoring sql replication setting.", cfg.ConnectionStringName, sqlReplicationConfigDocumentKey); - replicationStats.LastAlert = new Alert() + replicationStats.LastAlert = new Alert { AlertLevel = AlertLevel.Error, CreatedAt = DateTime.UtcNow, @@ -794,10 +828,10 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep if (string.IsNullOrWhiteSpace(setting)) { if (writeToLog) - log.Warn("Could not find setting named '{0}' for sql replication config: {1}, ignoring sql replication setting.", + Log.Warn("Could not find setting named '{0}' for sql replication config: {1}, ignoring sql replication setting.", cfg.ConnectionStringSettingName, sqlReplicationConfigDocumentKey); - replicationStats.LastAlert = new Alert() + replicationStats.LastAlert = new Alert { AlertLevel = AlertLevel.Error, CreatedAt = DateTime.UtcNow, @@ -814,8 +848,19 @@ private bool PrepareSqlReplicationConfig(SqlReplicationConfig cfg, string sqlRep public void Dispose() { - if (prefetchingBehavior != null) + foreach (var prefetchingBehavior in prefetchingBehaviors) + { prefetchingBehavior.Dispose(); + } + } + + private class SqlConfigGroup + { + public Etag LastReplicatedEtag { get; set; } + + public List ConfigsToWorkOn { get; set; } + + public PrefetchingBehavior PrefetchingBehavior { get; set; } } } } \ No newline at end of file diff --git a/Raven.Database/Indexing/DefaultIndexingClassifier.cs b/Raven.Database/Indexing/DefaultIndexingClassifier.cs index 0487f6ff6cdd..20d0c7a5f30a 100644 --- a/Raven.Database/Indexing/DefaultIndexingClassifier.cs +++ b/Raven.Database/Indexing/DefaultIndexingClassifier.cs @@ -15,8 +15,6 @@ public class DefaultIndexingClassifier : IIndexingClassifier { private readonly Dictionary> empty = new Dictionary>(); - private RoughEtagEqualityAndComparison comparison = new RoughEtagEqualityAndComparison(); - public Dictionary> GroupMapIndexes(IList indexes) { if (indexes.Count == 0) @@ -24,8 +22,8 @@ public class DefaultIndexingClassifier : IIndexingClassifier var indexesByIndexedEtag = indexes .Where(x => x.Index.IsMapIndexingInProgress == false) // indexes with precomputed docs are processed separately - .GroupBy(x => x.LastIndexedEtag, comparison) - .OrderByDescending(x => x.Key, comparison) + .GroupBy(x => x.LastIndexedEtag, RoughEtagEqualityAndComparison.Instance) + .OrderByDescending(x => x.Key, RoughEtagEqualityAndComparison.Instance) .ToList(); if (indexesByIndexedEtag.Count == 0) @@ -39,6 +37,8 @@ public class DefaultIndexingClassifier : IIndexingClassifier // at that point, it doesn't matter much, it would be gone within one or two indexing cycles public class RoughEtagEqualityAndComparison : IEqualityComparer, IComparer { + public static RoughEtagEqualityAndComparison Instance = new RoughEtagEqualityAndComparison(); + public bool Equals(Etag x, Etag y) { return Compare(x, y) == 0; diff --git a/Raven.Database/Raven.Database.csproj b/Raven.Database/Raven.Database.csproj index a1406d14fe26..a151b103daed 100644 --- a/Raven.Database/Raven.Database.csproj +++ b/Raven.Database/Raven.Database.csproj @@ -236,6 +236,7 @@ +