import hipi.image.FloatImage; import hipi.image.ImageHeader; import hipi.imagebundle.mapreduce.ImageBundleInputFormat; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.opencv.core.*; import org.opencv.objdetect.CascadeClassifier; import java.io.IOException; import java.net.URI; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import org.w3c.dom.Document; import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import java.io.File; public class FaceCount extends Configured implements Tool { public static class FaceCountMapper extends Mapper { private CascadeClassifier faceDetector; public Mat convertFloatImageToOpenCVMat(FloatImage floatImage) { int w = floatImage.getWidth(); int h = floatImage.getHeight(); float[] valData = floatImage.getData(); double[] rgb = {0.0,0.0,0.0}; Mat mat = new Mat(h, w, CvType.CV_8UC3); for (int j = 0; j < h; j++) { for (int i = 0; i < w; i++) { rgb[0] = (double) valData[(j*w+i)*3+0] * 255.0; // R rgb[1] = (double) valData[(j*w+i)*3+1] * 255.0; // G rgb[2] = (double) valData[(j*w+i)*3+2] * 255.0; // B mat.put(j, i, rgb); } } return mat; } // Count faces in image public int countFaces(Mat image) { // Detect faces in the image. // MatOfRect is a special container class for Rect. MatOfRect faceDetections = new MatOfRect(); faceDetector.detectMultiScale(image, faceDetections); return faceDetections.toArray().length; } //SETUP @Override public void setup(Context context) throws IOException, InterruptedException { try { System.loadLibrary(Core.NATIVE_LIBRARY_NAME); } catch (UnsatisfiedLinkError e) { System.err.println("Native code library failed to load.\n"+ e +"\n"+ Core.NATIVE_LIBRARY_NAME); System.exit(1); } if (context.getCacheFiles() != null && context.getCacheFiles().length >0) { URI mappingFileUri = context.getCacheFiles()[1]; if (mappingFileUri != null) { faceDetector = new CascadeClassifier("./lbpcascadefrontalface.xml"); if(!faceDetector.load("./lbpcascade_frontalface.xml")) { System.out.println("Error Loading XML File"); } else { System.out.println("Success Loading XML"); } } else { System.out.println(">>>>>> NO MAPPING FILE"); } } else { System.out.println(">>>>>> NO CACHE FILES AT ALL"); } super.setup(context); } public void map(ImageHeader key, FloatImage value, Context context) throws IOException, InterruptedException { if (value != null && value.getWidth() >1 && value.getHeight() >1 && value.getBands() == 3) { Mat cvImage = this.convertFloatImageToOpenCVMat(value); int faces = this.countFaces(cvImage); System.out.println(">>>>>> Detected Faces: " + Integer.toString(faces)); context.write(new IntWritable(1), new IntWritable(faces)); } } } public static class FaceCountReducer extends Reducer { public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { int total = 0; int images = 0; for (IntWritable val : values) { total += val.get(); images++; } String result = String.format("Total face detected: %d", total); // Emit output of job which will be written to HDFS context.write(new IntWritable(images), new Text(result)); } // reduce() } public int run(String[] args) throws Exception { // Check input arguments if (args.length != 2) { System.out.println("Usage: firstprog "); System.exit(0); } // Initialize and configure MapReduce job Job job = Job.getInstance(); // Set input format class which parses the input HIB and spawns map tasks job.setInputFormatClass(ImageBundleInputFormat.class); // Set the driver, mapper, and reducer classes which express the computation job.setJarByClass(FaceCount.class); job.setMapperClass(FaceCountMapper.class); job.setReducerClass(FaceCountReducer.class); // Set the types for the key/value pairs passed to/from map and reduce layers job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); // Set the input and output paths on the HDFS FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); ; //job.addCacheFile(new URI("hdfs://localhost:9000/user/abi/lbpcascadefrontalface.xml"+"#lbpcascadefrontalface.xml")); job.addCacheFile(new Path("hdfs://127.0.0.1:9000/user/abi/lbpcascade_frontalface.xml"+"#lbpcascade_frontalface.xml").toUri()); URI[] cacheFiles= job.getCacheFiles(); if(cacheFiles != null) { for (URI cacheFile : cacheFiles) { System.out.println("Cache file ->" + cacheFile); } } boolean success = job.waitForCompletion(true); // Return success or failure return success ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new FaceCount(), args); System.exit(0); } }