![hadoop Logo](/files/logo/hadoop.png)
# **Lab 1 - Hadoop MapReduce and HDFS**
#### The following steps (Part 1 and Part 2) demonstrate how to install HDFS and create and run "word count" application with Hadoop MapReduce. Then, in Part 3, you are asked to implement ... with Hadoop MapReduce.



### ** Part 1: HDFS **

#### Install HDFS

1. Download the Hadoop platform from the following link:
[Hadoop](http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.6.4/hadoop-2.6.4.tar.gz)

2. Set the environment variable.
   ```bash
   export JAVA_HOME="<JAVA PATH>"
   export HADOOP_HOME="<HADOOP PATH>/hadoop-2.6.4"
   export HADOOP_CONFIG="$HADOOP_HOME/etc/hadoop"
   ```

3. Specify environment variables in `$HADOOP_CONFIG/hadoop-env.sh`.
   ```bash
   export JAVA_HOME="<JAVA PATH>"
   ```

4. Make three folders on local file system, where HDFS namenode and datanode store their data.
   ```bash
   mkdir -p $HADOOP_HOME/hdfs/namenode
  
   mkdir -p $HADOOP_HOME/hdfs/datanode
   ```

5. The main HDFS configuration file is located at `$HADOOP_CONFIG/hdfs-site.xml`. Specify the folders path, built in step 4.
   ```xml
   <configuration>
   <property>
      <name>dfs.namenode.name.dir</name>
      <value>file:///<HADOOP HOME PATH>/hdfs/namenode</value>
      <description>Path on the local filesystem where the NameNode stores the namespace and transaction logs persistently.</description>
   </property>

   <property>
      <name>dfs.datanode.data.dir</name>
      <value>file:///<HADOOP HOME PATH>/hdfs/datanode</value>
      <description>Comma separated list of paths on the local filesystem of a DataNode where it should store its blocks.</description>
   </property>
   </configuration>
   ```

6. Specify the address of the namenode (master) in `$HADOOP_CONFIG/core-site.sh`.
   ```xml
   <configuration>
   <property>
      <name>fs.defaultFS</name>
      <value>hdfs://127.0.0.1:9000</value>
      <description>NameNode URI</description>
   </property>
   </configuration>
   ```

7. Format the namenode directory (DO THIS ONLY ONCE, THE FIRST TIME).
   ```bash
   $HADOOP_HOME/bin/hdfs namenode -format
   ```

8. Start the namenode and datanode daemons
   ```bash
   $HADOOP_HOME/sbin/hadoop-daemon.sh start namenode

   $HADOOP_HOME/sbin/hadoop-daemon.sh start datanode
   ```

