Skip to content

Commit

Permalink
core-edge: stat util changes
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski authored and jimwebber committed Jul 14, 2016
1 parent 3fda82c commit 194c4fd
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 103 deletions.
Expand Up @@ -22,41 +22,32 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.neo4j.logging.FormattedLogProvider.toOutputStream;

@SuppressWarnings( "unused" ) // for easy debugging, leave it
public class StatUtil
{
private static class StatPrinterService extends ScheduledThreadPoolExecutor
{
private static final StatPrinterService INSTANCE = new StatPrinterService();

private StatPrinterService()
{
super( 1, new NamedThreadFactory( "stat-printer" , Thread.NORM_PRIORITY, true ) );
super.setRemoveOnCancelPolicy( true );
}
}

public static class StatContext
{
private static final int N_BUCKETS = 10; // values >= Math.pow( 10, N_BUCKETS-1 ) all go into the last bucket

private String name;
private final String name;
private final Log log;
private final long printEvery;
private final boolean clearAfterPrint;
private BasicStats[] bucket = new BasicStats[N_BUCKETS];
private long totalCount;

private StatContext( String name, Log log )
private StatContext( String name, Log log, long printEvery, boolean clearAfterPrint )
{
this.name = name;
this.log = log;
this.printEvery = printEvery;
this.clearAfterPrint = clearAfterPrint;
clear();
}

Expand All @@ -77,6 +68,11 @@ public void collect( double value )
{
totalCount++;
bucket[bucketIndex].collect( value );

if ( totalCount % printEvery == 0 )
{
log.info( getText( clearAfterPrint ) );
}
}
}

Expand Down Expand Up @@ -134,48 +130,16 @@ public synchronized String toString()
}
}

public static StatContext create( String name )
{
return new StatContext( name, NullLogProvider.getInstance().getLog( name ) );
}

private static Map<String,ScheduledFuture> printingJobs = new HashMap<>();

public static synchronized StatContext create( String name, long printEveryMs )
public static synchronized StatContext create( String name, long printEvery, boolean clearAfterPrint )
{
return create( name, FormattedLogProvider.toOutputStream( System.out ).getLog( name ), printEveryMs, false, false );
return create( name, toOutputStream( System.out ).getLog( name ), printEvery, clearAfterPrint );
}

public static synchronized StatContext create( String name, Log log, long printEveryMs, boolean clearAfterPrint, boolean ensureUnique )
public static synchronized StatContext create( String name, Log log, long printEvery, boolean clearAfterPrint )
{
StatContext statContext = new StatContext( name, log );
ScheduledFuture job = null;

if ( ensureUnique )
{
job = printingJobs.remove( name );
}

if ( job != null )
{
job.cancel( true );
log.warn( "Replacing printer for: " + name );
}

job = StatPrinterService.INSTANCE.scheduleAtFixedRate( () -> {
if ( statContext.totalCount() > 0 )
{
String data = statContext.getText( clearAfterPrint );
log.info( "%s%s", name, data );
}
}, printEveryMs, printEveryMs, MILLISECONDS );

if ( ensureUnique )
{
printingJobs.put( name, job );
}

return statContext;
return new StatContext( name, log, printEvery, clearAfterPrint );
}

public static class TimingContext
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.util.Map;
import java.util.SortedMap;

import org.neo4j.coreedge.helper.StatUtil.StatContext;
import org.neo4j.coreedge.raft.log.EntryRecord;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
Expand Down Expand Up @@ -54,25 +53,18 @@ class RecoveryProtocol
private final ChannelMarshal<ReplicatedContent> contentMarshal;
private final LogProvider logProvider;
private final Log log;
private final StatContext scanStats;
private long expectedVersion;
private ReaderPool readerPool;

RecoveryProtocol( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, StatContext scanStats )
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider )
{
this.fileSystem = fileSystem;
this.fileNames = fileNames;
this.readerPool = readerPool;
this.contentMarshal = contentMarshal;
this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
this.scanStats = scanStats;
}

RecoveryProtocol( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool, ChannelMarshal<ReplicatedContent> marshal, LogProvider logProvider )
{
this( fileSystem, fileNames, readerPool, marshal, logProvider, null );
}

State run() throws IOException, DamagedLogStorageException, DisposedException
Expand All @@ -82,7 +74,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException

if ( files.entrySet().isEmpty() )
{
state.segments = new Segments( fileSystem, fileNames, readerPool, emptyList(), contentMarshal, logProvider, -1, scanStats );
state.segments = new Segments( fileSystem, fileNames, readerPool, emptyList(), contentMarshal, logProvider, -1 );
state.segments.rotate( -1, -1, -1 );
return state;
}
Expand Down Expand Up @@ -114,7 +106,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException
writeHeader( fileSystem, file, header );
}

SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, fileNameVersion, contentMarshal, logProvider, header, scanStats );
SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, fileNameVersion, contentMarshal, logProvider, header );

checkVersionStrictlyMonotonic( fileNameVersion );
checkVersionMatches( segment.header().version(), fileNameVersion );
Expand All @@ -138,7 +130,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException

SegmentFile last = segmentFiles.get( segmentFiles.size() - 1 );

