Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Merge master twitter only #131

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bin/create_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,18 @@
# flow_event - stores events fired during pig job execution
create 'flow_event', {NAME => 'i', VERSIONS => 3, COMPRESSION => 'LZO', BLOOMFILTER => 'ROW'}

# job_history_agg_daily - stores daily aggregated job info
# the s column family has a TTL of 30 days, it's used as a scratch col family
# it stores the run ids that are seen for that day
# we assume that a flow will not run for more than 30 days, hence it's fine to "expire" that data
create 'job_history_agg_daily', {NAME => 'i', COMPRESSION => 'LZO', BLOOMFILTER => 'ROWCOL'},
{NAME => 's', VERSIONS => 1, COMPRESSION => 'LZO', BLOCKCACHE => false, TTL => '2592000'}

# job_history_agg_weekly - stores weekly aggregated job info
# the s column family has a TTL of 30 days
# it stores the run ids that are seen for that week
# we assume that a flow will not run for more than 30 days, hence it's fine to "expire" that data
create 'job_history_agg_weekly', {NAME => 'i', COMPRESSION => 'LZO', BLOOMFILTER => 'ROWCOL'},
{NAME => 's', VERSIONS => 1, COMPRESSION => 'LZO', BLOCKCACHE => false, TTL => '2592000'}

exit
133 changes: 133 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/AggregationConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.hraven;

import org.apache.hadoop.hbase.util.Bytes;

