Skip to content

Commit

Permalink
Fix after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhen Li authored and lutovich committed Jul 26, 2018
1 parent 6f9f151 commit 2530efd
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 113 deletions.
Expand Up @@ -80,21 +80,21 @@ else if ( protocolVersion == BoltProtocolV3.VERSION )

private BoltStateMachine newStateMachineV1( BoltChannel boltChannel )
{
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis();
Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout );
TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV1SPI( db, availabilityGuard, txAwaitDuration, clock );

TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV1SPI( db, availabilityGuard, getAwaitDuration(), clock );
BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI );
return new BoltStateMachineV1( boltSPI, boltChannel, clock );
}

private BoltStateMachine newStateMachineV3( BoltChannel boltChannel )
{
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis();
Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout );
TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV3SPI( db, availabilityGuard, txAwaitDuration, clock );

TransactionStateMachineSPI transactionSPI = new TransactionStateMachineV3SPI( db, availabilityGuard, getAwaitDuration(), clock );
BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI );
return new BoltStateMachineV3( boltSPI, boltChannel, clock );
}

private Duration getAwaitDuration()
{
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis();
return Duration.ofMillis( bookmarkReadyTimeout );
}
}
Expand Up @@ -44,6 +44,8 @@
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
import org.neo4j.values.virtual.MapValue;

import static org.neo4j.util.Preconditions.checkState;

public class TransactionStateMachine implements StatementProcessor
{
final TransactionStateMachineSPI spi;
Expand Down Expand Up @@ -367,6 +369,9 @@ State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String s
Duration ignored1, Map<String,Object> ignored2 )
throws KernelException
{
checkState( ignored1 == null, "Explicit Transaction should not run with tx_timeout" );
checkState( ignored2 == null, "Explicit Transaction should not run with tx_metadata" );

if ( statement.isEmpty() )
{
statement = ctx.lastStatement;
Expand Down
Expand Up @@ -20,33 +20,23 @@
package org.neo4j.bolt.v3.messaging.request;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import org.neo4j.bolt.messaging.BoltIOException;
import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.v1.runtime.bookmarking.Bookmark;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.spatial.Point;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.util.BaseToObjectValueWriter;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.CoordinateReferenceSystem;
import org.neo4j.values.storable.LongValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;
import org.neo4j.values.virtual.VirtualValues;

import static java.util.Objects.requireNonNull;
import static org.neo4j.bolt.v3.messaging.request.MessageMetadataParser.parseBookmark;
import static org.neo4j.bolt.v3.messaging.request.MessageMetadataParser.parseTransactionMetadata;
import static org.neo4j.bolt.v3.messaging.request.MessageMetadataParser.parseTransactionTimeout;

public class BeginMessage implements RequestMessage
{
public static final byte SIGNATURE = 0x11;
private static final String TX_TIMEOUT_KEY = "tx_timeout";
private static final String TX_META_DATA_KEY = "tx_metadata";

private final MapValue meta;
private final Bookmark bookmark;
Expand All @@ -66,56 +56,6 @@ public BeginMessage( MapValue meta ) throws BoltIOException
this.txMetadata = parseTransactionMetadata( meta );
}

static Bookmark parseBookmark( MapValue meta ) throws BoltIOException
{
try
{
return Bookmark.fromParamsOrNull( meta );
}
catch ( KernelException e )
{
throw new BoltIOException( Status.Request.InvalidFormat, e.getMessage(), e );
}
}

static Duration parseTransactionTimeout( MapValue meta ) throws BoltIOException
{
AnyValue anyValue = meta.get( TX_TIMEOUT_KEY );
if ( anyValue == Values.NO_VALUE )
{
return null;
}
else if ( anyValue instanceof LongValue )
{
return Duration.ofMillis( ((LongValue) anyValue).longValue() );
}
else
{
throw new BoltIOException( Status.Request.InvalidFormat, "Expecting transaction timeout value to be a Long value, but got: " + anyValue );
}
}

static Map<String,Object> parseTransactionMetadata( MapValue meta ) throws BoltIOException
{
AnyValue anyValue = meta.get( TX_META_DATA_KEY );
if ( anyValue == Values.NO_VALUE )
{
return null;
}
else if ( anyValue instanceof MapValue )
{
MapValue mapValue = (MapValue) anyValue;
TransactionMetadataWriter writer = new TransactionMetadataWriter();
Map<String,Object> txMeta = new HashMap<>( mapValue.size() );
mapValue.foreach( ( key, value ) -> txMeta.put( key, writer.valueAsObject( value ) ) );
return txMeta;
}
else
{
throw new BoltIOException( Status.Request.InvalidFormat, "Expecting transaction metadata value to be a Map value, but got: " + anyValue );
}
}

public Bookmark bookmark()
{
return this.bookmark;
Expand Down Expand Up @@ -168,31 +108,4 @@ public Map<String,Object> transactionMetadata()
{
return txMetadata;
}

private static class TransactionMetadataWriter extends BaseToObjectValueWriter<RuntimeException>
{
@Override
protected Node newNodeProxyById( long id )
{
throw new UnsupportedOperationException( "Transaction metadata should not contain nodes" );
}

@Override
protected Relationship newRelationshipProxyById( long id )
{
throw new UnsupportedOperationException( "Transaction metadata should not contain relationships" );
}

@Override
protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate )
{
return Values.pointValue( crs, coordinate );
}

Object valueAsObject( AnyValue value )
{
value.writeTo( this );
return value();
}
}
}
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v3.messaging.request;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.neo4j.bolt.messaging.BoltIOException;
import org.neo4j.bolt.v1.runtime.bookmarking.Bookmark;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.spatial.Point;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.util.BaseToObjectValueWriter;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.CoordinateReferenceSystem;
import org.neo4j.values.storable.LongValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;

/**
* The parsing methods in this class returns null if the specified key is not found in the input message metadata map.
*/
class MessageMetadataParser
{
private static final String TX_TIMEOUT_KEY = "tx_timeout";
private static final String TX_META_DATA_KEY = "tx_metadata";

static Bookmark parseBookmark( MapValue meta ) throws BoltIOException
{
try
{
return Bookmark.fromParamsOrNull( meta );
}
catch ( KernelException e )
{
throw new BoltIOException( Status.Request.InvalidFormat, e.getMessage(), e );
}
}

static Duration parseTransactionTimeout( MapValue meta ) throws BoltIOException
{
AnyValue anyValue = meta.get( TX_TIMEOUT_KEY );
if ( anyValue == Values.NO_VALUE )
{
return null;
}
else if ( anyValue instanceof LongValue )
{
return Duration.ofMillis( ((LongValue) anyValue).longValue() );
}
else
{
throw new BoltIOException( Status.Request.InvalidFormat, "Expecting transaction timeout value to be a Long value, but got: " + anyValue );
}
}

static Map<String,Object> parseTransactionMetadata( MapValue meta ) throws BoltIOException
{
AnyValue anyValue = meta.get( TX_META_DATA_KEY );
if ( anyValue == Values.NO_VALUE )
{
return null;
}
else if ( anyValue instanceof MapValue )
{
MapValue mapValue = (MapValue) anyValue;
TransactionMetadataWriter writer = new TransactionMetadataWriter();
Map<String,Object> txMeta = new HashMap<>( mapValue.size() );
mapValue.foreach( ( key, value ) -> txMeta.put( key, writer.valueAsObject( value ) ) );
return txMeta;
}
else
{
throw new BoltIOException( Status.Request.InvalidFormat, "Expecting transaction metadata value to be a Map value, but got: " + anyValue );
}
}

private static class TransactionMetadataWriter extends BaseToObjectValueWriter<RuntimeException>
{
@Override
protected Node newNodeProxyById( long id )
{
throw new UnsupportedOperationException( "Transaction metadata should not contain nodes" );
}

@Override
protected Relationship newRelationshipProxyById( long id )
{
throw new UnsupportedOperationException( "Transaction metadata should not contain relationships" );
}

@Override
protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate )
{
return Values.pointValue( crs, coordinate );
}

Object valueAsObject( AnyValue value )
{
value.writeTo( this );
return value();
}
}
}
Expand Up @@ -30,9 +30,9 @@
import org.neo4j.values.virtual.VirtualValues;

