# Hadoop Programming in Java

In this notebook we will detail the steps necessary to:

0. [Install](#maven) Maven to manage the build process.
1. [Develop](#write) an Hadoop program on your local machine.
2. [Deploy](#deploy) and run an Hadoop program on yout Hadoop cluster.

Before starting, run the following Jupyter cell to configure some scripts.

In [1]:
%reload_ext autoreload
%autoreload 1
%aimport commands

Moreover, on your local machine you must use the same Java version that is used on the Hadoop cluster. Since our Hadoop cluster was installed using Java 8, please make sure this Java 8 is installed in your local machine and the environment variable `JAVA_HOME` is correctly configured.

In [2]:
!echo $JAVA_HOME

/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home



## 0. Hadoop and Maven <a name="maven"/>

For running a Hadoop job written in Java, we need to create a jar file with the compiled classes and also include other dependencies of our code, e.g., third-party libraries. This can be very time consuming if we do not automatise the tasks.

[Apache Maven](http://maven.apache.org/) allows a project to build using its project object model (POM) and a set of plugins that are shared by all projects using Maven, providing a uniform build system.

[Apache Maven](http://maven.apache.org/) is a **project management tool** which includes a project object model, a set of standards, a project lifecycle, a dependency management system, and
the logic for executing plugin goals at defined phases in a lifecycle.

When you use Maven, you describe your project using a well-defined **project object model** (the `pom.xml`file). Maven can then apply cross-cutting logic from a set of shared plugins.
As a project management tool, Maven preprocesses, compiles, packages and tests you projects.

If you want to install Maven on your virtual machines, please use the following command:

```bash
sudo apt-get install maven
```

If you want to install Maven on your laptop, use the same command on a Linux shell, or install the [Brew](https://brew.sh) packaging system on MacOS systems, then use the following command:

```bash
brew install maven
```

Now we will see how to configure a Apache Maven `pom.xml` file to obtain a single jar including our code plus the required dependencies ready to be executed on our Hadoop cluster.

### Start with a Maven project archetype

Archetype is a Maven project templating toolkit. We use Maven archetype to structure our Hadoop source code, to allow a quick and standardized development template.

Using Maven to generate an archetype template we will get a project directory structure which we will populate wiuth our Java code and a `pom.xml` file that we will use to configure the build process.

Before running the Maven archetype generation process, we need take three decisions:
1. The **local directory** where we will develop our code. We just need to open a shell in that directory, where we will execute the following commands.
2. The **artifact id** (`artifactId`) indicates the unique base name of the primary artifact being generated by this project. The primary artifact for a project is typically a JAR file. In the following, we will use the `wordcount` artifact id.
3. The **group id** (`groupId`) indicates the unique identifier of the organization or group that creates the project. In the following, we will use the `it.unipi.hadoop` group id.

We now can use Maven to generate our project archetype in our home directory with the following commands.

In [3]:
%cd

/Users/khast


In [4]:
!mvn archetype:generate -DgroupId=it.unipi.hadoop -DartifactId=wordcount \
                       -DarchetypeArtifactId=maven-archetype-quickstart \
                       -DinteractiveMode=false

[[1;34mINFO[m] Scanning for projects...
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m------------------< [0;36morg.apache.maven:standalone-pom[0;1m >-------------------[m
[[1;34mINFO[m] [1mBuilding Maven Stub Project (No POM) 1[m
[[1;34mINFO[m] [1m--------------------------------[ pom ]---------------------------------[m
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m>>> [0;32mmaven-archetype-plugin:3.1.2:generate[m [1m(default-cli)[0;1m > [0;1mgenerate-sources[m @ [36mstandalone-pom[0;1m >>>[m
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m<<< [0;32mmaven-archetype-plugin:3.1.2:generate[m [1m(default-cli)[0;1m < [0;1mgenerate-sources[m @ [36mstandalone-pom[0;1m <<<[m
[[1;34mINFO[m] 
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m--- [0;32mmaven-archetype-plugin:3.1.2:generate[m [1m(default-cli)[m @ [36mstandalone-pom[0;1m ---[m
[[1;34mINFO[m] Generating project in Batch mode
[[1;34mINFO[m] ----------------------------------------------------------------------------
[[1;

Maven will create a directory named after the provided `artifactId` (in our case, `wordcount`), including a minimal `pom.xml` file and a directory structure like the following:

```bash
wordcount/src
├── main
│   └── java
│       └── it
│           └── unipi
│               └── hadoop
│                   └── App.java
└── test
    └── java
        └── it
            └── unipi
                └── hadoop
                        └── AppTest.java
```

For the moment being, we can safely delete/ignore the `test` folder, as well as the `App.java` file. We will write our own Java file.

In [5]:
%cd wordcount

/Users/khast/wordcount


In [6]:
%rm -rf src/test

In [7]:
!rm -rf src/main/java/it/unipi/hadoop/App.java

### Update the POM file

A **Project Object Model** or **POM** is an XML file that contains information about the project and configuration details used by Maven to build the project. 

It contains default values for most projects. Examples for this is the build directory, which is `target`; the source directory, which is `src/main/java`; the test source directory, which is `src/test/java`; and so on. 

Our freshly generated `pom.xml` file is in the `wordcount` directory, and contains the following minimal information.

In [8]:
%cat pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>it.unipi.hadoop</groupId>
  <artifactId>wordcount</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>wordcount</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>


To enrich the POM with the elements we need to compile and package an Hadoop program, we need to include some *properties*, *plugins* and *dependencies*. Maven properties are variable that can be re-used in a `pom.xml` file. Plugins are used to create jar files, compile code, unit test code, create project documentation, and on and on. Dependencies are pieces of code (other project, Java libraries, etc) that you code will required to compile and run. Maven atuomatically downloads and links the dependencies on compilation, as well all the the dependencies of those dependencies (transitive dependencies), allowing your list to focus solely on the dependencies your project requires.

Your `pom.xml` file must include some properties: the Java compilter versions (Java 8), the project source encoding (UTF-8) and our Hadoop libraries version (3.1.3).

```xml
<project>
  [...]
  <properties>
    <java.version>1.8</java.version>
    <hadoop.version>3.1.3</hadoop.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  [...]
</project>
```

Your `pom.xml` file must configure the plugins we will use in the build process.

```xml
<project>
  [...]
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.2</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>${project.build.sourceEncoding}</encoding>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.2.0</version>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>

 [...]
</project>
```

Your `pom.xml` file must include the Hadoop dependencies. These dependencies must match the Hadoop version installed in your cluster.

```xml
<project>
  [...]
  <dependencies>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-app</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    [...]
  </dependencies>
  [...]
</project>
```

The following command will populate the `pom.xml` file accordingly.

In [13]:
!printf "%s\n" {commands.get_pom()} > pom.xml

In [14]:
%cat pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>it.unipi.hadoop</groupId>
  <artifactId>wordcount</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>wordcount</name>
  <url>http://maven.apache.org</url>
  
  <properties>
    <java.version>1.8</java.version>
    <hadoop.version>3.1.3</hadoop.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.2</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>${project.build.sourceEncoding}</encoding>
        </configuration>
      </plugin>

      

The following command must execute with no errors.

In [15]:
!mvn compile

[[1;34mINFO[m] Scanning for projects...
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m---------------------< [0;36mit.unipi.hadoop:wordcount[0;1m >----------------------[m
[[1;34mINFO[m] [1mBuilding wordcount 1.0-SNAPSHOT[m
[[1;34mINFO[m] [1m--------------------------------[ jar ]---------------------------------[m
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m--- [0;32mmaven-resources-plugin:2.6:resources[m [1m(default-resources)[m @ [36mwordcount[0;1m ---[m
[[1;34mINFO[m] Using 'UTF-8' encoding to copy filtered resources.
[[1;34mINFO[m] skip non existing resourceDirectory /Users/khast/wordcount/src/main/resources
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m--- [0;32mmaven-compiler-plugin:3.2:compile[m [1m(default-compile)[m @ [36mwordcount[0;1m ---[m
[[1;34mINFO[m] Changes detected - recompiling the module!
[[1;34mINFO[m] Compiling 1 source file to /Users/khast/wordcount/target/classes
[[1;34mINFO[m] [1m--------------------------------------------------------------

## 1. Writing code <a name="write"/>

You can write the source code of your application with any text editor. Here we will use the GNU [`nano`](https://www.nano-editor.org) editor.
```bash
cd wordcount
nano src/main/java/it/unipi/hadoop/WordCount.java
```
Edit the Java file with content, then close the file (Ctrl+O followed by Ctrl+X).

```java
package it.unipi.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
```

In the folder containing your `pom.xml` file, run the following command.

In [16]:
!mvn clean package

[[1;34mINFO[m] Scanning for projects...
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m---------------------< [0;36mit.unipi.hadoop:wordcount[0;1m >----------------------[m
[[1;34mINFO[m] [1mBuilding wordcount 1.0-SNAPSHOT[m
[[1;34mINFO[m] [1m--------------------------------[ jar ]---------------------------------[m
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m--- [0;32mmaven-clean-plugin:2.5:clean[m [1m(default-clean)[m @ [36mwordcount[0;1m ---[m
[[1;34mINFO[m] Deleting /Users/khast/wordcount/target
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m--- [0;32mmaven-resources-plugin:2.6:resources[m [1m(default-resources)[m @ [36mwordcount[0;1m ---[m
[[1;34mINFO[m] Using 'UTF-8' encoding to copy filtered resources.
[[1;34mINFO[m] skip non existing resourceDirectory /Users/khast/wordcount/src/main/resources
[[1;34mINFO[m] 
[[1;34mINFO[m] [1m--- [0;32mmaven-compiler-plugin:3.2:compile[m [1m(default-compile)[m @ [36mwordcount[0;1m ---[m
[[1;34mINFO[m] Changes detected 

If compilation and packaging runs smoothly, we will get a new `target` folder, containing the `wordcount-1.0-SNAPSHOT.jar` jar file to use to dispatch our application on any Hadoop cluster.

In [19]:
! ls -ltrh target

total 16
drwxr-xr-x  3 khast  staff    96B Apr 20 12:36 [1m[31mmaven-status[m[m
drwxr-xr-x  3 khast  staff    96B Apr 20 12:36 [1m[31mgenerated-sources[m[m
drwxr-xr-x  3 khast  staff    96B Apr 20 12:36 [1m[31mclasses[m[m
drwxr-xr-x  3 khast  staff    96B Apr 20 12:36 [1m[31mmaven-archiver[m[m
-rw-r--r--  1 khast  staff   6.5K Apr 20 12:36 wordcount-1.0-SNAPSHOT.jar


This jar file must be copied to the `namenode-hadoop` virtual machine in the Hadoop cluster.
```bash
scp target/wordcount-1.0-SNAPSHOT.jar hadoop@<namenode ip address>:
```

## 2. Running code <a name="deploy"/>

To test the Hadoop program we just wrote, we will use a small input data set called [`pg100.txt`](../exercises/data/pg100.txt) that we have already transferred to the `namenode-hadoop` virtual machine using `scp`
.

Open a terminal and run the following commands (delete the `output` directory on HDFS if it already exists):
```bash
hadoop fs -put pg100.txt
hadoop jar wordcount-1.0-SNAPSHOT.jar it.unipi.hadoop.WordCount pg100.txt output
```

Run the following command:
```bash
hadoop fs -ls output
```

You should see an output file for each reducer. Since there was only one reducer for this job, you should only see one `part-r-00000` file. Note that sometimes the files will be called `part-00000`, and sometimes they'll be called `part-r-00000`.

3. Run the following command:

```bash
hadoop fs -cat output/part-r-00000 | head
```

You should see the output.

## A. How to run Hadoop programs from your laptop

The following notes will describe how to setup your laptop to directly run your Hadoop programs without copying data and JAR files on the `namenode-hadoop` machine.

> **I will not cover all the details, and there can be errors. Use the notes at your own risk.**

Your laptop must be configured to access the Hadoop cluster and to generated JAR files compatible with the Hadoop cluster. 

1. Check that the Java VM in your laptop (and `JAVA_HOME`) corresponds **exactly** to the Java VM in your cluster (and `JAVA_HOME`).

2. Download on your laptop the Hadoop binary that you installed on your virtual machines. Check that the Hadoop version corresponds **exactly**.

3. Unzip the Hadoop tarball wherever you wish, but update your `HADOOP_HOME` and `PATH` env variables accordingly.

    ```bash
    cd <directory where you unzipped Hadoop>
    export HADOOP_HOME=`pwd`
    export PATH=$PATH:$HADOOP_HOME/bin
    hadoop version
    ```

    These export will be valid in the current shell only.

4. Update the `core-site.xml` file located at `$HADOOP_HOME/etc/hadoop/` to define the name node URI on your laptop.
The file must contain the following lines, with the <namenode ip address> updated according to your Hadoop cluster.
    ```
    <configuration>
      <property>
        <name>fs.defaultFS</name>
        <value>hdfs://<namenode ip address>:9820/</value>
      </property>
    </configuration>
    ```

5. You will interact with Hadoop using your **local user**. This means that you need to create suitable directories on HDFS first, such as `/user/foobar/`.

Now you are able to interact with HDFS and submit MapReduce jobs from your laptop. Make sure that the Hadoop cluster is up and running with no errors! :-)