Skip to content

Commit

Permalink
Merge pull request #11164 from MishaDemianenko/3.4-index-drop
Browse files Browse the repository at this point in the history
Reworked index drop during ongoing population.
  • Loading branch information
MishaDemianenko committed Mar 7, 2018
2 parents 5b18b07 + 159b749 commit 84df9f4
Show file tree
Hide file tree
Showing 21 changed files with 375 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Future;

import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.internal.kernel.api.IndexCapability;
Expand Down Expand Up @@ -57,9 +56,9 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
}

@Override
public Future<Void> drop() throws IOException
public void drop() throws IOException
{
return getDelegate().drop();
getDelegate().drop();
}

@Override
Expand Down Expand Up @@ -105,9 +104,9 @@ public void refresh() throws IOException
}

@Override
public Future<Void> close() throws IOException
public void close() throws IOException
{
return getDelegate().close();
getDelegate().close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.kernel.impl.api.index;

import java.util.concurrent.Future;

import org.neo4j.internal.kernel.api.IndexCapability;
import org.neo4j.internal.kernel.api.schema.LabelSchemaDescriptor;
import org.neo4j.io.pagecache.IOLimiter;
Expand All @@ -31,8 +29,6 @@
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.PopulationProgress;

import static org.neo4j.helpers.FutureAdapter.VOID;

public abstract class AbstractSwallowingIndexProxy implements IndexProxy
{
private final IndexMeta indexMeta;
Expand Down Expand Up @@ -104,9 +100,8 @@ public SchemaIndexProvider.Descriptor getProviderDescriptor()
}

@Override
public Future<Void> close()
public void close()
{
return VOID;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.kernel.impl.api.index;

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -134,11 +133,12 @@ public void force( IOLimiter ioLimiter ) throws IOException
}

@Override
public Future<Void> drop() throws IOException
public void drop() throws IOException
{
if ( state.compareAndSet( State.INIT, State.CLOSED ) )
{
return super.drop();
super.drop();
return;
}

if ( State.STARTING == state.get() )
Expand All @@ -149,18 +149,20 @@ public Future<Void> drop() throws IOException
if ( state.compareAndSet( State.STARTED, State.CLOSED ) )
{
waitOpenCallsToClose();
return super.drop();
super.drop();
return;
}

throw new IllegalStateException( "IndexProxy already closed" );
}

@Override
public Future<Void> close() throws IOException
public void close() throws IOException
{
if ( state.compareAndSet( State.INIT, State.CLOSED ) )
{
return super.close();
super.close();
return;
}

if ( state.compareAndSet( State.STARTING, State.CLOSED ) )
Expand All @@ -171,7 +173,8 @@ public Future<Void> close() throws IOException
if ( state.compareAndSet( State.STARTED, State.CLOSED ) )
{
waitOpenCallsToClose();
return super.close();
super.close();
return;
}

throw new IllegalStateException( "IndexProxy already closed" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Future;

import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.internal.kernel.api.InternalIndexState;
Expand All @@ -30,7 +29,6 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.helpers.FutureAdapter.VOID;
import static org.neo4j.helpers.collection.Iterators.emptyResourceIterator;

public class FailedIndexProxy extends AbstractSwallowingIndexProxy
Expand All @@ -55,14 +53,13 @@ public class FailedIndexProxy extends AbstractSwallowingIndexProxy
}

@Override
public Future<Void> drop() throws IOException
public void drop() throws IOException
{
indexCountsRemover.remove();
String message = "FailedIndexProxy#drop index on " + indexUserDescription + " dropped due to:\n" +
getPopulationFailure().asString();
log.info( message );
populator.drop();
return VOID;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -103,13 +102,13 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
}

@Override
public Future<Void> drop() throws IOException
public void drop() throws IOException
{
lock.readLock().lock();
try
{
closed = true;
return delegate.drop();
delegate.drop();
}
finally
{
Expand Down Expand Up @@ -285,13 +284,13 @@ public IndexCapability getIndexCapability()
}

@Override
public Future<Void> close() throws IOException
public void close() throws IOException
{
lock.readLock().lock();
try
{
closed = true;
return delegate.close();
delegate.close();
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,23 @@ public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingServic
/**
* Adds an {@link IndexPopulator} to be populated in this store scan. All participating populators must
* be added before calling {@link #run()}.
*
* @param populator {@link IndexPopulator} to participate.
* @param populator {@link IndexPopulator} to participate.
* @param indexId id of index.
* @param indexMeta {@link IndexMeta} meta information about index.
* @param indexUserDescription user description of this index.
* @param flipper {@link FlippableIndexProxy} to call after a successful population.
* @param failedIndexProxyFactory {@link FailedIndexProxyFactory} to use after an unsuccessful population.
*/
void addPopulator( IndexPopulator populator,
MultipleIndexPopulator.IndexPopulation addPopulator( IndexPopulator populator,
long indexId,
IndexMeta indexMeta,
String indexUserDescription,
FlippableIndexProxy flipper,
FailedIndexProxyFactory failedIndexProxyFactory )
{
assert storeScan == null : "Population have already started, too late to add populators at this point";
this.multiPopulator.addPopulator( populator, indexId, indexMeta, flipper, failedIndexProxyFactory, indexUserDescription );
return this.multiPopulator.addPopulator( populator, indexId, indexMeta, flipper, failedIndexProxyFactory,
indexUserDescription );
}

/**
Expand All @@ -85,18 +85,23 @@ void addPopulator( IndexPopulator populator,
@Override
public void run()
{
assert multiPopulator.hasPopulators() : "No index populators was added so there'd be no point in running this job";
assert storeScan == null : "Population have already started";

String oldThreadName = currentThread().getName();
currentThread().setName( "Index populator" );

try
{
if ( !multiPopulator.hasPopulators() )
{
return;
}
if ( storeScan != null )
{
throw new IllegalStateException( "Population already started." );
}

currentThread().setName( "Index populator" );
try
{
multiPopulator.create();
multiPopulator.replaceIndexCounts( 0, 0, 0 );
multiPopulator.resetIndexCounts();

monitor.indexPopulationScanStarting();
indexAllNodes();
Expand All @@ -107,7 +112,6 @@ public void run()
// We remain in POPULATING state
return;
}

multiPopulator.flipAfterPopulation();

schemaState.clear();
Expand Down Expand Up @@ -149,7 +153,12 @@ public Future<Void> cancel()
storeScan.stop();
}

return latchGuardedValue( Suppliers.<Void>singleton( null ), doneSignal, "Index population job cancel" );
return latchGuardedValue( Suppliers.singleton( null ), doneSignal, "Index population job cancel" );
}

void cancelPopulation( MultipleIndexPopulator.IndexPopulation population )
{
multiPopulator.cancelIndexPopulation( population );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Future;

import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.internal.kernel.api.IndexCapability;
Expand Down Expand Up @@ -68,17 +67,15 @@ public interface IndexProxy extends LabelSchemaSupplier
IndexUpdater newUpdater( IndexUpdateMode mode );

/**
* Initiates dropping this index context. The returned {@link Future} can be used to await
* its completion.
* Drop index.
* Must close the context as well.
*/
Future<Void> drop() throws IOException;
void drop() throws IOException;

/**
* Initiates a closing of this index context. The returned {@link Future} can be used to await
* its completion.
* Close this index context.
*/
Future<Void> close() throws IOException;
void close() throws IOException;

IndexDescriptor getDescriptor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ IndexProxy createPopulatingIndexProxy( final long ruleId,
{
final FlippableIndexProxy flipper = new FlippableIndexProxy();

// TODO: This is here because there is a circular dependency from PopulatingIndexProxy to FlippableIndexProxy
final String indexUserDescription = indexUserDescription( descriptor, providerDescriptor );
IndexPopulator populator = populatorFromProvider( providerDescriptor, ruleId, descriptor, samplingConfig );
IndexMeta indexMeta = indexMetaFromProvider( providerDescriptor, descriptor );
Expand All @@ -77,9 +76,9 @@ IndexProxy createPopulatingIndexProxy( final long ruleId,
new IndexCountsRemover( storeView, ruleId ),
logProvider );

PopulatingIndexProxy populatingIndex = new PopulatingIndexProxy( indexMeta, populationJob );

populationJob.addPopulator( populator, ruleId, indexMeta, indexUserDescription, flipper, failureDelegateFactory );
MultipleIndexPopulator.IndexPopulation indexPopulation = populationJob
.addPopulator( populator, ruleId, indexMeta, indexUserDescription, flipper, failureDelegateFactory );
PopulatingIndexProxy populatingIndex = new PopulatingIndexProxy( indexMeta, populationJob, indexPopulation );

flipper.flipTo( populatingIndex );

Expand Down Expand Up @@ -116,7 +115,6 @@ IndexProxy createOnlineIndexProxy( long ruleId,
IndexDescriptor descriptor,
SchemaIndexProvider.Descriptor providerDescriptor )
{
// TODO Hook in version verification/migration calls to the SchemaIndexProvider here
try
{
IndexAccessor onlineAccessor =
Expand Down

0 comments on commit 84df9f4

Please sign in to comment.