import static java.util.Objects.requireNonNull;
import static org.neo4j.bolt.v3.messaging.request.BeginMessage.parseBookmark;
import static org.neo4j.bolt.v3.messaging.request.BeginMessage.parseTransactionMetadata;
import static org.neo4j.bolt.v3.messaging.request.BeginMessage.parseTransactionTimeout;
import static org.neo4j.bolt.v3.messaging.request.MessageMetadataParser.parseBookmark;
import static org.neo4j.bolt.v3.messaging.request.MessageMetadataParser.parseTransactionMetadata;
import static org.neo4j.bolt.v3.messaging.request.MessageMetadataParser.parseTransactionTimeout;

public class RunMessage implements RequestMessage
{
Expand Down
Expand Up @@ -101,16 +101,11 @@ private BoltStateMachineState processRunMessage( RunMessage message, StateMachin
}

private BoltStateMachineState processCommitMessage( StateMachineContext context ) throws Exception
{
appendBookmarkInResponse( context );
return readyState;
}

static void appendBookmarkInResponse( StateMachineContext context ) throws KernelException
{
StatementProcessor statementProcessor = context.connectionState().getStatementProcessor();
Bookmark bookmark = statementProcessor.commitTransaction();
bookmark.attachTo( context.connectionState() );
return readyState;
}

private BoltStateMachineState processRollbackMessage( StateMachineContext context ) throws Exception
Expand Down
Expand Up @@ -171,8 +171,8 @@ interface CloseListener
PropertyCursor ambientPropertyCursor();

/**
* Attaches a map of data to the transaction.
* The daga will be printed when listing queries and inserted in to th query log.
* Attaches a map of data to this transaction.
* The data will be printed when listing queries and inserted in to the query log.
* @param metaData The data to add.
*/
void setMetaData( Map<String, Object> metaData );
Expand Down
Expand Up @@ -22,8 +22,6 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -105,6 +103,7 @@
import org.neo4j.storageengine.api.txstate.TxStateVisitor;

import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.neo4j.storageengine.api.TransactionApplicationMode.INTERNAL;

Expand Down Expand Up @@ -215,7 +214,7 @@ public KernelTransactionImplementation( StatementOperationParts statementOperati
versionContextSupplier );
this.accessCapability = accessCapability;
this.statistics = new Statistics( this, cpuClockRef, heapAllocationRef );
this.userMetaData = new HashMap<>();
this.userMetaData = emptyMap();
this.constraintSemantics = constraintSemantics;
DefaultCursors cursors = new DefaultCursors( storageReader );
AllStoreHolder allStoreHolder =
Expand Down Expand Up @@ -944,7 +943,7 @@ private void release()
hooksState = null;
closeListeners.clear();
reuseCount++;
userMetaData = Collections.emptyMap();
userMetaData = emptyMap();
userTransactionId = 0;
statistics.reset();
operations.release();
Expand Down

0 comments on commit 2530efd

Please sign in to comment.