Skip to content

Commit

Permalink
added accumulo 1.8.1 support
Browse files Browse the repository at this point in the history
  • Loading branch information
rfecher committed Aug 7, 2017
1 parent f4a5e6e commit e23fbe6
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 1,346 deletions.
6 changes: 0 additions & 6 deletions extensions/datastores/accumulo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.7.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*******************************************************************************
* Copyright (c) 2013-2017 Contributors to the Eclipse Foundation
*
*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
* All rights reserved. This program and the accompanying materials
Expand All @@ -19,22 +19,9 @@
import java.util.Map.Entry;
import java.util.TreeSet;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.NullToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,18 +43,30 @@
import mil.nga.giat.geowave.mapreduce.splits.IntermediateSplitInfo;
import mil.nga.giat.geowave.mapreduce.splits.RangeLocationPair;
import mil.nga.giat.geowave.mapreduce.splits.SplitsProvider;

//@formatter:off
/*if[accumulo.api=1.6]
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.data.KeyExtent;
else[accumulo.api=1.6]*/
/*if[accumulo.api=1.7]
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.Credentials;
import org.apache.accumulo.core.data.impl.KeyExtent;
/*end[accumulo.api=1.6]*/
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.NullToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
else[accumulo.api=1.7]*/
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.admin.Locations;
/*end[accumulo.api=1.7]*/
//@formatter:on

public class AccumuloSplitsProvider extends
SplitsProvider
{
Expand Down Expand Up @@ -149,6 +148,8 @@ protected TreeSet<IntermediateSplitInfo> populateIntermediateSplits(
}
}
// get the metadata information for these ranges
// @formatter:off
/*if[accumulo.api=1.7]
final Map<String, Map<KeyExtent, List<Range>>> tserverBinnedRanges = getBinnedRangesStructure();
TabletLocator tl;
try {
Expand All @@ -171,32 +172,21 @@ protected TreeSet<IntermediateSplitInfo> populateIntermediateSplits(
new PasswordToken(
accumuloOperations.getPassword()));
}
final ClientContext clientContext = new ClientContext(
instance,credentials,
new ClientConfiguration());
tl = getTabletLocator(
clientContext,
tableId);
// @formatter:off
/*if[accumulo.api=1.6]
tl = getTabletLocator(
instance,
tableId);
Object clientContextOrCredentials = credentials;
else[accumulo.api=1.6]*/
final ClientContext clientContext = new ClientContext(
instance,credentials,
new ClientConfiguration());
tl = getTabletLocator(
clientContext,
tableId);

final Object clientContextOrCredentials = clientContext;
/*end[accumulo.api=1.6]*/
// @formatter:on
// its possible that the cache could contain complete, but
// old information about a tables tablets... so clear it
tl.invalidateCache();
final List<Range> rangeList = new ArrayList<Range>(
ranges);
while (!binRanges(
rangeList,
clientContextOrCredentials,
clientContext,
tserverBinnedRanges,
tl)) {
if (!(instance instanceof MockInstance)) {
Expand Down Expand Up @@ -285,6 +275,79 @@ protected TreeSet<IntermediateSplitInfo> populateIntermediateSplits(
}
}
else[accumulo.api=1.7]*/
// @formatter:on
final HashMap<String, String> hostNameCache = getHostNameCache();
Locations locations;
try {
final Connector conn = accumuloOperations.getConnector();
locations = conn.tableOperations().locate(
tableName,
ranges);
}
catch (final Exception e) {
throw new IOException(
e);
}
for (final Entry<TabletId, List<Range>> tabletIdRanges : locations.groupByTablet().entrySet()) {
final TabletId tabletId = tabletIdRanges.getKey();
final String tabletServer = locations.getTabletLocation(tabletId);
final String ipAddress = tabletServer.split(
":",
2)[0];

String location = hostNameCache.get(ipAddress);
// HP Fortify "Often Misused: Authentication"
// These methods are not being used for
// authentication
if (location == null) {
final InetAddress inetAddress = InetAddress.getByName(ipAddress);
location = inetAddress.getHostName();
hostNameCache.put(
ipAddress,
location);
}

final Range tabletRange = tabletId.toRange();
final Map<PrimaryIndex, List<RangeLocationPair>> splitInfo = new HashMap<PrimaryIndex, List<RangeLocationPair>>();
final List<RangeLocationPair> rangeList = new ArrayList<RangeLocationPair>();

for (final Range range : tabletIdRanges.getValue()) {
final Range clippedRange = tabletRange.clip(range);
if (!(fullrange.beforeStartKey(clippedRange.getEndKey()) || fullrange.afterEndKey(clippedRange
.getStartKey()))) {
final double cardinality = getCardinality(
getHistStats(
index,
adapters,
adapterStore,
statsStore,
statsCache,
authorizations),
wrapRange(clippedRange));
rangeList.add(new AccumuloRangeLocationPair(
wrapRange(clippedRange),
location,
cardinality < 1 ? 1.0 : cardinality));
}
else {
LOGGER.info("Query split outside of range");
}
if (LOGGER.isTraceEnabled()) {
LOGGER.warn("Clipped range: " + rangeList.get(
rangeList.size() - 1).getRange());
}
}
if (!rangeList.isEmpty()) {
splitInfo.put(
index,
rangeList);
splits.add(new IntermediateSplitInfo(
splitInfo,
this));
}
}
/* end[accumulo.api=1.6] */
return splits;
}

