Skip to content

Commit

Permalink
Flatten LogEntryCommit hierarchy. Remove phase mentioning.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Oct 19, 2017
1 parent 57cab78 commit 37f33d7
Show file tree
Hide file tree
Showing 26 changed files with 84 additions and 119 deletions.
30 changes: 14 additions & 16 deletions community/kernel/src/main/java/org/neo4j/helpers/Format.java
Expand Up @@ -29,6 +29,20 @@

public class Format
{
/**
* Default time zone is UTC (+00:00) so that comparing timestamped logs from different
* sources is an easier task.
*/
public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone( "UTC" );
public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSZ";
public static final String TIME_FORMAT = "HH:mm:ss.SSS";

private static final String[] BYTE_SIZES = { "B", "kB", "MB", "GB" };

private static final ThreadLocalFormat DATE = new ThreadLocalFormat( DATE_FORMAT );
private static final ThreadLocalFormat TIME = new ThreadLocalFormat( TIME_FORMAT );
private static int KB = (int) ByteUnit.kibiBytes( 1 );

public static String date()
{
return date( DEFAULT_TIME_ZONE );
Expand Down Expand Up @@ -89,8 +103,6 @@ public static String time( Date date, TimeZone timeZone )
return TIME.format( date, timeZone );
}

private static int KB = (int) ByteUnit.kibiBytes( 1 );

public static String bytes( long bytes )
{
double size = bytes;
Expand Down Expand Up @@ -181,20 +193,6 @@ private Format()
// No instances
}

private static final String[] BYTE_SIZES = { "B", "kB", "MB", "GB" };

public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSZ";
public static final String TIME_FORMAT = "HH:mm:ss.SSS";

/**
* Default time zone is UTC (+00:00) so that comparing timestamped logs from different
* sources is an easier task.
*/
public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone( "UTC" );

private static final ThreadLocalFormat DATE = new ThreadLocalFormat( DATE_FORMAT );
private static final ThreadLocalFormat TIME = new ThreadLocalFormat( TIME_FORMAT );

private static class ThreadLocalFormat extends ThreadLocal<DateFormat>
{
private final String format;
Expand Down
Expand Up @@ -37,7 +37,7 @@
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_CHECKSUM;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_1P_COMMIT;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_COMMIT;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_START;

public class PhysicalLogicalTransactionStore implements LogicalTransactionStore
Expand Down Expand Up @@ -178,7 +178,7 @@ public boolean visit( LogPosition position, ReadableClosablePositionAwareChannel
case TX_START:
startEntry = logEntry.as();
break;
case TX_1P_COMMIT:
case TX_COMMIT:
LogEntryCommit commit = logEntry.as();
if ( commit.getTxId() == startTransactionId )
{
Expand Down
Expand Up @@ -29,6 +29,6 @@ private LogEntryByteCodes()
// Real entries
public static final byte TX_START = (byte) 1;
public static final byte COMMAND = (byte) 3;
public static final byte TX_1P_COMMIT = (byte) 5;
public static final byte TX_COMMIT = (byte) 5;
public static final byte CHECK_POINT = (byte) 7;
}
Expand Up @@ -23,18 +23,26 @@

import org.neo4j.helpers.Format;

public abstract class LogEntryCommit extends AbstractLogEntry
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_COMMIT;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion.CURRENT;

public class LogEntryCommit extends AbstractLogEntry
{
private final long txId;
private final long timeWritten;
protected final String name;

LogEntryCommit( LogEntryVersion version, byte type, long txId, long timeWritten, String name )
public LogEntryCommit( long txId, long timeWritten )
{
this( CURRENT, txId, timeWritten );
}

public LogEntryCommit( LogEntryVersion version, long txId, long timeWritten )
{
super( version, type );
super( version, TX_COMMIT );
this.txId = txId;
this.timeWritten = timeWritten;
this.name = name;
this.name = "Commit";
}

public long getTxId()
Expand All @@ -59,6 +67,13 @@ public String toString( TimeZone timeZone )
return name + "[txId=" + getTxId() + ", " + timestamp( getTimeWritten(), timeZone ) + "]";
}

@Override
@SuppressWarnings( "unchecked" )
public <T extends LogEntry> T as()
{
return (T) this;
}

@Override
public boolean equals( Object o )
{
Expand Down
Expand Up @@ -84,21 +84,21 @@ public boolean skip()
}
},

TX_1P_COMMIT
TX_COMMIT
{
@Override
public LogEntry parse( LogEntryVersion version, ReadableClosableChannel channel, LogPositionMarker marker,
CommandReaderFactory commandReader ) throws IOException
{
long txId = channel.getLong();
long timeWritten = channel.getLong();
return new OnePhaseCommit( version, txId, timeWritten );
return new LogEntryCommit( version, txId, timeWritten );
}

@Override
public byte byteCode()
{
return LogEntryByteCodes.TX_1P_COMMIT;
return LogEntryByteCodes.TX_COMMIT;
}

@Override
Expand Down
Expand Up @@ -30,7 +30,7 @@

import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.CHECK_POINT;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.COMMAND;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_1P_COMMIT;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_COMMIT;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes.TX_START;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion.CURRENT;

Expand Down Expand Up @@ -64,7 +64,7 @@ public void writeStartEntry( int masterId, int authorId, long timeWritten, long

public void writeCommitEntry( long transactionId, long timeWritten ) throws IOException
{
writeLogEntryHeader( TX_1P_COMMIT );
writeLogEntryHeader( TX_COMMIT );
channel.putLong( transactionId ).putLong( timeWritten );
}

Expand Down

This file was deleted.

Expand Up @@ -51,10 +51,10 @@
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.util.monitoring.SilentProgressReporter;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void shouldRecoverExistingData() throws Exception
writer.writeStartEntry( 0, 1, 2L, 3L, new byte[0] );
lastCommittedTxStartEntry = new LogEntryStart( 0, 1, 2L, 3L, new byte[0], lastCommittedTxPosition );
writer.writeCommitEntry( 4L, 5L );
lastCommittedTxCommitEntry = new OnePhaseCommit( 4L, 5L );
lastCommittedTxCommitEntry = new LogEntryCommit( 4L, 5L );

// check point pointing to the previously committed transaction
writer.writeCheckPointEntry( lastCommittedTxPosition );
Expand All @@ -135,7 +135,7 @@ public void shouldRecoverExistingData() throws Exception
expectedStartEntry = new LogEntryStart( 0, 1, 6L, 4L, new byte[0], marker.newPosition() );

writer.writeCommitEntry( 5L, 7L );
expectedCommitEntry = new OnePhaseCommit( 5L, 7L );
expectedCommitEntry = new LogEntryCommit( 5L, 7L );

return true;
} );
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;

import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
Expand All @@ -56,7 +55,7 @@ public class PhysicalTransactionCursorTest
private static final LogEntry NULL_ENTRY = null;
private static final CheckPoint A_CHECK_POINT_ENTRY = new CheckPoint( LogPosition.UNSPECIFIED );
private static final LogEntryStart A_START_ENTRY = new LogEntryStart( 0, 0, 0L, 0L, null, LogPosition.UNSPECIFIED );
private static final LogEntryCommit A_COMMIT_ENTRY = new OnePhaseCommit( 42, 0 );
private static final LogEntryCommit A_COMMIT_ENTRY = new LogEntryCommit( 42, 0 );
private static final LogEntryCommand A_COMMAND_ENTRY = new LogEntryCommand(
new Command.NodeCommand( new NodeRecord( 42 ), new NodeRecord( 42 ) ) );
private PhysicalTransactionCursor<ReadableLogChannel> cursor;
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogCheckPointEvent;
Expand Down Expand Up @@ -165,7 +164,7 @@ public void shouldAppendCommittedTransactions() throws Exception

LogEntryStart start = new LogEntryStart( 0, 0, 0L, latestCommittedTxWhenStarted, null,
LogPosition.UNSPECIFIED );
LogEntryCommit commit = new OnePhaseCommit( nextTxId, 0L );
LogEntryCommit commit = new LogEntryCommit( nextTxId, 0L );
CommittedTransactionRepresentation transaction =
new CommittedTransactionRepresentation( start, transactionRepresentation, commit );

Expand Down Expand Up @@ -213,7 +212,7 @@ public void shouldNotAppendCommittedTransactionsWhenTooFarAhead() throws Excepti

LogEntryStart start = new LogEntryStart( 0, 0, 0L, latestCommittedTxWhenStarted, null,
LogPosition.UNSPECIFIED );
LogEntryCommit commit = new OnePhaseCommit( latestCommittedTxWhenStarted + 2, 0L );
LogEntryCommit commit = new LogEntryCommit( latestCommittedTxWhenStarted + 2, 0L );
CommittedTransactionRepresentation transaction =
new CommittedTransactionRepresentation( start, transactionRepresentation, commit );

Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -52,7 +51,7 @@ public class TransactionPositionLocatorTest
private final LogEntryStart start = new LogEntryStart( 0, 0, 0, 0, null, startPosition );
private final LogEntryCommand command = new LogEntryCommand(
new Command.NodeCommand( new NodeRecord( 42 ), new NodeRecord( 42 ) ) );
private final LogEntryCommit commit = new OnePhaseCommit( txId, System.currentTimeMillis() );
private final LogEntryCommit commit = new LogEntryCommit( txId, System.currentTimeMillis() );

@Test
public void shouldFindTransactionLogPosition() throws IOException
Expand Down
Expand Up @@ -71,7 +71,7 @@ public void shouldParserStartEntry() throws IOException
public void shouldParserOnePhaseCommitEntry() throws IOException
{
// given
final LogEntryCommit commit = new OnePhaseCommit( version, 42, 21 );
final LogEntryCommit commit = new LogEntryCommit( version, 42, 21 );
final InMemoryClosableChannel channel = new InMemoryClosableChannel();

channel.putLong( commit.getTxId() );
Expand All @@ -80,7 +80,7 @@ public void shouldParserOnePhaseCommitEntry() throws IOException
channel.getCurrentPosition( marker );

// when
final LogEntryParser parser = version.entryParser( LogEntryByteCodes.TX_1P_COMMIT );
final LogEntryParser parser = version.entryParser( LogEntryByteCodes.TX_COMMIT );
final LogEntry logEntry = parser.parse( version, channel, marker, commandReader );

// then
Expand Down
Expand Up @@ -69,11 +69,11 @@ public void shouldReadACommitLogEntry() throws IOException
{
// given
LogEntryVersion version = LogEntryVersion.CURRENT;
final LogEntryCommit commit = new OnePhaseCommit( version, 42, 21 );
final LogEntryCommit commit = new LogEntryCommit( version, 42, 21 );
final InMemoryClosableChannel channel = new InMemoryClosableChannel();

channel.put( version.byteCode() );
channel.put( LogEntryByteCodes.TX_1P_COMMIT );
channel.put( LogEntryByteCodes.TX_COMMIT );
channel.putLong( commit.getTxId() );
channel.putLong( commit.getTimeWritten() );

Expand Down
Expand Up @@ -38,16 +38,14 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.log.stresstest.workload.Runner;
import org.neo4j.test.rule.TestDirectory;

import static org.junit.Assert.assertEquals;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.junit.Assert.assertEquals;
import static org.neo4j.function.Suppliers.untilTimeExpired;

public class TransactionAppenderStressTest
Expand Down Expand Up @@ -126,9 +124,9 @@ public long parseAllTxLogs() throws IOException
LogEntry logEntry = reader.readLogEntry( channel );
for ( ; logEntry != null; logEntry = reader.readLogEntry( channel ) )
{
if ( logEntry.getType() == LogEntryByteCodes.TX_1P_COMMIT )
if ( logEntry.getType() == LogEntryByteCodes.TX_COMMIT )
{
txId = logEntry.<OnePhaseCommit>as().getTxId();
txId = logEntry.<LogEntryCommit>as().getTxId();
}
}
}
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.util.monitoring.ProgressReporter;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -61,7 +61,7 @@ public void reportProgressOnRecovery() throws Throwable
when( transactionCursor.next() ).thenAnswer( new NextTransactionAnswer( transactionsToRecover ) );
when( reverseTransactionCursor.get() ).thenReturn( transactionRepresentation );
when( transactionCursor.get() ).thenReturn( transactionRepresentation );
when( transactionRepresentation.getCommitEntry() ).thenReturn( new OnePhaseCommit( lastCommittedTransactionId, 1L ) );
when( transactionRepresentation.getCommitEntry() ).thenReturn( new LogEntryCommit( lastCommittedTransactionId, 1L ) );

when( recoveryService.getRecoveryStartInformation() ).thenReturn( startInformation );
when( recoveryService.getTransactionsInReverseOrder( recoveryStartPosition ) ).thenReturn( reverseTransactionCursor );
Expand Down

0 comments on commit 37f33d7

Please sign in to comment.