Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 1126410] Update data migrator to use the latest storage node schema.
Browse files Browse the repository at this point in the history
(cherry picked from commit 942bc49)
Signed-off-by: Jay Shaughnessy <jshaughn@redhat.com>
  • Loading branch information
Stefan Negrea authored and jshaughn committed Sep 18, 2014
1 parent 579ee3f commit 1f359ea
Show file tree
Hide file tree
Showing 15 changed files with 216 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.rhq.server.metrics.domain.MetricsTable;
import org.rhq.server.metrics.migrator.workers.AggregateDataMigrator;
import org.rhq.server.metrics.migrator.workers.CallableMigrationWorker;
import org.rhq.server.metrics.migrator.workers.DeleteAllData;
import org.rhq.server.metrics.migrator.workers.MigrationTable;
import org.rhq.server.metrics.migrator.workers.RawDataMigrator;


Expand Down Expand Up @@ -115,15 +115,15 @@ public long estimate() throws Exception {
}

if (config.isRun1HAggregateDataMigration()) {
retryOnFailure(new AggregateDataMigrator(MetricsTable.ONE_HOUR, config), Task.Estimate);
retryOnFailure(new AggregateDataMigrator(MigrationTable.ONE_HOUR, config), Task.Estimate);
}

if (config.isRun6HAggregateDataMigration()) {
retryOnFailure(new AggregateDataMigrator(MetricsTable.SIX_HOUR, config), Task.Estimate);
retryOnFailure(new AggregateDataMigrator(MigrationTable.SIX_HOUR, config), Task.Estimate);
}

if (config.isRun1DAggregateDataMigration()) {
retryOnFailure(new AggregateDataMigrator(MetricsTable.TWENTY_FOUR_HOUR, config), Task.Estimate);
retryOnFailure(new AggregateDataMigrator(MigrationTable.TWENTY_FOUR_HOUR, config), Task.Estimate);
}

