Skip to content

Commit

Permalink
fixes to support hbase bulk delete using a coprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
rfecher committed Jun 22, 2017
1 parent e2326b9 commit 4ee87d2
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 3,722 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mil.nga.giat.geowave.datastore.hbase.operations;
package mil.nga.giat.geowave.datastore.hbase.query;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -20,30 +20,24 @@
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

import mil.nga.giat.geowave.core.index.ByteArrayId;
import mil.nga.giat.geowave.core.store.adapter.DataAdapter;
import org.apache.hadoop.hbase.filter.FilterList;
import mil.nga.giat.geowave.datastore.hbase.operations.protobuf.HBaseBulkDelete.BulkDeleteRequest;
import mil.nga.giat.geowave.datastore.hbase.operations.protobuf.HBaseBulkDelete.BulkDeleteResponse;
import mil.nga.giat.geowave.datastore.hbase.operations.protobuf.HBaseBulkDelete.BulkDeleteResponse.Builder;
import mil.nga.giat.geowave.datastore.hbase.operations.protobuf.HBaseBulkDelete.BulkDeleteService;
import mil.nga.giat.geowave.datastore.hbase.operations.protobuf.HBaseBulkDelete.BulkDeleteRequest.BulkDeleteType;
import mil.nga.giat.geowave.datastore.hbase.query.HBaseDistributableFilter;
import mil.nga.giat.geowave.datastore.hbase.query.HBaseNumericIndexStrategyFilter;

import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

import mil.nga.giat.geowave.datastore.hbase.query.protobuf.HBaseBulkDeleteProtos.BulkDeleteRequest;
import mil.nga.giat.geowave.datastore.hbase.query.protobuf.HBaseBulkDeleteProtos.BulkDeleteRequest.BulkDeleteType;
import mil.nga.giat.geowave.datastore.hbase.query.protobuf.HBaseBulkDeleteProtos.BulkDeleteResponse;
import mil.nga.giat.geowave.datastore.hbase.query.protobuf.HBaseBulkDeleteProtos.BulkDeleteResponse.Builder;
import mil.nga.giat.geowave.datastore.hbase.query.protobuf.HBaseBulkDeleteProtos.BulkDeleteService;

