diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/helper/StatUtil.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/helper/StatUtil.java index f0ee2a9c70b5..242b002b8dce 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/helper/StatUtil.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/helper/StatUtil.java @@ -22,41 +22,32 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.neo4j.helpers.NamedThreadFactory; -import org.neo4j.logging.FormattedLogProvider; import org.neo4j.logging.Log; -import org.neo4j.logging.NullLogProvider; import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.neo4j.logging.FormattedLogProvider.toOutputStream; @SuppressWarnings( "unused" ) // for easy debugging, leave it public class StatUtil { - private static class StatPrinterService extends ScheduledThreadPoolExecutor - { - private static final StatPrinterService INSTANCE = new StatPrinterService(); - - private StatPrinterService() - { - super( 1, new NamedThreadFactory( "stat-printer" , Thread.NORM_PRIORITY, true ) ); - super.setRemoveOnCancelPolicy( true ); - } - } - public static class StatContext { private static final int N_BUCKETS = 10; // values >= Math.pow( 10, N_BUCKETS-1 ) all go into the last bucket - private String name; + private final String name; + private final Log log; + private final long printEvery; + private final boolean clearAfterPrint; private BasicStats[] bucket = new BasicStats[N_BUCKETS]; private long totalCount; - private StatContext( String name, Log log ) + private StatContext( String name, Log log, long printEvery, boolean clearAfterPrint ) { this.name = name; + this.log = log; + this.printEvery = printEvery; + this.clearAfterPrint = clearAfterPrint; clear(); } @@ -77,6 +68,11 @@ public void collect( double value ) { totalCount++; bucket[bucketIndex].collect( value ); + + if ( totalCount % printEvery == 0 ) + { + log.info( getText( clearAfterPrint ) ); + } } } @@ -134,48 +130,16 @@ public synchronized String toString() } } - public static StatContext create( String name ) - { - return new StatContext( name, NullLogProvider.getInstance().getLog( name ) ); - } - private static Map printingJobs = new HashMap<>(); - public static synchronized StatContext create( String name, long printEveryMs ) + public static synchronized StatContext create( String name, long printEvery, boolean clearAfterPrint ) { - return create( name, FormattedLogProvider.toOutputStream( System.out ).getLog( name ), printEveryMs, false, false ); + return create( name, toOutputStream( System.out ).getLog( name ), printEvery, clearAfterPrint ); } - public static synchronized StatContext create( String name, Log log, long printEveryMs, boolean clearAfterPrint, boolean ensureUnique ) + public static synchronized StatContext create( String name, Log log, long printEvery, boolean clearAfterPrint ) { - StatContext statContext = new StatContext( name, log ); - ScheduledFuture job = null; - - if ( ensureUnique ) - { - job = printingJobs.remove( name ); - } - - if ( job != null ) - { - job.cancel( true ); - log.warn( "Replacing printer for: " + name ); - } - - job = StatPrinterService.INSTANCE.scheduleAtFixedRate( () -> { - if ( statContext.totalCount() > 0 ) - { - String data = statContext.getText( clearAfterPrint ); - log.info( "%s%s", name, data ); - } - }, printEveryMs, printEveryMs, MILLISECONDS ); - - if ( ensureUnique ) - { - printingJobs.put( name, job ); - } - - return statContext; + return new StatContext( name, log, printEvery, clearAfterPrint ); } public static class TimingContext diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/RecoveryProtocol.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/RecoveryProtocol.java index d19c803b328b..229d1eee0acf 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/RecoveryProtocol.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/RecoveryProtocol.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.SortedMap; -import org.neo4j.coreedge.helper.StatUtil.StatContext; import org.neo4j.coreedge.raft.log.EntryRecord; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ChannelMarshal; @@ -54,12 +53,11 @@ class RecoveryProtocol private final ChannelMarshal contentMarshal; private final LogProvider logProvider; private final Log log; - private final StatContext scanStats; private long expectedVersion; private ReaderPool readerPool; RecoveryProtocol( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool, - ChannelMarshal contentMarshal, LogProvider logProvider, StatContext scanStats ) + ChannelMarshal contentMarshal, LogProvider logProvider ) { this.fileSystem = fileSystem; this.fileNames = fileNames; @@ -67,12 +65,6 @@ class RecoveryProtocol this.contentMarshal = contentMarshal; this.logProvider = logProvider; this.log = logProvider.getLog( getClass() ); - this.scanStats = scanStats; - } - - RecoveryProtocol( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool, ChannelMarshal marshal, LogProvider logProvider ) - { - this( fileSystem, fileNames, readerPool, marshal, logProvider, null ); } State run() throws IOException, DamagedLogStorageException, DisposedException @@ -82,7 +74,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException if ( files.entrySet().isEmpty() ) { - state.segments = new Segments( fileSystem, fileNames, readerPool, emptyList(), contentMarshal, logProvider, -1, scanStats ); + state.segments = new Segments( fileSystem, fileNames, readerPool, emptyList(), contentMarshal, logProvider, -1 ); state.segments.rotate( -1, -1, -1 ); return state; } @@ -114,7 +106,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException writeHeader( fileSystem, file, header ); } - SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, fileNameVersion, contentMarshal, logProvider, header, scanStats ); + SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, fileNameVersion, contentMarshal, logProvider, header ); checkVersionStrictlyMonotonic( fileNameVersion ); checkVersionMatches( segment.header().version(), fileNameVersion ); @@ -138,7 +130,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException SegmentFile last = segmentFiles.get( segmentFiles.size() - 1 ); - state.segments = new Segments( fileSystem, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, files.lastKey(), scanStats ); + state.segments = new Segments( fileSystem, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, files.lastKey() ); state.appendIndex = last.header().prevIndex(); state.currentTerm = last.header().prevTerm(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java index fa26840e464c..3a396fb59991 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.IOException; -import org.neo4j.coreedge.helper.StatUtil.StatContext; import org.neo4j.coreedge.raft.log.EntryRecord; import org.neo4j.coreedge.raft.log.LogPosition; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -59,16 +58,9 @@ class SegmentFile implements AutoCloseable private final long version; private PhysicalFlushableChannel bufferedWriter; - private final StatContext scanStats; SegmentFile( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version, ChannelMarshal contentMarshal, LogProvider logProvider, SegmentHeader header ) - { - this( fileSystem, file, readerPool, version, contentMarshal, logProvider, header, null ); - } - - SegmentFile( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version, - ChannelMarshal contentMarshal, LogProvider logProvider, SegmentHeader header, StatContext scanStats ) { this.fileSystem = fileSystem; this.file = file; @@ -76,7 +68,6 @@ class SegmentFile implements AutoCloseable this.contentMarshal = contentMarshal; this.header = header; this.version = version; - this.scanStats = scanStats; this.positionCache = new PositionCache(); this.refCount = new ReferenceCounter(); @@ -85,13 +76,7 @@ class SegmentFile implements AutoCloseable } static SegmentFile create( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version, - ChannelMarshal contentMarshal, LogProvider logProvider, SegmentHeader header ) throws IOException - { - return create( fileSystem, file, readerPool, version, contentMarshal, logProvider, header, null ); - } - - static SegmentFile create( FileSystemAbstraction fileSystem, File file, ReaderPool readerPool, long version, - ChannelMarshal contentMarshal, LogProvider logProvider, SegmentHeader header, StatContext scanStats ) + ChannelMarshal contentMarshal, LogProvider logProvider, SegmentHeader header ) throws IOException { if ( fileSystem.fileExists( file ) ) @@ -99,7 +84,7 @@ static SegmentFile create( FileSystemAbstraction fileSystem, File file, ReaderPo throw new IllegalStateException( "File was not expected to exist" ); } - SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, version, contentMarshal, logProvider, header, scanStats ); + SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, version, contentMarshal, logProvider, header ); headerMarshal.marshal( header, segment.getOrCreateWriter() ); segment.flush(); @@ -127,10 +112,6 @@ IOCursor getCursor( long logIndex ) throws IOException, DisposedExc try { long currentIndex = position.logIndex; - if ( scanStats != null ) - { - scanStats.collect( offsetIndex - currentIndex ); - } return new EntryRecordCursor( reader, contentMarshal, currentIndex, offsetIndex, this ); } catch ( EndOfStreamException e ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java index b681a48171e0..1137e8413d8b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.time.Clock; -import org.neo4j.coreedge.helper.StatUtil; import org.neo4j.coreedge.raft.log.EntryRecord; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCursor; @@ -73,8 +72,6 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog private State state; private final ReaderPool readerPool; - private final StatUtil.StatContext scanStats; // this is temporary, for debugging purposes - public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize, ChannelMarshal contentMarshal, LogProvider logProvider, String pruningConfig, int readerPoolSize, Clock clock ) @@ -85,7 +82,6 @@ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long this.contentMarshal = contentMarshal; this.logProvider = logProvider; - this.scanStats = null; // StatUtil.create( "linear scans", logProvider.getLog( getClass() ), 10_000, true ); this.fileNames = new FileNames( directory ); this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock ); this.pruner = new SegmentedRaftLogPruner( pruningConfig, logProvider ); @@ -100,7 +96,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException, throw new IOException( "Could not create: " + directory ); } - RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider, scanStats ); + RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider ); state = recoveryProtocol.run(); log.info( "log started with recovered state %s", state ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java index dae994009568..d6b3b3b7d331 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.ListIterator; -import org.neo4j.coreedge.helper.StatUtil; import org.neo4j.coreedge.raft.log.segmented.OpenEndRangeMap.ValueRange; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ChannelMarshal; @@ -52,10 +51,9 @@ class Segments implements AutoCloseable private final LogProvider logProvider; private long currentVersion; private final ReaderPool readerPool; - private StatUtil.StatContext scanStats; Segments( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool, List allSegments, - ChannelMarshal contentMarshal, LogProvider logProvider, long currentVersion, StatUtil.StatContext scanStats ) + ChannelMarshal contentMarshal, LogProvider logProvider, long currentVersion ) { this.fileSystem = fileSystem; this.fileNames = fileNames; @@ -65,17 +63,10 @@ class Segments implements AutoCloseable this.log = logProvider.getLog( getClass() ); this.currentVersion = currentVersion; this.readerPool = readerPool; - this.scanStats = scanStats; populateRangeMap(); } - public Segments( FileSystemAbstraction fsa, FileNames fileNames, ReaderPool readerPool, List segmentFiles, - ChannelMarshal contentMarshal, LogProvider logProvider, long currentVersion ) - { - this( fsa, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, currentVersion, null ); - } - private void populateRangeMap() { for ( SegmentFile segment : allSegments ) @@ -144,7 +135,7 @@ private synchronized SegmentFile createNext( long prevFileLastIndex, long prevIn SegmentHeader header = new SegmentHeader( prevFileLastIndex, currentVersion, prevIndex, prevTerm ); File file = fileNames.getForVersion( currentVersion ); - SegmentFile segment = SegmentFile.create( fileSystem, file, readerPool, currentVersion, contentMarshal, logProvider, header, scanStats ); + SegmentFile segment = SegmentFile.create( fileSystem, file, readerPool, currentVersion, contentMarshal, logProvider, header ); // TODO: Force base directory... probably not possible using fsa. segment.flush();