state.segments = new Segments( fileSystem, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, files.lastKey(), scanStats );
state.segments = new Segments( fileSystem, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, files.lastKey() );
state.appendIndex = last.header().prevIndex();
state.currentTerm = last.header().prevTerm();

Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.File;
import java.io.IOException;

import org.neo4j.coreedge.helper.StatUtil.StatContext;
import org.neo4j.coreedge.raft.log.EntryRecord;
import org.neo4j.coreedge.raft.log.LogPosition;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
Expand Down Expand Up @@ -59,24 +58,16 @@ class SegmentFile implements AutoCloseable
private final long version;

private PhysicalFlushableChannel bufferedWriter;
private final StatContext scanStats;

SegmentFile( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header )
{
this( fileSystem, file, readerPool, version, contentMarshal, logProvider, header, null );
}

SegmentFile( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header, StatContext scanStats )
{
this.fileSystem = fileSystem;
this.file = file;
this.readerPool = readerPool;
this.contentMarshal = contentMarshal;
this.header = header;
this.version = version;
this.scanStats = scanStats;

this.positionCache = new PositionCache();
this.refCount = new ReferenceCounter();
Expand All @@ -85,21 +76,15 @@ class SegmentFile implements AutoCloseable
}

static SegmentFile create( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header ) throws IOException
{
return create( fileSystem, file, readerPool, version, contentMarshal, logProvider, header, null );
}

static SegmentFile create( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header, StatContext scanStats )
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header )
throws IOException
{
if ( fileSystem.fileExists( file ) )
{
throw new IllegalStateException( "File was not expected to exist" );
}

SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, version, contentMarshal, logProvider, header, scanStats );
SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, version, contentMarshal, logProvider, header );
headerMarshal.marshal( header, segment.getOrCreateWriter() );
segment.flush();

Expand Down Expand Up @@ -127,10 +112,6 @@ IOCursor<EntryRecord> getCursor( long logIndex ) throws IOException, DisposedExc
try
{
long currentIndex = position.logIndex;
if ( scanStats != null )
{
scanStats.collect( offsetIndex - currentIndex );
}
return new EntryRecordCursor( reader, contentMarshal, currentIndex, offsetIndex, this );
}
catch ( EndOfStreamException e )
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.time.Clock;

import org.neo4j.coreedge.helper.StatUtil;
import org.neo4j.coreedge.raft.log.EntryRecord;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCursor;
Expand Down Expand Up @@ -73,8 +72,6 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog
private State state;
private final ReaderPool readerPool;

private final StatUtil.StatContext scanStats; // this is temporary, for debugging purposes

public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider,
String pruningConfig, int readerPoolSize, Clock clock )
Expand All @@ -85,7 +82,6 @@ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long
this.contentMarshal = contentMarshal;
this.logProvider = logProvider;

this.scanStats = null; // StatUtil.create( "linear scans", logProvider.getLog( getClass() ), 10_000, true );
this.fileNames = new FileNames( directory );
this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock );
this.pruner = new SegmentedRaftLogPruner( pruningConfig, logProvider );
Expand All @@ -100,7 +96,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException,
throw new IOException( "Could not create: " + directory );
}

RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider, scanStats );
RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider );
state = recoveryProtocol.run();
log.info( "log started with recovered state %s", state );
}
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.ListIterator;

import org.neo4j.coreedge.helper.StatUtil;
import org.neo4j.coreedge.raft.log.segmented.OpenEndRangeMap.ValueRange;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
Expand All @@ -52,10 +51,9 @@ class Segments implements AutoCloseable
private final LogProvider logProvider;
private long currentVersion;
private final ReaderPool readerPool;
private StatUtil.StatContext scanStats;

Segments( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool, List<SegmentFile> allSegments,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, long currentVersion, StatUtil.StatContext scanStats )
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, long currentVersion )
{
this.fileSystem = fileSystem;
this.fileNames = fileNames;
Expand All @@ -65,17 +63,10 @@ class Segments implements AutoCloseable
this.log = logProvider.getLog( getClass() );
this.currentVersion = currentVersion;
this.readerPool = readerPool;
this.scanStats = scanStats;

populateRangeMap();
}

public Segments( FileSystemAbstraction fsa, FileNames fileNames, ReaderPool readerPool, List<SegmentFile> segmentFiles,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, long currentVersion )
{
this( fsa, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, currentVersion, null );
}

private void populateRangeMap()
{
for ( SegmentFile segment : allSegments )
Expand Down Expand Up @@ -144,7 +135,7 @@ private synchronized SegmentFile createNext( long prevFileLastIndex, long prevIn
SegmentHeader header = new SegmentHeader( prevFileLastIndex, currentVersion, prevIndex, prevTerm );

File file = fileNames.getForVersion( currentVersion );
SegmentFile segment = SegmentFile.create( fileSystem, file, readerPool, currentVersion, contentMarshal, logProvider, header, scanStats );
SegmentFile segment = SegmentFile.create( fileSystem, file, readerPool, currentVersion, contentMarshal, logProvider, header );
// TODO: Force base directory... probably not possible using fsa.
segment.flush();

Expand Down

0 comments on commit 194c4fd

Please sign in to comment.