Skip to content

Commit

Permalink
Data snapshot query execution
Browse files Browse the repository at this point in the history
Introduce possibility to execute query based on data snapshot.
Any data modification will mark corresponding pages with a version that
will be equal to transaction id in which that particular change was introduced.
Also allow count store to track transactions in which changes where introduced.

To be able to guarantee that query result is based on a data that was
present on query execution start and was not modified while it was running,
the engine will verify that all paged that were accessed during query
execution have the version that is less or equal that was last closed
on a moment when query started.

Support for snapshot query execution in CC and HA

Snapshot query execution is enabled by 'unsupported.dbms.query.snapshot'
setting and number of retries that engine will do while trying to get
stable snapshot is controlled by 'unsupported.dbms.query.snapshot.retries'.
  • Loading branch information
MishaDemianenko committed Jan 23, 2018
1 parent 0f01607 commit 3795750
Show file tree
Hide file tree
Showing 140 changed files with 3,204 additions and 475 deletions.
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.tracing.PageCacheTracer; import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier;
import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier;
import org.neo4j.kernel.api.direct.DirectStoreAccess; import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.labelscan.LabelScanStore;
Expand Down Expand Up @@ -158,7 +159,7 @@ public Result runFullConsistencyCheck( File storeDir, Config config, ProgressMon
Log log = logProvider.getLog( getClass() ); Log log = logProvider.getLog( getClass() );
ConfiguringPageCacheFactory pageCacheFactory = new ConfiguringPageCacheFactory( ConfiguringPageCacheFactory pageCacheFactory = new ConfiguringPageCacheFactory(
fileSystem, config, PageCacheTracer.NULL, PageCursorTracerSupplier.NULL, fileSystem, config, PageCacheTracer.NULL, PageCursorTracerSupplier.NULL,
logProvider.getLog( PageCache.class ) ); logProvider.getLog( PageCache.class ), EmptyVersionContextSupplier.INSTANCE );
PageCache pageCache = pageCacheFactory.getOrCreatePageCache(); PageCache pageCache = pageCacheFactory.getOrCreatePageCache();


try try
Expand Down Expand Up @@ -218,7 +219,7 @@ public Result runFullConsistencyCheck( final File storeDir, Config config, Progr
GraphDatabaseSettings.read_only.name(), TRUE, GraphDatabaseSettings.read_only.name(), TRUE,
GraphDatabaseSettings.label_index.name(), LabelIndex.AUTO.name() ) ); GraphDatabaseSettings.label_index.name(), LabelIndex.AUTO.name() ) );
StoreFactory factory = new StoreFactory( storeDir, config, StoreFactory factory = new StoreFactory( storeDir, config,
new DefaultIdGeneratorFactory( fileSystem ), pageCache, fileSystem, logProvider ); new DefaultIdGeneratorFactory( fileSystem ), pageCache, fileSystem, logProvider, EmptyVersionContextSupplier.INSTANCE );


ConsistencySummaryStatistics summary; ConsistencySummaryStatistics summary;
final File reportFile = chooseReportPath( reportDir ); final File reportFile = chooseReportPath( reportDir );
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier;
import org.neo4j.kernel.api.ReadOperations; import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.direct.DirectStoreAccess; import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
Expand Down Expand Up @@ -164,7 +165,7 @@ public DirectStoreAccess directStoreAccess()
fileSystem = new DefaultFileSystemAbstraction(); fileSystem = new DefaultFileSystemAbstraction();
PageCache pageCache = getPageCache( fileSystem ); PageCache pageCache = getPageCache( fileSystem );
LogProvider logProvider = NullLogProvider.getInstance(); LogProvider logProvider = NullLogProvider.getInstance();
StoreFactory storeFactory = new StoreFactory( directory, pageCache, fileSystem, logProvider ); StoreFactory storeFactory = new StoreFactory( directory, pageCache, fileSystem, logProvider, EmptyVersionContextSupplier.INSTANCE );
neoStore = storeFactory.openAllNeoStores(); neoStore = storeFactory.openAllNeoStores();
StoreAccess nativeStores; StoreAccess nativeStores;
if ( keepStatistics ) if ( keepStatistics )
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.helpers.progress.ProgressMonitorFactory; import org.neo4j.helpers.progress.ProgressMonitorFactory;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier;
import org.neo4j.kernel.api.direct.DirectStoreAccess; import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.labelscan.LabelScanStore;
Expand All @@ -59,7 +60,6 @@
import org.neo4j.test.rule.fs.DefaultFileSystemRule; import org.neo4j.test.rule.fs.DefaultFileSystemRule;


