Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #155 from gsteelman/piling_the_dogs_kv_cell
Browse files Browse the repository at this point in the history
Remove KeyValue use in hraven-core
  • Loading branch information
vrushalivc committed Oct 17, 2016
2 parents 916310b + 6733103 commit 4f5c229
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -472,11 +473,11 @@ private boolean updateCost(AppAggregationKey appAggKey, Table aggTable,
Result r = aggTable.get(g);
double existingCost = 0.0;
byte[] existingCostBytes = null;
KeyValue columnLatest = r.getColumnLatest(AggregationConstants.INFO_FAM_BYTES,
Cell columnLatest = r.getColumnLatestCell(AggregationConstants.INFO_FAM_BYTES,
AggregationConstants.JOBCOST_BYTES);

if (columnLatest != null) {
existingCost = Bytes.toDouble(columnLatest.getValue());
existingCost = Bytes.toDouble(CellUtil.cloneValue(columnLatest));
existingCostBytes = Bytes.toBytes(existingCost);
}

Expand Down Expand Up @@ -505,13 +506,13 @@ boolean updateQueue(AppAggregationKey appAggKey, Table aggTable, JobDetails jobD
AggregationConstants.HRAVEN_QUEUE_BYTES);
Result r = aggTable.get(g);

KeyValue existingQueuesKV =
r.getColumnLatest(AggregationConstants.INFO_FAM_BYTES,
Cell existingQueuesCell =
r.getColumnLatestCell(AggregationConstants.INFO_FAM_BYTES,
AggregationConstants.HRAVEN_QUEUE_BYTES);
String existingQueues = null;
byte[] existingQueuesBytes = null;
if (existingQueuesKV != null) {
existingQueues = Bytes.toString(existingQueuesKV.getValue());
if (existingQueuesCell != null) {
existingQueues = Bytes.toString(CellUtil.cloneValue(existingQueuesCell));
existingQueuesBytes = Bytes.toBytes(existingQueues);
}

Expand Down Expand Up @@ -586,7 +587,7 @@ private boolean updateNumberRuns(AppAggregationKey appAggKey, Table aggTable,
// it is possible that some other map task inserted
// the first run id for a job in this flow
// hence we check and put the number of runs in info col family
return incrNumberRuns(r.getColumn
return incrNumberRuns(r.getColumnCells
(AggregationConstants.INFO_FAM_BYTES,
AggregationConstants.NUMBER_RUNS_BYTES),
aggTable, appAggKey);
Expand All @@ -603,7 +604,7 @@ private boolean updateNumberRuns(AppAggregationKey appAggKey, Table aggTable,
* map task may have updated it in the mean time
* @throws IOException
*/
boolean incrNumberRuns(List<KeyValue> column, Table aggTable, AppAggregationKey appAggKey)
boolean incrNumberRuns(List<Cell> column, Table aggTable, AppAggregationKey appAggKey)
throws IOException {

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -116,9 +117,11 @@ public String getLatestVersion(String cluster, String user, String appId)
List<VersionInfo> versions = Lists.newArrayList();
Result r = this.versionsTable.get(get);
if (r != null && !r.isEmpty()) {
for (KeyValue kv : r.list()) {
for (Cell c : r.listCells()) {
versions.add(
new VersionInfo(Bytes.toString(kv.getQualifier()), Bytes.toLong(kv.getValue())) );
new VersionInfo(
Bytes.toString(CellUtil.cloneQualifier(c)),
Bytes.toLong(CellUtil.cloneValue(c))));
}
}

Expand Down Expand Up @@ -148,12 +151,12 @@ public List<VersionInfo> getDistinctVersions(String cluster, String user, String
Long ts = 0L;
Result r = this.versionsTable.get(get);
if (r != null && !r.isEmpty()) {
for (KeyValue kv : r.list()) {
for (Cell c : r.listCells()) {
ts = 0L;
try {
ts = Bytes.toLong(kv.getValue());
ts = Bytes.toLong(CellUtil.cloneValue(c));
versions.add(
new VersionInfo(Bytes.toString(kv.getQualifier()), ts) );
new VersionInfo(Bytes.toString(CellUtil.cloneQualifier(c)), ts) );
}
catch (IllegalArgumentException e1 ) {
// Bytes.toLong may throw IllegalArgumentException, although unlikely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void updateFlow(FlowQueueKey key, Flow flow) throws IOException {
}

/**
* Moves a flow_queue record from one row key to another. All KeyValues in the existing row
* Moves a flow_queue record from one row key to another. All Cells in the existing row
* will be written to the new row. This would primarily be used for transitioning a flow's
* data from one status to another.
*
Expand All @@ -136,8 +137,8 @@ public void moveFlow(FlowQueueKey oldKey, FlowQueueKey newKey)
}
// copy the existing row to the new key
Put p = new Put(queueKeyConverter.toBytes(newKey));
for (KeyValue kv : result.raw()) {
p.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
for (Cell c : result.rawCells()) {
p.add(CellUtil.cloneFamily(c), CellUtil.cloneQualifier(c), CellUtil.cloneValue(c));
}
flowQueueTable.put(p);
// delete the old row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -403,15 +404,15 @@ public Configuration createConfigurationFromResult(Result result)
throw new IllegalArgumentException("Cannot create InputStream from null");
}

KeyValue keyValue = result.getColumnLatest(Constants.RAW_FAM_BYTES,
Cell cell = result.getColumnLatestCell(Constants.RAW_FAM_BYTES,
Constants.JOBCONF_COL_BYTES);

// Create a jobConf from the raw input
Configuration jobConf = new Configuration(false);

byte[] jobConfRawBytes = null;
if (keyValue != null) {
jobConfRawBytes = keyValue.getValue();
if (cell != null) {
jobConfRawBytes = CellUtil.cloneValue(cell);
}
if (jobConfRawBytes == null || jobConfRawBytes.length == 0) {
throw new MissingColumnInResultException(Constants.RAW_FAM_BYTES,
Expand Down Expand Up @@ -515,15 +516,15 @@ public long getApproxSubmitTime(Result value)
"Cannot get last modification time from " + "a null hbase result");
}

KeyValue keyValue = value.getColumnLatest(Constants.INFO_FAM_BYTES,
Constants.JOBHISTORY_LAST_MODIFIED_COL_BYTES);
Cell cell = value.getColumnLatestCell(Constants.INFO_FAM_BYTES,
Constants.JOBHISTORY_LAST_MODIFIED_COL_BYTES);

if (keyValue == null) {
if (cell == null) {
throw new MissingColumnInResultException(Constants.INFO_FAM_BYTES,
Constants.JOBHISTORY_LAST_MODIFIED_COL_BYTES);
}

byte[] lastModTimeBytes = keyValue.getValue();
byte[] lastModTimeBytes = CellUtil.cloneValue(cell);
// we try to approximately set the job submit time based on when the job
// history file
// was last modified and an average job duration
Expand Down Expand Up @@ -572,16 +573,16 @@ public byte[] getJobHistoryRawFromResult(Result value)
throw new IllegalArgumentException("Cannot create InputStream from null");
}

KeyValue keyValue = value.getColumnLatest(Constants.RAW_FAM_BYTES,
Constants.JOBHISTORY_COL_BYTES);
Cell cell =
value.getColumnLatestCell(Constants.RAW_FAM_BYTES, Constants.JOBHISTORY_COL_BYTES);

// Could be that there is no conf file (only a history file).
if (keyValue == null) {
if (cell == null) {
throw new MissingColumnInResultException(Constants.RAW_FAM_BYTES,
Constants.JOBHISTORY_COL_BYTES);
}

byte[] jobHistoryRaw = keyValue.getValue();
byte[] jobHistoryRaw = CellUtil.cloneValue(cell);
return jobHistoryRaw;
}

Expand Down Expand Up @@ -612,11 +613,11 @@ public boolean getStatusAgg(byte[] row, byte[] col) throws IOException {
Get g = new Get(row);
g.addColumn(Constants.INFO_FAM_BYTES, col);
Result r = rawTable.get(g);
KeyValue kv = r.getColumnLatest(Constants.INFO_FAM_BYTES, col);
Cell cell = r.getColumnLatestCell(Constants.INFO_FAM_BYTES, col);
boolean status = false;
try {
if (kv != null) {
status = Bytes.toBoolean(kv.getValue());
if (cell != null) {
status = Bytes.toBoolean(CellUtil.cloneValue(cell));
}
} catch (IllegalArgumentException iae) {
LOG.error("Caught " + iae);
Expand All @@ -625,5 +626,4 @@ public boolean getStatusAgg(byte[] row, byte[] col) throws IOException {
+ status);
return status;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.twitter.hraven.datasource;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -24,7 +23,7 @@
* Indicates that the {@link Result} from a {@link Scan} is missing an expected
* column.
* <p>
* Specifically, this exception indicates that the {@link KeyValue} returned by
* Specifically, this exception indicates that the {@link Cell} returned by
* {@link Result#getColumnLatest(byte[], byte[])} is <code>null</code> or the
* list returned by {@link Result#getColumn(byte[], byte[]) is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -220,18 +223,18 @@ public void testCreateQueueListValue() throws IOException {
JobDetails jd = new JobDetails(null);
jd.setQueue("queue1");
byte[] qb = Bytes.toBytes("queue2!queue3!");
KeyValue existingQueuesKV =
new KeyValue(Bytes.toBytes("rowkey"), Constants.INFO_FAM_BYTES,
Constants.HRAVEN_QUEUE_BYTES, qb);
Cell existingQueuesCell =
CellUtil.createCell(Bytes.toBytes("rowkey"), Constants.INFO_FAM_BYTES,
Constants.HRAVEN_QUEUE_BYTES, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put.getCode(), qb);
AppSummaryService as = new AppSummaryService(null);
try {
String qlist = as.createQueueListValue(jd, Bytes.toString(existingQueuesKV.getValue()));
String qlist = as.createQueueListValue(jd, Bytes.toString(CellUtil.cloneValue(existingQueuesCell)));
assertNotNull(qlist);
String expQlist = "queue2!queue3!queue1!";
assertEquals(expQlist, qlist);

jd.setQueue("queue3");
qlist = as.createQueueListValue(jd, Bytes.toString(existingQueuesKV.getValue()));
qlist = as.createQueueListValue(jd, Bytes.toString(CellUtil.cloneValue(existingQueuesCell)));
assertNotNull(qlist);
expQlist = "queue2!queue3!";
assertEquals(expQlist, qlist);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ public void testAddVersion() throws Exception {
Result r = versionTable.get(new Get(appRow));
assertNotNull(r);
// should have 1 version
assertEquals(r.list().size(), 1);
assertEquals(r.listCells().size(), 1);
assertArrayEquals(
r.getValue(Constants.INFO_FAM_BYTES, Bytes.toBytes("v1")),
Bytes.toBytes(1L));

service.addVersion(cluster, user, appId, "v2", 10);
r = versionTable.get(new Get(appRow));
assertNotNull(r);
assertEquals(r.list().size(), 2);
assertEquals(r.listCells().size(), 2);
assertArrayEquals(
r.getValue(Constants.INFO_FAM_BYTES, Bytes.toBytes("v1")),
Bytes.toBytes(1L));
Expand All @@ -101,7 +101,7 @@ public void testAddVersion() throws Exception {
service.addVersion(cluster, user, appId, "v2", 5);
r = versionTable.get(new Get(appRow));
assertNotNull(r);
assertEquals(r.list().size(), 2);
assertEquals(r.listCells().size(), 2);
assertArrayEquals(
r.getValue(Constants.INFO_FAM_BYTES, Bytes.toBytes("v2")),
Bytes.toBytes(5L));
Expand All @@ -110,7 +110,7 @@ public void testAddVersion() throws Exception {
service.addVersion(cluster, user, appId, "v1", 11);
r = versionTable.get(new Get(appRow));
assertNotNull(r);
assertEquals(r.list().size(), 2);
assertEquals(r.listCells().size(), 2);
assertArrayEquals(
r.getValue(Constants.INFO_FAM_BYTES, Bytes.toBytes("v1")),
Bytes.toBytes(1L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -172,12 +175,12 @@ public void testGetApproxSubmitTime() throws IOException,
JobHistoryRawService rawService = null;
try {
rawService = new JobHistoryRawService(UTIL.getConfiguration());
KeyValue[] kvs = new KeyValue[1];
Cell[] cells = new Cell[1];
long modts = 1396550668000L;
kvs[0] = new KeyValue(Bytes.toBytes("someRowKey"),
Constants.INFO_FAM_BYTES, Constants.JOBHISTORY_LAST_MODIFIED_COL_BYTES,
Bytes.toBytes(modts));
Result result = new Result(kvs);
cells[0] = CellUtil.createCell(Bytes.toBytes("someRowKey"), Constants.INFO_FAM_BYTES,
Constants.JOBHISTORY_LAST_MODIFIED_COL_BYTES, HConstants.LATEST_TIMESTAMP,
KeyValue.Type.Put.getCode(), Bytes.toBytes(modts));
Result result = Result.create(cells);
long st = rawService.getApproxSubmitTime(result);
long expts = modts - Constants.AVERGAE_JOB_DURATION;
assertEquals(expts, st);
Expand Down
Loading

0 comments on commit 4f5c229

Please sign in to comment.