From 1c2b8d2d172e80a1397b2895a31dc10148eae7dd Mon Sep 17 00:00:00 2001 From: Arkadiusz Palinski Date: Mon, 11 Feb 2013 15:31:14 +0100 Subject: [PATCH] Attempt to restore index (map-only right now) from previously stored segment file. --- Raven.Database/Indexing/Index.cs | 55 +++++++- Raven.Database/Indexing/IndexCommitPoint.cs | 18 +++ Raven.Database/Indexing/IndexSegmentsInfo.cs | 60 +++++++++ Raven.Database/Indexing/IndexStorage.cs | 122 +++++++++++++++++- Raven.Database/Indexing/IndexingBatch.cs | 4 +- Raven.Database/Indexing/IndexingExecuter.cs | 6 +- Raven.Database/Indexing/MapReduceIndex.cs | 4 +- Raven.Database/Indexing/SimpleIndex.cs | 19 ++- .../Indexing/WritingDocumentsInfo.cs | 28 ++++ Raven.Database/Raven.Database.csproj | 3 + 10 files changed, 303 insertions(+), 16 deletions(-) create mode 100644 Raven.Database/Indexing/IndexCommitPoint.cs create mode 100644 Raven.Database/Indexing/IndexSegmentsInfo.cs create mode 100644 Raven.Database/Indexing/WritingDocumentsInfo.cs diff --git a/Raven.Database/Indexing/Index.cs b/Raven.Database/Indexing/Index.cs index 10ccf4922549..7e070607b4b8 100644 --- a/Raven.Database/Indexing/Index.cs +++ b/Raven.Database/Indexing/Index.cs @@ -242,7 +242,6 @@ public void MergeSegments() public abstract void IndexDocuments(AbstractViewGenerator viewGenerator, IndexingBatch batch, IStorageActionsAccessor actions, DateTime minimumTimestamp); - protected virtual IndexQueryResult RetrieveDocument(Document document, FieldsToFetch fieldsToFetch, ScoreDoc score) { return new IndexQueryResult @@ -304,7 +303,7 @@ public static RavenJObject CreateDocumentFromFields(Document document, FieldsToF return new KeyValuePair(fld.Name, stringValue); } - protected void Write(Func action) + protected void Write(Func action) { if (disposed) throw new ObjectDisposedException("Index " + name + " has been disposed"); @@ -314,6 +313,7 @@ protected void Write(Func action) bool shouldRecreateSearcher; var toDispose = new List(); Analyzer searchAnalyzer = null; + WritingDocumentsInfo info; try { waitReason = "Write"; @@ -340,12 +340,12 @@ protected void Write(Func action) logIndexing.Warn("Could not obtain the 'writing-to-index' lock of '{0}' index", name); } - int changedDocs; + var stats = new IndexingWorkStats(); try { - changedDocs = action(indexWriter, searchAnalyzer, stats); - shouldRecreateSearcher = changedDocs > 0; + info = action(indexWriter, searchAnalyzer, stats); + shouldRecreateSearcher = info.ChangedDocs > 0; foreach (var indexExtension in indexExtensions.Values) { indexExtension.OnDocumentsIndexed(currentlyIndexDocuments, searchAnalyzer); @@ -357,7 +357,7 @@ protected void Write(Func action) throw; } - if (changedDocs > 0) + if (info.ChangedDocs > 0) { UpdateIndexingStats(context, stats); WriteTempIndexToDiskIfNeeded(context); @@ -382,11 +382,52 @@ protected void Write(Func action) waitReason = null; LastIndexTime = SystemTime.UtcNow; } + + if (info.ShouldStoreCommitPoint && info.HighestETag != null) + { + StoreCommitPoint(info.HighestETag.Value); + } + if (shouldRecreateSearcher) RecreateSearcher(); } } + private void StoreCommitPoint(Guid latestIndexedETag) + { + var indexCommit = new IndexCommitPoint + { + LastIndexedETag = latestIndexedETag, + TimeStamp = SystemTime.UtcNow, + SegmentsInfo = GetCurrentSegmentsInfo() + }; + + context.IndexStorage.StoreCommitPoint(name, indexCommit); + } + + private IndexSegmentsInfo GetCurrentSegmentsInfo() + { + var segmentInfos = new SegmentInfos(); + var result = new IndexSegmentsInfo(); + + try + { + segmentInfos.Read(directory); + + result.Generation = segmentInfos.Generation; + result.CurrentSegmentFileName = segmentInfos.GetCurrentSegmentFileName(); + result.ReferencedFiles = segmentInfos.Files(directory, false); + } + catch (CorruptIndexException ex) + { + logIndexing.WarnException(string.Format("Could not read segment information for index '{0}'", name), ex); + + result.IsIndexCorrupted = true; + } + + return result; + } + protected void UpdateIndexingStats(WorkContext context, IndexingWorkStats stats) { context.TransactionalStorage.Batch(accessor => @@ -1290,7 +1331,7 @@ public void Backup(string backupDirectory, string path, string incrementalTag) File.Copy(fullPath, Path.Combine(saveToFolder, fileName)); allFilesWriter.WriteLine(fileName); } - return 0; + return WritingDocumentsInfo.WithoutCommitPoint(0); }); var commit = snapshotter.Snapshot(); diff --git a/Raven.Database/Indexing/IndexCommitPoint.cs b/Raven.Database/Indexing/IndexCommitPoint.cs new file mode 100644 index 000000000000..51fc00e0d9c0 --- /dev/null +++ b/Raven.Database/Indexing/IndexCommitPoint.cs @@ -0,0 +1,18 @@ +// ----------------------------------------------------------------------- +// +// Copyright (c) Hibernating Rhinos LTD. All rights reserved. +// +// ----------------------------------------------------------------------- +using System; + +namespace Raven.Database.Indexing +{ + public class IndexCommitPoint + { + public Guid LastIndexedETag { get; set; } + + public IndexSegmentsInfo SegmentsInfo { get; set; } + + public DateTime TimeStamp { get; set; } + } +} \ No newline at end of file diff --git a/Raven.Database/Indexing/IndexSegmentsInfo.cs b/Raven.Database/Indexing/IndexSegmentsInfo.cs new file mode 100644 index 000000000000..2307b66ae528 --- /dev/null +++ b/Raven.Database/Indexing/IndexSegmentsInfo.cs @@ -0,0 +1,60 @@ +// ----------------------------------------------------------------------- +// +// Copyright (c) Hibernating Rhinos LTD. All rights reserved. +// +// ----------------------------------------------------------------------- +using System.Collections.Generic; +using System.Linq; + +namespace Raven.Database.Indexing +{ + public class IndexSegmentsInfo + { + public long Generation { get; set; } + + public string CurrentSegmentFileName { get; set; } + + public ICollection ReferencedFiles { get; set; } + + public bool IsIndexCorrupted { get; set; } + + protected bool Equals(IndexSegmentsInfo other) + { + var theSameNumberOfItems = ReferencedFiles.Count == other.ReferencedFiles.Count; + var theSameReferencedFiles = true; + + if (theSameNumberOfItems) + { + if (ReferencedFiles.Any(file => other.ReferencedFiles.Contains(file) == false)) + theSameReferencedFiles = false; + } + else + theSameReferencedFiles = false; + + return theSameReferencedFiles && + Generation == other.Generation && + string.Equals(CurrentSegmentFileName, other.CurrentSegmentFileName) && + IsIndexCorrupted.Equals(other.IsIndexCorrupted); + } + + public override int GetHashCode() + { + unchecked + { + var hashCode = Generation.GetHashCode(); + hashCode = (hashCode * 397) ^ (CurrentSegmentFileName != null ? CurrentSegmentFileName.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ (ReferencedFiles != null ? ReferencedFiles.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ IsIndexCorrupted.GetHashCode(); + return hashCode; + } + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((IndexSegmentsInfo) obj); + } + } +} \ No newline at end of file diff --git a/Raven.Database/Indexing/IndexStorage.cs b/Raven.Database/Indexing/IndexStorage.cs index 1eb9f311c775..d682995b76c4 100644 --- a/Raven.Database/Indexing/IndexStorage.cs +++ b/Raven.Database/Indexing/IndexStorage.cs @@ -32,6 +32,7 @@ using Raven.Database.Plugins; using Raven.Database.Queries; using Raven.Database.Storage; +using Raven.Imports.Newtonsoft.Json; using Raven.Json.Linq; using Directory = System.IO.Directory; using System.ComponentModel.Composition; @@ -45,6 +46,8 @@ public class IndexStorage : CriticalFinalizerObject, IDisposable { private readonly DocumentDatabase documentDatabase; private const string IndexVersion = "2.0.0.1"; + private const string LastCommitPointDirectory = "CommitPoint"; + private const string LastCommitPointFile = "index.commitPoint"; private readonly IndexDefinitionStorage indexDefinitionStorage; private readonly InMemoryRavenConfiguration configuration; @@ -230,7 +233,24 @@ private void LoadExistingSuggestionsExtentions(string indexName, Index indexImpl if (configuration.ResetIndexOnUncleanShutdown) throw new InvalidOperationException("Rude shutdown detected on: " + indexDirectory); - CheckIndexAndRecover(directory, indexDirectory); + IndexCommitPoint lastCommitPoint; + + if (TryReuseLastCommitPointToRecoverIndex(indexDefinition, indexFullPath, out lastCommitPoint) == false) + { + CheckIndexAndRecover(directory, indexDirectory); + } + else + { + if (lastCommitPoint == null) + throw new InvalidOperationException("Index '" + indexDefinition.Name + "' reused previous segment files after in order to recover, but NULL commit point was returned."); + + documentDatabase.TransactionalStorage.Batch( + accessor => + accessor.Indexing.UpdateLastIndexed(indexDefinition.Name, lastCommitPoint.LastIndexedETag, + lastCommitPoint.TimeStamp)); + + } + directory.DeleteFile("writing-to-index.lock"); } } @@ -295,6 +315,106 @@ private static void CheckIndexAndRecover(Lucene.Net.Store.Directory directory, s startupLog.Warn("Fixed index {0} in {1}", indexDirectory, sp.Elapsed); } + public static void StoreCommitPoint(IndexCommitPoint indexCommit, string indexDirectory) + { + var commitPointDirectoryFullPath = Path.Combine(indexDirectory, LastCommitPointDirectory); + + if (Directory.Exists(commitPointDirectoryFullPath)) + { + Directory.Delete(commitPointDirectoryFullPath); + } + + Directory.CreateDirectory(commitPointDirectoryFullPath); + + using (var commitPointFile = File.Create(Path.Combine(commitPointDirectoryFullPath, LastCommitPointFile))) + { + var jsonSerializer = new JsonSerializer(); + var textWriter = new JsonTextWriter(new StreamWriter(commitPointFile)); + + jsonSerializer.Serialize(textWriter, indexCommit); + } + + File.Copy(Path.Combine(indexDirectory, indexCommit.SegmentsInfo.CurrentSegmentFileName), Path.Combine(commitPointDirectoryFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName)); + } + + public void StoreCommitPoint(string indexName, IndexCommitPoint indexCommit) + { + var indexFullPath = Path.Combine(path, MonoHttpUtility.UrlEncode(indexName)); + + if (indexCommit.SegmentsInfo != null && indexCommit.SegmentsInfo.IsIndexCorrupted == false) + { + var commitPointDirectoryFullPath = Path.Combine(indexFullPath, LastCommitPointDirectory); + + if (Directory.Exists(commitPointDirectoryFullPath)) + { + IOExtensions.DeleteDirectory(commitPointDirectoryFullPath); + } + + Directory.CreateDirectory(commitPointDirectoryFullPath); + + using (var commitPointFile = File.Create(Path.Combine(commitPointDirectoryFullPath, LastCommitPointFile))) + { + using (var sw = new StreamWriter(commitPointFile)) + { + var jsonSerializer = new JsonSerializer(); + var textWriter = new JsonTextWriter(sw); + + jsonSerializer.Serialize(textWriter, indexCommit); + + sw.Flush(); + } + } + + File.Copy(Path.Combine(indexFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName), Path.Combine(commitPointDirectoryFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName)); + } + } + + private static bool TryReuseLastCommitPointToRecoverIndex(IndexDefinition indexDefinition, string fullIndexPath, out IndexCommitPoint indexCommit) + { + indexCommit = null; + + try + { + if (indexDefinition.IsMapReduce == false) + return false; + + var commitPointDirectoryFullPath = Path.Combine(fullIndexPath, LastCommitPointDirectory); + + if (Directory.Exists(commitPointDirectoryFullPath) == false) + return false; + + using (var commitPointFile = File.Open(Path.Combine(commitPointDirectoryFullPath, LastCommitPointFile), FileMode.Open)) + { + var jsonSerializer = new JsonSerializer(); + var textReader = new JsonTextReader(new StreamReader(commitPointFile)); + + indexCommit = jsonSerializer.Deserialize(textReader); + } + + var filesInIndexDirectory = Directory.GetFiles(fullIndexPath); + + var missingFile = + indexCommit.SegmentsInfo.ReferencedFiles.Any( + referencedFile => filesInIndexDirectory.Contains(referencedFile) == false); + + if (missingFile) + { + return false; + } + + File.Copy(Path.Combine(commitPointDirectoryFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName), Path.Combine(fullIndexPath, indexCommit.SegmentsInfo.CurrentSegmentFileName), true); + + return true; + } + catch (Exception ex) + { + log.ErrorException("Error occured during an attempt to recover an index named '" + indexDefinition.Name + + "'from last segment files", ex); + + return false; + } + } + internal Lucene.Net.Store.Directory MakeRAMDirectoryPhysical(RAMDirectory ramDir, string indexName) { var newDir = new LuceneCodecDirectory(Path.Combine(path, MonoHttpUtility.UrlEncode(IndexDefinitionStorage.FixupIndexName(indexName, path))), documentDatabase.IndexCodecs.OfType()); diff --git a/Raven.Database/Indexing/IndexingBatch.cs b/Raven.Database/Indexing/IndexingBatch.cs index 35c837ecccfb..13e9d68e1a9e 100644 --- a/Raven.Database/Indexing/IndexingBatch.cs +++ b/Raven.Database/Indexing/IndexingBatch.cs @@ -6,8 +6,9 @@ namespace Raven.Database.Indexing { public class IndexingBatch { - public IndexingBatch() + public IndexingBatch(Guid highestEtagInBatch) { + HighestEtagInBatch = highestEtagInBatch; Ids = new List(); Docs = new List(); SkipDeleteFromIndex = new List(); @@ -17,6 +18,7 @@ public IndexingBatch() public readonly List Docs; public readonly List SkipDeleteFromIndex; public DateTime? DateTime; + public readonly Guid HighestEtagInBatch; public void Add(JsonDocument doc, object asJson, bool skipDeleteFromIndex) { diff --git a/Raven.Database/Indexing/IndexingExecuter.cs b/Raven.Database/Indexing/IndexingExecuter.cs index 9ccf0d9d0560..7c7c5e8c924d 100644 --- a/Raven.Database/Indexing/IndexingExecuter.cs +++ b/Raven.Database/Indexing/IndexingExecuter.cs @@ -111,7 +111,7 @@ private Guid DoActualIndexing(IList indexesToWorkOn, List HandleIndexingFor(index, lastEtag, lastModified)); @@ -278,7 +278,7 @@ public class IndexingBatchForIndex public IndexingBatch Batch { get; set; } } - private IEnumerable FilterIndexes(IList indexesToWorkOn, List jsonDocs) + private IEnumerable FilterIndexes(IList indexesToWorkOn, List jsonDocs, Guid highestETagInBatch) { var last = jsonDocs.Last(); @@ -323,7 +323,7 @@ private IEnumerable FilterIndexes(IList in if (viewGenerator == null) return; // probably deleted - var batch = new IndexingBatch(); + var batch = new IndexingBatch(highestETagInBatch); foreach (var item in filteredDocs) { diff --git a/Raven.Database/Indexing/MapReduceIndex.cs b/Raven.Database/Indexing/MapReduceIndex.cs index fca8f8869504..1b7cd1615bf2 100644 --- a/Raven.Database/Indexing/MapReduceIndex.cs +++ b/Raven.Database/Indexing/MapReduceIndex.cs @@ -222,7 +222,7 @@ public override void Remove(string[] keys, WorkContext context) stats.Operation = IndexingWorkStats.Status.Ignore; logIndexing.Debug(() => string.Format("Deleting ({0}) from {1}", string.Join(", ", keys), name)); writer.DeleteDocuments(keys.Select(k => new Term(Constants.ReduceKeyFieldName, k.ToLowerInvariant())).ToArray()); - return keys.Length; + return WritingDocumentsInfo.WithoutCommitPoint(keys.Length); }); } @@ -411,7 +411,7 @@ public void ExecuteReduction() x => x.Dispose()); } } - return count + ReduceKeys.Count; + return WritingDocumentsInfo.WithoutCommitPoint(count + ReduceKeys.Count); }); parent.AddindexingPerformanceStat(new IndexingPerformanceStats { diff --git a/Raven.Database/Indexing/SimpleIndex.cs b/Raven.Database/Indexing/SimpleIndex.cs index 08ef2cd0c2d9..6eb869120582 100644 --- a/Raven.Database/Indexing/SimpleIndex.cs +++ b/Raven.Database/Indexing/SimpleIndex.cs @@ -162,8 +162,14 @@ public override void IndexDocuments(AbstractViewGenerator viewGenerator, Indexin }, x => x.Dispose()); } - return sourceCount; + return new WritingDocumentsInfo + { + ChangedDocs = sourceCount, + HighestETag = batch.HighestEtagInBatch, + ShouldStoreCommitPoint = true + }; }); + AddindexingPerformanceStat(new IndexingPerformanceStats { OutputCount = count, @@ -269,7 +275,16 @@ public override void Remove(string[] keys, WorkContext context) context.AddError(name, null, e.Message); }, batcher => batcher.Dispose()); - return keys.Length; + + IndexStats currentIndexStats = null; + context.TransactionalStorage.Batch(accessor => currentIndexStats = accessor.Indexing.GetIndexStats(name)); + + return new WritingDocumentsInfo + { + ChangedDocs = keys.Length, + HighestETag = currentIndexStats.LastIndexedEtag, + ShouldStoreCommitPoint = true + }; }); } } diff --git a/Raven.Database/Indexing/WritingDocumentsInfo.cs b/Raven.Database/Indexing/WritingDocumentsInfo.cs new file mode 100644 index 000000000000..88dc92eefe1a --- /dev/null +++ b/Raven.Database/Indexing/WritingDocumentsInfo.cs @@ -0,0 +1,28 @@ +// ----------------------------------------------------------------------- +// +// Copyright (c) Hibernating Rhinos LTD. All rights reserved. +// +// ----------------------------------------------------------------------- +using System; + +namespace Raven.Database.Indexing +{ + public class WritingDocumentsInfo + { + public int ChangedDocs { get; set; } + + public Guid? HighestETag { get; set; } + + public bool ShouldStoreCommitPoint { get; set; } + + public static WritingDocumentsInfo WithoutCommitPoint(int changedDocs) + { + return new WritingDocumentsInfo + { + ChangedDocs = changedDocs, + HighestETag = null, + ShouldStoreCommitPoint = false + }; + } + } +} \ No newline at end of file diff --git a/Raven.Database/Raven.Database.csproj b/Raven.Database/Raven.Database.csproj index 4e671682cc8a..599cb182f0ef 100644 --- a/Raven.Database/Raven.Database.csproj +++ b/Raven.Database/Raven.Database.csproj @@ -220,10 +220,13 @@ + + +