Skip to content

Commit

Permalink
Revert "Fixes as issue with applying index updates to unique indexes"
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Feb 14, 2018
1 parent b1b231e commit 32f76c4
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 198 deletions.

This file was deleted.

Expand Up @@ -64,7 +64,6 @@

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MINUTES;

import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.collection.Iterables.asList;
import static org.neo4j.kernel.api.index.InternalIndexState.FAILED;
Expand Down Expand Up @@ -425,7 +424,10 @@ private void apply( Iterable<IndexEntryUpdate<LabelSchemaDescriptor>> updates, I
{
try ( IndexUpdaterMap updaterMap = indexMapRef.createIndexUpdaterMap( updateMode ) )
{
IndexUpdateProcessor.applyIndexUpdatesByMode( updates, update -> processUpdate( updaterMap, update ) );
for ( IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate : updates )
{
processUpdate( updaterMap, indexUpdate );
}
}
}

Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.util.stream.IntStream;

import org.neo4j.function.ThrowingConsumer;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.exceptions.index.FlipFailedKernelException;
Expand All @@ -55,10 +54,8 @@
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static java.lang.String.format;

import static org.neo4j.collection.primitive.PrimitiveIntCollections.contains;
import static org.neo4j.kernel.impl.api.index.IndexPopulationFailure.failure;
import static org.neo4j.kernel.impl.api.index.IndexUpdateProcessor.applyIndexUpdatesByMode;

/**
* {@link IndexPopulator} that allow population of multiple indexes during one iteration.
Expand Down Expand Up @@ -96,7 +93,7 @@ public class MultipleIndexPopulator implements IndexPopulator

// Concurrency queue since multiple concurrent threads may enqueue updates into it. It is important for this queue
// to have fast #size() method since it might be drained in batches
protected final Queue queue = new LinkedBlockingQueue<>();
protected final Queue<IndexEntryUpdate<?>> queue = new LinkedBlockingQueue<>();

// Populators are added into this list. The same thread adding populators will later call #indexAllNodes.
// Multiple concurrent threads might fail individual populations.
Expand Down Expand Up @@ -378,40 +375,17 @@ private void populateFromQueueIfAvailable( long currentlyIndexedNodeId )
{
try ( MultipleIndexUpdater updater = newPopulatingUpdater( storeView ) )
{
// TODO the generics of IndexEntryUpdate in combination with that of IndexUpdater make the IndexUpdateProcessor
// generics hard to make fit for all scenarios. For that reason the queue holding updates cannot quite have the generics
// it would have liked to have. Also this specific scenario catches the exceptions seen below inside acceptUpdate method,
// but they are caught here to satisfy the compiler, even though knowing that they will never be thrown.
// Generifying IndexUpdateProcessor with exception type doesn't work due to multiple exception types thrown.
try
{
applyIndexUpdatesByMode( extractUpdatesFromQueue(),
update -> storeScan.acceptUpdate( updater, update, currentlyIndexedNodeId ) );
}
catch ( IOException | IndexEntryConflictException e )
do
{
throw Exceptions.launderedException( e );
// no need to check for null as nobody else is emptying this queue
IndexEntryUpdate<?> update = queue.poll();
storeScan.acceptUpdate( updater, update, currentlyIndexedNodeId );
}
while ( !queue.isEmpty() );
}
}
}

/**
* Extracts {@link IndexEntryUpdate} from {@link #queue} into a {@link List}. This is done since updates can be added to this
* queue while we're doing this and the way we apply them requires a stable set of updates.
* @return a {@link List} of updates from {@link #queue}.
*/
private List extractUpdatesFromQueue()
{
List updatesSnapshot = new ArrayList( queue.size() * 2 );
do
{
updatesSnapshot.add( queue.poll() );
}
while ( !queue.isEmpty() );
return updatesSnapshot;
}

private void forEachPopulation( ThrowingConsumer<IndexPopulation,Exception> action )
{
for ( IndexPopulation population : populations )
Expand Down
Expand Up @@ -491,9 +491,6 @@ public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntry
case CHANGED:
added.add( Pair.of( update.getEntityId(), update.values()[0].asObjectCopy() ) );
break;
case REMOVED:
// ignore removals
break;
default:
throw new IllegalArgumentException( update.updateMode().name() );
}
Expand Down

This file was deleted.

0 comments on commit 32f76c4

Please sign in to comment.