Skip to content

Commit

Permalink
Separate mapreduce code from core (apache#425)
Browse files Browse the repository at this point in the history
* Move mapreduce client code to separate module
* Ensure "fresh start" by cleaning out old (deprecated) APIs
  • Loading branch information
ctubbsii committed Apr 13, 2018
1 parent df3c87c commit 67a1c69
Show file tree
Hide file tree
Showing 59 changed files with 273 additions and 614 deletions.
5 changes: 5 additions & 0 deletions assemble/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
<artifactId>log4j</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-client-mapreduce</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
Expand Down
28 changes: 28 additions & 0 deletions client/mapreduce/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Maven ignores
/target/

# IDE ignores
/.settings/
/.project
/.classpath
/.pydevproject
/.idea
/*.iml
/nbproject/
/nbactions.xml
/nb-configuration.xml
161 changes: 161 additions & 0 deletions client/mapreduce/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>accumulo-client-mapreduce</artifactId>
<name>Apache Accumulo MapReduce APIs</name>
<description>Apache Accumulo core libraries.</description>
<dependencies>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-fate</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.revelc.code</groupId>
<artifactId>apilyzer-maven-plugin</artifactId>
<executions>
<execution>
<id>apilyzer</id>
<goals>
<goal>analyze</goal>
</goals>
<configuration>
<includes>
<include>org[.]apache[.]accumulo[.]core[.]client[.]mapred(?:uce)?[.].*</include>
</includes>
<excludes>
<exclude>.*[.]impl[.].*</exclude>
</excludes>
<allows>
<allow>org[.]apache[.]accumulo[.]core[.](?:client|data|security)[.](?!.*(impl|thrift|crypto).*).*</allow>
<!-- Not public API, but hard to get rid of. Pair is just so useful. -->
<allow>org[.]apache[.]accumulo[.]core[.]util[.]Pair</allow>
<!--Types from hadoop used in API. If adding a new type from
Hadoop to the Accumulo API ensure its annotated as stable.-->
<allow>org[.]apache[.]hadoop[.]conf[.]Configuration</allow>
<allow>org[.]apache[.]hadoop[.]fs[.](FileSystem|Path)</allow>
<allow>org[.]apache[.]hadoop[.]io[.](Text|Writable|WritableComparable|WritableComparator)</allow>
<allow>org[.]apache[.]hadoop[.]mapred[.](JobConf|RecordReader|InputSplit|RecordWriter|Reporter)</allow>
<allow>org[.]apache[.]hadoop[.]mapred[.]FileOutputFormat[$]Counter</allow>
<allow>org[.]apache[.]hadoop[.]mapreduce[.](Job|JobContext|RecordReader|InputSplit|TaskAttemptContext|RecordWriter|OutputCommitter|TaskInputOutputContext)</allow>
<allow>org[.]apache[.]hadoop[.]mapreduce[.]lib[.]output[.]FileOutputFormat[$]Counter</allow>
<allow>org[.]apache[.]hadoop[.]util[.]Progressable</allow>
<allow>org[.]apache[.]hadoop[.]mapred[.](FileAlreadyExistsException|InvalidJobConfException)</allow>
<!--ugghhh-->
<allow>org[.]apache[.]log4j[.](Level|Logger)</allow>
</allows>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop-default</id>
<activation>
<property>
<name>!hadoop.profile</name>
</property>
</activation>
<properties>
<hadoop.profile>2</hadoop.profile>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop2</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>2</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop3</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>3</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
29 changes: 29 additions & 0 deletions client/mapreduce/src/main/findbugs/exclude-filter.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
<Match>
<!-- ignore intentional name shadowing -->
<Or>
<Class name="org.apache.accumulo.core.client.mapred.RangeInputSplit" />
<Class name="org.apache.accumulo.core.client.mapred.impl.BatchInputSplit" />
</Or>
<Or>
<Bug code="NM" pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
<Bug code="NM" pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Or>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.DeprecationUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
Expand Down Expand Up @@ -249,24 +248,6 @@ protected static AuthenticationToken getAuthenticationToken(JobConf job) {
return ConfiguratorBase.unwrapAuthenticationToken(job, token);
}

/**
* Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
*
* @param job
* the Hadoop job instance to be configured
* @param instanceName
* the Accumulo instance name
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
* @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead.
*/
@Deprecated
public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
setZooKeeperInstance(job,
ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers));
}

