+ *
+ * Required Properties
+ *
+ * - voldemort.cluster.file
+ * - voldemort.storedef.file
+ * - voldemort.store.name
+ * - voldemort.store.version
+ * - input.data.check.percent
+ *
+ *
+ * @author bbansal
+ *
+ * @deprecated Use {@link VoldemortStoreBuilderJob} instead.
+ *
+ */
+@Deprecated
+public class VoldemortBatchIndexJob extends AbstractHadoopJob {
+
+ private Cluster _cluster = null;
+
+ private static Logger logger = Logger.getLogger(VoldemortStoreBuilderJob.class);
+
+ public VoldemortBatchIndexJob(String name, Props props) throws FileNotFoundException {
+ super(name, props);
+ }
+
+ /**
+ * @deprecated use
+ * {@link VoldemortStoreBuilderJob#execute(String, String, String, String, int)}
+ * the parameter voldemort store version is deprecated and no
+ * longer used. Version is read from the store definition
+ * istead.
+ *
+ * @param voldemortClusterLocalFile
+ * @param storeName
+ * @param inputPath
+ * @param outputPath
+ * @param voldemortStoreVersion
+ * @param voldemortCheckDataPercent
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ @Deprecated
+ public void execute(String voldemortClusterLocalFile,
+ String storeName,
+ String inputPath,
+ String outputPath,
+ int voldemortStoreVersion,
+ int voldemortCheckDataPercent) throws IOException, URISyntaxException {
+ execute(voldemortClusterLocalFile,
+ storeName,
+ inputPath,
+ outputPath,
+ voldemortCheckDataPercent);
+ }
+
+ /**
+ * Method to allow this process to be a instance call from another Job.
+ *
+ * @storeName to dump the value
+ * @inputFile to generate the VFILE
+ *
+ *
+ */
+ public void execute(String voldemortClusterLocalFile,
+ String storeName,
+ String inputPath,
+ String outputPath,
+ int voldemortCheckDataPercent) throws IOException, URISyntaxException {
+ JobConf conf = createJobConf(VoldemortBatchIndexMapper.class,
+ VoldemortBatchIndexReducer.class);
+
+ try {
+ // get the voldemort cluster definition
+ // We need to use cluster.xml here where it not yet localized by
+ // TaskRunner
+ _cluster = HadoopUtils.readCluster(voldemortClusterLocalFile, conf);
+ } catch(Exception e) {
+ logger.error("Failed to read Voldemort cluster details", e);
+ throw new RuntimeException("", e);
+ }
+
+ // set the partitioner
+ conf.setPartitionerClass(VoldemortBatchIndexPartitoner.class);
+ conf.setNumReduceTasks(_cluster.getNumberOfNodes());
+
+ // Blow Away the O/p if force.overwirte is available
+
+ FileInputFormat.setInputPaths(conf, inputPath);
+
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+ if(getProps().getBoolean("force.output.overwrite", false)) {
+ FileSystem fs = FileOutputFormat.getOutputPath(conf).getFileSystem(conf);
+ fs.delete(FileOutputFormat.getOutputPath(conf), true);
+ }
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setMapOutputKeyClass(BytesWritable.class);
+ conf.setMapOutputValueClass(BytesWritable.class);
+ conf.setOutputKeyClass(BytesWritable.class);
+ conf.setOutputValueClass(BytesWritable.class);
+
+ conf.setNumReduceTasks(_cluster.getNumberOfNodes());
+
+ // get the store information
+
+ conf.setStrings("voldemort.index.filename", storeName + ".index");
+ conf.setStrings("voldemort.data.filename", storeName + ".data");
+ conf.setInt("input.data.check.percent", voldemortCheckDataPercent);
+ conf.setStrings("voldemort.store.name", storeName);
+
+ // run(conf);
+ JobClient.runJob(conf);
+
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ execute(getProps().get("voldemort.cluster.file"),
+ getProps().get("voldemort.store.name"),
+ getProps().get("input.path"),
+ getProps().get("output.path"),
+ getProps().getInt("input.data.check.percent", 0));
+
+ }
+
+ /**
+ * TODO HIGH : Doesnot check with Voldemort schema should validate the
+ * voldemort schema before writing.
+ *
+ * @author bbansal
+ *
+ */
+ public static class VoldemortBatchIndexMapper extends JsonConfigurable implements
+ Mapper {
+
+ private static Logger logger = Logger.getLogger(VoldemortBatchIndexMapper.class);
+ private Cluster _cluster = null;
+ private StoreDefinition _storeDef = null;
+ private ConsistentRoutingStrategy _routingStrategy = null;
+ private Serializer _keySerializer;
+ private Serializer _valueSerializer;
+ private int _checkPercent;
+ private int _version;
+
+ public void map(BytesWritable key,
+ BytesWritable value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ byte[] keyBytes = ByteUtils.copy(key.get(), 0, key.getSize());
+ byte[] valBytes = ByteUtils.copy(value.get(), 0, value.getSize());
+
+ ByteArrayOutputStream versionedKey = new ByteArrayOutputStream();
+ DataOutputStream keyDin = new DataOutputStream(versionedKey);
+ keyDin.write(_version);
+ keyDin.write(keyBytes);
+ keyDin.close();
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("Original key: size:" + versionedKey.toByteArray().length + " val:"
+ + ByteUtils.toHexString(versionedKey.toByteArray()));
+ logger.debug("MD5 val: size:" + ByteUtils.md5(versionedKey.toByteArray()).length
+ + " val:"
+ + ByteUtils.toHexString(ByteUtils.md5(versionedKey.toByteArray())));
+ logger.debug(" value bytes:" + value.getSize() + " ["
+ + ByteUtils.toHexString(valBytes) + "]");
+ }
+
+ List nodes = _routingStrategy.routeRequest(keyBytes);
+ for(Node node: nodes) {
+ ByteArrayOutputStream versionedValue = new ByteArrayOutputStream();
+ DataOutputStream valueDin = new DataOutputStream(versionedValue);
+ valueDin.writeInt(node.getId());
+ valueDin.write(_version);
+ valueDin.write(valBytes);
+ valueDin.close();
+
+ // check input
+ if(Math.ceil(Math.random() * 100.0) < _checkPercent) {
+ checkJsonType(versionedKey.toByteArray(),
+ ByteUtils.copy(versionedValue.toByteArray(),
+ 4,
+ versionedValue.size()));
+ }
+
+ BytesWritable outputKey = new BytesWritable(ByteUtils.md5(versionedKey.toByteArray()));
+ BytesWritable outputVal = new BytesWritable(versionedValue.toByteArray());
+
+ output.collect(outputKey, outputVal);
+ }
+ }
+
+ public void checkJsonType(byte[] key, byte[] value) {
+ try {
+ _keySerializer.toObject(key);
+ _valueSerializer.toObject(value);
+ } catch(Exception e) {
+ throw new RuntimeException("Failed to Serialize key/Value check data and config schema.",
+ e);
+ }
+ }
+
+ public void configure(JobConf conf) {
+ Props props = HadoopUtils.getPropsFromJob(conf);
+
+ // get the voldemort cluster.xml and store.xml files.
+ try {
+ _cluster = HadoopUtils.readCluster(props.get("voldemort.cluster.file"), conf);
+ _storeDef = HadoopUtils.readStoreDef(props.get("voldemort.store.file"),
+ props.get("voldemort.store.name"),
+ conf);
+
+ _checkPercent = conf.getInt("input.data.check.percent", 0);
+ _routingStrategy = new ConsistentRoutingStrategy(_cluster.getNodes(),
+ _storeDef.getReplicationFactor());
+ _keySerializer = (Serializer