Skip to content

Commit

Permalink
ZEN-16442 - Add summary_event.closed_status and improve related queri…
Browse files Browse the repository at this point in the history
…es and composite indexes.
  • Loading branch information
kindkid committed Feb 24, 2015
1 parent bc2fe94 commit b0339c7
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 22 deletions.
Expand Up @@ -51,6 +51,7 @@ public class EventConstants {

// Summary / Archive fields
public static final String COLUMN_STATUS_ID = "status_id";
public static final String COLUMN_CLOSED_STATUS = "closed_status";
public static final String COLUMN_UPDATE_TIME = "update_time";
public static final String COLUMN_FIRST_SEEN = "first_seen";
public static final String COLUMN_STATUS_CHANGE = "status_change";
Expand Down
45 changes: 23 additions & 22 deletions core/src/main/java/org/zenoss/zep/dao/impl/EventSummaryDaoImpl.java
Expand Up @@ -184,6 +184,7 @@ public List<String> call() throws Exception {
}

fields.put(COLUMN_STATUS_ID, event.getStatus().getNumber());
fields.put(COLUMN_CLOSED_STATUS, ZepConstants.CLOSED_STATUSES.contains(event.getStatus()));
fields.put(COLUMN_UPDATE_TIME, timestampConverter.toDatabaseType(updateTime));
fields.put(COLUMN_FIRST_SEEN, timestampConverter.toDatabaseType(firstSeen));
fields.put(COLUMN_STATUS_CHANGE, timestampConverter.toDatabaseType(created));
Expand Down Expand Up @@ -320,6 +321,7 @@ private Map<String,Object> getUpdateFields(EventSummaryOrBuilder oldSummary, Eve
}
if (updateStatus && oldStatus != newStatus) {
updateFields.put(COLUMN_STATUS_ID, insertFields.get(COLUMN_STATUS_ID));
updateFields.put(COLUMN_CLOSED_STATUS, insertFields.get(COLUMN_CLOSED_STATUS));
updateFields.put(COLUMN_STATUS_CHANGE, timestampConverter.toDatabaseType(occurrence.getCreatedTime()));
}

Expand Down Expand Up @@ -364,21 +366,20 @@ private List<String> clearEvents(Event event, EventPreCreateContext context)
Map<String,Object> fields = new HashMap<String,Object>(2);
fields.put("_clear_created_time", timestampConverter.toDatabaseType(lastSeen));
fields.put("_clear_hashes", clearHashes);
fields.put("_closed_status_ids", CLOSED_STATUS_IDS);

long updateTime = System.currentTimeMillis();
String indexSql = "INSERT INTO event_summary_index_queue (uuid, update_time) "
+ "SELECT uuid, " + String.valueOf(updateTime) + " FROM event_summary " +
"WHERE last_seen <= :_clear_created_time " +
"AND clear_fingerprint_hash IN (:_clear_hashes) " +
"AND status_id NOT IN (:_closed_status_ids) ";
"AND closed_status = FALSE ";
this.template.update(indexSql, fields);

/* Find events that this clear event would clear. */
final String sql = "SELECT uuid FROM event_summary " +
"WHERE last_seen <= :_clear_created_time " +
"AND clear_fingerprint_hash IN (:_clear_hashes) " +
"AND status_id NOT IN (:_closed_status_ids) " +
"AND closed_status = FALSE " +
"FOR UPDATE";

final List<String> results = this.template.query(sql, new RowMapper<String>() {
Expand Down Expand Up @@ -601,11 +602,6 @@ public EventBatch listBatch(EventBatchParams batchParams, long maxUpdateTime, in
EventStatus.STATUS_NEW, EventStatus.STATUS_ACKNOWLEDGED, EventStatus.STATUS_CLOSED,
EventStatus.STATUS_CLEARED);

private static final List<Integer> CLOSED_STATUS_IDS = Arrays.asList(
EventStatus.STATUS_AGED.getNumber(),
EventStatus.STATUS_CLEARED.getNumber(),
EventStatus.STATUS_CLOSED.getNumber());

private static List<Integer> getSeverityIds(EventSeverity maxSeverity, boolean inclusiveSeverity) {
List<Integer> severityIds = EventDaoHelper.getSeverityIdsLessThan(maxSeverity);
if (inclusiveSeverity) {
Expand All @@ -623,7 +619,7 @@ public long getAgeEligibleEventCount(long duration, TimeUnit unit, EventSeverity
if (severityIds.isEmpty()) {
return 0;
}
String sql = "SELECT count(*) FROM event_summary WHERE status_id NOT IN (:_closed_status_ids) AND " +
String sql = "SELECT count(*) FROM event_summary WHERE closed_status = FALSE AND " +
"last_seen < :_last_seen AND severity_id IN (:_severity_ids)";
Map <String, Object> fields = createSharedFields(duration, unit);
fields.put("_severity_ids", severityIds);
Expand Down Expand Up @@ -653,11 +649,11 @@ public int ageEvents(long agingInterval, TimeUnit unit,

Map<String, Object> fields = new HashMap<String, Object>();
fields.put(COLUMN_STATUS_ID, EventStatus.STATUS_AGED.getNumber());
fields.put(COLUMN_CLOSED_STATUS, ZepConstants.CLOSED_STATUSES.contains(EventStatus.STATUS_AGED));
fields.put(COLUMN_STATUS_CHANGE, timestampConverter.toDatabaseType(now));
fields.put(COLUMN_UPDATE_TIME, timestampConverter.toDatabaseType(now));
fields.put(COLUMN_LAST_SEEN, timestampConverter.toDatabaseType(ageTs));
fields.put("_severity_ids", severityIds);
fields.put("_closed_status_ids", CLOSED_STATUS_IDS);
fields.put("_limit", limit);

final String updateSql;
Expand All @@ -667,30 +663,32 @@ public int ageEvents(long agingInterval, TimeUnit unit,
"FROM event_summary " +
" WHERE last_seen < :last_seen AND" +
" severity_id IN (:_severity_ids) AND" +
" status_id NOT IN (:_closed_status_ids) LIMIT :_limit";
" closed_status = FALSE LIMIT :_limit";
this.template.update(indexSql, fields);

// Use UPDATE ... LIMIT
updateSql = "UPDATE event_summary SET" +
" status_id=:status_id,status_change=:status_change,update_time=:update_time" +
",closed_status=:closed_status" +
" WHERE last_seen < :last_seen AND severity_id IN (:_severity_ids)" +
" AND status_id NOT IN (:_closed_status_ids) LIMIT :_limit";
" AND closed_status = FALSE LIMIT :_limit";
}
else if (databaseCompatibility.getDatabaseType() == DatabaseType.POSTGRESQL) {
String indexSql = "INSERT INTO event_summary_index_queue (uuid, update_time) " +
"SELECT uuid, " + String.valueOf(now) + " " +
"FROM event_summary " +
" WHERE uuid IN (SELECT uuid FROM event_summary WHERE" +
" last_seen < :last_seen AND severity_id IN (:_severity_ids)" +
" AND status_id NOT IN (:_closed_status_ids) LIMIT :_limit)";
" AND closed_status = FALSE LIMIT :_limit)";
this.template.update(indexSql, fields);

// Use UPDATE ... WHERE pk IN (SELECT ... LIMIT)
updateSql = "UPDATE event_summary SET" +
" status_id=:status_id,status_change=:status_change,update_time=:update_time" +
",closed_status=:closed_status" +
" WHERE uuid IN (SELECT uuid FROM event_summary WHERE" +
" last_seen < :last_seen AND severity_id IN (:_severity_ids)" +
" AND status_id NOT IN (:_closed_status_ids) LIMIT :_limit)";
" AND closed_status = FALSE LIMIT :_limit)";
}
else {
throw new IllegalStateException("Unsupported database type: " + databaseCompatibility.getDatabaseType());
Expand Down Expand Up @@ -794,6 +792,7 @@ private int update(final List<String> uuids, final EventStatus status, final Eve
final Map<String,Object> fields = updateFields.toMap(uuidConverter);
fields.put(COLUMN_STATUS_ID, status.getNumber());
fields.put(COLUMN_STATUS_CHANGE, timestampConverter.toDatabaseType(now));
fields.put(COLUMN_CLOSED_STATUS, ZepConstants.CLOSED_STATUSES.contains(status));
fields.put(COLUMN_UPDATE_TIME, timestampConverter.toDatabaseType(now));
fields.put("_uuids", TypeConverterUtils.batchToDatabaseType(uuidConverter, uuids));
// If we aren't acknowledging events, we need to clear out the current user name / UUID values
Expand Down Expand Up @@ -883,11 +882,13 @@ public Map<String, Object> mapRow(ResultSet rs, int rowNum) throws SQLException
final String newFingerprint;
// When closing an event, give it a unique fingerprint hash
if (ZepConstants.CLOSED_STATUSES.contains(status)) {
updateFields.put(COLUMN_CLOSED_STATUS, Boolean.TRUE);
newFingerprint = EventDaoUtils.join('|', fingerprint, Long.toString(now));
}
// When re-opening an event, give it the true fingerprint_hash. This is required to correctly
// de-duplicate events.
else {
updateFields.put(COLUMN_CLOSED_STATUS, Boolean.FALSE);
newFingerprint = fingerprint;
}

Expand All @@ -910,7 +911,7 @@ public Map<String, Object> mapRow(ResultSet rs, int rowNum) throws SQLException
}, fields);

final String updateSql = "UPDATE event_summary SET status_id=:status_id,status_change=:status_change," +
"update_time=:update_time,"+
"closed_status=:closed_status,update_time=:update_time,"+
(status != EventStatus.STATUS_CLOSED && status != EventStatus.STATUS_CLEARED ? "current_user_uuid=:current_user_uuid,current_user_name=:current_user_name,":"" ) +
"cleared_by_event_uuid=:cleared_by_event_uuid,fingerprint_hash=:fingerprint_hash," +
"audit_json=:audit_json WHERE uuid=:uuid";
Expand Down Expand Up @@ -955,14 +956,13 @@ private Map<String, Object> createSharedFields(long duration, TimeUnit unit) {
Object lastSeen = timestampConverter.toDatabaseType(delta);
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("_last_seen", lastSeen);
fields.put("_closed_status_ids", CLOSED_STATUS_IDS);
return fields;
}

@Override
@Timed
public long getArchiveEligibleEventCount(long duration, TimeUnit unit) {
String sql = "SELECT COUNT(*) FROM event_summary WHERE status_id IN (:_closed_status_ids) AND last_seen < :_last_seen";
String sql = "SELECT COUNT(*) FROM event_summary WHERE closed_status = TRUE AND last_seen < :_last_seen";
Map<String, Object> fields = createSharedFields(duration, unit);
return template.queryForInt(sql, fields);
}
Expand All @@ -974,7 +974,7 @@ public int archive(long duration, TimeUnit unit, int limit) throws ZepException
Map<String, Object> fields = createSharedFields(duration, unit);
fields.put("_limit", limit);

final String sql = "SELECT uuid FROM event_summary WHERE status_id IN (:_closed_status_ids) AND "
final String sql = "SELECT uuid FROM event_summary WHERE closed_status = TRUE AND "
+ "last_seen < :_last_seen LIMIT :_limit FOR UPDATE";
final List<String> uuids = this.template.query(sql, new RowMapper<String>() {
@Override
Expand Down Expand Up @@ -1040,7 +1040,6 @@ public int archive(List<String> uuids) throws ZepException {
Map<String, Object> fields = new HashMap<String,Object>();
fields.put(COLUMN_UPDATE_TIME, timestampConverter.toDatabaseType(System.currentTimeMillis()));
fields.put("_uuids", TypeConverterUtils.batchToDatabaseType(uuidConverter, uuids));
fields.put("_closed_status_ids", CLOSED_STATUS_IDS);
StringBuilder selectColumns = new StringBuilder();

for (Iterator<String> it = this.archiveColumnNames.iterator(); it.hasNext();) {
Expand All @@ -1060,15 +1059,15 @@ public int archive(List<String> uuids) throws ZepException {
this.template.update("INSERT INTO event_summary_index_queue (uuid, update_time) "
+ "SELECT uuid, " + String.valueOf(updateTime) + " "
+ "FROM event_summary" +
" WHERE uuid IN (:_uuids) AND status_id IN (:_closed_status_ids)",
" WHERE uuid IN (:_uuids) AND closed_status = TRUE",
fields);

String insertSql = String.format("INSERT INTO event_archive (%s) SELECT %s FROM event_summary" +
" WHERE uuid IN (:_uuids) AND status_id IN (:_closed_status_ids) ON DUPLICATE KEY UPDATE summary=event_summary.summary",
" WHERE uuid IN (:_uuids) AND closed_status = TRUE ON DUPLICATE KEY UPDATE summary=event_summary.summary",
StringUtils.collectionToCommaDelimitedString(this.archiveColumnNames), selectColumns);

this.template.update(insertSql, fields);
final int updated = this.template.update("DELETE FROM event_summary WHERE uuid IN (:_uuids) AND status_id IN (:_closed_status_ids)",
final int updated = this.template.update("DELETE FROM event_summary WHERE uuid IN (:_uuids) AND closed_status = TRUE",
fields);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
Expand Down Expand Up @@ -1099,9 +1098,11 @@ public void importEvent(EventSummary eventSummary) throws ZepException {
if (ZepConstants.CLOSED_STATUSES.contains(eventSummary.getStatus())) {
String uniqueFingerprint = (String) fields.get(COLUMN_FINGERPRINT) + '|' + updateTime;
fields.put(COLUMN_FINGERPRINT_HASH, DaoUtils.sha1(uniqueFingerprint));
fields.put(COLUMN_CLOSED_STATUS, Boolean.TRUE);
}
else {
fields.put(COLUMN_FINGERPRINT_HASH, DaoUtils.sha1((String)fields.get(COLUMN_FINGERPRINT)));
fields.put(COLUMN_CLOSED_STATUS, Boolean.FALSE);
}

if (eventSummary.getOccurrence(0).getSeverity() != EventSeverity.SEVERITY_CLEAR) {
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/sql/mysql/007.sql
@@ -0,0 +1,46 @@
-- Copyright (C) 2015, Zenoss Inc. All Rights Reserved.

--
-- Adds closed_status column and improves related composite indexes.
--

ALTER TABLE event_summary ADD COLUMN closed_status TINYINT(1) NOT NULL DEFAULT 0;
UPDATE event_summary SET closed_status = 1 WHERE status_id IN (3,4,6);
ALTER TABLE event_summary ALTER COLUMN closed_status DROP DEFAULT;

DROP PROCEDURE IF EXISTS drop_index_if_exists;
DELIMITER $$
CREATE PROCEDURE drop_index_if_exists(IN tname VARCHAR(64), IN idx_name VARCHAR(64))
BEGIN
IF EXISTS(SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE table_schema = DATABASE()
AND UPPER(table_name) = tname AND UPPER(index_name) = idx_name)
THEN
SET @drop_sql = CONCAT('DROP INDEX ',idx_name,' ON ',tname,';');
PREPARE stmt FROM @drop_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END IF;
END
$$
DELIMITER ;

-- These are no longer useful.
CALL drop_index_if_exists('event_summary','event_summary_age_idx');
CALL drop_index_if_exists('event_summary','event_summary_archive_idx');

-- A few installs had these indexes created manually.
CALL drop_index_if_exists('event_summary','event_summary_last_seen');
CALL drop_index_if_exists('event_summary','event_summary_last_seen_idx');

CALL drop_index_if_exists('event_summary','event_summary_closed_last_seen_idx');
CREATE INDEX event_summary_closed_last_seen_idx ON event_summary(closed_status, last_seen);

CALL drop_index_if_exists('event_summary','event_summary_closed_uuid_idx');
CREATE INDEX event_summary_closed_uuid_idx ON event_summary(closed_status, uuid);

CALL drop_index_if_exists('event_summary','event_summary_clear_idx');
CREATE INDEX event_summary_clear_idx ON event_summary(closed_status, clear_fingerprint_hash, last_seen);

DROP PROCEDURE IF EXISTS drop_index_if_exists;

INSERT INTO schema_version (version, installed_time) VALUES(7, NOW());
21 changes: 21 additions & 0 deletions core/src/main/sql/postgresql/003.sql
@@ -0,0 +1,21 @@
--
-- Adds closed_status column and improves related composite indexes.
--

DROP INDEX IF EXISTS event_summary_age_idx;
DROP INDEX IF EXISTS event_summary_archive_idx;

ALTER TABLE event_summary ADD COLUMN closed_status BOOLEAN NOT NULL DEFAULT FALSE;
UPDATE event_summary SET closed_status = TRUE WHERE status_id IN (3,4,6);
ALTER TABLE event_summary ALTER COLUMN closed_status DROP DEFAULT;

DROP INDEX IF EXISTS event_summary_closed_last_seen_idx;
CREATE INDEX event_summary_closed_last_seen_idx ON event_summary(closed_status, last_seen);

DROP INDEX IF EXISTS event_summary_closed_uuid_idx;
CREATE INDEX event_summary_closed_uuid_idx ON event_summary(closed_status, uuid);

DROP INDEX IF EXISTS event_summary_clear_idx;
CREATE INDEX event_summary_clear_idx ON event_summary(closed_status, clear_fingerprint_hash, last_seen);

INSERT INTO schema_version (version, installed_time) VALUES(3, NOW());

0 comments on commit b0339c7

Please sign in to comment.