From 30ddb0b6abe72e11de3f109d418a243b52424e87 Mon Sep 17 00:00:00 2001 From: Ben Butler-Cole Date: Fri, 12 Aug 2016 13:27:42 +0100 Subject: [PATCH] db.awaitIndex() waits for the index to come online --- .../java/org/neo4j/function/Predicates.java | 50 ++++--- .../neo4j/kernel/api/exceptions/Status.java | 6 +- .../builtinprocs/AwaitIndexProcedure.java | 98 ++++++++++--- .../builtinprocs/BuiltInProcedures.java | 9 +- .../builtinprocs/AwaitIndexProcedureTest.java | 132 ++++++++++++++++-- .../builtinprocs/BuiltInProceduresTest.java | 2 +- .../integrationtest/BuiltInProceduresIT.java | 3 +- 7 files changed, 235 insertions(+), 65 deletions(-) diff --git a/community/common/src/main/java/org/neo4j/function/Predicates.java b/community/common/src/main/java/org/neo4j/function/Predicates.java index a447d57eda631..6104475c51925 100644 --- a/community/common/src/main/java/org/neo4j/function/Predicates.java +++ b/community/common/src/main/java/org/neo4j/function/Predicates.java @@ -152,37 +152,44 @@ public static void await( Supplier supplier, Predicate predic public static void await( Supplier condition, long timeout, TimeUnit unit ) throws TimeoutException, InterruptedException { - await( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS ); + awaitEx( condition::get, timeout, unit ); } - public static void awaitEx( ThrowingSupplier condition, long timeout, TimeUnit unit ) - throws TimeoutException, InterruptedException + public static void awaitEx( ThrowingSupplier condition, + long timeout, TimeUnit unit ) + throws TimeoutException, InterruptedException, EXCEPTION { - Supplier adapter = () -> { - try - { - return condition.get(); - } - catch ( Exception e ) - { - throw new RuntimeException( e ); - } - }; - await( adapter, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS ); + awaitEx( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS ); } - public static void await( Supplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) - throws TimeoutException, InterruptedException + public static void awaitEx( ThrowingSupplier condition, + long timeout, TimeUnit unit, long pollInterval, + TimeUnit pollUnit ) + throws TimeoutException, InterruptedException, EXCEPTION { - if ( !tryAwait( condition, timeout, timeoutUnit, pollInterval, pollUnit ) ) + if ( !tryAwaitEx( condition, timeout, unit, pollInterval, pollUnit ) ) { throw new TimeoutException( - "Waited for " + timeout + " " + timeoutUnit + ", but " + condition + " was not accepted." ); + "Waited for " + timeout + " " + unit + ", but " + condition + " was not accepted." ); } } - public static boolean tryAwait( Supplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit ) - throws InterruptedException + public static void await( Supplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, + TimeUnit pollUnit ) throws TimeoutException, InterruptedException + { + awaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit ); + } + + public static boolean tryAwait( Supplier condition, long timeout, TimeUnit timeoutUnit, long pollInterval, + TimeUnit pollUnit ) throws InterruptedException + { + return tryAwaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit ); + } + + public static boolean tryAwaitEx( ThrowingSupplier condition, + long timeout, TimeUnit timeoutUnit, + long pollInterval, TimeUnit pollUnit ) + throws InterruptedException, EXCEPTION { long deadlineMillis = System.currentTimeMillis() + timeoutUnit.toMillis( timeout ); long pollIntervalMillis = pollUnit.toMillis( pollInterval ); @@ -199,7 +206,8 @@ public static boolean tryAwait( Supplier condition, long timeout, TimeU return false; } - public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) throws InterruptedException + public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) + throws InterruptedException { long sleep = unit.toMillis( checkInterval ); do diff --git a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java index 90b960e5762cd..c71608a3d337d 100644 --- a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java +++ b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java @@ -375,8 +375,10 @@ enum Procedure implements Status ProcedureCallFailed( ClientError, "Failed to invoke a procedure. See the detailed error description for exact cause." ), TypeError( ClientError, - "A procedure is using or receiving a value of an invalid type." ) - ; + "A procedure is using or receiving a value of an invalid type." ), + ProcedureTimedOut( ClientError, + "The procedure has not completed within the specified timeout. You may want to retry with a longer " + + "timeout." ); private final Code code; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedure.java b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedure.java index 34fdcdfeee661..d845e67a31ac7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedure.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedure.java @@ -19,6 +19,10 @@ */ package org.neo4j.kernel.builtinprocs; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.neo4j.function.Predicates; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.ReadOperations; import org.neo4j.kernel.api.exceptions.ProcedureException; @@ -28,61 +32,111 @@ import org.neo4j.kernel.api.index.IndexDescriptor; import org.neo4j.kernel.api.index.InternalIndexState; -import static org.neo4j.kernel.api.index.InternalIndexState.ONLINE; +import static java.lang.String.format; public class AwaitIndexProcedure { - private ReadOperations operations; + private final ReadOperations operations; public AwaitIndexProcedure( KernelTransaction tx ) { operations = tx.acquireStatement().readOperations(); } - public void execute( String labelName, String propertyKeyName ) throws ProcedureException + public void execute( String labelName, String propertyKeyName, long timeout, TimeUnit timeoutUnits ) + throws ProcedureException { int labelId = getLabelId( labelName ); int propertyKeyId = getPropertyKeyId( propertyKeyName ); + String indexDescription = formatIndex( labelName, propertyKeyName ); + IndexDescriptor index = getIndex( labelId, propertyKeyId, indexDescription ); + waitUntilOnline( index, indexDescription, timeout, timeoutUnits ); + } - if ( ONLINE != getState( labelName, propertyKeyName, labelId, propertyKeyId ) ) + private int getLabelId( String labelName ) throws ProcedureException + { + int labelId = operations.labelGetForName( labelName ); + if ( labelId == ReadOperations.NO_SUCH_LABEL ) { - throw new ProcedureException( Status.General.UnknownError, "Index not online" ); + throw new ProcedureException( Status.Schema.LabelAccessFailed, "No such label %s", labelName ); } + return labelId; } - private InternalIndexState getState( String labelName, String propertyKeyName, int labelId, int propertyKeyId ) + private int getPropertyKeyId( String propertyKeyName ) throws ProcedureException + { + int propertyKeyId = operations.propertyKeyGetForName( propertyKeyName ); + if ( propertyKeyId == ReadOperations.NO_SUCH_PROPERTY_KEY ) + { + throw new ProcedureException( Status.Schema.PropertyKeyAccessFailed, + "No such property key %s", propertyKeyName ); + } + return propertyKeyId; + } + + private IndexDescriptor getIndex( int labelId, int propertyKeyId, String indexDescription ) throws + ProcedureException + { + try + { + return operations.indexGetForLabelAndPropertyKey( labelId, propertyKeyId ); + } + catch ( SchemaRuleNotFoundException e ) + { + throw new ProcedureException( Status.Schema.IndexNotFound, e, "No index on %s", indexDescription ); + } + } + + private void waitUntilOnline( IndexDescriptor index, String indexDescription, long timeout, TimeUnit timeoutUnits ) throws ProcedureException { try { - IndexDescriptor index = operations.indexGetForLabelAndPropertyKey( labelId, propertyKeyId ); - return operations.indexGetState( index ); + Predicates.awaitEx( () -> isOnline( indexDescription, index ), timeout, timeoutUnits ); + } + catch ( TimeoutException e ) + { + throw new ProcedureException( Status.Procedure.ProcedureTimedOut, + "Index on %s did not come online within %s %s", indexDescription, timeout, timeoutUnits ); } - catch ( SchemaRuleNotFoundException | IndexNotFoundKernelException e ) + catch ( InterruptedException e ) { - throw new ProcedureException( Status.Schema.IndexNotFound, e, - "No index on :%s(%s)", labelName, propertyKeyName ); + throw new ProcedureException( Status.General.DatabaseUnavailable, + "Interrupted waiting for index on %s to come online", indexDescription ); } } - private int getPropertyKeyId( String propertyKeyName ) throws ProcedureException + private boolean isOnline( String indexDescription, IndexDescriptor index ) throws ProcedureException { - int propertyKeyId = operations.propertyKeyGetForName( propertyKeyName ); - if ( propertyKeyId == ReadOperations.NO_SUCH_PROPERTY_KEY ) + InternalIndexState state = getState( indexDescription, index ); + switch ( state ) { - throw new ProcedureException( Status.Schema.PropertyKeyAccessFailed, - "No such property key %s", propertyKeyName ); + case POPULATING: + return false; + case ONLINE: + return true; + case FAILED: + throw new ProcedureException( Status.Schema.IndexCreationFailed, + "Index on %s is in failed state", indexDescription ); + default: + throw new IllegalStateException( "Unknown index state " + state ); } - return propertyKeyId; } - private int getLabelId( String labelName ) throws ProcedureException + private InternalIndexState getState( String indexDescription, IndexDescriptor index ) throws ProcedureException { - int labelId = operations.labelGetForName( labelName ); - if ( labelId == ReadOperations.NO_SUCH_LABEL ) + try { - throw new ProcedureException( Status.Schema.LabelAccessFailed, "No such label %s", labelName ); + return operations.indexGetState( index ); } - return labelId; + catch ( IndexNotFoundKernelException e ) + { + throw new ProcedureException( Status.Schema.IndexNotFound, e, "No index on %s", indexDescription ); + } + } + + private String formatIndex( String labelName, String propertyKeyName ) + { + return format( ":%s(%s)", labelName, propertyKeyName ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/BuiltInProcedures.java b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/BuiltInProcedures.java index 9ca23e1896ad7..843fbf33abf71 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/BuiltInProcedures.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/builtinprocs/BuiltInProcedures.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.neo4j.graphdb.Label; @@ -92,11 +93,11 @@ public Stream listIndexes() throws ProcedureException } @Procedure(name = "db.awaitIndex", mode = READ) - public void awaitIndex( @Name("label") String labelName, @Name("property") String propertyKeyName ) - throws ProcedureException - + public void awaitIndex( @Name("label") String labelName, + @Name("property") String propertyKeyName, + @Name(value = "timeOutSeconds") long timeout ) throws ProcedureException { - new AwaitIndexProcedure(tx).execute( labelName, propertyKeyName ); + new AwaitIndexProcedure( tx ).execute( labelName, propertyKeyName, timeout, TimeUnit.SECONDS ); } @Procedure(name = "db.constraints", mode = READ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedureTest.java b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedureTest.java index 639dd57d2bca3..a3e46b268f817 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedureTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/AwaitIndexProcedureTest.java @@ -19,7 +19,13 @@ */ package org.neo4j.kernel.builtinprocs; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.neo4j.kernel.api.DataWriteOperations; import org.neo4j.kernel.api.KernelTransaction; @@ -32,11 +38,15 @@ import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException; +import org.neo4j.kernel.api.exceptions.schema.IndexSchemaRuleNotFoundException; import org.neo4j.kernel.api.exceptions.schema.SchemaRuleNotFoundException; import org.neo4j.kernel.api.index.IndexDescriptor; +import org.neo4j.kernel.api.index.InternalIndexState; import org.neo4j.kernel.api.security.AccessMode; -import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -46,11 +56,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.kernel.api.index.InternalIndexState.FAILED; import static org.neo4j.kernel.api.index.InternalIndexState.ONLINE; import static org.neo4j.kernel.api.index.InternalIndexState.POPULATING; +import static org.neo4j.test.assertion.Assert.assertEventually; public class AwaitIndexProcedureTest { + private static final int timeout = 40; + private static final TimeUnit timeoutUnits = TimeUnit.MILLISECONDS; private final ReadOperations operations = mock( ReadOperations.class ); private final AwaitIndexProcedure procedure = new AwaitIndexProcedure( new StubKernelTransaction( operations ) ); @@ -61,12 +75,12 @@ public void shouldThrowAnExceptionIfTheLabelDoesntExist() throws ProcedureExcept try { - procedure.execute( "non-existent-label", null ); + procedure.execute( "non-existent-label", null, timeout, timeoutUnits ); fail( "Expected an exception" ); } catch ( ProcedureException e ) { - assertThat( e.getMessage(), containsString( "non-existent-label" ) ); + assertThat( e.status(), is( Status.Schema.LabelAccessFailed ) ); } } @@ -77,12 +91,12 @@ public void shouldThrowAnExceptionIfThePropertyKeyDoesntExist() throws Procedure try { - procedure.execute( null, "non-existent-property-key" ); + procedure.execute( null, "non-existent-property-key", timeout, timeoutUnits ); fail( "Expected an exception" ); } catch ( ProcedureException e ) { - assertThat( e.getMessage(), containsString( "non-existent-property-key" ) ); + assertThat( e.status(), is( Status.Schema.PropertyKeyAccessFailed ) ); } } @@ -93,34 +107,124 @@ public void shouldLookUpTheIndexByLabelIdAndPropertyKeyId() when( operations.labelGetForName( anyString() ) ).thenReturn( 123 ); when( operations.propertyKeyGetForName( anyString() ) ).thenReturn( 456 ); when( operations.indexGetForLabelAndPropertyKey( anyInt(), anyInt() ) ) - .thenReturn( new IndexDescriptor( -1, -1 ) ); + .thenReturn( new IndexDescriptor( 0, 0 ) ); when( operations.indexGetState( any( IndexDescriptor.class ) ) ).thenReturn( ONLINE ); - procedure.execute( null, null ); + procedure.execute( null, null, timeout, timeoutUnits ); verify( operations ).indexGetForLabelAndPropertyKey( 123, 456 ); } @Test - public void shouldThrowAnExceptionIfTheIndexIsNotOnline() + public void shouldThrowAnExceptionIfTheIndexHasFailed() throws SchemaRuleNotFoundException, IndexNotFoundKernelException { - when( operations.labelGetForName( anyString() ) ).thenReturn( 123 ); - when( operations.propertyKeyGetForName( anyString() ) ).thenReturn( 456 ); + when( operations.labelGetForName( anyString() ) ).thenReturn( 0 ); + when( operations.propertyKeyGetForName( anyString() ) ).thenReturn( 0 ); when( operations.indexGetForLabelAndPropertyKey( anyInt(), anyInt() ) ) - .thenReturn( new IndexDescriptor( -1, -1 ) ); - when( operations.indexGetState( any( IndexDescriptor.class ) ) ).thenReturn( POPULATING ); + .thenReturn( new IndexDescriptor( 0, 0 ) ); + when( operations.indexGetState( any( IndexDescriptor.class ) ) ).thenReturn( FAILED ); try { - procedure.execute( null, null ); + procedure.execute( null, null, timeout, timeoutUnits ); fail( "Expected an exception" ); } catch ( ProcedureException e ) { - assertThat( e.getMessage(), containsString( "not online" ) ); + assertThat( e.status(), is( Status.Schema.IndexCreationFailed ) ); + } + } + + @Test + public void shouldThrowAnExceptionIfTheIndexDoesNotExist() + throws SchemaRuleNotFoundException, IndexNotFoundKernelException + + { + when( operations.labelGetForName( anyString() ) ).thenReturn( 0 ); + when( operations.propertyKeyGetForName( anyString() ) ).thenReturn( 0 ); + //noinspection unchecked + when( operations.indexGetForLabelAndPropertyKey( anyInt(), anyInt() ) ) + .thenThrow( new IndexSchemaRuleNotFoundException( -1, -1 ) ); + + try + { + procedure.execute( null, null, timeout, timeoutUnits ); + fail( "Expected an exception" ); } + catch ( ProcedureException e ) + { + assertThat( e.status(), is( Status.Schema.IndexNotFound ) ); + } + } + + @Test + public void shouldBlockUntilTheIndexIsOnline() throws SchemaRuleNotFoundException, IndexNotFoundKernelException, + InterruptedException + { + when( operations.labelGetForName( anyString() ) ).thenReturn( 0 ); + when( operations.propertyKeyGetForName( anyString() ) ).thenReturn( 0 ); + when( operations.indexGetForLabelAndPropertyKey( anyInt(), anyInt() ) ) + .thenReturn( new IndexDescriptor( 0, 0 ) ); + + AtomicReference state = new AtomicReference<>( POPULATING ); + when( operations.indexGetState( any( IndexDescriptor.class ) ) ).then( new Answer() + { + @Override + public InternalIndexState answer( InvocationOnMock invocationOnMock ) throws Throwable + { + return state.get(); + } + } ); + + AtomicBoolean done = new AtomicBoolean( false ); + new Thread( () -> + { + try + { + procedure.execute( null, null, timeout, timeoutUnits ); + } + catch ( ProcedureException e ) + { + throw new RuntimeException( e ); + } + done.set( true ); + } ).start(); + + assertThat( done.get(), is( false ) ); + + state.set( ONLINE ); + assertEventually( "Procedure did not return after index was online", + done::get, is( true ), 5, TimeUnit.MILLISECONDS ); + } + + @Test + public void shouldTimeoutIfTheIndexTakesTooLongToComeOnline() + throws InterruptedException, SchemaRuleNotFoundException, IndexNotFoundKernelException + { + when( operations.labelGetForName( anyString() ) ).thenReturn( 0 ); + when( operations.propertyKeyGetForName( anyString() ) ).thenReturn( 0 ); + when( operations.indexGetForLabelAndPropertyKey( anyInt(), anyInt() ) ) + .thenReturn( new IndexDescriptor( 0, 0 ) ); + when( operations.indexGetState( any( IndexDescriptor.class ) ) ).thenReturn( POPULATING ); + + AtomicReference exception = new AtomicReference<>(); + new Thread( () -> + { + try + { + procedure.execute( null, null, timeout, timeoutUnits ); + } + catch ( ProcedureException e ) + { + exception.set( e ); + } + } ).start(); + + assertEventually( "Procedure did not time out", exception::get, not( nullValue() ), timeout * 2, timeoutUnits ); + //noinspection ThrowableResultOfMethodCallIgnored + assertThat( exception.get().status(), is( Status.Procedure.ProcedureTimedOut ) ); } private class StubKernelTransaction implements KernelTransaction diff --git a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java index f89e63cdbddf3..7a8525a4743ed 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/builtinprocs/BuiltInProceduresTest.java @@ -143,7 +143,7 @@ public void shouldListCorrectBuiltinProcedures() throws Throwable { // When/Then assertThat( call( "dbms.procedures" ), contains( - record( "db.awaitIndex", "db.awaitIndex(label :: STRING?, property :: STRING?) :: VOID" ), + record( "db.awaitIndex", "db.awaitIndex(label :: STRING?, property :: STRING?, timeOutSeconds :: INTEGER?) :: VOID" ), record( "db.constraints", "db.constraints() :: (description :: STRING?)" ), record( "db.indexes", "db.indexes() :: (description :: STRING?, state :: STRING?)" ), record( "db.labels", "db.labels() :: (label :: STRING?)" ), diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java index 7df184d3e3366..b6fccd1e59249 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/BuiltInProceduresIT.java @@ -104,7 +104,8 @@ public void listProcedures() throws Throwable assertThat( asList( stream ), containsInAnyOrder( equalTo( new Object[]{"db.constraints", "db.constraints() :: (description :: STRING?)"} ), equalTo( new Object[]{"db.indexes", "db.indexes() :: (description :: STRING?, state :: STRING?)"} ), - equalTo( new Object[]{"db.awaitIndex", "db.awaitIndex(label :: STRING?, property :: STRING?) :: VOID"} ), + equalTo( new Object[]{"db.awaitIndex", "db.awaitIndex(label :: STRING?, property :: STRING?, " + + "timeOutSeconds :: INTEGER?) :: VOID"} ), equalTo( new Object[]{"db.propertyKeys", "db.propertyKeys() :: (propertyKey :: STRING?)"} ), equalTo( new Object[]{"db.labels", "db.labels() :: (label :: STRING?)"} ), equalTo( new Object[]{"db.relationshipTypes", "db.relationshipTypes() :: (relationshipType :: " +