Skip to content

Commit

Permalink
Attempt to restore index (map-only right now) from previously stored …
Browse files Browse the repository at this point in the history
…segment file.
  • Loading branch information
Arkadiusz Palinski committed Feb 11, 2013
1 parent 4ce686c commit 1c2b8d2
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 16 deletions.
55 changes: 48 additions & 7 deletions Raven.Database/Indexing/Index.cs
Expand Up @@ -242,7 +242,6 @@ public void MergeSegments()

public abstract void IndexDocuments(AbstractViewGenerator viewGenerator, IndexingBatch batch, IStorageActionsAccessor actions, DateTime minimumTimestamp);


protected virtual IndexQueryResult RetrieveDocument(Document document, FieldsToFetch fieldsToFetch, ScoreDoc score)
{
return new IndexQueryResult
Expand Down Expand Up @@ -304,7 +303,7 @@ public static RavenJObject CreateDocumentFromFields(Document document, FieldsToF
return new KeyValuePair<string, RavenJToken>(fld.Name, stringValue);
}

protected void Write(Func<IndexWriter, Analyzer, IndexingWorkStats, int> action)
protected void Write(Func<IndexWriter, Analyzer, IndexingWorkStats, WritingDocumentsInfo> action)
{
if (disposed)
throw new ObjectDisposedException("Index " + name + " has been disposed");
Expand All @@ -314,6 +313,7 @@ protected void Write(Func<IndexWriter, Analyzer, IndexingWorkStats, int> action)
bool shouldRecreateSearcher;
var toDispose = new List<Action>();
Analyzer searchAnalyzer = null;
WritingDocumentsInfo info;
try
{
waitReason = "Write";
Expand All @@ -340,12 +340,12 @@ protected void Write(Func<IndexWriter, Analyzer, IndexingWorkStats, int> action)
logIndexing.Warn("Could not obtain the 'writing-to-index' lock of '{0}' index", name);
}

int changedDocs;

var stats = new IndexingWorkStats();
try
{
changedDocs = action(indexWriter, searchAnalyzer, stats);
shouldRecreateSearcher = changedDocs > 0;
info = action(indexWriter, searchAnalyzer, stats);
shouldRecreateSearcher = info.ChangedDocs > 0;
foreach (var indexExtension in indexExtensions.Values)
{
indexExtension.OnDocumentsIndexed(currentlyIndexDocuments, searchAnalyzer);
Expand All @@ -357,7 +357,7 @@ protected void Write(Func<IndexWriter, Analyzer, IndexingWorkStats, int> action)
throw;
}

if (changedDocs > 0)
if (info.ChangedDocs > 0)
{
UpdateIndexingStats(context, stats);
WriteTempIndexToDiskIfNeeded(context);
Expand All @@ -382,11 +382,52 @@ protected void Write(Func<IndexWriter, Analyzer, IndexingWorkStats, int> action)
waitReason = null;
LastIndexTime = SystemTime.UtcNow;
}

if (info.ShouldStoreCommitPoint && info.HighestETag != null)
{
StoreCommitPoint(info.HighestETag.Value);
}

if (shouldRecreateSearcher)
RecreateSearcher();
}
}

private void StoreCommitPoint(Guid latestIndexedETag)
{
var indexCommit = new IndexCommitPoint
{
LastIndexedETag = latestIndexedETag,
TimeStamp = SystemTime.UtcNow,
SegmentsInfo = GetCurrentSegmentsInfo()
};

context.IndexStorage.StoreCommitPoint(name, indexCommit);
}

private IndexSegmentsInfo GetCurrentSegmentsInfo()
{
var segmentInfos = new SegmentInfos();
var result = new IndexSegmentsInfo();

try
{
segmentInfos.Read(directory);

result.Generation = segmentInfos.Generation;
result.CurrentSegmentFileName = segmentInfos.GetCurrentSegmentFileName();
result.ReferencedFiles = segmentInfos.Files(directory, false);
}
catch (CorruptIndexException ex)
{
logIndexing.WarnException(string.Format("Could not read segment information for index '{0}'", name), ex);

result.IsIndexCorrupted = true;
}

return result;
}

protected void UpdateIndexingStats(WorkContext context, IndexingWorkStats stats)
{
context.TransactionalStorage.Batch(accessor =>
Expand Down Expand Up @@ -1290,7 +1331,7 @@ public void Backup(string backupDirectory, string path, string incrementalTag)
File.Copy(fullPath, Path.Combine(saveToFolder, fileName));
allFilesWriter.WriteLine(fileName);
}
return 0;
return WritingDocumentsInfo.WithoutCommitPoint(0);
});

