Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial checkin

  • Loading branch information...
commit f6ba04d61ecf0836a9752ddfc0511a67b1481d14 0 parents
@keith-turner authored
Showing with 233 additions and 0 deletions.
  1. +60 −0 README
  2. +24 −0 pom.xml
  3. +149 −0 src/main/java/ParallelSplitter.java
60 README
@@ -0,0 +1,60 @@
+This project contains a Java program that works around the slow split issue
+identified in [ACCUMULO-348][1]. The program works around the issue by making
+the split calls in parallel. To use this project use maven to build the jar
+using the following command.
+
+ mvn package
+
+Then place the jar in <ACCUMULO_HOME>/lib/ext and then run the following command.
+
+ $ ./bin/accumulo ParallelSplitter
+ Usage : ParallelSplitter <instance> <zoo keepers> <table> <user> <pass> <num threads> <file>
+
+Some experiments were done varying the number of splits to create and the
+number of threads to use. These results were done on a 10 node cluster using
+Accumulo 1.4.0. The table being split was empty, if it had data that would
+probably change the times. The times were obtained by timing the process, so
+the times include java startup times. The results are below.
+
+ParallelSplitter times for 999 splits :
+
+ 4 threads : 5.4s
+ 8 threads : 3.0s
+ 16 threads : 3.7s
+
+This is the time the addsplits command took for 999 splits
+
+ $ time ./bin/accumulo shell -u root -p secret -e "addsplits -t foo -sf splits.txt"
+ real 0m13.386s
+
+ParallelSplitter times for 4999 splits :
+
+ 4 threads : 53.6s
+ 8 threads : 15.0s
+ 16 threads : 7.4s
+ 32 threads : 20.2s
+
+This is the time the addsplits command took for 4999 splits
+
+ $ time ./bin/accumulo shell -u root -p secret -e "addsplits -t foo -sf splits.txt"
+ real 1m37.254s
+
+ParallelSplitter times for 99,999 splits :
+
+ 8 threads : 408.3s
+ 16 threads : 227.1s
+ 32 threads : 117.7s
+ 64 threads : 92.3s
+ 128 threads : 119.5s
+
+This is the time the addsplits command took for 99,999 splits
+
+ $ time ./bin/accumulo shell -u root -p secret -e "addsplits -t foo -sf splits.txt"
+ real 152m15.531s
+
+About halfway though the above command I discovered that flushing the metadata
+table would speed things up. Doing this more frequently would have
+dramatically changed the time above.
+
+[1]: https://issues.apache.org/jira/browse/ACCUMULO-348
+
24 pom.xml
@@ -0,0 +1,24 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>ParallelIngest</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+ <name>ParallelSplitter</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>1.4.0</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
149 src/main/java/ParallelSplitter.java
@@ -0,0 +1,149 @@
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Scanner;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This is a work around for ACCUMULO-348 that parallelizes adding splits to a table.
+ */
+public class ParallelSplitter {
+
+ private static class SplitEnv {
+ private Connector conn;
+ private String tableName;
+ private ExecutorService executor;
+ private CountDownLatch latch;
+ private AtomicReference<Exception> exception;
+
+ SplitEnv(Connector conn, String tableName, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) {
+ this.conn = conn;
+ this.tableName = tableName;
+ this.executor = executor;
+ this.latch = latch;
+ this.exception = exception;
+ }
+ }
+
+ private static class SplitTask implements Runnable {
+
+ private List<Text> splits;
+ private SplitEnv env;
+
+ SplitTask(SplitEnv env, List<Text> splits) {
+ this.env = env;
+ this.splits = splits;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (env.exception.get() != null)
+ return;
+
+ if (splits.size() <= 2) {
+ env.conn.tableOperations().addSplits(env.tableName, new TreeSet<Text>(splits));
+ for (int i = 0; i < splits.size(); i++)
+ env.latch.countDown();
+ return;
+ }
+
+ int mid = splits.size() / 2;
+
+ // split the middle split point to ensure that child task split different tablets and can therefore
+ // run in parallel
+ env.conn.tableOperations().addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 1)));
+ env.latch.countDown();
+
+ env.executor.submit(new SplitTask(env, splits.subList(0, mid)));
+ env.executor.submit(new SplitTask(env, splits.subList(mid + 1, splits.size())));
+
+ } catch (Exception e) {
+ env.exception.compareAndSet(null, e);
+ }
+ }
+
+ }
+
+ public static void addSplits(Connector conn, String tableName, SortedSet<Text> partitionKeys, int numThreads) throws Exception {
+ List<Text> splits = new ArrayList<Text>(partitionKeys);
+ // should be sorted because we copied from a sorted set, but that makes assumptions about
+ // how the copy was done so resort to be sure.
+ Collections.sort(splits);
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch latch = new CountDownLatch(splits.size());
+ AtomicReference<Exception> exception = new AtomicReference<Exception>(null);
+
+ executor.submit(new SplitTask(new SplitEnv(conn, tableName, executor, latch, exception), splits));
+
+ while (!latch.await(100, TimeUnit.MILLISECONDS)) {
+ if (exception.get() != null) {
+ executor.shutdownNow();
+ throw exception.get();
+ }
+ }
+
+ executor.shutdown();
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length != 7) {
+ System.err.println("Usage : " + ParallelSplitter.class.getName() + " <instance> <zoo keepers> <table> <user> <pass> <num threads> <file>");
+ System.exit(-1);
+ }
+
+ String instance = args[0];
+ String zooKeepers = args[1];
+ String table = args[2];
+ String user = args[3];
+ String pass = args[4];
+ int numThreads = Integer.parseInt(args[5]);
+ String file = args[6];
+
+ TreeSet<Text> splits = new TreeSet<Text>();
+
+ Scanner scanner = new Scanner(new File(file));
+ while (scanner.hasNextLine()) {
+ splits.add(new Text(scanner.nextLine()));
+ }
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(instance, zooKeepers);
+ Connector conn = zki.getConnector(user, pass);
+
+ addSplits(conn, table, splits, numThreads);
+
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.