Skip to content

Commit

Permalink
Completed time-series updates of metric state in chronicle file, and …
Browse files Browse the repository at this point in the history
…status change event firing
  • Loading branch information
nickman committed Feb 9, 2013
1 parent b5b0525 commit fd82393
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 40 deletions.
174 changes: 172 additions & 2 deletions spring/src/main/java/org/helios/apmrouter/catalog/EntryStatus.java
Expand Up @@ -24,8 +24,11 @@
*/
package org.helios.apmrouter.catalog;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;

import org.helios.apmrouter.collections.ConcurrentLongSortedSet;
import org.helios.apmrouter.util.BitMaskedEnum;

/**
Expand All @@ -44,8 +47,8 @@ public enum EntryStatus {
/** The entry is offline and has not seen inserts within one time series tier */
OFFLINE;

/** A decode map for Byte -> EntryStatus */
public static final Map<Byte, EntryStatus> ORD2ENUM = BitMaskedEnum.Support.generateByteMap(EntryStatus.values());
/** A decode map for ByteMask -> EntryStatus */
public static final Map<Byte, EntryStatus> MASK2ENUM = BitMaskedEnum.Support.generateByteMap(EntryStatus.values());

/**
* Returns the byte ordinal for this EntryStatus
Expand All @@ -55,6 +58,12 @@ public byte byteOrdinal() {
return (byte)ordinal();
}

public static void main(String[] args) {
for(EntryStatus e: EntryStatus.values()) {
System.out.println(e.name() + ":" + e.byteOrdinal());
}
}


/**
* Returns the EntryStatus with the specified byte ordinal
Expand All @@ -70,4 +79,165 @@ public static EntryStatus forByte(byte b) {

}
}

/**
* Decodes the passed int to an EntryStatus name
* @param code the into to decode
* @return the status name
*/
public static String decode(int code) {
try {
return EntryStatus.forByte((byte)code).name();
} catch (Exception ex) {
return "INVALID CODE:" + code;
}
}

/**
* Renders a state change summary from a change map
* @param changeMap the change map to render
* @return a string
*/
public static String renderStatusCounts(Map<EntryStatus, EntryStatusChange> changeMap) {
StringBuilder b = new StringBuilder("State Change Summary:");
for(Map.Entry<EntryStatus, EntryStatusChange> entry: changeMap.entrySet()) {
b.append("\n\t").append(entry.getKey().name()).append(":").append(entry.getValue().getMetricIds().size());
}
return b.toString();

}


/**
* <p>Title: EntryChange</p>
* <p>Description: Represents an entry state being changed to, a timestamp and an array of metric ids that changed state.</p>
* <p>Company: Helios Development Group LLC</p>
* @author Whitehead (nwhitehead AT heliosdev DOT org)
* <p><code>org.helios.apmrouter.catalog.EntryStatusChange</code></p>
*/
public static class EntryStatusChange {
private final EntryStatus toStatus;
private final long timestamp;
private final ConcurrentLongSortedSet metricIds;


/**
* Creates a new map of EntryStatusChanges keyed by the entry status
* @param timestamp the timestamp that the changes to be added were effective as of
* @return an EntryStatusChange map
*/
public static Map<EntryStatus, EntryStatusChange> getChangeMap(long timestamp) {
return getChangeMap(timestamp, 2048);
}

/**
* Creates a new map of EntryStatusChanges keyed by the entry status
* @param timestamp the timestamp that the changes to be added were effective as of
* @param sizeEstimate the initial size of the metric id array
* @return an EntryStatusChange map
*/
public static Map<EntryStatus, EntryStatusChange> getChangeMap(long timestamp, int sizeEstimate) {
Map<EntryStatus, EntryStatusChange> map = new EnumMap<EntryStatus, EntryStatusChange>(EntryStatus.class);
for(EntryStatus es: EntryStatus.values()) {
map.put(es, new EntryStatusChange(es, timestamp, sizeEstimate));
}
return map;
}

/**
* Creates a change map for a single state change
* @param timestamp The timestamp of the change
* @param metricId The metric id
* @param status the new status
* @return a singleton change map
*/
public static Map<EntryStatus, EntryStatusChange> getChangeMap(long timestamp, long metricId, EntryStatus status) {
return new EnumMap<EntryStatus, EntryStatusChange>(Collections.singletonMap(status, new EntryStatusChange(status, timestamp, 1).appendMetricIds(metricId)));
}



/**
* Creates a new EntryStatusChange
* @param toStatus the status being transitioned to
* @param timestamp the timestamp of the transition in a UTC long.
*/
private EntryStatusChange(EntryStatus toStatus, long timestamp) {
this(toStatus, timestamp, 2048);
}

/**
* Creates a new EntryStatusChange
* @param toStatus the status being transitioned to
* @param timestamp the timestamp of the transition in a UTC long.
* @param sizeEstimate the initial size of the metric id array
*/
private EntryStatusChange(EntryStatus toStatus, long timestamp, int sizeEstimate) {
this.toStatus = toStatus;
this.timestamp = timestamp;
metricIds = new ConcurrentLongSortedSet(false, sizeEstimate);
}

/**
* Appends metric Ids to this state change
* @param metricIds The metric ids to append
* @return this state change
*/
public EntryStatusChange appendMetricIds(long...metricIds) {
this.metricIds.add(metricIds);
return this;
}


/**
* Returns the status being transitioned to
* @return the status being transitioned to
*/
public EntryStatus getToStatus() {
return toStatus;
}


/**
* Returns the time of the state change
* @return the time of the state change
*/
public long getTimestamp() {
return timestamp;
}

/**
* Returns the metricIds that changed state
* @return the metricIds that changed state
*/
public ConcurrentLongSortedSet getMetricIds() {
return metricIds;
}

/**
* Adds an array of metric ids to this state change
* @param metricIds an array of metric ids.
*/
public void addMetricIds(long...metricIds) {
this.metricIds.add(metricIds);
}


}
}
