var commit = snapshotter.Snapshot();
Expand Down
18 changes: 18 additions & 0 deletions Raven.Database/Indexing/IndexCommitPoint.cs
@@ -0,0 +1,18 @@
// -----------------------------------------------------------------------
// <copyright file="IndexCommitPoint.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System;

namespace Raven.Database.Indexing
{
public class IndexCommitPoint
{
public Guid LastIndexedETag { get; set; }

public IndexSegmentsInfo SegmentsInfo { get; set; }

public DateTime TimeStamp { get; set; }
}
}
60 changes: 60 additions & 0 deletions Raven.Database/Indexing/IndexSegmentsInfo.cs
@@ -0,0 +1,60 @@
// -----------------------------------------------------------------------
// <copyright file="IndexSegmentsInfo.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
// -----------------------------------------------------------------------
using System.Collections.Generic;
using System.Linq;

namespace Raven.Database.Indexing
{
public class IndexSegmentsInfo
{
public long Generation { get; set; }

public string CurrentSegmentFileName { get; set; }

public ICollection<string> ReferencedFiles { get; set; }

public bool IsIndexCorrupted { get; set; }

protected bool Equals(IndexSegmentsInfo other)
{
var theSameNumberOfItems = ReferencedFiles.Count == other.ReferencedFiles.Count;
var theSameReferencedFiles = true;

if (theSameNumberOfItems)
{
if (ReferencedFiles.Any(file => other.ReferencedFiles.Contains(file) == false))
theSameReferencedFiles = false;
}
else
theSameReferencedFiles = false;

return theSameReferencedFiles &&
Generation == other.Generation &&
string.Equals(CurrentSegmentFileName, other.CurrentSegmentFileName) &&
IsIndexCorrupted.Equals(other.IsIndexCorrupted);
}

public override int GetHashCode()
{
unchecked
{
var hashCode = Generation.GetHashCode();
hashCode = (hashCode * 397) ^ (CurrentSegmentFileName != null ? CurrentSegmentFileName.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (ReferencedFiles != null ? ReferencedFiles.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ IsIndexCorrupted.GetHashCode();
return hashCode;
}
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((IndexSegmentsInfo) obj);
}
}
}
122 changes: 121 additions & 1 deletion Raven.Database/Indexing/IndexStorage.cs
Expand Up @@ -32,6 +32,7 @@
using Raven.Database.Plugins;
using Raven.Database.Queries;
using Raven.Database.Storage;
using Raven.Imports.Newtonsoft.Json;
using Raven.Json.Linq;
using Directory = System.IO.Directory;
using System.ComponentModel.Composition;
Expand All @@ -45,6 +46,8 @@ public class IndexStorage : CriticalFinalizerObject, IDisposable
{
private readonly DocumentDatabase documentDatabase;
private const string IndexVersion = "2.0.0.1";
private const string LastCommitPointDirectory = "CommitPoint";
private const string LastCommitPointFile = "index.commitPoint";

private readonly IndexDefinitionStorage indexDefinitionStorage;
private readonly InMemoryRavenConfiguration configuration;
Expand Down Expand Up @@ -230,7 +233,24 @@ private void LoadExistingSuggestionsExtentions(string indexName, Index indexImpl
if (configuration.ResetIndexOnUncleanShutdown)
throw new InvalidOperationException("Rude shutdown detected on: " + indexDirectory);

CheckIndexAndRecover(directory, indexDirectory);
IndexCommitPoint lastCommitPoint;

if (TryReuseLastCommitPointToRecoverIndex(indexDefinition, indexFullPath, out lastCommitPoint) == false)
{
CheckIndexAndRecover(directory, indexDirectory);
}
else
{
if (lastCommitPoint == null)
throw new InvalidOperationException("Index '" + indexDefinition.Name + "' reused previous segment files after in order to recover, but NULL commit point was returned.");

documentDatabase.TransactionalStorage.Batch(
accessor =>
accessor.Indexing.UpdateLastIndexed(indexDefinition.Name, lastCommitPoint.LastIndexedETag,
lastCommitPoint.TimeStamp));

}

directory.DeleteFile("writing-to-index.lock");
}
}
Expand Down Expand Up @@ -295,6 +315,106 @@ private static void CheckIndexAndRecover(Lucene.Net.Store.Directory directory, s
startupLog.Warn("Fixed index {0} in {1}", indexDirectory, sp.Elapsed);
}

public static void StoreCommitPoint(IndexCommitPoint indexCommit, string indexDirectory)
{
var commitPointDirectoryFullPath = Path.Combine(indexDirectory, LastCommitPointDirectory);

if (Directory.Exists(commitPointDirectoryFullPath))
{
Directory.Delete(commitPointDirectoryFullPath);
}

Directory.CreateDirectory(commitPointDirectoryFullPath);

using (var commitPointFile = File.Create(Path.Combine(commitPointDirectoryFullPath, LastCommitPointFile)))
{
var jsonSerializer = new JsonSerializer();
var textWriter = new JsonTextWriter(new StreamWriter(commitPointFile));

jsonSerializer.Serialize(textWriter, indexCommit);
}

File.Copy(Path.Combine(indexDirectory, indexCommit.SegmentsInfo.CurrentSegmentFileName), Path.Combine(commitPointDirectoryFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName));
}

public void StoreCommitPoint(string indexName, IndexCommitPoint indexCommit)
{
var indexFullPath = Path.Combine(path, MonoHttpUtility.UrlEncode(indexName));

if (indexCommit.SegmentsInfo != null && indexCommit.SegmentsInfo.IsIndexCorrupted == false)
{
var commitPointDirectoryFullPath = Path.Combine(indexFullPath, LastCommitPointDirectory);

if (Directory.Exists(commitPointDirectoryFullPath))
{
IOExtensions.DeleteDirectory(commitPointDirectoryFullPath);
}

Directory.CreateDirectory(commitPointDirectoryFullPath);

using (var commitPointFile = File.Create(Path.Combine(commitPointDirectoryFullPath, LastCommitPointFile)))
{
using (var sw = new StreamWriter(commitPointFile))
{
var jsonSerializer = new JsonSerializer();
var textWriter = new JsonTextWriter(sw);

jsonSerializer.Serialize(textWriter, indexCommit);

sw.Flush();
}
}

File.Copy(Path.Combine(indexFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName), Path.Combine(commitPointDirectoryFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName));
}
}

private static bool TryReuseLastCommitPointToRecoverIndex(IndexDefinition indexDefinition, string fullIndexPath, out IndexCommitPoint indexCommit)
{
indexCommit = null;

try
{
if (indexDefinition.IsMapReduce == false)
return false;

var commitPointDirectoryFullPath = Path.Combine(fullIndexPath, LastCommitPointDirectory);

if (Directory.Exists(commitPointDirectoryFullPath) == false)
return false;

using (var commitPointFile = File.Open(Path.Combine(commitPointDirectoryFullPath, LastCommitPointFile), FileMode.Open))
{
var jsonSerializer = new JsonSerializer();
var textReader = new JsonTextReader(new StreamReader(commitPointFile));

indexCommit = jsonSerializer.Deserialize<IndexCommitPoint>(textReader);
}

var filesInIndexDirectory = Directory.GetFiles(fullIndexPath);

var missingFile =
indexCommit.SegmentsInfo.ReferencedFiles.Any(
referencedFile => filesInIndexDirectory.Contains(referencedFile) == false);

if (missingFile)
{
return false;
}

File.Copy(Path.Combine(commitPointDirectoryFullPath, indexCommit.SegmentsInfo.CurrentSegmentFileName), Path.Combine(fullIndexPath, indexCommit.SegmentsInfo.CurrentSegmentFileName), true);

return true;
}
catch (Exception ex)
{
log.ErrorException("Error occured during an attempt to recover an index named '" + indexDefinition.Name +
"'from last segment files", ex);

return false;
}
}

internal Lucene.Net.Store.Directory MakeRAMDirectoryPhysical(RAMDirectory ramDir, string indexName)
{
var newDir = new LuceneCodecDirectory(Path.Combine(path, MonoHttpUtility.UrlEncode(IndexDefinitionStorage.FixupIndexName(indexName, path))), documentDatabase.IndexCodecs.OfType<AbstractIndexCodec>());
Expand Down
4 changes: 3 additions & 1 deletion Raven.Database/Indexing/IndexingBatch.cs
Expand Up @@ -6,8 +6,9 @@ namespace Raven.Database.Indexing
{
public class IndexingBatch
{
public IndexingBatch()
public IndexingBatch(Guid highestEtagInBatch)
{
HighestEtagInBatch = highestEtagInBatch;
Ids = new List<string>();
Docs = new List<dynamic>();
SkipDeleteFromIndex = new List<bool>();
Expand All @@ -17,6 +18,7 @@ public IndexingBatch()
public readonly List<dynamic> Docs;
public readonly List<bool> SkipDeleteFromIndex;
public DateTime? DateTime;
public readonly Guid HighestEtagInBatch;

public void Add(JsonDocument doc, object asJson, bool skipDeleteFromIndex)
{
Expand Down
6 changes: 3 additions & 3 deletions Raven.Database/Indexing/IndexingExecuter.cs
Expand Up @@ -111,7 +111,7 @@ private Guid DoActualIndexing(IList<IndexToWorkOn> indexesToWorkOn, List<JsonDoc
var lastEtag = lastByEtag.Etag.Value;

context.IndexedPerSecIncreaseBy(jsonDocs.Count);
var result = FilterIndexes(indexesToWorkOn, jsonDocs).ToList();
var result = FilterIndexes(indexesToWorkOn, jsonDocs, lastEtag).ToList();

ExecuteAllInterleaved(result, index => HandleIndexingFor(index, lastEtag, lastModified));

Expand Down Expand Up @@ -278,7 +278,7 @@ public class IndexingBatchForIndex
public IndexingBatch Batch { get; set; }
}

private IEnumerable<IndexingBatchForIndex> FilterIndexes(IList<IndexToWorkOn> indexesToWorkOn, List<JsonDocument> jsonDocs)
private IEnumerable<IndexingBatchForIndex> FilterIndexes(IList<IndexToWorkOn> indexesToWorkOn, List<JsonDocument> jsonDocs, Guid highestETagInBatch)
{
var last = jsonDocs.Last();

Expand Down Expand Up @@ -323,7 +323,7 @@ private IEnumerable<IndexingBatchForIndex> FilterIndexes(IList<IndexToWorkOn> in
if (viewGenerator == null)
return; // probably deleted
var batch = new IndexingBatch();
var batch = new IndexingBatch(highestETagInBatch);
foreach (var item in filteredDocs)
{
Expand Down

0 comments on commit 1c2b8d2

Please sign in to comment.