Skip to content

Commit

Permalink
Revert "Check validity of transactions before appending them to the log"
Browse files Browse the repository at this point in the history
This reverts commit 35558b0.

Reason is that this change was fundamentally wrong, as it assumed that the
previous transaction had already been applied at the time of validation,
whereas in fact it may have just been queued up to be applied later in a
batch.
  • Loading branch information
tinwelint committed Nov 2, 2015
1 parent 6d399c5 commit 0ff35b3
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 76 deletions.
Expand Up @@ -744,7 +744,7 @@ private IndexingModule buildIndexing( Config config, JobScheduler scheduler, Sch
final IntegrityValidator integrityValidator = new IntegrityValidator( neoStore, indexingService ); final IntegrityValidator integrityValidator = new IntegrityValidator( neoStore, indexingService );


final IndexUpdatesValidator indexUpdatesValidator = dependencies.satisfyDependency( final IndexUpdatesValidator indexUpdatesValidator = dependencies.satisfyDependency(
new OnlineIndexUpdatesValidator( neoStore, new PropertyLoader( neoStore ), new OnlineIndexUpdatesValidator( neoStore, kernelHealth, new PropertyLoader( neoStore ),
indexingService, IndexUpdateMode.ONLINE ) ); indexingService, IndexUpdateMode.ONLINE ) );


// TODO Move to constructor // TODO Move to constructor
Expand Down
Expand Up @@ -21,6 +21,7 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.api.index.NodePropertyUpdate; import org.neo4j.kernel.api.index.NodePropertyUpdate;
import org.neo4j.kernel.impl.store.NeoStore; import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.NodeStore;
Expand All @@ -34,19 +35,21 @@
* {@link org.neo4j.kernel.impl.transaction.command.Command}s in transaction state. * {@link org.neo4j.kernel.impl.transaction.command.Command}s in transaction state.
* It is done by inferring {@link org.neo4j.kernel.api.index.NodePropertyUpdate}s from commands and asking * It is done by inferring {@link org.neo4j.kernel.api.index.NodePropertyUpdate}s from commands and asking
* {@link org.neo4j.kernel.impl.api.index.IndexingService} to check those via * {@link org.neo4j.kernel.impl.api.index.IndexingService} to check those via
* {@link org.neo4j.kernel.impl.api.index.IndexingService#validate(Iterable, IndexUpdateMode)}. * {@link org.neo4j.kernel.impl.api.index.IndexingService#validate(Iterable,IndexUpdateMode)}.
*/ */
public class OnlineIndexUpdatesValidator implements IndexUpdatesValidator public class OnlineIndexUpdatesValidator implements IndexUpdatesValidator
{ {
private final NodeStore nodeStore; private final NodeStore nodeStore;
private final PropertyStore propertyStore; private final PropertyStore propertyStore;
private final PropertyLoader propertyLoader; private final PropertyLoader propertyLoader;
private final IndexingService indexing; private final IndexingService indexing;
private final KernelHealth kernelHealth;
private final IndexUpdateMode updateMode; private final IndexUpdateMode updateMode;


public OnlineIndexUpdatesValidator( NeoStore neoStore, PropertyLoader propertyLoader, public OnlineIndexUpdatesValidator( NeoStore neoStore, KernelHealth kernelHealth, PropertyLoader propertyLoader,
IndexingService indexing, IndexUpdateMode updateMode ) IndexingService indexing, IndexUpdateMode updateMode )
{ {
this.kernelHealth = kernelHealth;
this.updateMode = updateMode; this.updateMode = updateMode;
this.nodeStore = neoStore.getNodeStore(); this.nodeStore = neoStore.getNodeStore();
this.propertyStore = neoStore.getPropertyStore(); this.propertyStore = neoStore.getPropertyStore();
Expand All @@ -58,7 +61,15 @@ public OnlineIndexUpdatesValidator( NeoStore neoStore, PropertyLoader propertyLo
public ValidatedIndexUpdates validate( TransactionRepresentation transaction ) throws IOException public ValidatedIndexUpdates validate( TransactionRepresentation transaction ) throws IOException
{ {
NodePropertyCommandsExtractor extractor = new NodePropertyCommandsExtractor(); NodePropertyCommandsExtractor extractor = new NodePropertyCommandsExtractor();
transaction.accept( extractor ); try
{
transaction.accept( extractor );
}
catch ( IOException cause )
{
kernelHealth.panic( cause );
throw cause;
}


if ( !extractor.containsAnyNodeOrPropertyUpdate() ) if ( !extractor.containsAnyNodeOrPropertyUpdate() )
{ {
Expand Down
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.atomic.AtomicReference;

/**
* Basically a very simple variant of {@link AtomicReference} with option to not have the of concurrency control,
* since it may be implemented for simple single threaded use.
*/
public interface Access<T>
{
T get();

void set( T value );
}
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom;


import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongObjectMap;
Expand All @@ -48,7 +49,6 @@
import org.neo4j.kernel.impl.transaction.state.LazyIndexUpdates; import org.neo4j.kernel.impl.transaction.state.LazyIndexUpdates;
import org.neo4j.kernel.impl.transaction.state.PropertyLoader; import org.neo4j.kernel.impl.transaction.state.PropertyLoader;


import static java.util.Arrays.asList;
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
Expand All @@ -59,6 +59,9 @@
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;

import static java.util.Arrays.asList;

import static org.neo4j.kernel.impl.api.index.IndexUpdateMode.ONLINE; import static org.neo4j.kernel.impl.api.index.IndexUpdateMode.ONLINE;
import static org.neo4j.kernel.impl.store.record.Record.NO_LABELS_FIELD; import static org.neo4j.kernel.impl.store.record.Record.NO_LABELS_FIELD;
import static org.neo4j.kernel.impl.store.record.Record.NO_NEXT_PROPERTY; import static org.neo4j.kernel.impl.store.record.Record.NO_NEXT_PROPERTY;
Expand Down Expand Up @@ -237,7 +240,7 @@ private IndexUpdatesValidator newIndexUpdatesValidatorWithMockedDependencies()
{ {
when( neoStore.getNodeStore() ).thenReturn( nodeStore ); when( neoStore.getNodeStore() ).thenReturn( nodeStore );
when( neoStore.getPropertyStore() ).thenReturn( propertyStore ); when( neoStore.getPropertyStore() ).thenReturn( propertyStore );
return new OnlineIndexUpdatesValidator( neoStore, propertyLoader, indexingService, ONLINE ); return new OnlineIndexUpdatesValidator( neoStore, null, propertyLoader, indexingService, ONLINE );
} }


private static NodePropertyCommands createNodeWithLabelAndPropertyCommands( long nodeId, int label, int property ) private static NodePropertyCommands createNodeWithLabelAndPropertyCommands( long nodeId, int label, int property )
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.DefaultIdGeneratorFactory; import org.neo4j.kernel.DefaultIdGeneratorFactory;
import org.neo4j.kernel.IdType; import org.neo4j.kernel.IdType;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.api.TokenNameLookup; import org.neo4j.kernel.api.TokenNameLookup;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.index.NodePropertyUpdate; import org.neo4j.kernel.api.index.NodePropertyUpdate;
Expand Down Expand Up @@ -109,7 +110,6 @@
import org.neo4j.test.PageCacheRule; import org.neo4j.test.PageCacheRule;
import org.neo4j.unsafe.batchinsert.LabelScanWriter; import org.neo4j.unsafe.batchinsert.LabelScanWriter;


import static java.lang.Integer.parseInt;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
Expand All @@ -128,6 +128,9 @@
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;

import static java.lang.Integer.parseInt;

import static org.neo4j.graphdb.Direction.INCOMING; import static org.neo4j.graphdb.Direction.INCOMING;
import static org.neo4j.graphdb.Direction.OUTGOING; import static org.neo4j.graphdb.Direction.OUTGOING;
import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.helpers.collection.Iterables.count;
Expand Down Expand Up @@ -1455,7 +1458,9 @@ private TransactionRepresentationCommitProcess commitProcess() throws IOExceptio


private TransactionRepresentationCommitProcess commitProcess( IndexingService indexing ) throws IOException private TransactionRepresentationCommitProcess commitProcess( IndexingService indexing ) throws IOException
{ {
OnlineIndexUpdatesValidator indexUpdatesValidator = new OnlineIndexUpdatesValidator( neoStore,
KernelHealth kernelHealth = mock( KernelHealth.class );
OnlineIndexUpdatesValidator indexUpdatesValidator = new OnlineIndexUpdatesValidator( neoStore, kernelHealth,
new PropertyLoader( neoStore ), indexing, IndexUpdateMode.ONLINE ); new PropertyLoader( neoStore ), indexing, IndexUpdateMode.ONLINE );
return commitProcess( indexing, indexUpdatesValidator); return commitProcess( indexing, indexUpdatesValidator);
} }
Expand Down
Expand Up @@ -34,13 +34,15 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.GraphDatabaseAPI; import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.api.direct.DirectStoreAccess; import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier; import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode; import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService; import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.OnlineIndexUpdatesValidator; import org.neo4j.kernel.impl.api.index.OnlineIndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates; import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
Expand Down Expand Up @@ -224,8 +226,9 @@ private static class TransactionApplier implements AutoCloseable
this.storeApplier = resolver.resolveDependency( TransactionRepresentationStoreApplier.class ) this.storeApplier = resolver.resolveDependency( TransactionRepresentationStoreApplier.class )
.withLegacyIndexTransactionOrdering( IdOrderingQueue.BYPASS ); .withLegacyIndexTransactionOrdering( IdOrderingQueue.BYPASS );
this.indexingService = resolver.resolveDependency( IndexingService.class ); this.indexingService = resolver.resolveDependency( IndexingService.class );
KernelHealth kernelHealth = resolver.resolveDependency( KernelHealth.class );
this.indexUpdatesValidator = new OnlineIndexUpdatesValidator( this.indexUpdatesValidator = new OnlineIndexUpdatesValidator(
neoStore, new PropertyLoader( neoStore ), indexingService, IndexUpdateMode.BATCHED ); neoStore, kernelHealth, new PropertyLoader( neoStore ), indexingService, IndexUpdateMode.BATCHED );
} }


long applyTransactionsFrom( File sourceDir, long upToTxId ) throws IOException long applyTransactionsFrom( File sourceDir, long upToTxId ) throws IOException
Expand Down
Expand Up @@ -38,8 +38,9 @@ class DefaultIndexUpdatesValidatorCreator implements
public IndexUpdatesValidator apply( DependencyResolver resolver ) public IndexUpdatesValidator apply( DependencyResolver resolver )
{ {
NeoStore neoStore = resolver.resolveDependency( NeoStoreProvider.class ).evaluate(); NeoStore neoStore = resolver.resolveDependency( NeoStoreProvider.class ).evaluate();
KernelHealth kernelHealth = resolver.resolveDependency( KernelHealth.class );
IndexingService indexing = resolver.resolveDependency( IndexingService.class ); IndexingService indexing = resolver.resolveDependency( IndexingService.class );
PropertyLoader propertyLoader = new PropertyLoader( neoStore ); PropertyLoader propertyLoader = new PropertyLoader( neoStore );
return new OnlineIndexUpdatesValidator( neoStore, propertyLoader, indexing, BATCHED ); return new OnlineIndexUpdatesValidator( neoStore, kernelHealth, propertyLoader, indexing, BATCHED );
} }
} }
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent; import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.util.Access;
import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.logging.Logging; import org.neo4j.kernel.logging.Logging;
Expand All @@ -66,30 +67,27 @@ public class TransactionCommittingResponseUnpacker implements ResponseUnpacker,
{ {
@Override @Override
public void visit( CommittedTransactionRepresentation transaction, TxHandler handler, public void visit( CommittedTransactionRepresentation transaction, TxHandler handler,
WritablePair<Commitment,ValidatedIndexUpdates> pair ) throws IOException Access<Commitment> commitmentAccess ) throws IOException
{ {
// Tuck away the Commitment returned from the call to append. We'll use each Commitment right before // Tuck away the Commitment returned from the call to append. We'll use each Commitment right before
// applying each transaction. // applying each transaction.
TransactionRepresentation representation = transaction.getTransactionRepresentation(); Commitment commitment = appender.append( transaction.getTransactionRepresentation(),
ValidatedIndexUpdates indexUpdates = indexUpdatesValidator.validate( representation ); transaction.getCommitEntry().getTxId() );
Commitment commitment = appender.append( representation, transaction.getCommitEntry().getTxId() ); commitmentAccess.set( commitment );
pair.setFirst( commitment );
pair.setOther( indexUpdates );

} }
}; };
// Visits all queued, and recently appended, transactions, applying them to the store // Visits all queued, and recently appended, transactions, applying them to the store
private final TransactionVisitor batchApplier = new TransactionVisitor() private final TransactionVisitor batchApplier = new TransactionVisitor()
{ {
@Override @Override
public void visit( CommittedTransactionRepresentation transaction, TxHandler handler, public void visit( CommittedTransactionRepresentation transaction, TxHandler handler,
WritablePair<Commitment,ValidatedIndexUpdates> pair ) throws IOException Access<Commitment> commitmentAccess ) throws IOException
{ {
long transactionId = transaction.getCommitEntry().getTxId(); long transactionId = transaction.getCommitEntry().getTxId();
TransactionRepresentation representation = transaction.getTransactionRepresentation(); TransactionRepresentation representation = transaction.getTransactionRepresentation();
pair.getFirst().publishAsCommitted(); commitmentAccess.get().publishAsCommitted();
try ( LockGroup locks = new LockGroup(); try ( LockGroup locks = new LockGroup();
ValidatedIndexUpdates indexUpdates = pair.getOther() ) ValidatedIndexUpdates indexUpdates = indexUpdatesValidator.validate( representation ) )
{ {
storeApplier.apply( representation, indexUpdates, locks, transactionId, EXTERNAL ); storeApplier.apply( representation, indexUpdates, locks, transactionId, EXTERNAL );
handler.accept( transaction ); handler.accept( transaction );
Expand All @@ -100,9 +98,9 @@ public void visit( CommittedTransactionRepresentation transaction, TxHandler han
{ {
@Override @Override
public void visit( CommittedTransactionRepresentation transaction, TxHandler handler, public void visit( CommittedTransactionRepresentation transaction, TxHandler handler,
WritablePair<Commitment,ValidatedIndexUpdates> pair ) throws IOException Access<Commitment> commitmentAccess ) throws IOException
{ {
if ( pair.getFirst().markedAsCommitted() ) if ( commitmentAccess.get().markedAsCommitted() )
{ {
transactionIdStore.transactionClosed( transaction.getCommitEntry().getTxId() ); transactionIdStore.transactionClosed( transaction.getCommitEntry().getTxId() );
} }
Expand Down
Expand Up @@ -22,10 +22,10 @@
import java.io.IOException; import java.io.IOException;


import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.Commitment; import org.neo4j.kernel.impl.transaction.log.Commitment;
import org.neo4j.kernel.impl.util.Access;


/** /**
* Queues {@link TransactionRepresentation} for application at a later point. Queued transactions can be visited * Queues {@link TransactionRepresentation} for application at a later point. Queued transactions can be visited
Expand Down Expand Up @@ -68,13 +68,11 @@ public void clear()
queueIndex = 0; queueIndex = 0;
} }



private static class Transaction implements Access<Commitment>
private static class Transaction implements WritablePair<Commitment,ValidatedIndexUpdates>
{ {
private CommittedTransactionRepresentation transaction; private CommittedTransactionRepresentation transaction;
private TxHandler txHandler; private TxHandler txHandler;
private Commitment commitment; private Commitment commitment;
private ValidatedIndexUpdates validatedIndexUpdates;


void set( CommittedTransactionRepresentation transaction, TxHandler txHandler ) void set( CommittedTransactionRepresentation transaction, TxHandler txHandler )
{ {
Expand All @@ -84,27 +82,15 @@ void set( CommittedTransactionRepresentation transaction, TxHandler txHandler )
} }


@Override @Override
public Commitment getFirst() public Commitment get()
{ {
return commitment; return commitment;
} }


@Override @Override
public ValidatedIndexUpdates getOther() public void set( Commitment commitment )
{
return validatedIndexUpdates;
}

@Override
public void setFirst( Commitment commitment )
{ {
this.commitment = commitment; this.commitment = commitment;
} }

@Override
public void setOther( ValidatedIndexUpdates validatedIndexUpdates )
{
this.validatedIndexUpdates = validatedIndexUpdates;
}
} }
} }
Expand Up @@ -22,9 +22,9 @@
import java.io.IOException; import java.io.IOException;


import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.Commitment; import org.neo4j.kernel.impl.transaction.log.Commitment;
import org.neo4j.kernel.impl.util.Access;


/** /**
* Visits queued transactions. * Visits queued transactions.
Expand All @@ -33,6 +33,6 @@
*/ */
interface TransactionVisitor interface TransactionVisitor
{ {
void visit( CommittedTransactionRepresentation transaction, TxHandler handler, void visit( CommittedTransactionRepresentation transaction, TxHandler handler, Access<Commitment> commitment )
WritablePair<Commitment,ValidatedIndexUpdates> pair ) throws IOException; throws IOException;
} }

This file was deleted.

Expand Up @@ -474,7 +474,7 @@ private void addMockedNeoStore( DependencyResolver dependencyResolver )
} }


@Test @Test
public void shouldNotAppendOrApplyTransactionIfIndexUpdatesValidationFails() throws Throwable public void shouldNotApplyTransactionIfIndexUpdatesValidationFails() throws Throwable
{ {
// Given // Given
DependencyResolver resolver = mock( DependencyResolver.class ); DependencyResolver resolver = mock( DependencyResolver.class );
Expand Down Expand Up @@ -517,7 +517,7 @@ public void shouldNotAppendOrApplyTransactionIfIndexUpdatesValidationFails() thr
} }


// Then // Then
verifyZeroInteractions( appender, storeApplier ); verifyZeroInteractions( storeApplier );
} }


private Function<DependencyResolver,IndexUpdatesValidator> customValidator( final IndexUpdatesValidator validator ) private Function<DependencyResolver,IndexUpdatesValidator> customValidator( final IndexUpdatesValidator validator )
Expand Down

0 comments on commit 0ff35b3

Please sign in to comment.