Skip to content

Commit

Permalink
db.awaitIndex() waits for the index to come online
Browse files Browse the repository at this point in the history
  • Loading branch information
benbc committed Aug 16, 2016
1 parent 24d30aa commit 30ddb0b
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 65 deletions.
50 changes: 29 additions & 21 deletions community/common/src/main/java/org/neo4j/function/Predicates.java
Expand Up @@ -152,37 +152,44 @@ public static <TYPE> void await( Supplier<TYPE> supplier, Predicate<TYPE> predic
public static void await( Supplier<Boolean> condition, long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException
{
await( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS );
awaitEx( condition::get, timeout, unit );
}

public static void awaitEx( ThrowingSupplier<Boolean, Exception> condition, long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException
public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boolean, EXCEPTION> condition,
long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException, EXCEPTION
{
Supplier<Boolean> adapter = () -> {
try
{
return condition.get();
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
};
await( adapter, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS );
awaitEx( condition, timeout, unit, DEFAULT_POLL_INTERVAL, TimeUnit.MILLISECONDS );
}

public static void await( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit )
throws TimeoutException, InterruptedException
public static <EXCEPTION extends Exception> void awaitEx( ThrowingSupplier<Boolean, EXCEPTION> condition,
long timeout, TimeUnit unit, long pollInterval,
TimeUnit pollUnit )
throws TimeoutException, InterruptedException, EXCEPTION
{
if ( !tryAwait( condition, timeout, timeoutUnit, pollInterval, pollUnit ) )
if ( !tryAwaitEx( condition, timeout, unit, pollInterval, pollUnit ) )
{
throw new TimeoutException(
"Waited for " + timeout + " " + timeoutUnit + ", but " + condition + " was not accepted." );
"Waited for " + timeout + " " + unit + ", but " + condition + " was not accepted." );
}
}

public static boolean tryAwait( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit )
throws InterruptedException
public static void await( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval,
TimeUnit pollUnit ) throws TimeoutException, InterruptedException
{
awaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit );
}

public static boolean tryAwait( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval,
TimeUnit pollUnit ) throws InterruptedException
{
return tryAwaitEx( condition::get, timeout, timeoutUnit, pollInterval, pollUnit );
}

public static <EXCEPTION extends Exception> boolean tryAwaitEx( ThrowingSupplier<Boolean, EXCEPTION> condition,
long timeout, TimeUnit timeoutUnit,
long pollInterval, TimeUnit pollUnit )
throws InterruptedException, EXCEPTION
{
long deadlineMillis = System.currentTimeMillis() + timeoutUnit.toMillis( timeout );
long pollIntervalMillis = pollUnit.toMillis( pollInterval );
Expand All @@ -199,7 +206,8 @@ public static boolean tryAwait( Supplier<Boolean> condition, long timeout, TimeU
return false;
}

public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) throws InterruptedException
public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit )
throws InterruptedException
{
long sleep = unit.toMillis( checkInterval );
do
Expand Down
Expand Up @@ -375,8 +375,10 @@ enum Procedure implements Status
ProcedureCallFailed( ClientError,
"Failed to invoke a procedure. See the detailed error description for exact cause." ),
TypeError( ClientError,
"A procedure is using or receiving a value of an invalid type." )
;
"A procedure is using or receiving a value of an invalid type." ),
ProcedureTimedOut( ClientError,
"The procedure has not completed within the specified timeout. You may want to retry with a longer " +
"timeout." );

private final Code code;

Expand Down
Expand Up @@ -19,6 +19,10 @@
*/
package org.neo4j.kernel.builtinprocs;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.neo4j.function.Predicates;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.exceptions.ProcedureException;
Expand All @@ -28,61 +32,111 @@
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.InternalIndexState;

import static org.neo4j.kernel.api.index.InternalIndexState.ONLINE;
import static java.lang.String.format;

public class AwaitIndexProcedure
{
private ReadOperations operations;
private final ReadOperations operations;

public AwaitIndexProcedure( KernelTransaction tx )
{
operations = tx.acquireStatement().readOperations();
}

public void execute( String labelName, String propertyKeyName ) throws ProcedureException
public void execute( String labelName, String propertyKeyName, long timeout, TimeUnit timeoutUnits )
throws ProcedureException
{
int labelId = getLabelId( labelName );
int propertyKeyId = getPropertyKeyId( propertyKeyName );
String indexDescription = formatIndex( labelName, propertyKeyName );
IndexDescriptor index = getIndex( labelId, propertyKeyId, indexDescription );
waitUntilOnline( index, indexDescription, timeout, timeoutUnits );
}

if ( ONLINE != getState( labelName, propertyKeyName, labelId, propertyKeyId ) )
private int getLabelId( String labelName ) throws ProcedureException
{
int labelId = operations.labelGetForName( labelName );
if ( labelId == ReadOperations.NO_SUCH_LABEL )
{
throw new ProcedureException( Status.General.UnknownError, "Index not online" );
throw new ProcedureException( Status.Schema.LabelAccessFailed, "No such label %s", labelName );
}
return labelId;
}

private InternalIndexState getState( String labelName, String propertyKeyName, int labelId, int propertyKeyId )
private int getPropertyKeyId( String propertyKeyName ) throws ProcedureException
{
int propertyKeyId = operations.propertyKeyGetForName( propertyKeyName );
if ( propertyKeyId == ReadOperations.NO_SUCH_PROPERTY_KEY )
{
throw new ProcedureException( Status.Schema.PropertyKeyAccessFailed,
"No such property key %s", propertyKeyName );
}
return propertyKeyId;
}

private IndexDescriptor getIndex( int labelId, int propertyKeyId, String indexDescription ) throws
ProcedureException
{
try
{
return operations.indexGetForLabelAndPropertyKey( labelId, propertyKeyId );
}
catch ( SchemaRuleNotFoundException e )
{
throw new ProcedureException( Status.Schema.IndexNotFound, e, "No index on %s", indexDescription );
}
}

private void waitUntilOnline( IndexDescriptor index, String indexDescription, long timeout, TimeUnit timeoutUnits )
throws ProcedureException
{
try
{
IndexDescriptor index = operations.indexGetForLabelAndPropertyKey( labelId, propertyKeyId );
return operations.indexGetState( index );
Predicates.awaitEx( () -> isOnline( indexDescription, index ), timeout, timeoutUnits );
}
catch ( TimeoutException e )
{
throw new ProcedureException( Status.Procedure.ProcedureTimedOut,
"Index on %s did not come online within %s %s", indexDescription, timeout, timeoutUnits );
}
catch ( SchemaRuleNotFoundException | IndexNotFoundKernelException e )
catch ( InterruptedException e )
{
throw new ProcedureException( Status.Schema.IndexNotFound, e,
"No index on :%s(%s)", labelName, propertyKeyName );
throw new ProcedureException( Status.General.DatabaseUnavailable,
"Interrupted waiting for index on %s to come online", indexDescription );
}
}

private int getPropertyKeyId( String propertyKeyName ) throws ProcedureException
private boolean isOnline( String indexDescription, IndexDescriptor index ) throws ProcedureException
{
int propertyKeyId = operations.propertyKeyGetForName( propertyKeyName );
if ( propertyKeyId == ReadOperations.NO_SUCH_PROPERTY_KEY )
InternalIndexState state = getState( indexDescription, index );
switch ( state )
{
throw new ProcedureException( Status.Schema.PropertyKeyAccessFailed,
"No such property key %s", propertyKeyName );
case POPULATING:
return false;
case ONLINE:
return true;
case FAILED:
throw new ProcedureException( Status.Schema.IndexCreationFailed,
"Index on %s is in failed state", indexDescription );
default:
throw new IllegalStateException( "Unknown index state " + state );
}
return propertyKeyId;
}

private int getLabelId( String labelName ) throws ProcedureException
private InternalIndexState getState( String indexDescription, IndexDescriptor index ) throws ProcedureException
{
int labelId = operations.labelGetForName( labelName );
if ( labelId == ReadOperations.NO_SUCH_LABEL )
try
{
throw new ProcedureException( Status.Schema.LabelAccessFailed, "No such label %s", labelName );
return operations.indexGetState( index );
}
return labelId;
catch ( IndexNotFoundKernelException e )
{
throw new ProcedureException( Status.Schema.IndexNotFound, e, "No index on %s", indexDescription );
}
}

private String formatIndex( String labelName, String propertyKeyName )
{
return format( ":%s(%s)", labelName, propertyKeyName );
}
}
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.neo4j.graphdb.Label;
Expand Down Expand Up @@ -92,11 +93,11 @@ public Stream<IndexResult> listIndexes() throws ProcedureException
}

@Procedure(name = "db.awaitIndex", mode = READ)
public void awaitIndex( @Name("label") String labelName, @Name("property") String propertyKeyName )
throws ProcedureException

public void awaitIndex( @Name("label") String labelName,
@Name("property") String propertyKeyName,
@Name(value = "timeOutSeconds") long timeout ) throws ProcedureException
{
new AwaitIndexProcedure(tx).execute( labelName, propertyKeyName );
new AwaitIndexProcedure( tx ).execute( labelName, propertyKeyName, timeout, TimeUnit.SECONDS );
}

@Procedure(name = "db.constraints", mode = READ)
Expand Down

0 comments on commit 30ddb0b

Please sign in to comment.