/**
* defines the aggregation related constants
*
*/
public class AggregationConstants {

// name the table as job_history_agg_daily
public static final String AGG_DAILY_TABLE = Constants.HISTORY_TABLE + "_agg_daily";
public static final byte[] AGG_DAILY_TABLE_BYTES = Bytes.toBytes(AGG_DAILY_TABLE);

//name the table as job_history_agg_weekly
public static final String AGG_WEEKLY_TABLE = Constants.HISTORY_TABLE + "_agg_weekly";
public static final byte[] AGG_WEEKLY_TABLE_BYTES = Bytes.toBytes(AGG_WEEKLY_TABLE);

public static final String INFO_FAM = "i";
public static final byte[] INFO_FAM_BYTES = Bytes.toBytes(INFO_FAM);

/**
* The s column family has a TTL of 30 days
* It's used as a scratch column family
* It stores the run ids that are seen for that day
* we assume that a flow will not run for more than 30 days,
* hence it's fine to "expire" that data
*/
public static final String SCRATCH_FAM = "s";
public static final byte[] SCRATCH_FAM_BYTES = Bytes.toBytes(SCRATCH_FAM);

/** parameter that specifies whether or not to aggregate */
public static final String AGGREGATION_FLAG_NAME = "aggregate";

/**
* name of the flag that determines whether or not re-aggregate
* (overrides aggregation status in raw table for that job)
*/
public static final String RE_AGGREGATION_FLAG_NAME = "reaggregate";

/** column name for app id in aggregation table */
public static final String APP_ID_COL = "app_id";
public static final byte[] APP_ID_COL_BYTES = Bytes.toBytes(APP_ID_COL.toLowerCase());

/**
* the number of runs in an aggregation
*/
public static final String NUMBER_RUNS = "number_runs";

/** raw bytes representation of the number of runs parameter */
public static final byte[] NUMBER_RUNS_BYTES = Bytes.toBytes(NUMBER_RUNS.toLowerCase());

/**
* the user who ran this app
*/
public static final String USER = "user";
public static final byte[] USER_BYTES = Bytes.toBytes(USER.toLowerCase());

/**
* the number of jobs in an aggregation
*/
public static final String TOTAL_JOBS = "total_jobs";

/** raw bytes representation of the number jobs parameter */
public static final byte[] TOTAL_JOBS_BYTES = Bytes.toBytes(TOTAL_JOBS.toLowerCase());

/**
* use this config setting to define an hadoop-version-independent property for queuename
*/
public static final String HRAVEN_QUEUE = "queue";

/** raw bytes representation of the queue parameter */
public static final byte[] HRAVEN_QUEUE_BYTES = Bytes.toBytes(HRAVEN_QUEUE.toLowerCase());

/** total maps and reduces */
public static final String TOTAL_MAPS = "total_maps";
public static final byte[] TOTAL_MAPS_BYTES = Bytes.toBytes(TOTAL_MAPS.toLowerCase());
public static final String TOTAL_REDUCES = "total_reduces";
public static final byte[] TOTAL_REDUCES_BYTES = Bytes.toBytes(TOTAL_REDUCES.toLowerCase());

/** slot millis for maps and reduces */
public static final String SLOTS_MILLIS_MAPS = "slots_millis_maps";
public static final byte[] SLOTS_MILLIS_MAPS_BYTES = Bytes.toBytes(SLOTS_MILLIS_MAPS
.toLowerCase());
public static final String SLOTS_MILLIS_REDUCES = "slots_millis_reduces";
public static final byte[] SLOTS_MILLIS_REDUCES_BYTES = Bytes.toBytes(SLOTS_MILLIS_REDUCES
.toLowerCase());

/** used to indicate how expensive a job is in terms of memory and time taken */
public static final String MEGABYTEMILLIS = "megabytemillis";
public static final byte[] MEGABYTEMILLIS_BYTES = Bytes.toBytes(MEGABYTEMILLIS.toLowerCase());

/** used to indicate the cost of a job is in terms of currency units */
public static final String JOBCOST = "jobcost";
public static final byte[] JOBCOST_BYTES = Bytes.toBytes(JOBCOST.toLowerCase());

public static final String JOB_DAILY_AGGREGATION_STATUS_COL = "daily_aggregation_status";
public static final byte[] JOB_DAILY_AGGREGATION_STATUS_COL_BYTES =
Bytes.toBytes(JOB_DAILY_AGGREGATION_STATUS_COL);

public static final String JOB_WEEKLY_AGGREGATION_STATUS_COL = "weekly_aggregation_status";
public static final byte[] JOB_WEEKLY_AGGREGATION_STATUS_COL_BYTES =
Bytes.toBytes(JOB_WEEKLY_AGGREGATION_STATUS_COL);

/**
* number of retries for check and put
*/
public static final int RETRY_COUNT = 100;

/**
* type of aggregation : daily, weekly
*/
public enum AGGREGATION_TYPE {
DAILY,
WEEKLY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.hraven;

/**
* Represents the row key that stores Aggregations for an app
*/
public class AppAggregationKey extends AppKey {

/**
* timestamp stored as part of row key in aggregation table
* this is usually top of the day or top of the week timestamp
* All apps that belong to that day (or that week for weekly aggregations)
* have the same aggregation id
*
* If a {@link Flow} (like a pig job or a scalding job) spans more than a 1 day,
* the aggregationId is the day that the first job in that Flow started running,
* which is the submitTime or runId of that {@link Flow}
*/
private long aggregationId;

public AppAggregationKey(String cluster, String userName, String appId, Long ts) {
super(cluster, userName, appId);
this.setAggregationId(ts);
}

public long getAggregationId() {
return aggregationId;
}

public void setAggregationId(long aggregationId) {
this.aggregationId = aggregationId;
}

/**
* Encodes the given timestamp for ordering by run ID
*/
public static long encodeAggregationId(long timestamp) {
return Long.MAX_VALUE - timestamp;
}

/**
* Inverted version of {@link AppAggregationKey#getaggregationId()} used in the byte representation for
* reverse chronological sorting.
* @return
*/
public long getEncodedAggregationId() {
return encodeAggregationId(aggregationId);
}

}
28 changes: 28 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/AppSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.annotate.JsonSerialize;

import com.twitter.hraven.datasource.ProcessingException;
Expand Down Expand Up @@ -105,6 +106,33 @@ public Set<String> getQueue() {
return queues;
}

/**
* @returns the set of queue names as a {@link String}
*/
public String getQueuesAsString() {
StringBuilder sb = new StringBuilder();
for (String q : queues) {
sb.append(q);
sb.append(Constants.SEP);
}
return sb.toString();
}

/**
* Adding to {@link Set} of queue names from a
* {@link String} representation
* @param {@link String}
*/
public void setQueuesFromString(String queueList) {

if (StringUtils.isNotBlank(queueList)) {
String[] queues = queueList.split(Constants.SEP);
for (String q : queues) {
this.queues.add(q);
}
}
}

public void setQueue(Set<String> queue) {
this.queues = queue;
}
Expand Down
28 changes: 15 additions & 13 deletions hraven-core/src/main/java/com/twitter/hraven/HdfsStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.hadoop.hbase.client.Result;
import org.codehaus.jackson.map.annotate.JsonSerialize;

import com.twitter.hraven.util.ByteUtil;


/**
* An HdfsStats object represents information about a particular
Expand Down Expand Up @@ -251,21 +253,21 @@ public void populate(Result result) {
NavigableMap<byte[], byte[]> infoValues =
result.getFamilyMap(HdfsConstants.DISK_INFO_FAM_BYTES);

this.fileCount += JobDetails.getValueAsLong(HdfsConstants.FILE_COUNT_COLUMN_BYTES, infoValues);
this.dirCount += JobDetails.getValueAsLong(HdfsConstants.DIR_COUNT_COLUMN_BYTES, infoValues);
this.spaceConsumed += JobDetails.getValueAsLong(HdfsConstants.SPACE_CONSUMED_COLUMN_BYTES,
this.fileCount += ByteUtil.getValueAsLong(HdfsConstants.FILE_COUNT_COLUMN_BYTES, infoValues);
this.dirCount += ByteUtil.getValueAsLong(HdfsConstants.DIR_COUNT_COLUMN_BYTES, infoValues);
this.spaceConsumed += ByteUtil.getValueAsLong(HdfsConstants.SPACE_CONSUMED_COLUMN_BYTES,
infoValues);
this.accessCountTotal += JobDetails.getValueAsLong(HdfsConstants.ACCESS_COUNT_TOTAL_COLUMN_BYTES,
this.accessCountTotal += ByteUtil.getValueAsLong(HdfsConstants.ACCESS_COUNT_TOTAL_COLUMN_BYTES,
infoValues);
this.owner = JobDetails.getValueAsString(HdfsConstants.OWNER_COLUMN_BYTES, infoValues);
this.quota += JobDetails.getValueAsLong(HdfsConstants.QUOTA_COLUMN_BYTES, infoValues);
this.spaceQuota += JobDetails.getValueAsLong(HdfsConstants.SPACE_QUOTA_COLUMN_BYTES, infoValues);
this.tmpFileCount += JobDetails.getValueAsLong(HdfsConstants.TMP_FILE_COUNT_COLUMN_BYTES, infoValues);
this.tmpSpaceConsumed += JobDetails.getValueAsLong(HdfsConstants.TMP_SPACE_CONSUMED_COLUMN_BYTES, infoValues);
this.trashFileCount += JobDetails.getValueAsLong(HdfsConstants.TRASH_FILE_COUNT_COLUMN_BYTES, infoValues);
this.trashSpaceConsumed += JobDetails.getValueAsLong(HdfsConstants.TRASH_SPACE_CONSUMED_COLUMN_BYTES, infoValues);
this.accessCost += JobDetails.getValueAsDouble(HdfsConstants.ACCESS_COST_COLUMN_BYTES, infoValues);
this.storageCost += JobDetails.getValueAsDouble(HdfsConstants.STORAGE_COST_COLUMN_BYTES, infoValues);
this.owner = ByteUtil.getValueAsString(HdfsConstants.OWNER_COLUMN_BYTES, infoValues);
this.quota += ByteUtil.getValueAsLong(HdfsConstants.QUOTA_COLUMN_BYTES, infoValues);
this.spaceQuota += ByteUtil.getValueAsLong(HdfsConstants.SPACE_QUOTA_COLUMN_BYTES, infoValues);
this.tmpFileCount += ByteUtil.getValueAsLong(HdfsConstants.TMP_FILE_COUNT_COLUMN_BYTES, infoValues);
this.tmpSpaceConsumed += ByteUtil.getValueAsLong(HdfsConstants.TMP_SPACE_CONSUMED_COLUMN_BYTES, infoValues);
this.trashFileCount += ByteUtil.getValueAsLong(HdfsConstants.TRASH_FILE_COUNT_COLUMN_BYTES, infoValues);
this.trashSpaceConsumed += ByteUtil.getValueAsLong(HdfsConstants.TRASH_SPACE_CONSUMED_COLUMN_BYTES, infoValues);
this.accessCost += ByteUtil.getValueAsDouble(HdfsConstants.ACCESS_COST_COLUMN_BYTES, infoValues);
this.storageCost += ByteUtil.getValueAsDouble(HdfsConstants.STORAGE_COST_COLUMN_BYTES, infoValues);
this.hdfsCost = calculateHDFSCost();

}
Expand Down
Loading