# Training MLLib model on EMR cluster

## Creating an AWS key-pair

To access the instances on AWS, we first need a key-pair. If you don’t have one, create one in https://us-east-2.console.aws.amazon.com/ec2/home?region=us-east-2#KeyPairs and save it to your computer.


# Setting up Spark cluster

As a first step we’ll create a Spark cluster on AWS EMR (Elastic Map Reduce)


1. Go to https://us-east-2.console.aws.amazon.com/elasticmapreduce and click on **Create cluster** and then **Go to Advanced options**

2. Select latest EMR version and check Hadoop, Spark and Livy
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587027516546_image.png)

3. Remove **Task** node type and change Core instance type to **Spot** (you can use Spot also for Master, but you risk that it’ll be terminated). You may select smaller instances if you wish, but for this type of cluster (32GB memory) you’d pay ~$0.45/hour
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587027664960_image.png)

4. Name your cluster
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587027700424_image.png)

5. Use you key-pair
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587029054377_image.png)

6. After the cluster starts, you should see this
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587029513107_image.png)


7. AWS is by default blocking all requests to the cluster from outside, so to be able to SSH into it we need to open port 22 for EMR security group.
    1. Go to https://us-east-2.console.aws.amazon.com/ec2/home?region=us-east-2#SecurityGroups
    2. Find security group **ElasticMapReduce-master**
    3. Click on **Inbound rules** below → **Edit Inbound rules**
    4. Add SSH and allow it from Anywhere
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587030515578_image.png)

8. Now we have to establish connection through SSH forwarding. Click on **Enable Web Connection** and copy the SSH command (you need to change the path to your key-pair). It should look like this
    ```
    ssh -i ~/mojmir.pem -ND 8157 hadoop@ec2-3-15-234-116.us-east-2.compute.amazonaws.com
    ```

    If you did everything correctly, you can go to the following URL in your browser

    ```
    ec2-3-19-69-20.us-east-2.compute.amazonaws.com:8998
    ```

    and see Livy UI
    
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587045193456_image.png)

9. The last step will be to run local port forwarding on localhost so that we can access the cluster from sparkmagic. The following only forwards port 8998 where Livy lives

```
ssh -i ~/mojmir.pem -N -L 8998:localhost:8998 hadoop@ec2-3-19-69-20.us-east-2.compute.amazonaws.com
```

10. If you are on Windows, either use Putty https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/putty.html or alternatively change IP in sparkmagic config

  1. Download config file https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json and rename it to config.json

  2. Replace all occurences of http://localhost:8998 by Livy address (e.g. http://ec2-52-14-86-104.us-east-2.compute.amazonaws.com:8998)

  3. Copy `config.json` to `[home directory]/.sparkmagic/config.json` (there should be an empty file already, so replace it)

# Connecting to Spark cluster

Now that we have estabilished connection to our cluster, we’ll use [sparkmagic](https://github.com/jupyter-incubator/sparkmagic) to connect to it. To install sparkmagic

1. `pip install sparkmagic`
2. `jupyter nbextension enable --py --sys-prefix widgetsnbextension` 
3. (Only if you use Jupyterlab) `jupyter labextension install @jupyter-widgets/jupyterlab-manager`
4. Check location of sparkmagic with `pip show sparkmagic`, then `cd` into that directory
    ```
    cd /usr/local/lib/python3.7/site-packages
    ```
    and install kernel
    ```
    jupyter-kernelspec install sparkmagic/kernels/pysparkkernel
    ```

5. Open Jupyter notebook / Lab and create PySpark notebook
    
![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587045389624_image.png)

6. Run `sc` in the cell. After a while you should see your new Spark Context!

![](https://paper-attachments.dropbox.com/s_7E296B3828F47DAA4B2AC26953FEE16C616F9AA7984483B4ED1335DCB156EBC3_1587045471061_image.png)

In [0]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "/usr/bin/python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

In [0]:
sc

In [0]:
# list available packages in pyspark
sc.list_packages()

# Data Preparation

1. Create new bucket `mlcollege` (use your own name as bucket names must be unique) in S3 (https://s3.console.aws.amazon.com/s3/home?region=eu-central-1)

2. To simulate the real world situation, first upload adult data from https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data to that bucket

In [0]:
columns = [
    'age',
    'workclass', 
    'fnlwgt',
    'education',
    'education_num',
    'marital_status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'capital_gain',
    'capital_loss',
    'hours_per_week',
    'native_country',
    'income',
]

# read data from S3
df = spark.read.csv("s3://mlcollege/adult.data", header=False, inferSchema=True, mode='FAILFAST')
df = df.toDF(*columns)
df.show()

In [0]:
# repartition it and save to parquet format
df.repartition(5).write.parquet("s3n://mlcollege/adult.parquet")

# Load training data

In [0]:
df = spark.read.parquet("s3n://mlcollege/adult.parquet")

In [0]:
df.count()

In [0]:
from pyspark.sql import functions

# create label (note the extra whitespace)
df = df.withColumn('label', (functions.ltrim(df.income) == '>50K').cast('int'))

# persist dataframe in memory to avoid loading on every command
df.persist()

# Model training

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# create pipeline that will accept two features `hours_per_week` and `education_num`

assembler = VectorAssembler(inputCols=['hours_per_week', 'education_num'], outputCol = 'features')
regressor = LogisticRegression(featuresCol = 'features', labelCol = 'label')

pipeline = Pipeline(stages=[assembler, regressor])

In [0]:
# train model
model = pipeline.fit(df)

In [0]:
# test that it works
model.transform(df.select('hours_per_week', 'education_num')).show()

# Save model with mlflow

In [0]:
sc.install_pypi_package("mlflow")
sc.install_pypi_package("boto3")

In [0]:
import mlflow.spark

version = 'v3'

# need to save to /tmp because we don't have permissions to write anywhere else
mlflow.spark.save_model(model, f"/tmp/models/spark-adult-model-{version}")

In [0]:
import tarfile
import os

def make_tarfile(output_filename, source_dir):
    """Compress directory into tar.gz"""
    with tarfile.open(output_filename, "w:gz") as tar:
        tar.add(source_dir, arcname=os.path.basename(source_dir))
        
make_tarfile(f'/tmp/models/spark-adult-model-{version}.tar.gz', f'/tmp/models/spark-adult-model-{version}')

In [0]:
import boto3

# upload to S3
s3 = boto3.resource('s3')
s3.meta.client.upload_file(f'/tmp/models/spark-adult-model-{version}.tar.gz', 'mlcollege', f'models/spark-adult-model-{version}.tar.gz')


# Model Serving
1. Download the model from S3 and uncompress it
2. Install mlflow with `pip install mlflow`
3. Serve the model with `PYSPARK_PYTHON=python3 mlflow models serve -m spark-adult-model-v2 --no-conda`
4. Start making predictions!
```
    curl -d '{"columns":["hours_per_week", "education_num"], "data":[[100.0, 9.0], [40.0, 13.0]]}' -H 'Content-Type: application/json; format=pandas-split' -X POST localhost:5000/invocations
```

# Tasks

1. Try to add non-numeric features to the model, retrain it and try making prediction

2. Restart your cluster and try it with [Zeppelin notebook](https://zeppelin.apache.org/)
    1. Instead of adding **Livy** to software configuration, add **Zeppelin**
    2. Connect to it on port 8890
    3. Zeppelin uses Scala by default, but you can use `%pyspark` at the beginning of every block to run PySpark
    4. Retrain your model in the same way