public class HBaseBulkDeleteEndpoint extends
BulkDeleteService implements
CoprocessorService,
Expand All @@ -61,20 +55,19 @@ public Service getService() {

@Override
public void delete(
RpcController controller,
BulkDeleteRequest request,
RpcCallback<BulkDeleteResponse> done ) {
final RpcController controller,
final BulkDeleteRequest request,
final RpcCallback<BulkDeleteResponse> done ) {
long totalRowsDeleted = 0L;
long totalVersionsDeleted = 0L;
FilterList filterList = null;
// final DataAdapter dataAdapter = null;
ByteArrayId adapterId = null;
final List<byte[]> adapterIds = new ArrayList<>();

Long timestamp = null;
if (request.hasTimestamp()) {
timestamp = request.getTimestamp();
}
BulkDeleteType deleteType = request.getDeleteType();
final BulkDeleteType deleteType = request.getDeleteType();

/**
* Extract the filter from the bulkDeleteRequest
Expand Down Expand Up @@ -157,20 +150,14 @@ public void delete(
e);
}

if (request.hasAdapter()) {
final byte[] adapterBytes = request.getAdapter().toByteArray();
final ByteBuffer buf = ByteBuffer.wrap(adapterBytes);
buf.get();
final int length = buf.getInt();
final byte[] adapterIdBytes = new byte[length];
buf.get(adapterIdBytes);
adapterId = new ByteArrayId(
adapterIdBytes);
}
else if (request.hasAdapterId()) {
final byte[] adapterIdBytes = request.getAdapterId().toByteArray();
adapterId = new ByteArrayId(
adapterIdBytes);
if (request.hasAdapterIds()) {
final ByteBuffer buf = ByteBuffer.wrap(request.getAdapterIds().toByteArray());
final int numAdapters = buf.getInt();
for (int i = 0; i < numAdapters; i++) {
final byte[] adapterIdBytes = new byte[buf.getInt()];
buf.get(adapterIdBytes);
adapterIds.add(adapterIdBytes);
}
}

/**
Expand All @@ -179,23 +166,25 @@ else if (request.hasAdapterId()) {
RegionScanner scanner = null;
try {
scanner = null;
Scan scan = new Scan();
final Scan scan = new Scan();
scan.setFilter(filterList);

if (adapterId != null) {
scan.addFamily(adapterId.getBytes());
if (!adapterIds.isEmpty()) {
for (final byte[] adapterId : adapterIds) {
scan.addFamily(adapterId);
}
}

Region region = env.getRegion();
final Region region = env.getRegion();
scanner = region.getScanner(scan);

boolean hasMore = true;
int rowBatchSize = request.getRowBatchSize();
final int rowBatchSize = request.getRowBatchSize();
while (hasMore) {
List<List<Cell>> deleteRows = new ArrayList<>(
final List<List<Cell>> deleteRows = new ArrayList<>(
rowBatchSize);
for (int i = 0; i < rowBatchSize; i++) {
List<Cell> results = new ArrayList<>();
final List<Cell> results = new ArrayList<>();
hasMore = scanner.next(results);
if (results.size() > 0) {
deleteRows.add(results);
Expand All @@ -206,15 +195,15 @@ else if (request.hasAdapterId()) {
}
}
if (deleteRows.size() > 0) {
Mutation[] deleteArr = new Mutation[deleteRows.size()];
final Mutation[] deleteArr = new Mutation[deleteRows.size()];
int i = 0;
for (List<Cell> deleteRow : deleteRows) {
for (final List<Cell> deleteRow : deleteRows) {
deleteArr[i++] = createDeleteMutation(
deleteRow,
deleteType,
timestamp);
}
OperationStatus[] opStatus = region.batchMutate(
final OperationStatus[] opStatus = region.batchMutate(
deleteArr,
HConstants.NO_NONCE,
HConstants.NO_NONCE);
Expand All @@ -224,7 +213,7 @@ else if (request.hasAdapterId()) {
}
totalRowsDeleted++;
if (deleteType == BulkDeleteType.VERSION) {
byte[] versionsDeleted = deleteArr[i].getAttribute(NO_OF_VERSIONS_TO_DELETE);
final byte[] versionsDeleted = deleteArr[i].getAttribute(NO_OF_VERSIONS_TO_DELETE);
if (versionsDeleted != null) {
totalVersionsDeleted += Bytes.toInt(versionsDeleted);
}
Expand All @@ -233,16 +222,17 @@ else if (request.hasAdapterId()) {
}
}
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
catch (final IOException e) {
LOGGER.error(
"Unable to delete rows",
e);
}
finally {
if (scanner != null) {
try {
scanner.close();
}
catch (IOException ioe) {
catch (final IOException ioe) {
LOGGER.error(
"Error during bulk delete in HBase.",
ioe);
Expand All @@ -251,21 +241,21 @@ else if (request.hasAdapterId()) {
}
}

Builder responseBuilder = BulkDeleteResponse.newBuilder();
final Builder responseBuilder = BulkDeleteResponse.newBuilder();
responseBuilder.setRowsDeleted(totalRowsDeleted);
if (deleteType == BulkDeleteType.VERSION) {
responseBuilder.setVersionsDeleted(totalVersionsDeleted);
}

// Send the response back
BulkDeleteResponse response = responseBuilder.build();
final BulkDeleteResponse response = responseBuilder.build();
done.run(response);
}

private Delete createDeleteMutation(
List<Cell> deleteRow,
BulkDeleteType deleteType,
Long timestamp ) {
final List<Cell> deleteRow,
final BulkDeleteType deleteType,
final Long timestamp ) {
long ts;
if (timestamp == null) {
ts = HConstants.LATEST_TIMESTAMP;
Expand All @@ -274,14 +264,14 @@ private Delete createDeleteMutation(
ts = timestamp;
}
// We just need the rowkey. Get it from 1st KV.
byte[] row = CellUtil.cloneRow(deleteRow.get(0));
Delete delete = new Delete(
final byte[] row = CellUtil.cloneRow(deleteRow.get(0));
final Delete delete = new Delete(
row,
ts);
if (deleteType == BulkDeleteType.FAMILY) {
Set<byte[]> families = new TreeSet<>(
final Set<byte[]> families = new TreeSet<>(
Bytes.BYTES_COMPARATOR);
for (Cell kv : deleteRow) {
for (final Cell kv : deleteRow) {
if (families.add(CellUtil.cloneFamily(kv))) {
delete.addFamily(
CellUtil.cloneFamily(kv),
Expand All @@ -290,9 +280,9 @@ private Delete createDeleteMutation(
}
}
else if (deleteType == BulkDeleteType.COLUMN) {
Set<Column> columns = new HashSet<>();
for (Cell kv : deleteRow) {
Column column = new Column(
final Set<Column> columns = new HashSet<>();
for (final Cell kv : deleteRow) {
final Column column = new Column(
CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv));
if (columns.add(column)) {
Expand All @@ -318,7 +308,7 @@ else if (deleteType == BulkDeleteType.VERSION) {
// the scan fetched will get deleted.
int noOfVersionsToDelete = 0;
if (timestamp == null) {
for (Cell kv : deleteRow) {
for (final Cell kv : deleteRow) {
delete.addColumn(
CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv),
Expand All @@ -327,9 +317,9 @@ else if (deleteType == BulkDeleteType.VERSION) {
}
}
else {
Set<Column> columns = new HashSet<>();
for (Cell kv : deleteRow) {
Column column = new Column(
final Set<Column> columns = new HashSet<>();
for (final Cell kv : deleteRow) {
final Column column = new Column(
CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv));
// Only one version of particular column getting deleted.
Expand All @@ -351,42 +341,42 @@ else if (deleteType == BulkDeleteType.VERSION) {

private static class Column
{
private byte[] family;
private byte[] qualifier;
private final byte[] family;
private final byte[] qualifier;

public Column(
byte[] family,
byte[] qualifier ) {
final byte[] family,
final byte[] qualifier ) {
this.family = family;
this.qualifier = qualifier;
}

@Override
public boolean equals(
Object other ) {
final Object other ) {
if (!(other instanceof Column)) {
return false;
}
Column column = (Column) other;
final Column column = (Column) other;
return Bytes.equals(
this.family,
family,
column.family) && Bytes.equals(
this.qualifier,
qualifier,
column.qualifier);
}

@Override
public int hashCode() {
int h = 31;
h = h + 13 * Bytes.hashCode(this.family);
h = h + 13 * Bytes.hashCode(this.qualifier);
h = h + (13 * Bytes.hashCode(family));
h = h + (13 * Bytes.hashCode(qualifier));
return h;
}
}

@Override
public void start(
CoprocessorEnvironment env )
final CoprocessorEnvironment env )
throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
Expand All @@ -399,7 +389,7 @@ public void start(

@Override
public void stop(
CoprocessorEnvironment env )
final CoprocessorEnvironment env )
throws IOException {
// nothing to do
}
Expand Down

0 comments on commit 4ee87d2

Please sign in to comment.