# Spark + Hadoop + H2O Demo

Due to the complexities of code involved, most of the examples below can't actually be run from the notebook. Where noted, copy the code shown into an appropiate script and execute from the console

## Hadoop

### Hadoop Installation

Follow the tutorial at https://www.edureka.co/blog/install-hadoop-single-node-hadoop-cluster . Some notes to consider:
* Download and install JDK 8u101 (from https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8-2177648.html , the version from the tutorial may not work for your processor)
* Download and install Hadoop
* Configure environment for use
   * This configuration consists of editing xml files so that Hadoop creates a single node environment. They're pretty easy to follow
   * Before starting the daemons, run the command `sudo apt-get install openssh-client openssh-server`. This is required to open port 22
   * At the end of this tutorial, you will have a Hadoop instance running. Browse to http://localhost:50070/dfshealth.html to see it in action.
   * SH script to configure environment and launch Hadoop:

```
#Setup Java
export JAVA_HOME=$HOME/jdk1.8.0_101
export PATH=$JAVA_HOME/bin:$PATH

#Setup Hadoop
export HADOOP_HOME=$HOME/hadoop-2.7.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export PATH=$PATH:$HADOOP_HOME/bin

#Launch Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver
```

### Hadoop Demo

Now that the Hadoop services are running, you can insert data into the HDFS

`$ hadoop fs -mkdir /input`

`$ hadoop fs -put starcraft.csv /input/.`

starcraft.csv is a database with information about Starcraft players, including their rank and age. Let's perform a Map Reduce operation on this dataset, to obtain the average age at each rank.

To achieve this we have created a mapper.py script, which extracts the age and rank of the dataset, and a reducer.py script, which performs an averaging operation.

They key item to note about this scripts is that they operate with the standard input/output facilites. This allows Hadoop to run several instances in parallel with different portions of the dataset

Now to execute our code (the current directory has to contain the mapper.py and reducer.py scripts):

`$ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -file mapper.py -file reducer.py -mapper mapper.py -reducer reducer.py -input /input/starcraft.csv -output /output`

This will tell the Hadoop server to perform the operation on the dataset in a distributed cluster. Once all the Mapping and Reducing operations are done, a success message is printed:

```
19/07/16 15:42:19 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-unjar3585551381596376153/] [] /tmp/streamjob3165364863209447883.jar tmpDir=null
19/07/16 15:42:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/07/16 15:42:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/07/16 15:42:20 INFO mapred.FileInputFormat: Total input paths to process : 1
19/07/16 15:42:20 INFO mapreduce.JobSubmitter: number of splits:2
19/07/16 15:42:20 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1563301111758_0003
19/07/16 15:42:21 INFO impl.YarnClientImpl: Submitted application application_1563301111758_0003
19/07/16 15:42:21 INFO mapreduce.Job: The url to track the job: http://ubuntu:8088/proxy/application_1563301111758_0003/
19/07/16 15:42:21 INFO mapreduce.Job: Running job: job_1563301111758_0003
19/07/16 15:42:27 INFO mapreduce.Job: Job job_1563301111758_0003 running in uber mode : false
19/07/16 15:42:27 INFO mapreduce.Job:  map 0% reduce 0%
19/07/16 15:42:32 INFO mapreduce.Job:  map 100% reduce 0%
19/07/16 15:42:37 INFO mapreduce.Job:  map 100% reduce 100%
19/07/16 15:42:37 INFO mapreduce.Job: Job job_1563301111758_0003 completed successfully
19/07/16 15:42:37 INFO mapreduce.Job: Counters: 49
        File System Counters            
                FILE: Number of bytes read=26726          
                FILE: Number of bytes written=419739      
                FILE: Number of read operations=0           
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=548941
                HDFS: Number of bytes written=91
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters              
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=6275
                Total time spent by all reduces in occupied slots (ms)=2226
                Total time spent by all map tasks (ms)=6275
                Total time spent by all reduce tasks (ms)=2226           
                Total vcore-milliseconds taken by all map tasks=6275       
                Total vcore-milliseconds taken by all reduce tasks=2226
                Total megabyte-milliseconds taken by all map tasks=6425600 
                Total megabyte-milliseconds taken by all reduce tasks=2279424
        Map-Reduce Framework
                Map input records=3395
                Map output records=3340
                Map output bytes=20040
                Map output materialized bytes=26732
                Input split bytes=186
                Combine input records=0
                Combine output records=0
                Reduce input groups=145
                Reduce shuffle bytes=26732
                Reduce input records=3340
                Reduce output records=7
                Spilled Records=6680
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=189
                CPU time spent (ms)=2120
                Physical memory (bytes) snapshot=701128704
                Virtual memory (bytes) snapshot=5860065280
                Total committed heap usage (bytes)=533725184
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=548755
        File Output Format Counters
                Bytes Written=91
19/07/16 15:42:37 INFO streaming.StreamJob: Output directory: /output
```