import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;

import static org.neo4j.graphdb.Label.label; import static org.neo4j.graphdb.Label.label;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;


Expand Down Expand Up @@ -151,7 +151,8 @@ private StoreFactory newStoreFactory( PageCache pageCache )
{ {
FileSystemAbstraction fileSystem = fileSystemRule.get(); FileSystemAbstraction fileSystem = fileSystemRule.get();
return new StoreFactory( directory.directory(), getTuningConfiguration(), return new StoreFactory( directory.directory(), getTuningConfiguration(),
new DefaultIdGeneratorFactory( fileSystem ), pageCache, fileSystem, NullLogProvider.getInstance() ); new DefaultIdGeneratorFactory( fileSystem ), pageCache, fileSystem, NullLogProvider.getInstance(),
EmptyVersionContextSupplier.INSTANCE );
} }


private Config getTuningConfiguration() private Config getTuningConfiguration()
Expand Down
5 changes: 5 additions & 0 deletions community/cypher/cypher/pom.xml
Expand Up @@ -348,6 +348,11 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>


<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

</dependencies> </dependencies>


</project> </project>
Expand Up @@ -22,8 +22,10 @@
import org.neo4j.cypher.internal.CommunityCompatibilityFactory; import org.neo4j.cypher.internal.CommunityCompatibilityFactory;
import org.neo4j.cypher.javacompat.internal.GraphDatabaseCypherService; import org.neo4j.cypher.javacompat.internal.GraphDatabaseCypherService;
import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Service; import org.neo4j.helpers.Service;
import org.neo4j.kernel.api.KernelAPI; import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.QueryEngineProvider; import org.neo4j.kernel.impl.query.QueryEngineProvider;
import org.neo4j.kernel.impl.query.QueryExecutionEngine; import org.neo4j.kernel.impl.query.QueryExecutionEngine;
Expand Down Expand Up @@ -56,10 +58,31 @@ protected QueryExecutionEngine createEngine( Dependencies deps, GraphDatabaseAPI
LogService logService = resolver.resolveDependency( LogService.class ); LogService logService = resolver.resolveDependency( LogService.class );
KernelAPI kernelAPI = resolver.resolveDependency( KernelAPI.class ); KernelAPI kernelAPI = resolver.resolveDependency( KernelAPI.class );
Monitors monitors = resolver.resolveDependency( Monitors.class ); Monitors monitors = resolver.resolveDependency( Monitors.class );
Config config = resolver.resolveDependency( Config.class );
LogProvider logProvider = logService.getInternalLogProvider(); LogProvider logProvider = logService.getInternalLogProvider();
CommunityCompatibilityFactory compatibilityFactory = CommunityCompatibilityFactory compatibilityFactory =
new CommunityCompatibilityFactory( queryService, kernelAPI, monitors, logProvider ); new CommunityCompatibilityFactory( queryService, kernelAPI, monitors, logProvider );
deps.satisfyDependencies( compatibilityFactory ); deps.satisfyDependencies( compatibilityFactory );
return new ExecutionEngine( queryService, logProvider, compatibilityFactory); return createEngine( queryService, config, logProvider, compatibilityFactory );
}

private QueryExecutionEngine createEngine( GraphDatabaseCypherService queryService, Config config,
LogProvider logProvider, CommunityCompatibilityFactory compatibilityFactory )
{
return config.get( GraphDatabaseSettings.snapshot_query ) ?
snapshotEngine( queryService, config, logProvider, compatibilityFactory ) :
standardEngine( queryService, logProvider, compatibilityFactory );
}

