Skip to content

Commit

Permalink
Multiple bookmarks
Browse files Browse the repository at this point in the history
Previously Bolt server could only accept and process a single bookmark
supplied with parameters for `RUN(BEGIN)` command. However there are
use-cases where clients want to supply multiple bookmarks produced by
separate workers. Ability to supply multiple bookmarks is needed for
such cases. Clients can't compare bookmarks because they are just
opaque strings.

This commit makes it possible to supply multiple bookmarks in a new
"bookmarks" metadata parameters field. Server will try to parse
multiple bookmarks and will fallback to parsing a single bookmark
denoted by "bookmark" key for backwards compatibility reasons. Latest
bookmark will be selected and server will wait for it.
  • Loading branch information
lutovich committed May 29, 2017
1 parent 735026d commit 10c2bc6
Show file tree
Hide file tree
Showing 4 changed files with 450 additions and 25 deletions.
Expand Up @@ -152,9 +152,9 @@ State run( MutableTransactionState ctx, SPI spi, String statement,
{
ctx.currentTransaction = spi.beginTransaction( ctx.securityContext );

if ( params.containsKey( "bookmark" ) )
Bookmark bookmark = Bookmark.fromParamsOrNull( params );
if ( bookmark != null )
{
final Bookmark bookmark = Bookmark.fromString( params.get( "bookmark" ).toString() );
spi.awaitUpToDate( bookmark.txId() );
ctx.currentResult = new BookmarkResult( bookmark );
}
Expand Down
Expand Up @@ -19,13 +19,18 @@
*/
package org.neo4j.bolt.v1.runtime.bookmarking;

import java.util.List;
import java.util.Map;

import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;

import static java.lang.String.format;

public class Bookmark
{
private static final String BOOKMARK_KEY = "bookmark";
private static final String BOOKMARKS_KEY = "bookmarks";
private static final String BOOKMARK_TX_PREFIX = "neo4j:bookmark:v1:tx";

private final long txId;
Expand All @@ -41,27 +46,85 @@ public String toString()
return format( BOOKMARK_TX_PREFIX + "%d", txId );
}

public static Bookmark fromString( String bookmarkString) throws BookmarkFormatException
public static Bookmark fromParamsOrNull( Map<String,Object> params ) throws BookmarkFormatException
{
if ( bookmarkString != null && bookmarkString.startsWith( BOOKMARK_TX_PREFIX ) )
// try to parse multiple bookmarks, if available
Bookmark bookmark = parseMultipleBookmarks( params );
if ( bookmark == null )
{
try
{
return new Bookmark( Long.parseLong( bookmarkString.substring( BOOKMARK_TX_PREFIX.length() ) ) );
}
catch ( NumberFormatException e )
{
throw new BookmarkFormatException( bookmarkString, e );
}
// fallback to parsing single bookmark, if available, for backwards compatibility reasons
// some older drivers can only send a single bookmark
return parseSingleBookmark( params );
}
throw new BookmarkFormatException( bookmarkString );
return bookmark;
}

public long txId()
{
return txId;
}

private static Bookmark parseMultipleBookmarks( Map<String,Object> params ) throws BookmarkFormatException
{
Object bookmarksObject = params.get( BOOKMARKS_KEY );

if ( bookmarksObject == null )
{
return null;
}
else if ( bookmarksObject instanceof List )
{
List<?> bookmarks = (List<?>) bookmarksObject;

long maxTxId = -1;
for ( Object bookmark : bookmarks )
{
if ( bookmark != null )
{
long txId = txIdFrom( bookmark.toString() );
if ( txId > maxTxId )
{
maxTxId = txId;
}
}
}
return maxTxId == -1 ? null : new Bookmark( maxTxId );
}
else
{
throw new BookmarkFormatException( bookmarksObject );
}
}

private static Bookmark parseSingleBookmark( Map<String,Object> params ) throws BookmarkFormatException
{
Object bookmarkObject = params.get( BOOKMARK_KEY );
if ( bookmarkObject == null )
{
return null;
}

String bookmarkString = bookmarkObject.toString();
return new Bookmark( txIdFrom( bookmarkString ) );
}

private static long txIdFrom( String bookmarkString ) throws BookmarkFormatException
{
if ( !bookmarkString.startsWith( BOOKMARK_TX_PREFIX ) )
{
throw new BookmarkFormatException( bookmarkString );
}

try
{
return Long.parseLong( bookmarkString.substring( BOOKMARK_TX_PREFIX.length() ) );
}
catch ( NumberFormatException e )
{
throw new BookmarkFormatException( bookmarkString, e );
}
}

static class BookmarkFormatException extends KernelException
{
BookmarkFormatException( String bookmarkString, NumberFormatException e )
Expand All @@ -70,10 +133,10 @@ static class BookmarkFormatException extends KernelException
"unable to parse transaction id", bookmarkString, BOOKMARK_TX_PREFIX );
}

BookmarkFormatException( String bookmarkString )
BookmarkFormatException( Object bookmarkObject )
{
super( Status.Transaction.InvalidBookmark, "Supplied bookmark [%s] does not conform to pattern %s",
bookmarkString, BOOKMARK_TX_PREFIX );
bookmarkObject, BOOKMARK_TX_PREFIX );
}
}
}
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime;

import org.junit.Test;

import java.util.Map;

import org.neo4j.time.FakeClock;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.neo4j.bolt.security.auth.AuthenticationResult.AUTH_DISABLED;
import static org.neo4j.helpers.collection.MapUtil.map;

public class TransactionStateMachineTest
{
@Test
public void shouldNotWaitWhenNoBookmarkSupplied() throws Exception
{
TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class );
TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI );

stateMachine.run( "BEGIN", emptyMap() );

verify( stateMachineSPI, never() ).awaitUpToDate( anyLong() );
}

@Test
public void shouldAwaitSingleBookmark() throws Exception
{
TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class );
TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI );

stateMachine.run( "BEGIN", map( "bookmark", "neo4j:bookmark:v1:tx15" ) );

verify( stateMachineSPI ).awaitUpToDate( 15 );
}

@Test
public void shouldAwaitMultipleBookmarks() throws Exception
{
TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class );
TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI );

Map<String,Object> params = map( "bookmarks", asList(
"neo4j:bookmark:v1:tx15", "neo4j:bookmark:v1:tx5", "neo4j:bookmark:v1:tx92", "neo4j:bookmark:v1:tx9" )
);
stateMachine.run( "BEGIN", params );

verify( stateMachineSPI ).awaitUpToDate( 92 );
}

@Test
public void shouldAwaitMultipleBookmarksWhenBothSingleAndMultipleSupplied() throws Exception
{
TransactionStateMachineSPI stateMachineSPI = mock( TransactionStateMachineSPI.class );
TransactionStateMachine stateMachine = newTransactionStateMachine( stateMachineSPI );

Map<String,Object> params = map(
"bookmark", "neo4j:bookmark:v1:tx42",
"bookmarks", asList( "neo4j:bookmark:v1:tx47", "neo4j:bookmark:v1:tx67", "neo4j:bookmark:v1:tx45" )
);
stateMachine.run( "BEGIN", params );

verify( stateMachineSPI ).awaitUpToDate( 67 );
}

private static TransactionStateMachine newTransactionStateMachine( TransactionStateMachineSPI stateMachineSPI )
{
return new TransactionStateMachine( stateMachineSPI, AUTH_DISABLED, new FakeClock() );
}
}

0 comments on commit 10c2bc6

Please sign in to comment.