The results are stored in the directory indicated by the previous command:

`$ hadoop fs -cat /output/part-00000`

```
1,22.724551                                  
2,22.155620              
3,22.050633               
4,21.981504                                                            
5,21.362283                     
6,20.677939                                     
7,21.171429   
```

Now you know enough to bring up a Hadoop environment, populate it with data and perform operations on it.

## Spark

### Spark Installation

Follow the tutorial at https://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm
* Java is already installed from the previous step
* Instalation is simple: Extract the scala binaries and update PATH
* At this point tools are available, such as spark-shell to run commands interactively
* Shell script to set up environment:
```
#Spark
export SPARK_HOME=$HOME/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
```

### Spark Demo

Unfortunately, due to the configuration of this virtual machine, the Spark demo cannot be executed in place. We will show the code without actually running it.
A good resource in addition to this demo is the following tutorial: https://towardsdatascience.com/apache-spark-mllib-tutorial-ec6f1cb336a9

Now that Spark is installed, launch the interactive python Spark environment:

`$ pyspark`

This command will launch the spark-shell server. You should see a message along the lines of `Spark context Web UI available at http://192.168.110.128:4040` . You can browse to this address to manage the session.

It is easier to work with Spark from a Python script using a couple of helper libraries:

```
$ conda install -c conda-forge pyspark findspark
```

Now we're ready to start working with some models. For this demo, Spark will work standalone, without Hadoop. First lets create a Spark session in which to work:

In [2]:
import findspark
findspark.init('$SPARK_HOME')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Now we can load data:

In [5]:
data = spark.read.csv('spark_hadoop/starcraft.csv')
data.show()

And extract some features. Note the VectorAssembler call, which loads the data into the RDD:

In [6]:
from pyspark.ml.feature import VectorAssembler
feature_cols = data.columns[2:-1]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
features = assembler.transform(data)
features.show()

With our features and labels, we can create a train/test split and run a linear regression:

In [8]:
from pyspark.ml.regression import LinearRegression

train, test = features.randomSplit([0.8, 0.2])
lr = LinearRegression(featuresCol="features", labelCol="LeagueIndex")
model = lr.fit(train)
result = model.evaluate(test)

And there it is! We have created an ML model using Spark.

## H2O

### Installation

For installation you should follow the tutorial at http://docs.h2o.ai/h2o/latest-stable/h2o-docs/downloading.html (there are instructions also for Hadoop and Anaconda Cloud)

#### Installation in Python

Install dependencies (prepending with sudo if needed):
* pip install requests
* pip install tabulate
* pip install "colorama>=0.3.8"
* pip install future

Install module

* pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o

Optionally initialize H2O in Python and run a demo to see H2O at work.

* python
* import h2o
* h2o.init()
* h2o.demo("glm")

### H2O Demo

Follow the tutorial at http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/stacked-ensembles.html

