Skip to content

Commit

Permalink
Issue #4873 , initial version of profiler to identify YCSB and storag…
Browse files Browse the repository at this point in the history
…e engine in general issues, was created and dynamic JMX bean was provided.
  • Loading branch information
laa committed Apr 7, 2016
1 parent cc06dc4 commit 9a73a9a
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 95 deletions.
44 changes: 44 additions & 0 deletions core/src/main/java/com/orientechnologies/common/util/ORawPair.java
@@ -0,0 +1,44 @@
package com.orientechnologies.common.util;

/**
* Container for pair of non null objects.
*/
public class ORawPair<V1, V2> {
private final V1 first;
private final V2 second;

public ORawPair(V1 first, V2 second) {
this.first = first;
this.second = second;
}

public V1 getFirst() {
return first;
}

public V2 getSecond() {
return second;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;

ORawPair<?, ?> oRawPair = (ORawPair<?, ?>) o;

if (!first.equals(oRawPair.first))
return false;
return second.equals(oRawPair.second);

}

@Override
public int hashCode() {
int result = first.hashCode();
result = 31 * result + second.hashCode();
return result;
}
}
Expand Up @@ -84,6 +84,8 @@ public ODurableComponent(OAbstractPaginatedStorage storage, String name, String
this.writeCache = storage.getWriteCache();
this.performanceStatisticManager = storage.getPerformanceStatisticManager();
this.lockName = lockName;

performanceStatisticManager.registerComponent(fullName);
}

