Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial import

  • Loading branch information...
commit 1d95b95b38831ff2b58f03e8b5045985598b9240 0 parents
Keith Turner authored
59 README
@@ -0,0 +1,59 @@
+Accumulo has a simple test suite that verifies that data is not lost at scale.
+This test suite is called continuous ingest. This test runs many ingest
+clients that continually create linked lists containing 25 million nodes. At
+some point the clients are stopped and a map reduce job is run to ensure no
+linked list has a hole. A hole indicates data was lost.
+
+This project is a version of the test suite written using Gora. Theoretically
+it could run against other column stores. Currently I have only tested it at
+scale using Accumulo.
+
+Below is rough sketch of how data is written. For specific details look at the
+Generator code.
+
+ 1 Write out 1 million nodes
+ 2 Flush
+ 3 Write out 1 million that reference previous million
+ 4 If this is the 25th set of 1 million nodes, then update 1st set of million
+ to point to last
+ 5 goto 1
+
+The key is that nodes only reference flushed nodes. Therefore a node should
+never reference a missing node, even if the ingest client is killed at any
+point in time.
+
+When running this test suite w/ Accumulo we also run a script called the
+Agitator that randomly and continuously kills server processes. We found many
+data loss bugs in Accumulo by doing this. This test suite can also help find
+bugs that impact uptime and stability when run for day or weeks.
+
+This test suite consist of a few Java programs, a little helper script to run
+the java programs, and a maven script to build it. To build the code, you may
+need to edit the maven script to point to the gora data store that you want to
+use. Or just use the maven script to build this java code, and copy whatever
+dependencies you need into lib. To compile, do "mvn compile package". The
+current maven build script depends on an unreleased version of Accumulo and an
+un released version of gora-accumulo. Both of these can be downloaded and
+installed in your local maven repo using mvn install.
+
+Below is a description of the Java programs
+
+ * goraci.Generator - A map only job that generates data.
+ * goraci.Verify - A map reduce job that looks for holes. Look at the
+ counts after running. REFERENCED and UNREFERENCED are
+ ok, any UNDEFINED counts are bad. Do not run at the
+ same time as the Generator.
+ * goraci.Walker - A standalong program that start following a linked list
+ and emits timing info.
+ * goraci.Print - A standalone program that prints nodes in the linked list
+
+goraci.sh is a helper script that you can use to run the above programs. It
+assumes all needed jars are in the lib dir. It does not need the package name.
+You can just run "./goraci.sh Generator", below is an example.
+
+ $ ./goraci.sh Generator
+ Usage : Generator <num mappers> <num nodes>
+
+This test suite does not do everything that the Accumulo test suite does,
+mainly it does not collect statistics and generate reports.
+
10 cinode.json
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "CINode",
+ "namespace": "goraci.generated",
+ "fields" : [
+ {"name": "prev", "type": "long"},
+ {"name": "client", "type": "string"},
+ {"name": "count", "type": "long"}
+ ]
+}
14 goraci.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+GORACI_HOME=`dirname "$0"`
+export HADOOP_CLASSPATH=$(JARS=("$GORACI_HOME/lib"/*.jar); IFS=:; echo "${JARS[*]}")
+LIBJARS=`echo $HADOOP_CLASSPATH | tr : ,`
+
+
+PACKAGE="goraci"
+
+CMD=$1
+shift
+
+hadoop jar "$GORACI_HOME/lib/goraci-0.0.1-SNAPSHOT.jar" "$PACKAGE.$CMD" -libjars $LIBJARS $@
+
111 pom.xml
@@ -0,0 +1,111 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>goraci</groupId>
+ <artifactId>goraci</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ <version>0.2-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.3.3</version>
+ </dependency>
+
+ <!-- begin dependencies for accumulo.... all needed runtime deps are specified
+ because enabling transitive deps brings in too much junk. Comment out if not
+ using accumulo -->
+ <!-- see https://issues.apache.org/jira/browse/GORA-65 to obtain source
+ for gora-accumulo -->
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-accumulo</artifactId>
+ <version>0.2-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>1.4.0-incubating-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>cloudtrace</artifactId>
+ <version>1.4.0-incubating-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.6.1</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.3.1</version>
+ <scope>runtime</scope>
+ </dependency>
+ <!-- end accumulo deps -->
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>lib</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <outputDirectory>lib</outputDirectory>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>lib</directory>
+ <includes>
+ <include>**/*.jar</include>
+ </includes>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
36 src/main/java/goraci/Clear.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package goraci;
+
+import goraci.generated.CINode;
+
+import java.io.IOException;
+
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public class Clear {
+ public static void main(String[] args) throws IOException {
+ DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
+ store.truncateSchema();
+ store.close();
+ }
+}
274 src/main/java/goraci/Generator.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package goraci;
+
+import goraci.generated.CINode;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ *
+ */
+public class Generator extends Configured implements Tool {
+
+ private static final int WIDTH = 1000000;
+ private static final int WRAP = WIDTH * 25;
+
+ static class GeneratorInputFormat extends InputFormat<LongWritable,NullWritable> {
+
+ static class GeneratorInputSplit extends InputSplit implements Writable {
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 1;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[0];
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+ }
+
+ static class GeneratorRecordReader extends RecordReader<LongWritable,NullWritable> {
+
+ private long numNodes;
+ private boolean hasNext = true;
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return new LongWritable(numNodes);
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void initialize(InputSplit arg0, TaskAttemptContext context) throws IOException, InterruptedException {
+ numNodes = context.getConfiguration().getLong("goraci.generator.nodes", 1000000);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ boolean hasnext = this.hasNext;
+ this.hasNext = false;
+ return hasnext;
+ }
+
+ }
+
+ @Override
+ public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ GeneratorRecordReader rr = new GeneratorRecordReader();
+ rr.initialize(split, context);
+ return rr;
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
+ int numMappers = job.getConfiguration().getInt("goraci.generator.mappers", 1);
+
+ ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
+
+ for (int i = 0; i < numMappers; i++) {
+ splits.add(new GeneratorInputSplit());
+ }
+
+ return splits;
+ }
+
+ }
+
+ static class GeneratorMapper extends Mapper<LongWritable,NullWritable,NullWritable,NullWritable> {
+
+ @Override
+ protected void map(LongWritable key, NullWritable value, Context output) throws IOException {
+ long num = key.get();
+ System.out.println("num" + num);
+
+ Utf8 id = new Utf8(UUID.randomUUID().toString());
+
+ DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
+
+ store.createSchema();
+
+ Random rand = new Random();
+
+ long[] first = null;
+ long[] prev = null;
+ long[] current = new long[WIDTH];
+
+ long count = 0;
+ while (count < num) {
+ for (int i = 0; i < current.length; i++)
+ current[i] = Math.abs(rand.nextLong());
+
+ persist(store, count, prev, current, id);
+
+ if (first == null)
+ first = current;
+ prev = current;
+ current = new long[WIDTH];
+
+ count += current.length;
+ output.setStatus("Count " + count);
+
+ if (count % WRAP == 0) {
+ // this block of code turns the 1 million linked list of length 25 into one giant circular linked list of 25 million
+
+ circularLeftShift(first);
+
+ updatePrev(store, first, prev);
+
+ first = null;
+ prev = null;
+ }
+
+ }
+
+ store.close();
+
+ }
+
+ private static void circularLeftShift(long[] first) {
+ long ez = first[0];
+ for (int i = 0; i < first.length - 1; i++)
+ first[i] = first[i + 1];
+ first[first.length - 1] = ez;
+ }
+
+ private static void persist(DataStore<Long,CINode> store, long count, long[] prev, long[] current, Utf8 id) throws IOException {
+ for (int i = 0; i < current.length; i++) {
+ CINode node = store.newPersistent();
+ node.setCount(count + i);
+ if (prev != null)
+ node.setPrev(prev[i]);
+ else
+ node.setPrev(-1);
+ node.setClient(id);
+
+ store.put(current[i], node);
+ }
+
+ store.flush();
+ }
+
+ private static void updatePrev(DataStore<Long,CINode> store, long[] first, long[] current) throws IOException {
+ for (int i = 0; i < current.length; i++) {
+ CINode node = store.newPersistent();
+ node.setPrev(current[i]);
+ store.put(first[i], node);
+ System.out.printf("Set prev %016x %016x\n", first[i], current[i]);
+ }
+
+ store.flush();
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length == 0) {
+ System.out.println("Usage : " + Generator.class.getSimpleName() + " <num mappers> <num nodes>");
+ return 0;
+ }
+
+ int numMappers = Integer.parseInt(args[0]);
+ long numNodes = Long.parseLong(args[1]);
+
+ Job job = new Job(getConf());
+
+ job.setJobName("Link Generator");
+ job.setNumReduceTasks(0);
+ job.setJarByClass(getClass());
+
+ job.setInputFormatClass(GeneratorInputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ job.getConfiguration().setInt("goraci.generator.mappers", numMappers);
+ job.getConfiguration().setLong("goraci.generator.nodes", numNodes);
+
+ job.setMapperClass(GeneratorMapper.class);
+
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
+
+ boolean success = job.waitForCompletion(true);
+
+ return success ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new Generator(), args);
+ System.exit(ret);
+ }
+
+
+}
95 src/main/java/goraci/Print.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package goraci;
+
+import goraci.generated.CINode;
+
+import java.math.BigInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ *
+ */
+public class Print extends Configured implements Tool {
+
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("s", "start", true, "start key");
+ options.addOption("e", "end", true, "end key");
+ options.addOption("l", "limit", true, "number to print");
+
+ GnuParser parser = new GnuParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ if (cmd.getArgs().length != 0) {
+ throw new ParseException("Command takes no arguments");
+ }
+ } catch (ParseException e) {
+ System.err.println("Failed to parse command line " + e.getMessage());
+ System.err.println();
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(getClass().getSimpleName(), options);
+ System.exit(-1);
+ }
+
+ DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
+
+ Query<Long,CINode> query = store.newQuery();
+
+ if (cmd.hasOption("s"))
+ query.setStartKey(new BigInteger(cmd.getOptionValue("s"), 16).longValue());
+
+ if (cmd.hasOption("e"))
+ query.setEndKey(new BigInteger(cmd.getOptionValue("e"), 16).longValue());
+
+ if (cmd.hasOption("l"))
+ query.setLimit(Integer.parseInt(cmd.getOptionValue("l")));
+ else
+ query.setLimit(100);
+
+ Result<Long,CINode> rs = store.execute(query);
+
+ while (rs.next()) {
+ CINode node = rs.get();
+ System.out.printf("%016x:%016x:%012d:%s\n", rs.getKey(), node.getPrev(), node.getCount(), node.getClient());
+
+ }
+
+ store.close();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new Print(), args);
+ System.exit(ret);
+ }
+}
31 src/main/java/goraci/Test.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package goraci;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+/**
+ *
+ */
+public class Test {
+ public static void main(String[] args) throws IOException {
+ BigInteger bi = new BigInteger("8000000000000000", 16);
+ System.out.println(bi.toString(16));
+ System.out.printf("%016x\n", bi.longValue());
+ }
+}
155 src/main/java/goraci/Verify.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package goraci;
+
+import goraci.generated.CINode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.gora.mapreduce.GoraMapper;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ *
+ */
+public class Verify extends Configured implements Tool {
+
+ private static final VLongWritable DEF = new VLongWritable(-1);
+
+ public static class VerifyMapper extends GoraMapper<Long,CINode,LongWritable,VLongWritable> {
+ private LongWritable row = new LongWritable();
+ private LongWritable ref = new LongWritable();
+ private VLongWritable vrow = new VLongWritable();
+
+ @Override
+ protected void map(Long key, CINode node, Context context) throws IOException, InterruptedException {
+ row.set(key);
+ context.write(row, DEF);
+
+ if (node.getPrev() >= 0) {
+ ref.set(node.getPrev());
+ vrow.set(key);
+ context.write(ref, vrow);
+ }
+ }
+ }
+
+ public static enum Counts {
+ UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
+ }
+
+ public static class VerifyReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
+ private ArrayList<Long> refs = new ArrayList<Long>();
+
+ public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
+
+ int defCount = 0;
+
+ refs.clear();
+ for (VLongWritable type : values) {
+ if (type.get() == -1) {
+ defCount++;
+ } else {
+ refs.add(type.get());
+ }
+ }
+
+ // TODO check for more than one def, should not happen
+
+ if (defCount == 0 && refs.size() > 0) {
+ // this is bad, found a node that is referenced but not defined. It must have been lost, emit some info about this node for debugging purposes.
+
+ StringBuilder sb = new StringBuilder();
+ String comma = "";
+ for (Long ref : refs) {
+ sb.append(comma);
+ comma = ",";
+ sb.append(String.format("%016x", ref));
+ }
+
+ context.write(new Text(String.format("%016x", key.get())), new Text(sb.toString()));
+ context.getCounter(Counts.UNDEFINED).increment(1);
+
+ } else if (defCount > 0 && refs.size() == 0) {
+ // node is defined but not referenced
+ context.getCounter(Counts.UNREFERENCED).increment(1);
+ } else {
+ // node is defined and referenced
+ context.getCounter(Counts.REFERENCED).increment(1);
+ }
+
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
+ return 0;
+ }
+
+ DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
+
+ String outputDir = args[0];
+ int numReducers = Integer.parseInt(args[1]);
+
+ Job job = new Job(getConf());
+
+ if (!job.getConfiguration().get("io.serializations").contains("org.apache.hadoop.io.serializer.JavaSerialization")) {
+ job.getConfiguration().set("io.serializations", job.getConfiguration().get("io.serializations") + ",org.apache.hadoop.io.serializer.JavaSerialization");
+ }
+
+ job.setJobName("Link Verifier");
+ job.setNumReduceTasks(numReducers);
+ job.setJarByClass(getClass());
+
+ Query<Long,CINode> query = store.newQuery();
+ query.setFields("prev");
+
+ GoraMapper.initMapperJob(job, query, store, LongWritable.class, VLongWritable.class, VerifyMapper.class, true);
+
+ job.setReducerClass(VerifyReducer.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, new Path(outputDir));
+
+ boolean success = job.waitForCompletion(true);
+
+ store.close();
+
+ return success ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new Verify(), args);
+ System.exit(ret);
+ }
+}
125 src/main/java/goraci/Walker.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package goraci;
+
+import goraci.generated.CINode;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ *
+ */
+public class Walker extends Configured implements Tool {
+
+ private static final String[] PREV_FIELD = new String[] {"prev"};
+
+ public int run(String[] args) throws IOException {
+ Options options = new Options();
+ options.addOption("n", "num", true, "number of queries");
+
+ GnuParser parser = new GnuParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ if (cmd.getArgs().length != 0) {
+ throw new ParseException("Command takes no arguments");
+ }
+ } catch (ParseException e) {
+ System.err.println("Failed to parse command line " + e.getMessage());
+ System.err.println();
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(getClass().getSimpleName(), options);
+ System.exit(-1);
+ }
+
+ long maxQueries = Long.MAX_VALUE;
+ if (cmd.hasOption('n')) {
+ maxQueries = Long.parseLong(cmd.getOptionValue("n"));
+ }
+
+ DataStore<Long,CINode> store = DataStoreFactory.getDataStore(Long.class, CINode.class, new Configuration());
+
+ Random rand = new Random();
+
+ long numQueries = 0;
+
+ while (numQueries < maxQueries) {
+ CINode node = findStartNode(rand, store);
+ numQueries++;
+ while (node != null && node.getPrev() >= 0 && numQueries < maxQueries) {
+ long prev = node.getPrev();
+
+ long t1 = System.currentTimeMillis();
+ node = store.get(prev, PREV_FIELD);
+ long t2 = System.currentTimeMillis();
+ System.out.printf("CQ %d %016x \n", t2 - t1, prev);
+ numQueries++;
+
+ t1 = System.currentTimeMillis();
+ node = store.get(prev, PREV_FIELD);
+ t2 = System.currentTimeMillis();
+ System.out.printf("HQ %d %016x \n", t2 - t1, prev);
+ numQueries++;
+
+ }
+ }
+
+ store.close();
+ return 0;
+ }
+
+ private static CINode findStartNode(Random rand, DataStore<Long,CINode> store) throws IOException {
+ Query<Long,CINode> query = store.newQuery();
+ query.setStartKey(rand.nextLong());
+ query.setLimit(1);
+ query.setFields(PREV_FIELD);
+
+ long t1 = System.currentTimeMillis();
+ Result<Long,CINode> rs = store.execute(query);
+ long t2 = System.currentTimeMillis();
+
+ if (rs.next()) {
+ System.out.printf("FSR %d %016x\n", t2 - t1, rs.getKey());
+ return rs.get();
+ }
+
+ System.out.println("FSR " + (t2 - t1));
+
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new Walker(), args);
+ System.exit(ret);
+ }
+
+}
92 src/main/java/goraci/generated/CINode.java
@@ -0,0 +1,92 @@
+package goraci.generated;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class CINode extends PersistentBase {
+ public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"CINode\",\"namespace\":\"org.apache.gora.continuous.generated\",\"fields\":[{\"name\":\"prev\",\"type\":\"long\",\"default\":\"-1\"},{\"name\":\"client\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}");
+ public static enum Field {
+ PREV(0,"prev"),
+ CLIENT(1,"client"),
+ COUNT(2,"count"),
+ ;
+ private int index;
+ private String name;
+ Field(int index, String name) {this.index=index;this.name=name;}
+ public int getIndex() {return index;}
+ public String getName() {return name;}
+ public String toString() {return name;}
+ };
+ public static final String[] _ALL_FIELDS = {"prev","client","count",};
+ static {
+ PersistentBase.registerFields(CINode.class, _ALL_FIELDS);
+ }
+ private long prev;
+ private Utf8 client;
+ private long count;
+ public CINode() {
+ this(new StateManagerImpl());
+ }
+ public CINode(StateManager stateManager) {
+ super(stateManager);
+ }
+ public CINode newInstance(StateManager stateManager) {
+ return new CINode(stateManager);
+ }
+ public Schema getSchema() { return _SCHEMA; }
+ public Object get(int _field) {
+ switch (_field) {
+ case 0: return prev;
+ case 1: return client;
+ case 2: return count;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int _field, Object _value) {
+ if(isFieldEqual(_field, _value)) return;
+ getStateManager().setDirty(this, _field);
+ switch (_field) {
+ case 0:prev = (Long)_value; break;
+ case 1:client = (Utf8)_value; break;
+ case 2:count = (Long)_value; break;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ public long getPrev() {
+ return (Long) get(0);
+ }
+ public void setPrev(long value) {
+ put(0, value);
+ }
+ public Utf8 getClient() {
+ return (Utf8) get(1);
+ }
+ public void setClient(Utf8 value) {
+ put(1, value);
+ }
+ public long getCount() {
+ return (Long) get(2);
+ }
+ public void setCount(long value) {
+ put(2, value);
+ }
+}
7 src/main/resources/gora-accumulo-mapping.xml
@@ -0,0 +1,7 @@
+<gora-orm>
+ <class name="goraci.generated.CINode" keyClass="java.lang.Long" table="ci" encoder="org.apache.gora.accumulo.encoders.HexEncoder">
+ <field name="prev" family="meta" qualifier="prev"/>
+ <field name="client" family="meta" qualifier="client"/>
+ <field name="count" family="meta" qualifier="count" />
+ </class>
+</gora-orm>
5 src/main/resources/gora.properties
@@ -0,0 +1,5 @@
+gora.datastore.default=org.apache.gora.accumulo.store.AccumuloStore
+gora.datastore.accumulo.instance=a14
+gora.datastore.accumulo.zookeepers=localhost
+gora.datastore.accumulo.user=root
+gora.datastore.accumulo.password=secret
Please sign in to comment.
Something went wrong with that request. Please try again.