# MapReduce Word Count Example
Various snippets of code have been downloaded from different sources including Alexandria.  These are distributed in this document in the cells below.  The first cell is my final code that I have put together using the various snippets below.

In [None]:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileOutputFormat}
import org.apache.hadoop.util.GenericOptionsParser
import scala.collection.JavaConversions._

// This class performs the map operation, translating raw input into the key-value
// pairs we will feed into our reduce operation.
class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {
  val one = new IntWritable(1)
  val word = new Text
  
  override
  def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context) = {
    for (t <- value.toString().split("\\s")) {
      word.set(t)
      context.write(word, one)
    }
  }
}

// This class performs the reduce operation, iterating over the key-value pairs
// produced by our map operation to produce a result. In this case we just
// calculate a simple total for each word seen.
class IntSumReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
  override
  def reduce(key: Text, values: java.lang.Iterable[IntWritable],
             context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
    val sum = values.foldLeft(0) { (t, i) => t + i.get }
    context.write(key, new IntWritable(sum))
  }
}

object WordCount {

  def main(args: Array[String]): Int = {
    val conf = new Configuration()
    val otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs
    if (otherArgs.length != 2) {
      println("Usage: wordcount <in> <out>")
      return 2
    }
    val job = new Job(conf, "word count")
    job.setJarByClass(classOf[TokenizerMapper])
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReducer])
    job.setReducerClass(classOf[IntSumReducer])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[IntWritable])
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path((args(1))))
    if (job.waitForCompletion(true)) 0 else 1
  }

}

In [None]:
// It appears that there are two APIs for Apache MapReduce.  The older hadoop.mapred, and
// the newer hadoop.mapreduce.  The Alexandria documentation seems to employ the older one
// though it is not explictly specified.  The referenced example however employs the newer
// one, and this is the one that my code uses.

//package wordcount

import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
import org.apache.hadoop.mapred.{MapReduceBase, Mapper, OutputCollector, Reporter}
import java.util.StringTokenizer

// see https://github.com/milesegan/scala-hadoop-example/blob/master/WordCount.scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
import scala.collection.JavaConversions._

// see https://www.programcreek.com/scala/org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf

In [None]:
// Code provided in Alexandria. This uses the older hadoop.mapred library.
class WordCountScala extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
    val one = new IntWritable(1);
    val word = new Text();
    
    def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], 
            reporter: Reporter): Unit = {
        var line = value.toString();
        line.split(" ").foreach(a => {word.set(a); output.collect(word, one);});
    }
}

// compare to code at https://www.programcreek.com/scala/org.apache.hadoop.mapred.JobConf...

// This class performs the map operation, translating raw input into the key-value
// pairs we will feed into our reduce operation.
class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {
  val one = new IntWritable(1)
  val word = new Text
  
  override
  def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context) = {
    for (t <- value.toString().split("\\s")) {
      word.set(t)
      context.write(word, one)
    }
  }
}

In [None]:
// Code provided in Alexandria. This is actually Java code.
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, 
Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

// compare to code at https://www.programcreek.com/scala/org.apache.hadoop.mapred.JobConf...

// This class performs the reduce operation, iterating over the key-value pairs
// produced by our map operation to produce a result. In this case we just
// calculate a simple total for each word seen.
class IntSumReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
  override
  def reduce(key: Text, values: java.lang.Iterable[IntWritable],
             context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
    val sum = values.foldLeft(0) { (t, i) => t + i.get }
    context.write(key, new IntWritable(sum))
  }
}

In [None]:
// This class configures and runs the job with the map and reduce classes we've
// specified above.
object WordCount {

  def main(args:Array[String]):Int = {
    val conf = new Configuration()
    val otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs
    if (otherArgs.length != 2) {
      println("Usage: wordcount <in> <out>")
      return 2
    }
    val job = new Job(conf, "word count")
    job.setJarByClass(classOf[TokenizerMapper])
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReducer])
    job.setReducerClass(classOf[IntSumReducer])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[IntWritable])
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path((args(1))))
    if (job.waitForCompletion(true)) 0 else 1
  }

}