Skip to content

Commit

Permalink
Issue #5458 unit tests were added.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Dec 16, 2015
1 parent 872c29b commit f14ba8b
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 185 deletions.

Large diffs are not rendered by default.

185 changes: 142 additions & 43 deletions ...entechnologies/orient/core/storage/impl/local/statistic/OStoragePerformanceStatistic.java 100644 → 100755
Expand Up @@ -3,13 +3,16 @@
import com.orientechnologies.common.concur.lock.ODistributedCounter;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.orient.core.exception.OReadCacheException;
import com.orientechnologies.orient.core.storage.OIdentifiableStorage;
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;

import javax.management.*;
import java.lang.management.ManagementFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -24,12 +27,12 @@ public class OStoragePerformanceStatistic implements OStoragePerformanceStatisti
/**
* Timer which is used to update exposed statistic values (so called snapshots).
*/
private final Timer updateTimer;
private final ScheduledExecutorService updateTimer;

/**
* Task which is used to update exposed statistic values (aka snapshots).
*/
private volatile TimerTask updateTask;
private volatile ScheduledFuture<?> updateTask;

/**
* Amount of bytes in megabyte.
Expand Down Expand Up @@ -60,7 +63,7 @@ protected Deque<Long> initialValue() {
/**
* Flag which indicates whether measurement of storage performance statistic is enabled.
*/
private volatile boolean measurementIsEnabled = false;
private volatile boolean measurementEnabled = false;

/**
* Amount of times when cache was accessed during the measurement session.
Expand Down Expand Up @@ -197,26 +200,51 @@ protected Deque<Long> initialValue() {
*/
private final AtomicBoolean mbeanIsRegistered = new AtomicBoolean();

/**
* Object which is used to get current PC nano time.
*/
private final NanoTimer nanoTimer;