if (config.isDeleteAllDataAtEndOfMigration()) {
Expand All @@ -141,15 +141,15 @@ public void migrateData() throws Exception {
}

if (config.isRun1HAggregateDataMigration()) {
retryOnFailure(new AggregateDataMigrator(MetricsTable.ONE_HOUR, config), Task.Migrate);
retryOnFailure(new AggregateDataMigrator(MigrationTable.ONE_HOUR, config), Task.Migrate);
}

if (config.isRun6HAggregateDataMigration()) {
retryOnFailure(new AggregateDataMigrator(MetricsTable.SIX_HOUR, config), Task.Migrate);
retryOnFailure(new AggregateDataMigrator(MigrationTable.SIX_HOUR, config), Task.Migrate);
}

if (config.isRun1DAggregateDataMigration()) {
retryOnFailure(new AggregateDataMigrator(MetricsTable.TWENTY_FOUR_HOUR, config), Task.Migrate);
retryOnFailure(new AggregateDataMigrator(MigrationTable.TWENTY_FOUR_HOUR, config), Task.Migrate);
}

if (config.isDeleteAllDataAtEndOfMigration()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.rhq.server.metrics.migrator.DataMigrator.DatabaseType;
import org.rhq.server.metrics.migrator.workers.AggregateDataMigrator;
import org.rhq.server.metrics.migrator.workers.DeleteAllData;
import org.rhq.server.metrics.migrator.workers.MetricsIndexUpdateAccumulator;
import org.rhq.server.metrics.migrator.workers.MetricsIndexMigrator;
import org.rhq.server.metrics.migrator.workers.RawDataMigrator;


Expand Down Expand Up @@ -210,7 +210,7 @@ private static void setLogLevel(Level level) {

//force change some of the logger levels
Class[] clazzes = new Class[] { DataMigratorRunner.class, DataMigrator.class, RawDataMigrator.class,
DeleteAllData.class, AggregateDataMigrator.class, MetricsIndexUpdateAccumulator.class };
DeleteAllData.class, AggregateDataMigrator.class, MetricsIndexMigrator.class };
for (Class clazz : clazzes) {
migratorLogging = root.getLogger(clazz);
if (Level.DEBUG.equals(level)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* RHQ Management Platform
* Copyright (C) 2005-2014 Red Hat, Inc.
* All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/

package org.rhq.server.metrics.migrator.datasources;

import java.io.BufferedWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,47 +53,47 @@ public class AggregateDataMigrator extends AbstractMigrationWorker implements Ca
private final String selectQuery;
private final String deleteQuery;
private final String countQuery;
private final MetricsTable metricsTable;
private final MetricsIndexUpdateAccumulator metricsIndexAccumulator;
private final MigrationTable migrationTable;
private final MetricsIndexMigrator metricsIndexAccumulator;

/**
* @param query
* @param metricsTable
* @param migrationTable
*/
public AggregateDataMigrator(MetricsTable metricsTable, DataMigratorConfiguration config)
public AggregateDataMigrator(MigrationTable migrationTable, DataMigratorConfiguration config)
throws Exception {

this.metricsTable = metricsTable;
this.migrationTable = migrationTable;
this.config = config;

if (MetricsTable.ONE_HOUR.equals(this.metricsTable)) {
if (MigrationTable.ONE_HOUR.equals(this.migrationTable)) {
this.selectQuery = MigrationQuery.SELECT_1H_DATA.toString();
this.deleteQuery = MigrationQuery.DELETE_1H_DATA.toString();
this.countQuery = MigrationQuery.COUNT_1H_DATA.toString();
} else if (MetricsTable.SIX_HOUR.equals(this.metricsTable)) {
} else if (MigrationTable.SIX_HOUR.equals(this.migrationTable)) {
this.selectQuery = MigrationQuery.SELECT_6H_DATA.toString();
this.deleteQuery = MigrationQuery.DELETE_6H_DATA.toString();
this.countQuery = MigrationQuery.COUNT_6H_DATA.toString();
} else if (MetricsTable.TWENTY_FOUR_HOUR.equals(this.metricsTable)) {
} else if (MigrationTable.TWENTY_FOUR_HOUR.equals(this.migrationTable)) {
this.selectQuery = MigrationQuery.SELECT_1D_DATA.toString();
this.deleteQuery = MigrationQuery.DELETE_1D_DATA.toString();
this.countQuery = MigrationQuery.COUNT_1D_DATA.toString();
} else {
throw new Exception("MetricsTable " + metricsTable.toString() + " not supported by this migrator.");
throw new Exception("MigrationTable " + migrationTable.toString() + " not supported by this migrator.");
}

metricsIndexAccumulator = new MetricsIndexUpdateAccumulator(metricsTable, config);
metricsIndexAccumulator = new MetricsIndexMigrator(migrationTable, config);
}

@Override
public long estimate() throws Exception {
long recordCount = this.getRowCount(this.countQuery);
log.debug("Retrieved record count for table " + metricsTable.toString() + " -- " + recordCount);
log.debug("Retrieved record count for table " + migrationTable.toString() + " -- " + recordCount);

Telemetry telemetry = this.performMigration(Task.Estimate);
long estimatedTimeToMigrate = telemetry.getMigrationTime();

long estimation = (recordCount / (long) MAX_RECORDS_TO_LOAD_FROM_SQL / (long) NUMBER_OF_BATCHES_FOR_ESTIMATION)
long estimation = (recordCount / MAX_RECORDS_TO_LOAD_FROM_SQL / NUMBER_OF_BATCHES_FOR_ESTIMATION)
* estimatedTimeToMigrate;

estimation += telemetry.getNonMigrationTime();
Expand Down Expand Up @@ -131,9 +131,9 @@ private void deleteTableData() throws Exception {
nativeQuery.executeUpdate();
session.getTransaction().commit();
closeSQLSession(session);
log.info("- " + metricsTable.toString() + " - Cleaned -");
log.info("- " + migrationTable.toString() + " - Cleaned -");
} catch (Exception e) {
log.error("Failed to delete " + metricsTable.toString()
log.error("Failed to delete " + migrationTable.toString()
+ " data. Attempting to delete data one more time...");

failureCount++;
Expand Down Expand Up @@ -173,7 +173,7 @@ private Telemetry performMigration(Task task) throws Exception {
insertDataToCassandra(existingData);
break;
} catch (Exception e) {
log.error("Failed to insert " + metricsTable.toString()
log.error("Failed to insert " + migrationTable.toString()
+ " data. Attempting to insert the current batch of data one more time");
log.error(e);

Expand All @@ -184,7 +184,7 @@ private Telemetry performMigration(Task task) throws Exception {
}
}

log.info("- " + metricsTable + " - " + lastMigratedRecord + " -");
log.info("- " + migrationTable + " - " + lastMigratedRecord + " -");

numberOfBatchesMigrated++;
if (Task.Estimate.equals(task) && numberOfBatchesMigrated >= NUMBER_OF_BATCHES_FOR_ESTIMATION) {
Expand Down Expand Up @@ -213,7 +213,7 @@ private void insertDataToCassandra(List<Object[]> existingData) throws Exception
long creationTimeMillis;
long itemTTLSeconds;
long currentTimeMillis = System.currentTimeMillis();
long expectedTTLMillis = metricsTable.getTTLinMilliseconds();
long expectedTTLMillis = migrationTable.getTTLinMilliseconds();

for (Object[] rawMeasurement : existingData) {
creationTimeMillis = Long.parseLong(rawMeasurement[MigrationQuery.TIMESTAMP_INDEX].toString());
Expand All @@ -223,22 +223,16 @@ private void insertDataToCassandra(List<Object[]> existingData) throws Exception
int scheduleId = Integer.parseInt(rawMeasurement[MigrationQuery.SCHEDULE_INDEX].toString());
Date time = new Date(creationTimeMillis);

batch.add(QueryBuilder.insertInto(metricsTable.toString()).value("schedule_id", scheduleId)
.value("time", time).value("type", AggregateType.AVG.ordinal())
.value("value", Double.parseDouble(rawMeasurement[MigrationQuery.VALUE_INDEX].toString()))
batch.add(QueryBuilder.insertInto(MetricsTable.AGGREGATE.toString())
.value("schedule_id", scheduleId)
.value("bucket", migrationTable.getMigrationBucket().toString())
.value("time", time)
.value("avg", Double.parseDouble(rawMeasurement[MigrationQuery.VALUE_INDEX].toString()))
.value("min", Double.parseDouble(rawMeasurement[MigrationQuery.MIN_VALUE_INDEX].toString()))
.value("max", Double.parseDouble(rawMeasurement[MigrationQuery.MAX_VALUE_INDEX].toString()))
.using(ttl((int) itemTTLSeconds)));

batch.add(QueryBuilder.insertInto(metricsTable.toString()).value("schedule_id", scheduleId)
.value("time", time).value("type", AggregateType.MIN.ordinal())
.value("value", Double.parseDouble(rawMeasurement[MigrationQuery.MIN_VALUE_INDEX].toString()))
.using(ttl((int) itemTTLSeconds)));

batch.add(QueryBuilder.insertInto(metricsTable.toString()).value("schedule_id", scheduleId)
.value("time", time).value("type", AggregateType.MAX.ordinal())
.value("value", Double.parseDouble(rawMeasurement[MigrationQuery.MAX_VALUE_INDEX].toString()))
.using(ttl((int) itemTTLSeconds)));

batchSize += 3;
batchSize++;

metricsIndexAccumulator.add(scheduleId, creationTimeMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@
* @author Stefan Negrea
*
*/
public class MetricsIndexUpdateAccumulator extends AbstractMigrationWorker {
public class MetricsIndexMigrator extends AbstractMigrationWorker {
private static final int MAX_SIZE = 3000;

private final Log log = LogFactory.getLog(MetricsIndexUpdateAccumulator.class);
private final Log log = LogFactory.getLog(MetricsIndexMigrator.class);

private final DateTimeService dateTimeService = new DateTimeService();
private final MetricsConfiguration configuration = new MetricsConfiguration();
private final MetricsConfiguration metricsConfiguration = new MetricsConfiguration();
private final Map<Integer, Set<Long>> accumulator = new HashMap<Integer, Set<Long>>();

private final MetricsTable table;
private final DataMigratorConfiguration config;
private final MigrationTable table;
private final DataMigratorConfiguration migratorConfiguration;

private final long timeLimit;
private final PreparedStatement updateMetricsIndex;
Expand All @@ -67,16 +67,18 @@ public class MetricsIndexUpdateAccumulator extends AbstractMigrationWorker {

private int currentCount = 0;

public MetricsIndexUpdateAccumulator(MetricsTable table, DataMigratorConfiguration config) {
public MetricsIndexMigrator(MigrationTable table, DataMigratorConfiguration config) {
this.table = table;
this.config = config;
this.migratorConfiguration = config;

if (MetricsTable.RAW.equals(table) || MetricsTable.ONE_HOUR.equals(table)
|| MetricsTable.SIX_HOUR.equals(table)) {
this.sliceDuration = configuration.getTimeSliceDuration(table);
if (MigrationTable.RAW.equals(table) || MigrationTable.ONE_HOUR.equals(table)
|| MigrationTable.SIX_HOUR.equals(table)) {
this.sliceDuration = this.getTimeSliceDuration(table);
this.timeLimit = this.getLastAggregationTime(table) - this.sliceDuration.getMillis();
this.updateMetricsIndex = config.getSession().prepare(
"INSERT INTO metrics_index (bucket, time, schedule_id) VALUES (?, ?, ?)");
"INSERT INTO " + MetricsTable.INDEX + " " +
"(bucket, partition, time, schedule_id) " +
"VALUES (?, ?, ?, ?) ");
this.validAccumulatorTable = true;
} else {
this.timeLimit = Integer.MAX_VALUE;
Expand Down Expand Up @@ -120,9 +122,12 @@ public void drain() throws Exception {

for (Map.Entry<Integer, Set<Long>> entry : accumulator.entrySet()) {
for (Long timestamp : entry.getValue()) {
BoundStatement statement = updateMetricsIndex.bind(this.table.getTableName(), new Date(timestamp),
entry.getKey());
resultSetFutures.add(config.getSession().executeAsync(statement));
Integer scheduleId = entry.getKey();

BoundStatement statement = updateMetricsIndex.bind(table.getAggregationBucket().toString(),
(scheduleId % metricsConfiguration.getIndexPartitions()), new Date(timestamp), scheduleId);

resultSetFutures.add(migratorConfiguration.getSession().executeAsync(statement));
}
}

Expand All @@ -134,21 +139,21 @@ public void drain() throws Exception {
currentCount = 0;
}

private long getLastAggregationTime(MetricsTable migratedTable) {
StatelessSession session = getSQLSession(config);
private long getLastAggregationTime(MigrationTable migratedTable) {
StatelessSession session = getSQLSession(migratorConfiguration);

long aggregationSlice = Integer.MAX_VALUE;
Duration duration = null;
String queryString = null;

if (MetricsTable.RAW.equals(migratedTable)) {
duration = configuration.getRawTimeSliceDuration();
if (MigrationTable.RAW.equals(migratedTable)) {
duration = metricsConfiguration.getRawTimeSliceDuration();
queryString = MigrationQuery.MAX_TIMESTAMP_1H_DATA.toString();
} else if (MetricsTable.ONE_HOUR.equals(migratedTable)) {
duration = configuration.getOneHourTimeSliceDuration();
} else if (MigrationTable.ONE_HOUR.equals(migratedTable)) {
duration = metricsConfiguration.getOneHourTimeSliceDuration();
queryString = MigrationQuery.MAX_TIMESTAMP_6H_DATA.toString();
} else if (MetricsTable.SIX_HOUR.equals(migratedTable)) {
duration = configuration.getSixHourTimeSliceDuration();
} else if (MigrationTable.SIX_HOUR.equals(migratedTable)) {
duration = metricsConfiguration.getSixHourTimeSliceDuration();
queryString = MigrationQuery.MAX_TIMESTAMP_1D_DATA.toString();
}

Expand All @@ -168,4 +173,17 @@ private long getLastAggregationTime(MetricsTable migratedTable) {

return aggregationSlice;
}

public Duration getTimeSliceDuration(MigrationTable table) {
if (MigrationTable.RAW.equals(table)) {
return metricsConfiguration.getRawTimeSliceDuration();
} else if (MigrationTable.ONE_HOUR.equals(table)) {
return metricsConfiguration.getOneHourTimeSliceDuration();
} else if (MigrationTable.SIX_HOUR.equals(table)) {
return metricsConfiguration.getSixHourTimeSliceDuration();
}

throw new IllegalArgumentException("Time slice duration for " + table.getTableName()
+ " table is not supported");
}
}
Loading

0 comments on commit 1f359ea

Please sign in to comment.