/**
* Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
*
Expand All @@ -282,21 +263,6 @@ public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientC
InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
}

/**
* Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job.
*
* @param job
* the Hadoop job instance to be configured
* @param instanceName
* the Accumulo instance name
* @since 1.5.0
* @deprecated since 1.8.0; use MiniAccumuloCluster or a standard mock framework
*/
@Deprecated
public static void setMockInstance(JobConf job, String instanceName) {
InputConfigurator.setMockInstance(CLASS, job, instanceName);
}

/**
* Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the
* configuration.
Expand Down Expand Up @@ -491,24 +457,6 @@ private void setupIterators(JobConf job, ScannerBase scanner, String tableName,
scanner.addScanIterator(iterator);
}

/**
* Configures the iterators on a scanner for the given table name.
*
* @param job
* the Hadoop job configuration
* @param scanner
* the scanner for which to configure the iterators
* @param tableName
* the table name for which the scanner is configured
* @since 1.6.0
* @deprecated since 1.7.0; Use {@link #jobIterators} instead.
*/
@Deprecated
protected void setupIterators(JobConf job, Scanner scanner, String tableName,
RangeInputSplit split) {
setupIterators(job, (ScannerBase) scanner, tableName, split);
}

/**
* Initialize a scanner over the given input split using this task attempt configuration.
*/
Expand Down Expand Up @@ -591,9 +539,6 @@ public void initialize(InputSplit inSplit, JobConf job) throws IOException {
if (isOffline) {
scanner = new OfflineScanner(instance, new Credentials(principal, token),
Table.ID.of(baseSplit.getTableId()), authorizations);
} else if (DeprecationUtil.isMockInstance(instance)) {
scanner = instance.getConnector(principal, token)
.createScanner(baseSplit.getTableName(), authorizations);
} else {
ClientConfiguration clientConf = getClientConfiguration(job);
ClientContext context = new ClientContext(instance, new Credentials(principal, token),
Expand Down Expand Up @@ -707,14 +652,10 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Instance instance = getInstance(job);
Table.ID tableId;
// resolve table name to id once, and use id from this point forward
if (DeprecationUtil.isMockInstance(instance)) {
tableId = Table.ID.of("");
} else {
try {
tableId = Tables.getTableId(instance, tableName);
} catch (TableNotFoundException e) {
throw new IOException(e);
}
try {
tableId = Tables.getTableId(instance, tableName);
} catch (TableNotFoundException e) {
throw new IOException(e);
}

Authorizations auths = getScanAuthorizations(job);
Expand Down Expand Up @@ -762,13 +703,11 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
new Credentials(getPrincipal(job), getAuthenticationToken(job)),
getClientConfiguration(job));
while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
if (!DeprecationUtil.isMockInstance(instance)) {
String tableIdStr = tableId.canonicalID();
if (!Tables.exists(instance, tableId))
throw new TableDeletedException(tableIdStr);
if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
throw new TableOfflineException(instance, tableIdStr);
}
String tableIdStr = tableId.canonicalID();
if (!Tables.exists(instance, tableId))
throw new TableDeletedException(tableIdStr);
if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
throw new TableOfflineException(instance, tableIdStr);
binnedRanges.clear();
log.warn("Unable to locate bins for specified ranges. Retrying.");
// sleep randomly between 100 and 200 ms
Expand Down

0 comments on commit 67a1c69

Please sign in to comment.