/**
* Creates object and initiates it with value of size of page in cache and set name of MBean exposed to JVM.
*
* @param pageSize Page size in cache.
* @param storageName Name of storage performance statistic of which is gathered.
* @param storageId Id of storage {@link OIdentifiableStorage#getId()}
*/
public OStoragePerformanceStatistic(int pageSize, String storageName) {
public OStoragePerformanceStatistic(int pageSize, String storageName, long storageId) {
this(pageSize, storageName, storageId, new NanoTimer() {
@Override
public long getNano() {
return System.nanoTime();
}
});
}

/**
* Creates object and initiates it with value of size of page in cache and set name of MBean exposed to JVM.
*
* @param pageSize Page size in cache.
* @param storageName Name of storage performance statistic of which is gathered.
* @param storageId Id of storage {@link OIdentifiableStorage#getId()}
* @param nanoTimer Object which is used to get current PC nano time.
*/
public OStoragePerformanceStatistic(int pageSize, String storageName, long storageId, NanoTimer nanoTimer) {
this.pageSize = pageSize;

updateTimer = new Timer("Updater for storage performance statistic of " + storageName, true);
updateTimer = Executors.newSingleThreadScheduledExecutor(new SnapshotTaskFactory(storageName));
mbeanName = "com.orientechnologies.orient.core.storage.impl.local.statistic:type=OStoragePerformanceStatisticMXBean,name="
+ storageName;
+ storageName + ",id=" + storageId;

this.nanoTimer = nanoTimer;
}

/**
* Starts gathering of performance statistic for storage.
*/
@Override
public void startMeasurement() {
measurementIsEnabled = true;
measurementEnabled = true;

cacheAccessCount.clear();
cacheHit.clear();
Expand All @@ -231,41 +259,31 @@ public void startMeasurement() {
commitCount.clear();
commitTime.clear();

updateTask = new TimerTask() {
@Override
public void run() {
cacheAccessCountSnapshot = cacheAccessCount.get();
cacheHitSnapshot = cacheHit.get();
pageReadFromFileTimeSnapshot = pageReadFromFileTime.get();
pageReadFromFileCountSnapshot = pageReadFromFileCount.get();
pageReadFromCacheTimeSnapshot = pageReadFromCacheTime.get();
pageReadFromCacheCountSnapshot = pageReadFromCacheCount.get();
pageWriteToCacheTimeSnapshot = pageWriteToCacheTime.get();
pageWriteToCacheCountSnapshot = pageWriteToCacheCount.get();
pageWriteToFileTimeSnapshot = pageWriteToFileTime.get();
pageWriteToFileCountSnapshot = pageWriteToFileCount.get();
commitCountSnapshot = commitCount.get();
commitTimeSnapshot = commitTime.get();
}
};

updateTimer.schedule(updateTask, MILLISECONDS_IN_SECOND, MILLISECONDS_IN_SECOND);
updateTask = updateTimer.scheduleWithFixedDelay(new SnapshotTask(), 1, 1, TimeUnit.SECONDS);
}

/**
* Stops gathering of performance statistic for storage, but does not clear snapshot values.
*/
@Override
public void stopMeasurement() {
final TimerTask ct = updateTask;
final ScheduledFuture<?> ct = updateTask;
if (ct != null) {
ct.cancel();
ct.cancel(false);
}

measurementIsEnabled = false;
measurementEnabled = false;
updateTask = null;
}

/**
* @return <code>true</code> if statistic is measured inside of storage.
*/
@Override
public boolean isMeasurementEnabled() {
return measurementEnabled;
}

/**
* Registers MBean in MBean server.
*/
Expand Down Expand Up @@ -318,7 +336,11 @@ public void unregisterMBean() {
*/
@Override
public long getReadSpeedFromCacheInMB() {
return (getReadSpeedFromCacheInPages() * pageSize) / MEGABYTE;
final long pagesSpeed = getReadSpeedFromCacheInPages();
if (pagesSpeed < 0)
return -1;

return (pagesSpeed * pageSize) / MEGABYTE;
}

/**
Expand Down Expand Up @@ -353,7 +375,11 @@ public long getReadSpeedFromFileInPages() {
*/
@Override
public long getReadSpeedFromFileInMB() {
return (getReadSpeedFromFileInPages() * pageSize) / MEGABYTE;
final long pageSpeed = getReadSpeedFromFileInPages();
if (pageSpeed < 0)
return -1;

return (pageSpeed * pageSize) / MEGABYTE;
}

/**
Expand All @@ -368,7 +394,7 @@ public long getAmountOfPagesReadFromCache() {
* @return Amount of pages are read from file.
*/
@Override
public long getAmountOfPagesReadFromFileSystem() {
public long getAmountOfPagesReadFromFile() {
return pageReadFromFileCountSnapshot;
}

Expand All @@ -391,7 +417,11 @@ public long getWriteSpeedInCacheInPages() {
*/
@Override
public long getWriteSpeedInCacheInMB() {
return (getWriteSpeedInCacheInPages() * pageSize) / MEGABYTE;
final long pageSpeed = getWriteSpeedInCacheInPages();
if (pageSpeed < 0)
return -1;

return (pageSpeed * pageSize) / MEGABYTE;
}

/**
Expand All @@ -413,7 +443,11 @@ public long getWriteSpeedInFileInPages() {
*/
@Override
public long getWriteSpeedInFileInMB() {
return (getWriteSpeedInFileInPages() * pageSize) / MEGABYTE;
final long pageSpeed = getWriteSpeedInFileInPages();
if (pageSpeed < 0)
return -1;

return (pageSpeed * pageSize) / MEGABYTE;
}

/**
Expand All @@ -424,6 +458,14 @@ public long getAmountOfPagesWrittenToCache() {
return pageWriteToCacheCountSnapshot;
}

/**
* @return Amount of pages written to file.
*/
@Override
public long getAmountOfPagesWrittenToFile() {
return pageWriteToFileCountSnapshot;
}

/**
* @return Average time of commit of atomic operation in nanoseconds
* or value which is less than 0, which means that value can not be calculated.
Expand Down Expand Up @@ -462,9 +504,9 @@ public void startPageReadFromFileTimer() {
* That is utility method which is used by all startXXXTimer methods.
*/
private void pushTimeStamp() {
if (measurementIsEnabled) {
if (measurementEnabled) {
final Deque<Long> stamps = tlTimeStumps.get();
stamps.push(System.nanoTime());
stamps.push(nanoTimer.getNano());
}
}

Expand All @@ -476,7 +518,7 @@ private void pushTimeStamp() {
public void stopPageReadFromFileTimer(int readPages) {
final Deque<Long> stamps = tlTimeStumps.get();
if (stamps.size() > 0) {
final long endTs = System.nanoTime();
final long endTs = nanoTimer.getNano();

pageReadFromFileTime.add(endTs - stamps.pop());
pageReadFromFileCount.add(readPages);
Expand All @@ -496,7 +538,7 @@ public void startPageReadFromCacheTimer() {
public void stopPageReadFromCacheTimer() {
final Deque<Long> stamps = tlTimeStumps.get();
if (stamps.size() > 0) {
final long endTs = System.nanoTime();
final long endTs = nanoTimer.getNano();

pageReadFromCacheTime.add(endTs - stamps.pop());
pageReadFromCacheCount.increment();
Expand All @@ -516,7 +558,7 @@ public void startPageWriteToCacheTimer() {
public void stopPageWriteToCacheTimer() {
final Deque<Long> stamps = tlTimeStumps.get();
if (stamps.size() > 0) {
final long endTs = System.nanoTime();
final long endTs = nanoTimer.getNano();

pageWriteToCacheTime.add(endTs - stamps.pop());
pageWriteToCacheCount.increment();
Expand All @@ -536,7 +578,7 @@ public void startPageWriteToFileTimer() {
public void stopPageWriteToFileTimer() {
final Deque<Long> stamps = tlTimeStumps.get();
if (stamps.size() > 0) {
final long endTs = System.nanoTime();
final long endTs = nanoTimer.getNano();

pageWriteToFileTime.add(endTs - stamps.pop());
pageWriteToFileCount.increment();
Expand All @@ -547,7 +589,7 @@ public void stopPageWriteToFileTimer() {
* Increments counter of page accesses from cache.
*/
public void incrementPageAccessOnCacheLevel() {
if (measurementIsEnabled) {
if (measurementEnabled) {
cacheAccessCount.increment();
}
}
Expand All @@ -556,7 +598,7 @@ public void incrementPageAccessOnCacheLevel() {
* Increments counter of cache hits
*/
public void incrementCacheHit() {
if (measurementIsEnabled) {
if (measurementEnabled) {
cacheHit.increment();
}
}
Expand All @@ -574,10 +616,67 @@ public void startCommitTimer() {
public void stopCommitTimer() {
final Deque<Long> stamps = tlTimeStumps.get();
if (stamps.size() > 0) {
final long endTs = System.nanoTime();
final long endTs = nanoTimer.getNano();

commitTime.add(endTs - stamps.pop());
commitCount.increment();
}
}

/**
* Interface which is used by this tool to get current PC nano time.
* Implementation which calls <code>System.nanoTime()</code> is used by default.
*/
public interface NanoTimer {
/**
* @return Current PC nano time.
*/
long getNano();
}

/**
* Task to move gathered values about performance statistic at the end of measurement session to snapshot holders.
*/
private final class SnapshotTask implements Runnable {

/**
* Moves counters values to snapshot values.
*/
@Override
public void run() {
cacheAccessCountSnapshot = cacheAccessCount.get();
cacheHitSnapshot = cacheHit.get();
pageReadFromFileTimeSnapshot = pageReadFromFileTime.get();
pageReadFromFileCountSnapshot = pageReadFromFileCount.get();
pageReadFromCacheTimeSnapshot = pageReadFromCacheTime.get();
pageReadFromCacheCountSnapshot = pageReadFromCacheCount.get();
pageWriteToCacheTimeSnapshot = pageWriteToCacheTime.get();
pageWriteToCacheCountSnapshot = pageWriteToCacheCount.get();
pageWriteToFileTimeSnapshot = pageWriteToFileTime.get();
pageWriteToFileCountSnapshot = pageWriteToFileCount.get();
commitCountSnapshot = commitCount.get();
commitTimeSnapshot = commitTime.get();
}
}

/**
* Thread factory for {@link SnapshotTask}
*/
private final static class SnapshotTaskFactory implements ThreadFactory {
private final String storageName;

public SnapshotTaskFactory(String storageName) {
this.storageName = storageName;
}

@Override
public Thread newThread(Runnable r) {
final Thread thread = new Thread(OAbstractPaginatedStorage.storageThreadGroup, r);
thread.setName("Updater for storage performance statistic of " + storageName);
thread.setDaemon(true);

return thread;
}
}

}

0 comments on commit f14ba8b

Please sign in to comment.