```
import h2o
from h2o.estimators.random_forest import H2ORandomForestEstimator
from h2o.estimators.gbm import H2OGradientBoostingEstimator
from h2o.estimators.stackedensemble import H2OStackedEnsembleEstimator
from h2o.grid.grid_search import H2OGridSearch
from __future__ import print_function
h2o.init()

# Import a sample binary outcome train/test set into H2O
train = h2o.import_file("https://s3.amazonaws.com/erin-data/higgs/higgs_train_10k.csv")
test = h2o.import_file("https://s3.amazonaws.com/erin-data/higgs/higgs_test_5k.csv")

# Identify predictors and response
x = train.columns
y = "response"
x.remove(y)

# For binary classification, response should be a factor
train[y] = train[y].asfactor()
test[y] = test[y].asfactor()

# Number of CV folds (to generate level-one data for stacking)
nfolds = 5

# There are a few ways to assemble a list of models to stack together:
# 1. Train individual models and put them in a list
# 2. Train a grid of models
# 3. Train several grids of models
# Note: All base models must have the same cross-validation folds and
# the cross-validated predicted values must be kept.


# 1. Generate a 2-model ensemble (GBM + RF)

# Train and cross-validate a GBM
my_gbm = H2OGradientBoostingEstimator(distribution="bernoulli",
                                      ntrees=10,
                                      max_depth=3,
                                      min_rows=2,
                                      learn_rate=0.2,
                                      nfolds=nfolds,
                                      fold_assignment="Modulo",
                                      keep_cross_validation_predictions=True,
                                      seed=1)
my_gbm.train(x=x, y=y, training_frame=train)


# Train and cross-validate a RF
my_rf = H2ORandomForestEstimator(ntrees=50,
                                 nfolds=nfolds,
                                 fold_assignment="Modulo",
                                 keep_cross_validation_predictions=True,
                                 seed=1)
my_rf.train(x=x, y=y, training_frame=train)


# Train a stacked ensemble using the GBM and GLM above
ensemble = H2OStackedEnsembleEstimator(model_id="my_ensemble_binomial",
                                       base_models=[my_gbm, my_rf])
ensemble.train(x=x, y=y, training_frame=train)

# Eval ensemble performance on the test data
perf_stack_test = ensemble.model_performance(test)

# Compare to base learner performance on the test set
perf_gbm_test = my_gbm.model_performance(test)
perf_rf_test = my_rf.model_performance(test)
baselearner_best_auc_test = max(perf_gbm_test.auc(), perf_rf_test.auc())
stack_auc_test = perf_stack_test.auc()
print("Best Base-learner Test AUC:  {0}".format(baselearner_best_auc_test))
print("Ensemble Test AUC:  {0}".format(stack_auc_test))

# Generate predictions on a test set (if neccessary)
pred = ensemble.predict(test)


# 2. Generate a random grid of models and stack them together

# Specify GBM hyperparameters for the grid
hyper_params = {"learn_rate": [0.01, 0.03],
                "max_depth": [3, 4, 5, 6, 9],
                "sample_rate": [0.7, 0.8, 0.9, 1.0],
                "col_sample_rate": [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]}
search_criteria = {"strategy": "RandomDiscrete", "max_models": 3, "seed": 1}

# Train the grid
grid = H2OGridSearch(model=H2OGradientBoostingEstimator(ntrees=10,
                                                        seed=1,
                                                        nfolds=nfolds,
                                                        fold_assignment="Modulo",
                                                        keep_cross_validation_predictions=True),
                     hyper_params=hyper_params,
                     search_criteria=search_criteria,
                     grid_id="gbm_grid_binomial")
grid.train(x=x, y=y, training_frame=train)

# Train a stacked ensemble using the GBM grid
ensemble = H2OStackedEnsembleEstimator(model_id="my_ensemble_gbm_grid_binomial",
                                       base_models=grid.model_ids)
ensemble.train(x=x, y=y, training_frame=train)

# Eval ensemble performance on the test data
perf_stack_test = ensemble.model_performance(test)

# Compare to base learner performance on the test set
baselearner_best_auc_test = max([h2o.get_model(model).model_performance(test_data=test).auc() for model in grid.model_ids])
stack_auc_test = perf_stack_test.auc()
print("Best Base-learner Test AUC:  {0}".format(baselearner_best_auc_test))
print("Ensemble Test AUC:  {0}".format(stack_auc_test))

# Generate predictions on a test set (if neccessary)
pred = ensemble.predict(test)
```