#### Test HDFS
1. Prints out the HDFS running processes, by running the `jps` command in a terminal.
2. Monitor the process through their web interfaces:
   - Namenode: [http://127.0.0.1:50070](http://127.0.0.1:50070)
   - Datanode: [http://127.0.0.1:50075](http://127.0.0.1:50075)
3. Try HDFS commands
   ```bash
   # Create a new directory /sics on HDFS
   $HADOOP_HOME/bin/hdfs dfs -mkdir /sics

   # Create a file, call it big, on your local filesystem and upload it to HDFS under /sics.
   $HADOOP_HOME/bin/hdfs dfs -put big /sics

   # View the content of /sics directory
   $HADOOP_HOME/bin/hdfs dfs -ls big /sics

   # Determine the size of big on HDFS
   $HADOOP_HOME/bin/hdfs dfs -du -h /sics/big

   # Print the first 5 lines to screen from big on HDFS
   $HADOOP_HOME/bin/hdfs dfs -cat /sics/big | head -n 5
   
   # Copy big to /big hdfscopy on HDFS
   $HADOOP_HOME/bin/hdfs dfs -cp /sics/big /sics/big_hdfscopy
   
   # Copy big back to local filesystem and name it big localcopy
   $HADOOP_HOME/bin/hdfs dfs -get /sics/big big_localcopy
   
   # Check the entire HDFS filesystem for inconsistencies/problems
   $HADOOP_HOME/bin/hdfs fsck /
   
   # Delete big from HDFS
   $HADOOP_HOME/bin/hdfs dfs -rm /sics/big
   
   # Delete /sics directory from HDFS
   $HADOOP_HOME/bin/hdfs dfs -rm -r /sics
   ```


# Stream Processing example with Flink, Kafka and Python

This repository contains the components for a simple streaming pipeline:
 * Generate data and write it to Apache Kafka
 * Process the generated data from Kafka using Apache Flink
 * Write the results back to Kafka for further processing
 * Analyze the results from Kafka using Ipython Notebook

**Questions and feedback can be sent to [gyfora@apache.org](mailto:gyfora@apache.org)**

### Description

In this very simple example we will analyse temperature data, generated for European cities in a streaming fashion.

Data is generated as simple Strings in the format of: `"City, Temperature"`
<br>`"Budapest, 30", "Stockholm, 20", "Budapest, 32" …` 

Our goal is to analyse the incoming data in a continuous fashion, updating our statistics as new data becomes available. 

We generate random temperatures using a [simple Scala program](https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/DataGenerator.scala), which writes it directly to Kafka, making it available for processing.

We then use a [Flink program](https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala) to do the following processing steps:

1. Parse the incoming String into the Scala case class `Temp(city: String, temp: Double)`:

   ```
   "Budapest, 30" -> ("Budapest", 30)
   "Stockholm, 20" -> ("Stockholm", 20)
   ```
   
   *Note: An important consideration we need to make when implementing the parsing step is that it should be robust to errors coming from incorrect input formats.*
2. We compute a historical average of the temperatures for each city:

   ```
   ("Budapest", 30) -> Avg: (“Budapest", 30)
   ("Budapest", 40) -> Avg: (“Budapest", 35)
   (“Stockholm”, 20) -> Avg: (“Stockholm”, 20)
   ("Budapest", 37) -> Avg: (“Budapest", 35.67)
   (“Stockholm”, 22) -> Avg: (“Stockholm”, 21)
   ```
   *Note: There is distinctive property of computing a historical average compared to the parsing operator. Here we need to keep some computational state for each city (sum and count) so we are able to update the average when the next element arrives.*

    *You can read more about the interesting topic of operator states and stateful stream processing in [this wiki article](https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing).*
3. We compute the current global maximum temperature every 5 seconds:
   
    ```
   ("Budapest", 32) 
   ("Madrid", 34)     -> Max: (“Madrid", 34)
   ("Stockholm", 20)
   -----------------------------------------
   ("Budapest", 36) 
   ("Madrid", 33)     -> Max: (“Budapest", 36)
   ("Stockholm", 23)
   ```
   *Note: This is a typical example of window computations, when the data stream is discretised into small batches (windows) and some sort of aggregation or transformation is applied independently on each window.*
4. The computed statistics are written back to Kafka and can be further analysed. In this example we use [IPython notebook](https://github.com/gyfora/summer-school/blob/master/python/KafkaExample.ipynb) to provide basic visualisation.

### Running the example:

**Prerequisites**: *JDK 7+, Maven 3.x, Scala 2.10, Python 2.7, IPython notebook*

 1. Download & install [Apache Kafka](https://kafka.apache.org/08/quickstart.html)
 2. Install the Python kafka client

    ```bash
    pip install kafka-python
    ```
 3. Clone the repository and build a jar

     ```bash
    git clone https://github.com/gyfora/summer-school.git
    cd summer-school/flink
    mvn assembly:assembly
    ```
 4. Start the Kafka server and create the topics

     ```bash
    cd <KAFKA-DIR>
    # Start Zookeper server
    bin/zookeeper-server-start.sh config/zookeeper.properties
    # Start Kafka server
    bin/kafka-server-start.sh config/server.properties
    # Create input and output topics
    bin/kafka-topics.sh --create --topic input --partitions 1 --replication-factor 1 --zookeeper localhost:2181
    bin/kafka-topics.sh --create --topic output_avg --partitions 1 --replication-factor 1 --zookeeper localhost:2181
    bin/kafka-topics.sh --create --topic output_max --partitions 1 --replication-factor 1 --zookeeper localhost:2181
    ```
 5. Start Flink streaming job

    ```bash
    # Running on a Flink mini cluster (equivalent to executing from the IDE) 
    cd <git>/summer-school/flink
    java -classpath target/FlinkExample.jar summerschool.FlinkKafkaExample
    ```

    To execute the job on an already running cluster ([cluster setup guide](https://ci.apache.org/projects/flink/flink-docs-master/setup/local_setup.html)) we can either use the Flink command-line or web-client:

    ```bash
    # Start a local Flink cluster
    cd <FLINK-DIR>
    bin/start-cluster-streaming.sh

    # Run the example job using the command-line client
    bin/flink run -c summerschool.FlinkKafkaExample <summer-school-dir>/flink/target/FlinkExample.jar
    ```

 6. Start Data generator

    ```bash
    cd <git>/summer-school/flink
    java -classpath target/FlinkExample.jar summerschool.DataGenerator
    ```
 7. Start IPython Notebook to further analyse the results

    ```bash
    cd <git>/summer-school/python
    ipython notebook
    ```