Skip to content

Commit

Permalink
Makes sure no half transactions gets included when copying over unfin…
Browse files Browse the repository at this point in the history
…ished transactions to the new log
  • Loading branch information
tinwelint committed Jan 27, 2012
1 parent 32cb67a commit 76cc3cb
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -1437,32 +1439,37 @@ public synchronized long rotate() throws IOException
// Set<Integer> startEntriesWritten = new HashSet<Integer>();
LogBuffer newLogBuffer = instantiateCorrectWriteBuffer( newLog );
boolean foundFirstStrayTx = false;
Set<Integer> identsWithStartEntry = new HashSet<Integer>();
while ((entry = LogIoUtils.readEntry( sharedBuffer, fileChannel, cf )) != null )
{
if ( !foundFirstStrayTx && xidIdentMap.get( entry.getIdentifier() ) != null )
if ( !foundFirstStrayTx && xidIdentMap.get( entry.getIdentifier() ) != null )
{
foundFirstStrayTx = true;
}
if ( foundFirstStrayTx )
{
if ( entry instanceof LogEntry.Start )
{
identsWithStartEntry.add( entry.getIdentifier() );
LogEntry.Start startEntry = (LogEntry.Start) entry;
startEntry.setStartPosition( newLogBuffer.getFileChannelPosition() ); // newLog.position() );
// overwrite old start entry with new that has updated position
xidIdentMap.put( startEntry.getIdentifier(), startEntry );
// startEntriesWritten.add( entry.getIdentifier() );
}
else if ( entry instanceof LogEntry.Commit )
if ( identsWithStartEntry.contains( entry.getIdentifier() ) )
{
LogEntry.Start startEntry = xidIdentMap.get( entry.getIdentifier() );
LogEntry.Commit commitEntry = (LogEntry.Commit) entry;
TxPosition oldPos = positionCache.getStartPosition( commitEntry.getTxId() );
TxPosition newPos = cacheTxStartPosition( commitEntry.getTxId(), startEntry.getMasterId(), startEntry, logVersion+1 );
msgLog.logMessage( "Updated tx " + ((LogEntry.Commit) entry ).getTxId() +
" from " + oldPos + " to " + newPos );
if ( entry instanceof LogEntry.Commit )
{
LogEntry.Start startEntry = xidIdentMap.get( entry.getIdentifier() );
LogEntry.Commit commitEntry = (LogEntry.Commit) entry;
TxPosition oldPos = positionCache.getStartPosition( commitEntry.getTxId() );
TxPosition newPos = cacheTxStartPosition( commitEntry.getTxId(), startEntry.getMasterId(), startEntry, logVersion+1 );
msgLog.logMessage( "Updated tx " + ((LogEntry.Commit) entry ).getTxId() +
" from " + oldPos + " to " + newPos );
}
LogIoUtils.writeLogEntry( entry, newLogBuffer );
}
LogIoUtils.writeLogEntry( entry, newLogBuffer );
}
}
newLogBuffer.force();
Expand Down

0 comments on commit 76cc3cb

Please sign in to comment.