Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BnP improvement: #245

Merged
merged 1 commit into from Mar 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,6 +29,7 @@

import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.mr.AbstractHadoopStoreBuilderMapper;
import voldemort.store.readonly.mr.HadoopStoreBuilder;
import voldemort.utils.ByteUtils;
Expand Down Expand Up @@ -68,15 +69,22 @@ public int run(String[] args) throws Exception {

Configuration config = this.getConf();
config.set("mapred.job.name", "test-store-builder");
HadoopStoreBuilder builder = new HadoopStoreBuilder(config,
BuildTestStoreMapper.class,
SequenceFileInputFormat.class,
cluster,
def,
(long) (1.5 * 1024 * 1024 * 1024),
new Path(tempDir),
new Path(outputDir),
new Path(inputDir));
HadoopStoreBuilder builder = new HadoopStoreBuilder(
config,
BuildTestStoreMapper.class,
SequenceFileInputFormat.class,
cluster,
def,
new Path(tempDir),
new Path(outputDir),
new Path(inputDir),
CheckSum.CheckSumType.NONE,
false,
false,
(long) (1.5 * 1024 * 1024 * 1024),
-1,
false,
null);
builder.build();
return 0;
}
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.Task;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
Expand Down Expand Up @@ -88,194 +89,33 @@ public class HadoopStoreBuilder {
private boolean saveKeys = false;
private boolean reducerPerBucket = false;
private int numChunks = -1;

private boolean isAvro;

/**
* Kept for backwards compatibility. We do not use replicationFactor any
* more since it is derived from the store definition
*
* @param conf A base configuration to start with
* @param mapperClass The class to use as the mapper
* @param inputFormatClass The input format to use for reading values
* @param cluster The voldemort cluster for which the stores are being built
* @param storeDef The store definition of the store
* @param replicationFactor NOT USED
* @param chunkSizeBytes The size of the chunks used by the read-only store
* @param tempDir The temporary directory to use in hadoop for intermediate
* reducer output
* @param outputDir The directory in which to place the built stores
* @param inputPath The path from which to read input data
*/
@SuppressWarnings("unchecked")
@Deprecated
public HadoopStoreBuilder(Configuration conf,
Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
int replicationFactor,
long chunkSizeBytes,
Path tempDir,
Path outputDir,
Path inputPath) {
this(conf,
mapperClass,
inputFormatClass,
cluster,
storeDef,
chunkSizeBytes,
tempDir,
outputDir,
inputPath);
}

/**
* Create the store builder
*
* @param conf A base configuration to start with
* @param mapperClass The class to use as the mapper
* @param inputFormatClass The input format to use for reading values
* @param cluster The voldemort cluster for which the stores are being built
* @param storeDef The store definition of the store
* @param chunkSizeBytes The size of the chunks used by the read-only store
* @param tempDir The temporary directory to use in hadoop for intermediate
* reducer output
* @param outputDir The directory in which to place the built stores
* @param inputPath The path from which to read input data
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
long chunkSizeBytes,
Path tempDir,
Path outputDir,
Path inputPath) {
super();
this.config = conf;
this.mapperClass = Utils.notNull(mapperClass);
this.inputFormatClass = Utils.notNull(inputFormatClass);
this.inputPath = inputPath;
this.cluster = Utils.notNull(cluster);
this.storeDef = Utils.notNull(storeDef);
this.chunkSizeBytes = chunkSizeBytes;
this.tempDir = tempDir;
this.outputDir = Utils.notNull(outputDir);
isAvro = false;
if(chunkSizeBytes > MAX_CHUNK_SIZE || chunkSizeBytes < MIN_CHUNK_SIZE)
throw new VoldemortException("Invalid chunk size, chunk size must be in the range "
+ MIN_CHUNK_SIZE + "..." + MAX_CHUNK_SIZE);
}

/**
* Create the store builder
*
* @param conf A base configuration to start with
* @param mapperClass The class to use as the mapper
* @param inputFormatClass The input format to use for reading values
* @param cluster The voldemort cluster for which the stores are being built
* @param storeDef The store definition of the store
* @param chunkSizeBytes The size of the chunks used by the read-only store
* @param tempDir The temporary directory to use in hadoop for intermediate
* reducer output
* @param outputDir The directory in which to place the built stores
* @param inputPath The path from which to read input data
* @param checkSumType The checksum algorithm to use
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
long chunkSizeBytes,
Path tempDir,
Path outputDir,
Path inputPath,
CheckSumType checkSumType) {
this(conf,
mapperClass,
inputFormatClass,
cluster,
storeDef,
chunkSizeBytes,
tempDir,
outputDir,
inputPath);
this.checkSumType = checkSumType;

}
private Long minNumberOfRecords;

/**
* Create the store builder
*
*
* @param conf A base configuration to start with
* @param mapperClass The class to use as the mapper
* @param inputFormatClass The input format to use for reading values
* @param cluster The voldemort cluster for which the stores are being built
* @param storeDef The store definition of the store
* @param chunkSizeBytes The size of the chunks used by the read-only store
* @param tempDir The temporary directory to use in hadoop for intermediate
* reducer output
* reducer output
* @param outputDir The directory in which to place the built stores
* @param inputPath The path from which to read input data
* @param checkSumType The checksum algorithm to use
* @param saveKeys Boolean to signify if we want to save the key as well
* @param reducerPerBucket Boolean to signify whether we want to have a
* single reducer for a bucket ( thereby resulting in all chunk files
* for a bucket being generated in a single reducer )
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
long chunkSizeBytes,
Path tempDir,
Path outputDir,
Path inputPath,
CheckSumType checkSumType,
boolean saveKeys,
boolean reducerPerBucket) {
this(conf,
mapperClass,
inputFormatClass,
cluster,
storeDef,
chunkSizeBytes,
tempDir,
outputDir,
inputPath,
checkSumType);
this.saveKeys = saveKeys;
this.reducerPerBucket = reducerPerBucket;
}

/**
* Create the store builder
*
* @param conf A base configuration to start with
* @param mapperClass The class to use as the mapper
* @param inputFormatClass The input format to use for reading values
* @param cluster The voldemort cluster for which the stores are being built
* @param storeDef The store definition of the store
* @param tempDir The temporary directory to use in hadoop for intermediate
* reducer output
* @param outputDir The directory in which to place the built stores
* @param inputPath The path from which to read input data
* @param checkSumType The checksum algorithm to use
* @param saveKeys Boolean to signify if we want to save the key as well
* @param reducerPerBucket Boolean to signify whether we want to have a
* single reducer for a bucket ( thereby resulting in all chunk files
* for a bucket being generated in a single reducer )
* single reducer for a bucket ( thereby resulting in all chunk files
* for a bucket being generated in a single reducer )
* @param chunkSizeBytes Size of each chunks (ignored if numChunks is > 0)
* @param numChunks Number of chunks per bucket ( partition or partition
* replica )
* replica )
* @param isAvro whether the data format is avro
* @param minNumberOfRecords
*
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
Class mapperClass,
Class<? extends InputFormat> inputFormatClass,
Expand All @@ -287,26 +127,38 @@ public HadoopStoreBuilder(Configuration conf,
CheckSumType checkSumType,
boolean saveKeys,
boolean reducerPerBucket,
int numChunks) {
super();
long chunkSizeBytes,
int numChunks,
boolean isAvro,
Long minNumberOfRecords) {
this.config = conf;
this.mapperClass = Utils.notNull(mapperClass);
this.inputFormatClass = Utils.notNull(inputFormatClass);
this.inputPath = inputPath;
this.cluster = Utils.notNull(cluster);
this.storeDef = Utils.notNull(storeDef);
this.chunkSizeBytes = -1;
this.tempDir = tempDir;
this.outputDir = Utils.notNull(outputDir);
this.checkSumType = checkSumType;
this.saveKeys = saveKeys;
this.reducerPerBucket = reducerPerBucket;
this.chunkSizeBytes = chunkSizeBytes;
this.numChunks = numChunks;
isAvro = false;
if(numChunks <= 0)
throw new VoldemortException("Number of chunks should be greater than zero");
this.isAvro = isAvro;
this.minNumberOfRecords = minNumberOfRecords == null ? 1 : minNumberOfRecords;

if(numChunks <= 0) {
logger.info("HadoopStoreBuilder constructed with numChunks <= 0, thus relying chunk size.");
if(chunkSizeBytes > MAX_CHUNK_SIZE || chunkSizeBytes < MIN_CHUNK_SIZE) {
throw new VoldemortException("Invalid chunk size, chunk size must be in the range "
+ MIN_CHUNK_SIZE + "..." + MAX_CHUNK_SIZE);
}
} else {
logger.info("HadoopStoreBuilder constructed with numChunks > 0, thus ignoring chunk size.");
}
}


/**
* Run the job
*/
Expand Down Expand Up @@ -433,6 +285,13 @@ public void build() {
// Once the job has completed log the counter
Counters counters = job.getCounters();

long numberOfRecords = counters.getCounter(Task.Counter.REDUCE_INPUT_GROUPS);

if (numberOfRecords < minNumberOfRecords) {
throw new VoldemortException("The number of records in the data set (" + numberOfRecords +
") is lower than the minimum required (" + minNumberOfRecords + "). Aborting.");
}

if(saveKeys) {
logger.info("Number of collisions in the job - "
+ counters.getCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS));
Expand Down Expand Up @@ -554,17 +413,6 @@ public boolean accept(Path arg0) {

}

/**
* Run the job
*/
public void buildAvro() {

isAvro = true;
build();
return;

}

/**
* A comparator that sorts index files last. This is required to maintain
* the order while calculating checksum
Expand Down
Expand Up @@ -92,6 +92,8 @@ private static OptionParser configureParser() {
parser.accepts("force-overwrite", "deletes final output directory if present.");
parser.accepts("save-keys", "save the keys in the data file");
parser.accepts("reducer-per-bucket", "run single reducer per bucket");
parser.accepts("num-chunks", "number of chunks (if set, takes precedence over chunksize)");
parser.accepts("is-avro", "is the data format avro?");
parser.accepts("help", "print usage information");
return parser;
}
Expand Down Expand Up @@ -183,20 +185,26 @@ public int run(String[] args) throws Exception {
Class[] deps = new Class[] { ImmutableCollection.class, JDOMException.class,
VoldemortConfig.class, HadoopStoreJobRunner.class, mapperClass };

int numChunks = CmdUtils.valueOf(options, "num-chunks", -1);
boolean isAvro = CmdUtils.valueOf(options, "is-avro", false);

addDepJars(conf, deps, addJars);

HadoopStoreBuilder builder = new HadoopStoreBuilder(conf,
mapperClass,
inputFormatClass,
cluster,
storeDef,
chunkSizeBytes,
tempDir,
outputDir,
inputPath,
checkSumType,
saveKeys,
reducerPerBucket);
reducerPerBucket,
chunkSizeBytes,
numChunks,
isAvro,
null);

builder.build();
return 0;
Expand Down