private SnapshotExecutionEngine snapshotEngine( GraphDatabaseCypherService queryService, Config config,
LogProvider logProvider, CommunityCompatibilityFactory compatibilityFactory )
{
return new SnapshotExecutionEngine( queryService, config, logProvider, compatibilityFactory);
}

private ExecutionEngine standardEngine( GraphDatabaseCypherService queryService, LogProvider logProvider,
CommunityCompatibilityFactory compatibilityFactory )
{
return new ExecutionEngine( queryService, logProvider, compatibilityFactory );
} }
} }
@@ -0,0 +1,194 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.javacompat;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Result;

import static java.lang.System.lineSeparator;

/**
* Result produced as result of eager query execution for cases when {@link SnapshotExecutionEngine} is used.
*/
class EagerResult implements Result
{
private static final String ITEM_SEPARATOR = ", ";
private final Result originalResult;
private final List<Map<String, Object>> queryResult = new ArrayList<>();
private int cursor;

EagerResult( Result result )
{
this.originalResult = result;
}

public void consume()
{
while ( originalResult.hasNext() )
{
queryResult.add( originalResult.next() );
}
}

@Override
public QueryExecutionType getQueryExecutionType()
{
return originalResult.getQueryExecutionType();
}

@Override
public List<String> columns()
{
return originalResult.columns();
}

@Override
public <T> ResourceIterator<T> columnAs( String name )
{
return new EagerResultResourceIterator<>( name );
}

@Override
public boolean hasNext()
{
return cursor < queryResult.size();
}

@Override
public Map<String,Object> next()
{
return queryResult.get( cursor++ );
}

@Override
public void close()
{
// nothing to close. Original result is already closed at this point
}

@Override
public QueryStatistics getQueryStatistics()
{
return originalResult.getQueryStatistics();
}

@Override
public ExecutionPlanDescription getExecutionPlanDescription()
{
return originalResult.getExecutionPlanDescription();
}

@Override
public String resultAsString()
{
List<String> columns = originalResult.columns();
StringBuilder builder = new StringBuilder();
builder.append( String.join( ITEM_SEPARATOR, columns ) );
if ( !queryResult.isEmpty() )
{
builder.append( lineSeparator() );
int numberOfColumns = columns.size();
for ( Map<String,Object> row : queryResult )
{
writeRow( columns, builder, numberOfColumns, row );
builder.append( lineSeparator() );
}
}
return builder.toString();
}

@Override
public void writeAsStringTo( PrintWriter writer )
{
writer.print( resultAsString() );
}

@Override
public void remove()
{
throw new UnsupportedOperationException( "Not supported" );
}

@Override
public Iterable<Notification> getNotifications()
{
return originalResult.getNotifications();
}

@Override
public <VisitationException extends Exception> void accept( ResultVisitor<VisitationException> visitor )
throws VisitationException
{
for ( Map<String,Object> map : queryResult )
{
visitor.visit( new MapRow( map ) );
}
}

private void writeRow( List<String> columns, StringBuilder builder, int numberOfColumns, Map<String,Object> row )
{
for ( int i = 0; i < numberOfColumns; i++ )
{
builder.append( row.get( columns.get( i ) ) );
if ( i != numberOfColumns - 1 )
{
builder.append( ITEM_SEPARATOR );
}
}
}

private class EagerResultResourceIterator<T> implements ResourceIterator<T>
{
private final String column;
int cursor;

EagerResultResourceIterator( String column )
{
this.column = column;
}

@Override
public boolean hasNext()
{
return cursor < queryResult.size();
}

@Override
public T next()
{
return (T) queryResult.get( cursor++ ).get( column );
}

@Override
public void close()
{
// Nothing to close.
}
}
}
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.server.rest.transactional; package org.neo4j.cypher.internal.javacompat;


import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
Expand Down

0 comments on commit 3795750

Please sign in to comment.