Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

- Changed the LuceneIndexRecoveryIT to expose the problem where recovery

would insert data so that it got duplicated in a lucene index.
- Fixed so that IndexAccessor gets a separate call when doing recovery
  instead of update()
  • Loading branch information...
commit 900e8cf5f5469a89c7e5c357fded061a91557600 1 parent 8288187
@tinwelint authored
Showing with 214 additions and 38 deletions.
  1. +13 −0 community/kernel/src/main/java/org/neo4j/kernel/api/index/IndexAccessor.java
  2. +6 −0 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/AbstractDelegatingIndexProxy.java
  3. +7 −0 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/AbstractSwallowingIndexProxy.java
  4. +15 −0 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxy.java
  5. +7 −0 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexProxy.java
  6. +22 −5 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java
  7. +6 −0 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxy.java
  8. +7 −0 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/PopulatingIndexProxy.java
  9. +7 −0 community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/InMemoryIndexProvider.java
  10. +57 −24 community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneIndexAccessor.java
  11. +2 −2 community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneIndexPopulator.java
  12. +1 −1  community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneSchemaIndexProvider.java
  13. +64 −6 community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/LuceneIndexRecoveryIT.java
View
13 community/kernel/src/main/java/org/neo4j/kernel/api/index/IndexAccessor.java
@@ -41,6 +41,14 @@
void updateAndCommit( Iterable<NodePropertyUpdate> updates ) throws IOException;
/**
+ * Apply a set of changes to this index. This method will be called instead of
+ * {@link #updateAndCommit(Iterable)} during recovery of the database when starting up after
+ * a crash or similar. Updates given here may have already been applied to this index, so
+ * additional checks must be in place so that data doesn't get duplicated, but is idempotent.
+ */
+ void recover( Iterable<NodePropertyUpdate> updates ) throws IOException;
+
+ /**
* Forces this index to disk. Called at certain points from within Neo4j for example when
* rotating the logical log. After completion of this call there cannot be any essential state that
* hasn't been forced to disk.
@@ -74,6 +82,11 @@ public void drop()
public void updateAndCommit( Iterable<NodePropertyUpdate> updates )
{
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ }
@Override
public void force()
View
6 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/AbstractDelegatingIndexProxy.java
@@ -42,6 +42,12 @@ public void update( Iterable<NodePropertyUpdate> updates ) throws IOException
{
getDelegate().update( updates );
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ getDelegate().recover( updates );
+ }
@Override
public Future<Void> drop() throws IOException
View
7 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/AbstractSwallowingIndexProxy.java
@@ -21,6 +21,7 @@
import static org.neo4j.helpers.FutureAdapter.VOID;
+import java.io.IOException;
import java.util.concurrent.Future;
import org.neo4j.kernel.api.index.IndexReader;
@@ -49,6 +50,12 @@ public void update( Iterable<NodePropertyUpdate> updates )
{
// intentionally swallow updates, we're failed and nothing but re-population or dropIndex will solve this
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ // intentionally swallow updates, we're failed and nothing but re-population or dropIndex will solve this
+ }
@Override
public void force()
View
15 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxy.java
@@ -96,6 +96,21 @@ public void update( Iterable<NodePropertyUpdate> updates ) throws IOException
}
@Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ // TODO Shouldn't need the lock
+ lock.readLock().lock();
+ try
+ {
+ delegate.recover( updates );
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
public Future<Void> drop() throws IOException
{
lock.readLock().lock();
View
7 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexProxy.java
@@ -54,6 +54,8 @@
void update( Iterable<NodePropertyUpdate> updates ) throws IOException;
+ void recover( Iterable<NodePropertyUpdate> updates ) throws IOException;
+
/**
* Initiates dropping this index context. The returned {@link Future} can be used to await
* its completion.
@@ -93,6 +95,11 @@ public void start()
public void update( Iterable<NodePropertyUpdate> updates )
{
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ }
@Override
public Future<Void> drop()
View
27 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java
@@ -252,15 +252,32 @@ else if ( index == null )
public void updateIndexes( Iterable<NodePropertyUpdate> updates )
{
- for ( IndexProxy index : indexes.values() )
+ if ( serviceRunning )
{
- try
+ for ( IndexProxy index : indexes.values() )
{
- index.update( updates );
+ try
+ {
+ index.update( updates );
+ }
+ catch ( IOException e )
+ {
+ throw new UnderlyingStorageException( "Unable to update " + index, e );
+ }
}
- catch ( IOException e )
+ }
+ else
+ {
+ for ( IndexProxy index : indexes.values() )
{
- throw new UnderlyingStorageException( "Unable to update " + index, e );
+ try
+ {
+ index.recover( updates );
+ }
+ catch ( IOException e )
+ {
+ throw new UnderlyingStorageException( "Unable to update " + index, e );
+ }
}
}
}
View
6 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/OnlineIndexProxy.java
@@ -50,6 +50,12 @@ public void update( Iterable<NodePropertyUpdate> updates ) throws IOException
{
accessor.updateAndCommit( updates );
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ accessor.recover( updates );
+ }
@Override
public Future<Void> drop() throws IOException
View
7 community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/PopulatingIndexProxy.java
@@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.api.index;
+import java.io.IOException;
import java.util.concurrent.Future;
import org.neo4j.kernel.api.index.IndexNotFoundKernelException;
@@ -55,6 +56,12 @@ public void update( Iterable<NodePropertyUpdate> updates )
{
job.update( updates );
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ throw new UnsupportedOperationException( "Recovered updates shouldn't reach this place" );
+ }
@Override
public Future<Void> drop()
View
7 community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/InMemoryIndexProvider.java
@@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.api.index;
+import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -111,6 +112,12 @@ public void updateAndCommit( Iterable<NodePropertyUpdate> updates )
{
update( updates );
}
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ update( updates );
+ }
@Override
public void force()
View
81 community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneIndexAccessor.java
@@ -25,8 +25,11 @@
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.neo4j.kernel.api.impl.index.LuceneSchemaIndexProvider.DocumentLogic;
import org.neo4j.kernel.api.impl.index.LuceneSchemaIndexProvider.WriterLogic;
@@ -60,34 +63,64 @@ public void drop() throws IOException
}
@Override
- public void updateAndCommit( Iterable<NodePropertyUpdate> updates )
+ public void updateAndCommit( Iterable<NodePropertyUpdate> updates ) throws IOException
{
- try
+ for ( NodePropertyUpdate update : updates )
{
- for ( NodePropertyUpdate update : updates )
+ switch ( update.getUpdateMode() )
{
- switch ( update.getUpdateMode() )
- {
- case ADDED:
- add( update.getNodeId(), update.getValueAfter() );
- break;
- case CHANGED:
- change( update.getNodeId(), update.getValueBefore(), update.getValueAfter() );
- break;
- case REMOVED:
- remove( update.getNodeId(), update.getValueBefore() );
- break;
- default:
- throw new UnsupportedOperationException();
- }
+ case ADDED:
+ add( update.getNodeId(), update.getValueAfter() );
+ break;
+ case CHANGED:
+ change( update.getNodeId(), update.getValueBefore(), update.getValueAfter() );
+ break;
+ case REMOVED:
+ remove( update.getNodeId(), update.getValueBefore() );
+ break;
+ default:
+ throw new UnsupportedOperationException();
}
-
- // Call refresh here since we are guaranteed to be the only thread writing concurrently.
- searcherManager.maybeRefresh();
}
- catch ( IOException e )
+
+ // Call refresh here since we are guaranteed to be the only thread writing concurrently.
+ searcherManager.maybeRefresh();
+ }
+
+ @Override
+ public void recover( Iterable<NodePropertyUpdate> updates ) throws IOException
+ {
+ for ( NodePropertyUpdate update : updates )
+ {
+ switch ( update.getUpdateMode() )
+ {
+ case ADDED:
+ addRecovered( update.getNodeId(), update.getValueAfter() );
+ break;
+ case CHANGED:
+ change( update.getNodeId(), update.getValueBefore(), update.getValueAfter() );
+ break;
+ case REMOVED:
+ remove( update.getNodeId(), update.getValueBefore() );
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+ searcherManager.maybeRefresh();
+ }
+
+ private void addRecovered( long nodeId, Object value ) throws IOException
+ {
+ IndexSearcher searcher = searcherManager.acquire();
+ TopDocs hits = searcher.search( new TermQuery( documentLogic.newQueryForChangeOrRemove( nodeId ) ), 1 );
+ if ( hits.totalHits > 0 )
+ {
+ writer.updateDocument( documentLogic.newQueryForChangeOrRemove( nodeId ), documentLogic.newDocument( nodeId, value ) );
+ }
+ else
{
- throw new RuntimeException( e );
+ add( nodeId, value );
}
}
@@ -98,13 +131,13 @@ private void add( long nodeId, Object value ) throws IOException
private void change( long nodeId, Object valueBefore, Object valueAfter ) throws IOException
{
- writer.updateDocument( documentLogic.newQueryForChangeOrRemove( nodeId, valueBefore ),
+ writer.updateDocument( documentLogic.newQueryForChangeOrRemove( nodeId ),
documentLogic.newDocument( nodeId, valueAfter ) );
}
private void remove( long nodeId, Object value ) throws IOException
{
- writer.deleteDocuments( documentLogic.newQueryForChangeOrRemove( nodeId, value ) );
+ writer.deleteDocuments( documentLogic.newQueryForChangeOrRemove( nodeId ) );
}
@Override
View
4 community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneIndexPopulator.java
@@ -128,11 +128,11 @@ private void applyQueuedUpdates() throws IOException
writer.addDocument( documentLogic.newDocument( nodeId, update.getValueAfter() ) );
break;
case CHANGED:
- writer.updateDocument( documentLogic.newQueryForChangeOrRemove( nodeId, update.getValueBefore() ),
+ writer.updateDocument( documentLogic.newQueryForChangeOrRemove( nodeId ),
documentLogic.newDocument( nodeId, update.getValueAfter() ) );
break;
case REMOVED:
- writer.deleteDocuments( documentLogic.newQueryForChangeOrRemove( nodeId, update.getValueBefore() ) );
+ writer.deleteDocuments( documentLogic.newQueryForChangeOrRemove( nodeId ) );
break;
}
}
View
2  community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneSchemaIndexProvider.java
@@ -137,7 +137,7 @@ else if ( value instanceof Number )
throw new UnsupportedOperationException( value.toString() + ", " + value.getClass() );
}
- public Term newQueryForChangeOrRemove( long nodeId, Object value )
+ public Term newQueryForChangeOrRemove( long nodeId )
{
return new Term( NODE_ID_KEY, "" + nodeId );
}
View
70 community/lucene-index/src/test/java/org/neo4j/kernel/api/impl/index/LuceneIndexRecoveryIT.java
@@ -19,9 +19,9 @@
*/
package org.neo4j.kernel.api.impl.index;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import static org.neo4j.graphdb.DynamicLabel.label;
-import static org.neo4j.helpers.collection.IteratorUtil.asSet;
+import static org.neo4j.helpers.collection.IteratorUtil.asUniqueSet;
import java.io.File;
import java.io.IOException;
@@ -53,9 +53,8 @@
public class LuceneIndexRecoveryIT
{
-
@Test
- public void shouldNotAddTwiceDuringRecovery() throws Exception
+ public void addShouldBeIdempotentWhenDoingRecovery() throws Exception
{
// Given
startDb(createLuceneIndexFactory());
@@ -77,6 +76,65 @@ public void shouldNotAddTwiceDuringRecovery() throws Exception
}
@Test
+ public void changeShouldBeIdempotentWhenDoingRecovery() throws Exception
+ {
+ // Given
+ startDb(createLuceneIndexFactory());
+ Label myLabel = label( "MyLabel" );
+ createIndex( myLabel, true );
+ long node = createNode( myLabel, 12 );
+ rotateLogs();
+
+ updateNode( node, 13 );
+
+ // And Given
+ killDb();
+
+ // When
+ startDb( createLuceneIndexFactory() );
+
+ // Then
+ assertEquals( 0, doIndexLookup( myLabel, 12 ).size() );
+ assertEquals( 1, doIndexLookup( myLabel, 13 ).size() );
+ }
+
+ @Test
+ public void removeShouldBeIdempotentWhenDoingRecovery() throws Exception
+ {
+ // Given
+ startDb(createLuceneIndexFactory());
+ Label myLabel = label( "MyLabel" );
+ createIndex( myLabel, true );
+ long node = createNode( myLabel, 12 );
+ rotateLogs();
+
+ deleteNode( node );
+
+ // And Given
+ killDb();
+
+ // When
+ startDb( createLuceneIndexFactory() );
+
+ // Then
+ assertEquals( 0, doIndexLookup( myLabel, 12 ).size() );
+ }
+
+ private void deleteNode( long node )
+ {
+ Transaction tx = db.beginTx();
+ try
+ {
+ db.getNodeById( node ).delete();
+ tx.success();
+ }
+ finally
+ {
+ tx.finish();
+ }
+ }
+
+ @Test
public void shouldNotAddTwiceDuringRecoveryIfCrashedDuringPopulation() throws Exception
{
// Given
@@ -126,7 +184,7 @@ public void shouldNotUpdateTwiceDuringRecovery() throws Exception
private GraphDatabaseAPI db;
private DirectoryFactory directoryFactory;
- private DirectoryFactory ignoreCloseDirectoryFactory = new DirectoryFactory()
+ private final DirectoryFactory ignoreCloseDirectoryFactory = new DirectoryFactory()
{
@Override
public Directory open( File dir ) throws IOException
@@ -206,7 +264,7 @@ private void createIndex( Label label, boolean wait )
{
Transaction tx = db.beginTx();
Iterable<Node> iter = db.findNodesByLabelAndProperty( myLabel, NUM_BANANAS_KEY, value );
- Set<Node> nodes = asSet( iter );
+ Set<Node> nodes = asUniqueSet( iter );
tx.success();
tx.finish();
return nodes;
Please sign in to comment.
Something went wrong with that request. Please try again.