Expand Up @@ -24,6 +24,10 @@
*/
package org.helios.apmrouter.catalog;

import java.util.Map;

import org.helios.apmrouter.catalog.EntryStatus.EntryStatusChange;

/**
* <p>Title: EntryStatusChangeListener</p>
* <p>Description: Defines a class that listens on entry status changes for entries in the live tier.</p>
Expand All @@ -35,10 +39,7 @@
public interface EntryStatusChangeListener {
/**
* Fired on a change in state of a live tier entry
* @param entryId The id of the entry that changed state
* @param timestamp The timestamp of the change in ms.
* @param priorState The prior state
* @param newState The new state
* @param changeMap map of EntryStatusChanges keyed by the entry status
*/
public void onEntryStatusChange(long entryId, long timestamp, EntryStatus priorState, EntryStatus newState);
public void onEntryStatusChange(Map<EntryStatus, EntryStatusChange> changeMap);
}
Expand Up @@ -30,6 +30,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -43,7 +44,9 @@
import org.helios.apmrouter.catalog.DChannelEventType;
import org.helios.apmrouter.catalog.EntryStatus;
import org.helios.apmrouter.catalog.MetricCatalogService;
import org.helios.apmrouter.catalog.EntryStatus.EntryStatusChange;
import org.helios.apmrouter.collections.ConcurrentLongSlidingWindow;
import org.helios.apmrouter.collections.ConcurrentLongSortedSet;
import org.helios.apmrouter.collections.LongSlidingWindow;
import org.helios.apmrouter.destination.chronicletimeseries.ChronicleTSManager;
import org.helios.apmrouter.destination.chronicletimeseries.ChronicleTier;
Expand Down Expand Up @@ -125,10 +128,40 @@ public void doStart() throws Exception {

/**
* {@inheritDoc}
* @see org.helios.apmrouter.catalog.EntryStatusChangeListener#onEntryStatusChange(long, long, org.helios.apmrouter.catalog.EntryStatus, org.helios.apmrouter.catalog.EntryStatus)
* @see org.helios.apmrouter.catalog.EntryStatusChangeListener#onEntryStatusChange(java.util.Map)
*/
@Override
public void onEntryStatusChange(long entryId, long timestamp, EntryStatus priorState, EntryStatus newState) {
public void onEntryStatusChange(Map<EntryStatus, EntryStatusChange> changeMap) {
Connection conn = null;
PreparedStatement ps = null;
long start = System.currentTimeMillis();
try {
conn = ds.getConnection();
ps = conn.prepareStatement("UPDATE METRIC SET STATE=?, LAST_SEEN=? WHERE METRIC_ID=?");
for(Map.Entry<EntryStatus, EntryStatusChange> entry: changeMap.entrySet()) {
byte status = entry.getKey().byteOrdinal();
ConcurrentLongSortedSet metricIds = entry.getValue().getMetricIds();
Timestamp ts = new Timestamp(entry.getValue().getTimestamp());
for(int i = 0; i < metricIds.size(); i++) {
ps.setByte(1, status);
ps.setTimestamp(2, ts);
ps.setLong(3, metricIds.get(i));
ps.addBatch();
}
ps.executeBatch();
}
ps.close(); ps = null;
conn.close(); conn = null;
long elapsed = System.currentTimeMillis()-start;
if("TRACE".equals(getLevel())) {
trace(new StringBuilder(EntryStatus.renderStatusCounts(changeMap)).append("\n\tUpdate Elapsed:").append(elapsed).append(" ms."));
}
} catch (Exception e) {
throw new RuntimeException("Exception processing entry status updates", e);
} finally {
if(ps!=null) try { ps.close(); } catch (Exception e) {/* No Op */}
if(conn!=null) try { conn.close(); } catch (Exception e) {/* No Op */}
}

}

Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.regex.Pattern;

import org.h2.tools.SimpleResultSet;
import org.helios.apmrouter.catalog.EntryStatus;

/**
* <p>Title: H2StoredProcedure</p>
Expand All @@ -54,6 +55,20 @@ public class H2StoredProcedure {
/** Constant int array of 1, 2 and three */
private static final int[] ARR_ONE_TWO_THREE = {1,2,3};

/**
* Decodes the passed int to an EntryStatus name
* @param code the into to decode
* @return the status name
*/
public static String decode(int code) {
try {
return EntryStatus.forByte((byte)code).name();
} catch (Exception ex) {
return "INVALID CODE:" + code;
}
}


/**
* Called when an agent connects or disconnects (or times out)
* @param conn The H2 supplied connection
Expand Down

0 comments on commit fd82393

Please sign in to comment.