Expand Down Expand Up @@ -368,39 +431,18 @@ public GeoWaveInputSplit constructInputSplit(
splitInfo,
locations);
}

/**
* Initializes an Accumulo {@link TabletLocator} based on the configuration.
*
* @param instance
* the accumulo instance
* @param tableName
* the accumulo table name
* @return an Accumulo tablet locator
* @throws TableNotFoundException
* if the table name set on the configuration doesn't exist
*
*/
// @formatter:off
/*if[accumulo.api=1.7]
protected TabletLocator getTabletLocator(
final Object clientContextOrInstance,
final String tableId )
throws TableNotFoundException {
TabletLocator tabletLocator = null;
// @formatter:off
/*if[accumulo.api=1.6]
tabletLocator = TabletLocator.getLocator(
(Instance) clientContextOrInstance,
new Text(
tableId));
else[accumulo.api=1.6]*/
tabletLocator = TabletLocator.getLocator(
(ClientContext) clientContextOrInstance,
new Text(
tableId));

/*end[accumulo.api=1.6]*/
// @formatter:on
return tabletLocator;
}
Expand All @@ -413,21 +455,11 @@ protected static boolean binRanges(
AccumuloSecurityException,
TableNotFoundException,
IOException {
// @formatter:off
/*if[accumulo.api=1.6]
return tabletLocator.binRanges(
(Credentials) clientContextOrCredentials,
rangeList,
tserverBinnedRanges).isEmpty();
else[accumulo.api=1.6]*/

return tabletLocator.binRanges(
(ClientContext) clientContextOrCredentials,
rangeList,
tserverBinnedRanges).isEmpty();

/*end[accumulo.api=1.6]*/
// @formatter:on
}

end[accumulo.api=1.7]*/
// @formatter:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.format.Formatter;
// @formatter:off
/*if[accumulo.api=1.7]
else[accumulo.api=1.7]*/
import org.apache.accumulo.core.util.format.FormatterConfig;
/*end[accumulo.api=1.7]*/
//@formatter:on
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +46,8 @@ public PersistentDataFormatter() {
}

private Iterator<Entry<Key, Value>> si;
// @formatter:off
/*if[accumulo.api=1.7]
private boolean doTimestamps;
private static final ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
@Override
Expand Down Expand Up @@ -71,14 +79,34 @@ public Date parse(
}
};
else[accumulo.api=1.7]*/
// @formatter:on
private FormatterConfig config;

/* end[accumulo.api=1.7] */

@Override
public void initialize(
Iterable<Entry<Key, Value>> scanner,
boolean printTimestamps ) {

// @formatter:off
/*if[accumulo.api=1.7]
boolean printTimestamps
else[accumulo.api=1.7]*/
// @formatter:on
FormatterConfig config
/* end[accumulo.api=1.7] */
) {
checkState(false);
si = scanner.iterator();

// @formatter:off
/*if[accumulo.api=1.7]
doTimestamps = printTimestamps;
else[accumulo.api=1.7]*/
// @formatter:on
this.config = config;
/* end[accumulo.api=1.7] */
}

public boolean hasNext() {
Expand All @@ -88,9 +116,15 @@ public boolean hasNext() {

public String next() {
DateFormat timestampFormat = null;

// @formatter:off
/*if[accumulo.api=1.7]
if (doTimestamps) {
timestampFormat = formatter.get();
else[accumulo.api=1.7]*/
// @formatter:on
if (config != null && config.willPrintTimestamps()) {
timestampFormat = config.getDateFormatSupplier().get();
/* end[accumulo.api=1.7] */
}

return next(timestampFormat);
Expand All @@ -117,21 +151,6 @@ protected void checkState(
"Already initialized");
}

// this should be replaced with something like Record.toString();
public String formatEntry(
Entry<Key, Value> entry,
boolean showTimestamps ) {
DateFormat timestampFormat = null;

if (showTimestamps) {
timestampFormat = formatter.get();
}

return formatEntry(
entry,
timestampFormat);
}

/*
* so a new date object doesn't get created for every record in the scan
* result
Expand Down Expand Up @@ -291,8 +310,4 @@ public Iterator<Entry<Key, Value>> getScannerIterator() {
return si;
}

protected boolean isDoTimestamps() {
return doTimestamps;
}

}

0 comments on commit e23fbe6

Please sign in to comment.