public String getLockName() {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.orientechnologies.common.concur.lock.OReadersWriterSpinLock;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.common.util.ORawPair;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.storage.OIdentifiableStorage;
import com.orientechnologies.orient.core.storage.OStorage;
Expand All @@ -30,11 +31,13 @@
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.orientechnologies.orient.core.storage.impl.local.statistic.OSessionStoragePerformanceStatistic.PerformanceCountersHolder;
import static com.orientechnologies.orient.core.storage.impl.local.statistic.OSessionStoragePerformanceStatistic.PerformanceSnapshot;

/**
* Aggregator of performance statistic for whole storage.
Expand Down Expand Up @@ -65,8 +68,11 @@
* You may access performance data both after you stopped gathering statistic and during gathering of statistic.
* You may manipulate by manager directly from Java or from JMX from bean with name which consist of prefix {@link #MBEAN_PREFIX}
* and storage name.
* <p>
* If {@link com.orientechnologies.orient.core.storage.impl.local.paginated.base.ODurableComponent} participates in
* performance monitoring it has to register itself using method {@link #registerComponent(String)}
*/
public class OPerformanceStatisticManager implements OPerformanceStatisticManagerMXBean {
public class OPerformanceStatisticManager {
/**
* Prefix of name of JMX bean.
*/
Expand Down Expand Up @@ -143,6 +149,11 @@ protected Boolean initialValue() {
*/
private final AtomicBoolean mbeanIsRegistered = new AtomicBoolean();

/**
* List of all components which registered itself as participating in performance monitoring.
*/
private final List<String> componentNames = new CopyOnWriteArrayList<String>();

/**
* @param intervalBetweenSnapshots Interval between time series for each thread statistic.
* @see OSessionStoragePerformanceStatistic
Expand Down Expand Up @@ -188,7 +199,6 @@ public OSessionStoragePerformanceStatistic stopThreadMonitoring() {
* <p>
* After call of this method you can not start monitoring on thread level till call of {@link #stopMonitoring()} is performed.
*/
@Override
public void startMonitoring() {
switchLock.acquireWriteLock();
try {
Expand All @@ -207,7 +217,6 @@ public void startMonitoring() {
/**
* Stops monitoring of performance statistic for whole system.
*/
@Override
public void stopMonitoring() {
switchLock.acquireWriteLock();
try {
Expand Down Expand Up @@ -248,7 +257,7 @@ public void registerMBean(String storageName, int storageId) {
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
final ObjectName mbeanName = new ObjectName(getMBeanName(storageName, storageId));
if (!server.isRegistered(mbeanName)) {
server.registerMBean(this, mbeanName);
server.registerMBean(new OPerformanceStatisticManagerMBean(this), mbeanName);
} else {
mbeanIsRegistered.set(false);
OLogManager.instance().warn(this,
Expand Down Expand Up @@ -296,44 +305,26 @@ public void unregisterMBean(String storageName, int storageId) {
}
}

/**
* Registers component as such one which participates in performance monitoring.
*
* @param component Component which participates in performance monitoring.
*/
public void registerComponent(String component) {
componentNames.add(component);
}

/**
* @return Set of names of components for which performance statistic is gathered.
*/
@Override
public Set<String> getComponentNames() {
switchLock.acquireReadLock();
try {
if (enabled) {
final Set<String> result = new HashSet<String>();

final ImmutableStatistic ds = deadThreadsStatistic;
if (ds != null) {
result.addAll(deadThreadsStatistic.countersByComponents.keySet());
}

for (final OSessionStoragePerformanceStatistic statistic : statistics.values()) {
final Map<String, PerformanceCountersHolder> countersHolderMap = new ConcurrentHashMap<String, PerformanceCountersHolder>();
statistic.pushComponentCounters(countersHolderMap);
result.addAll(countersHolderMap.keySet());
}

return result;
}

if (postMeasurementStatistic == null)
return Collections.emptySet();

return Collections.unmodifiableSet(postMeasurementStatistic.countersByComponents.keySet());
} finally {
switchLock.releaseReadLock();
}
return new HashSet<String>(componentNames);
}

/**
* @return Instance of performance container which is used to gathering data about storage performance statistic or
* <code>null</code> if none of both methods {@link #startMonitoring()} or {@link #startThreadMonitoring()} are called.
*/
@Override
public OSessionStoragePerformanceStatistic getSessionPerformanceStatistic() {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -365,8 +356,6 @@ public OSessionStoragePerformanceStatistic getSessionPerformanceStatistic() {
* @return Average amount of pages which were read from cache for component with given name during single data operation or value
* which is less than 0, which means that value cannot be calculated.
*/

@Override
public long getAmountOfPagesPerOperation(String componentName) {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -394,7 +383,6 @@ public long getAmountOfPagesPerOperation(String componentName) {
/**
* @return Percent of cache hits or value which is less than 0, which means that value cannot be calculated.
*/
@Override
public int getCacheHits() {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -422,7 +410,6 @@ public int getCacheHits() {
* @param componentName Name of component data of which should be returned. Name is case sensitive.
* @return Percent of cache hits or value which is less than 0, which means that value cannot be calculated.
*/
@Override
public int getCacheHits(String componentName) {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -450,7 +437,6 @@ public int getCacheHits(String componentName) {
* @return Average time of commit of atomic operation in nanoseconds or value which is less than 0, which means that value cannot
* be calculated.
*/
@Override
public long getCommitTimeAvg() {
switchLock.acquireReadLock();
try {
Expand All @@ -474,7 +460,6 @@ public long getCommitTimeAvg() {
* @return Read speed of data in pages per second on cache level or value which is less than 0, which means that value cannot be
* calculated.
*/
@Override
public long getReadSpeedFromCacheInPages() {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -503,7 +488,6 @@ public long getReadSpeedFromCacheInPages() {
* @return Read speed of data in pages per second on cache level or value which is less than 0, which means that value cannot be
* calculated.
*/
@Override
public long getReadSpeedFromCacheInPages(String componentName) {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -532,7 +516,6 @@ public long getReadSpeedFromCacheInPages(String componentName) {
* @return Read speed of data from file system in pages per second or value which is less than 0, which means that value cannot be
* calculated.
*/
@Override
public long getReadSpeedFromFileInPages() {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -561,7 +544,6 @@ public long getReadSpeedFromFileInPages() {
* @return Read speed of data from file system in pages per second or value which is less than 0, which means that value cannot be
* calculated.
*/
@Override
public long getReadSpeedFromFileInPages(String componentName) {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -589,7 +571,6 @@ public long getReadSpeedFromFileInPages(String componentName) {
* @return Write speed of data in pages per second on cache level or value which is less than 0, which means that value cannot be
* calculated.
*/
@Override
public long getWriteSpeedInCacheInPages() {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -619,7 +600,6 @@ public long getWriteSpeedInCacheInPages() {
* @return Write speed of data in pages per second on cache level or value which is less than 0, which means that value cannot be
* calculated.
*/
@Override
public long getWriteSpeedInCacheInPages(String componentName) {
switchLock.acquireReadLock();
try {
Expand Down Expand Up @@ -654,15 +634,27 @@ private void fetchSystemCounters(PerformanceCountersHolder countersHolder) {
//all dead threads will be removed and statistics from them will be
//later accumulated in #deadThreadsStatistic field, then result statistic from this field
//will be aggregated to countersHolder
final ArrayList<Thread> threadsToRemove = new ArrayList<Thread>();

//To decrease inter thread communication delay we fetch snapshots first
//and only after that we aggregate data from immutable snapshots
final Collection<ORawPair<Thread, PerformanceSnapshot>> snapshots = new ArrayList<ORawPair<Thread, PerformanceSnapshot>>(
statistics.size());

final Collection<Thread> threadsToRemove = new ArrayList<Thread>();
for (Map.Entry<Thread, OSessionStoragePerformanceStatistic> entry : statistics.entrySet()) {
final Thread thread = entry.getKey();
final OSessionStoragePerformanceStatistic statistic = entry.getValue();
snapshots.add(new ORawPair<Thread, PerformanceSnapshot>(thread, statistic.getSnapshot()));
}

if (!thread.isAlive()) {
threadsToRemove.add(thread);
for (ORawPair<Thread, PerformanceSnapshot> pair : snapshots) {
final Thread thread = pair.getFirst();

if (thread.isAlive()) {
final PerformanceSnapshot snapshot = pair.getSecond();
snapshot.performanceCountersHolder.pushData(countersHolder);
} else {
final OSessionStoragePerformanceStatistic performanceStatistic = entry.getValue();
performanceStatistic.pushSystemCounters(countersHolder);
threadsToRemove.add(thread);
}
}

Expand Down Expand Up @@ -690,15 +682,27 @@ private void fetchComponentCounters(String componentName, PerformanceCountersHol
//later accumulated in #deadThreadsStatistic field, then result statistic from this field
//will be aggregated to componentCountersHolder

final ArrayList<Thread> threadsToRemove = new ArrayList<Thread>();
//To decrease inter thread communication delay we fetch snapshots first
//and only after that we aggregate data from immutable snapshots
final Collection<ORawPair<Thread, PerformanceSnapshot>> snapshots = new ArrayList<ORawPair<Thread, PerformanceSnapshot>>(
statistics.size());

final List<Thread> threadsToRemove = new ArrayList<Thread>();
for (Map.Entry<Thread, OSessionStoragePerformanceStatistic> entry : statistics.entrySet()) {
final Thread thread = entry.getKey();
final OSessionStoragePerformanceStatistic statistic = entry.getValue();
snapshots.add(new ORawPair<Thread, PerformanceSnapshot>(thread, statistic.getSnapshot()));
}

if (!thread.isAlive()) {
threadsToRemove.add(thread);
for (ORawPair<Thread, PerformanceSnapshot> pair : snapshots) {
final Thread thread = pair.getFirst();
if (thread.isAlive()) {
final PerformanceSnapshot snapshot = pair.getSecond();
final PerformanceCountersHolder holder = snapshot.countersByComponent.get(componentName);
if (holder != null)
holder.pushData(componentCountersHolder);
} else {
final OSessionStoragePerformanceStatistic performanceStatistic = entry.getValue();
performanceStatistic.pushComponentCounters(componentName, componentCountersHolder);
threadsToRemove.add(thread);
}
}

Expand All @@ -720,7 +724,7 @@ private void fetchComponentCounters(String componentName, PerformanceCountersHol
*
* @param threadsToRemove Dead threads statistic of which should be moved to {@link #deadThreadsStatistic} field.
*/
private void updateDeadThreadsStatistic(ArrayList<Thread> threadsToRemove) {
private void updateDeadThreadsStatistic(Collection<Thread> threadsToRemove) {
deadThreadsUpdateLock.lock();
try {
//we accumulate all statistic in intermediate fields and only then put
Expand Down

0 comments on commit 9a73a9a

Please sign in to comment.