# TUTORIALS: SPARK
## PART 2: SPARK ON THE STAND ALONE MODE


### by Tran Nguyen


## Table of Contents
- [1. Introduction](#intro)
- [2. Set up Spark Clusters with AWS](#setup)
- [3. Submit script on Spark Clusters](#script)
- [4. Some AWS CLI basic commands](#awscli)
- [5. Use notebook on the Spark clusters](#notebook)
- [6. Clean up the resources](#clean_up)


<a id='intro'></a>
## 1. INTRODUCTION

**There are 4 different Spark modes:**
1. Local mode: Use Spark on a single machine such as laptop. With this mode, there is not really any distributed computing. This mode is used to learn Spark syntax and prototype the project.
The other 3 modes are cluster manager modes:
2. Stand alone: working on a cluster.
3. YARN
4. Mesos
YARN and Mesos are used when sharing cluster with a team. 

**In this tutorial, we will learn about the stand alone mode on AWS which includes:**
- Set up the Spark clusters using AWS console and also AWS CLI
- Use built-in notebook on the Spark clusters.
- Submit Spark scripts to run on the clusters using AWS CLI.

#### Reference:
Some of the materials are from the Data Engineer nanodegree program on Udacity (https://www.udacity.com/course/data-engineer-nanodegree--nd027)

<a id='setup'></a>
## 2. SET UP SPARK CLUSTERS WITH AWS 

- A Spark cluster includes multiple machines. In order to use Spark code on each machine, Spark and its dependencies need to be downloaded and installed manually. With EMR, Elastic Map Reduce service from AWS, everything is ready to use. So instead of using EC2, we use the EMR service to set up Spark Clusters.
- This tutorial includes 2 approaches setting up using AWS console and CLI. Note that both approaches needs the `Key Pairs` in `EC2 Network & Security`. 
- EC2 - EMR checklist for the AWS cluster to work (Reference: https://stackoverflow.com/questions/17698876/aws-ssh-access-port-22-operation-timed-out-issue/60134134#60134134):
    + On AWS console: is the Instance up and healthy?
    + Is it in a public Subnet?
    + Does it has a public ip?
    + Does the VPC has Internet Gateway?
    + Does it has the Routing Table to the Internet Gateway? ( Attached to the subnet?)
    + Does the Network ACL rules the default?
    + Does the Security group allows ping? If yes, does the ping works?
    + Does the Security group allows SSH inbound?
    + If there is still no clue, then fire up a new instance (from a base AMI) in the same VPC. Connect to it via SSH. If it was successful, try to ssh from that instance.
    

### Set up credentials in EC2

- From the AWS console, click on `Service`, type 'EC2' to go to EC2 console => Choose `Key Pairs` in **Network & Security** on the left panel => Choose `Create key pair` => Name "spark-cluster", File format: pem => the .pem file will be automatically downloaded. Move this .pem file to the desire directory used for the project (usually the root directory)
- When using the file to set up the ssh, there would be an error if the .pem file is too open:
     + Example: Permissions 0644 for '.aws/spark-cluster.pem' are too open. It is required that your private key files are NOT accessible by others. This private key will be ignored.
     + Need to set it using the command:
     `sudo chmod 600 .aws/spark-cluster.pem`

### 2.1. SET UP USING AWS CONSOLE
- Tutorials for set up Spark cluster on AWS EMR: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs.html


- From the AWS console, go to EMR service => Choose `Create cluster` => Name "spark-udacity" => Select `emr-5.20.0` release, `Spark:Spark 2.4.5 on Hadoop 2.8.5 YARN and Zeppelin 0.8.2`, instance type `m5.xlarge`, number of instances: `4`, EC2 key pair `spark-cluster` => `Create cluster`.

### 2.2. SET UP USING AWS CLI

- CLI: Command Line Interface
- Need Python 3.6 or above
- Tutorial: https://dziganto.github.io/aws/aws%20cli/emr/big%20data/hadoop/jupyterhub/spark/Setup-an-EMR-Cluster-via-AWS-CLI/

#### 2.2.1. Step 1: Set up credentials using AWS IAM
- From the AWS console, click on `Service`, type IAM and go to IAM => Choose `User` => `Add user` => Enter a user name such as 'user_awscli' or 'admin', Choose `Access type` as **Programmatic access**, and then `Next: Permissions`. Click on `Attach existence policies directly` page, choose Set permission as `Administrator Access` and then choose `Next: Tags`. Skip this tag page and choose `Next: Review` => Choose `Create user` => Save the user name, Access Key and Secret Access Key.
- Set up the Key Pairs on EC2 and save the .pem file if not done yet.
- Create a S3 bucket where you want to store log files, otherwise, a S3 bucket will be automatically created.
 - From the AWS console, click on `Service`, type 'S3' and go to S3 console => Choose `Create bucket` => Enter a name for the bucket such as 's3-for-emr-cluster'. Keep everything at default to create a bucket.

#### 2.2.2. Step 2: Install `awscli `
- Install using `pip install awscli`.
- Type `aws help` to check if the installation is correct.

#### 2.2.3. Set up `awscli ` environment (create the credentials and config files)
##### Approach 1:

- Create the `credentials` file on terminal as follow (you can use `nano` or any other text editor of your choice):

    + Navigate to desire folder, usually the root directory
    + `$mkdir .aws` (period to denote a hidden directory)
    + `$cd .aws`
    + `$nano credentials`
    ```
    [default]
    aws_access_key_id=EXAMPLE_ID
    aws_secret_access_key=EXAMPLE_KEY
    ```
    Use `Ctrl + X`, and then `Y` to save the file and exit nano.
- Create the config file:
     + `$cat config`
    ```
    [default]
    region=us-west-2
    output=json
    ```   

##### Approach 2:
- Navigate to desire folder on cli:
```
$aws configure
AWS Access Key ID [None]: Enter your secret key from the user created in Session 2.2.1.
AWS Secret Access Key [None]: Enter your secret access key
Default region name [None]: us-west-2
Default output format [None]: 
```

- The 2 files will be automatically created in the hidden folder .aws, usually located at root directory as follow:

    `$cd ~/.aws`
    
    `$ls` 
    
=> config		credentials
- Check to verify the information in the 2 files:

    `$cat credentials`
    
    `$cat config`    

#### 2.2.4. Create EMR cluster

**The command to create emr cluster - Example 1:**

In [None]:
aws emr create-cluster --release-label emr-5.20.0 
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,
InstanceType=m5.xlarge InstanceGroupType=CORE,InstanceCount=3,InstanceType=m5.xlarge 
--use-default-roles --ec2-attributes KeyName=YOUR_KEY 
--applications Name=JupyterHub Name=Spark Name=Hadoop --name=spark-udacity 
--log-uri s3://spark-test-emr

**Note:** 
- `name`: the name for the cluster, such as 'test-emr-cluster'
- `--release-label emr-5.20.0`: build a cluster with EMR version 5.20.0

- `--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m5.xlarge InstanceGroupType=CORE,InstanceCount=3,InstanceType=m5.xlarge`: build 1 Master node of type m5.xlarge and 3 Core nodes also of type m5.xlarge

- `--use-default-roles`: use the default service role (EMR_DefaultRole) and instance profile (EMR_EC2_DefaultRole) for permissions to access other AWS services

- `--ec2-attributes KeyName=YOUR_KEY`: configures Amazon EC2 instance configurations, the EC2 instance name that we set up in step `2.2.2. Set up credentials in EC2` and get the .pem file. 

- `--applications Name=JupyterHub Name=Spark Name=Hadoop`: install JupyterHub, Spark, and Hadoop on this cluster

- `--name=spark-udacity`: name the cluster, for example `spark-udacity` 

- `--log-uri s3://spark-test-emr`: specify the S3 bucket where you want to store log files 

- `--auto-terminate`: EMR clusters are costly => auto-terminate when done. When you use auto-termination, the cluster starts, runs any bootstrap actions that you specify, and then executes steps that typically input data, process the data, and then produce and save output. When the steps finish, Amazon EMR automatically terminates the cluster Amazon EC2 instances. If we don't put any bootstrap action, we should remove this field to access the ssh.

**The command to create emr cluster - Example 2:**


In [None]:
aws emr create-cluster --name spark-udacity 
--use-default-roles 
--release-label emr-5.28.0 --instance-count 3 
--applications Name=Spark 
--ec2-attributes KeyName=spark-cluster 
--instance-type m5.xlarge 
--log-uri s3:///s3-for-emr-cluster/

**Note:**
- `--release-label emr-5.20.0`: build a cluster with EMR version 5.20.0

- `--instance-count 3` and`--instance-type m5.xlarge`: build 1 Master node and 2 Core nodes of type m5.xlarge

- `--use-default-roles`: use the default service role (EMR_DefaultRole) and instance profile (EMR_EC2_DefaultRole) for permissions to access other AWS services

- `--ec2-attributes KeyName=YOUR_KEY`: the EC2 instance name that we set up in step `2.2.2. Set up credentials in EC2` and get the .pem file. In this case, the name is `spark-cluster`.

**The command to create emr cluster - Example 3:**


In [None]:
aws emr create-cluster --name test-emr-cluster 
--use-default-roles --release-label emr-5.28.0 
--instance-count 3 --instance-type m5.xlarge 
--applications Name=JupyterHub Name=Spark Name=Hadoop 
--ec2-attributes KeyName=emr-cluster  
--log-uri s3://s3-for-emr-cluster/

**EMR Script Components:** 
- `--name`: the name for the cluster, in this case is 'test-emr-cluster'
- `--use-default-roles`: use the default service role (EMR_DefaultRole) and instance profile (EMR_EC2_DefaultRole) for permissions to access other AWS services
- `--release-label emr-5.28.0`: build a cluster with EMR version 5.28.0
- `--instance-count 3` and`--instance-type m5.xlarge`: build 1 Master node and 2 Core nodes of type m5.xlarge
- `--applications Name=JupyterHub Name=Spark Name=Hadoop`: install JupyterHub, Spark, and Hadoop on this cluster
- `--ec2-attributes KeyName=emr-cluster`: configures Amazon EC2 instance configurations, KeyName is the EC2 instance name that we set up in step 3 `Set up credentials in EC2` and get the .pem file. In this case, the name is `emr-cluster`. We should provide a specific subnet and key here
- `--log-uri s3://s3-for-emr-cluster/`: specify the S3 bucket where you want to store log files. In this case, the S3 bucket is 's3-for-emr-cluster'. This optional.
- Since EMR clusters are costly, we can put the option `--auto-terminate` to auto-terminate the cluster when all the actions on the cluster are done. For this to work, we also need to specify the boostrap action in the command using `--bootstrap-actions Path="s3://bootstrap.sh"`. When you use auto-termination, the cluster starts, runs any bootstrap actions that you specify, and then executes steps that typically input data, process the data, and then produce and save output. When the steps finish, Amazon EMR automatically terminates the cluster Amazon EC2 instances. If we don't put any bootstrap action, we should remove this field. The reference for boostrap could be found in details on AWS site: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-bootstrap.html

**Desired Output:**


{
    "ClusterId": "j-xxxxxxxxxxxxx",
    "ClusterArn": "arn:aws:elasticmapreduce:us-west-2:xxxxxxxxxxxx:cluster/j-xxxxxxxxxxxxx"
}


**=> Check it on AWS EMR console or check using the command:**

`aws emr describe-cluster --cluster-id j-xxxxxxxxxxxxx`

#### 2.2.5. Allow SSH Access
- Reference: https://docs.amazonaws.cn/en_us/emr/latest/ManagementGuide/emr-gs-ssh.html

- From the AWS console, click on `Service`, type EMR and go to EMR console => Choose `Clusters` => Choose the name of the cluster on the list. In this case `test-emr-cluster` => On the `Summary` tab, scroll down to see the part `Security and access`, choose the `Security groups for Master` link => Choose the `Security group ID` for `ElasticMapReduce-master` => Scroll down on `Inbound rules`, `Edit inbound rules` => Delete any SSH rule if have, then choose `Add Rule` => Choose Type: SSH, TCP for Protocol and 22 for Port Range => For source, select My IP => `Save` => We will use that IP for ssh command

#### 2.2.6.Create an SSH connection with the master node of the cluster

- Reference: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html


##### Approach 1:
- On the terminal, check the id of the cluster using the command

`aws emr list-clusters`
- Type on the terminal

`aws emr ssh --cluster-id j-xxxxxxxxxxxx --key-pair-file .aws/spark-cluster.pem`

##### Approach 2:
- From the AWS console, click on `Service`, type EMR and go to EMR console => Choose `Clusters` => Choose the name of the cluster on the list. In this case `test-emr-cluster` => On the Summary tab, Click the link **Connect to the Master Node Using SSH**:

<img src="img/ssh2.png" width="80%"/>

- Copy and paste on the terminal the command shown on the pop-up window:

<img src="img/ssh2_link.png" width="80%"/>

- Replace ~/emr-cluster.pem with the location and filename of the private key file (.pem) we have set up.

`ssh -i ~/.aws/emr-cluster.pem hadoop@ec2-xx-xxx-xx-xxx.us-west-2.compute.amazonaws.com`

<a id='script'></a>
## 3. SUBMIT SCRIPT ON THE SPARK CLUSTER

- After connecting to the spark cluster using ssh, we can submit the python script to the spark cluster.
- Note that when the EMR instance is terminated, everything on the EMR instance will be lost. It is the best practice to save the initial, final, and intermediate data of the data pipeline in the S3 for future retrieval. It is best practice to move your files from your local machine to AWS S3, then use the program to read the data from AWS S3.

### 3.1. WRITE A SCRIPT AND SUBMIT DIRECTLY ON THE SPARK CLUSTER

- Write a simple script in the cluster:

`nano test_emr.py`

- Finding location of spark-submit:

`which spark-submit`

- Submit the script:

`spark-submit --master yarn ./test_emr.py`

In [4]:
%%script echo Skip running this cell
### Content of the file 'test_emr.py'

from pyspark.sql import SparkSession, functions as F

if __name__ == "__main__":
    """
       	example of script submited to spark cluster
    """
    spark = SparkSession.builder.getOrCreate()
    
    df = spark.createDataFrame([('01/Jul/1995:00:00:01 -0400', ),('01/Jul/1995:00:00:11 -0400',),('26/Jul/1995:17:07:52 -0400',)
                            ('19/Jul/1995:01:56:43 -0400',), ('11/Jul/1995:12:50:18 -0400',)], ['TIME'])
    df = df.withColumn("date", F.unix_timestamp(F.col('TIME'), 'dd/MMM/yyyy:HH:mm:ss Z').cast('timestamp'))
    # show the dataframe
    df.show()
    
    #### stop the spark, otherwise the program will be hanged => Good for streamming
    spark.stop()

Skip running this cell


### 3.2. COPY A SCRIPT FILE TO THE SPARK CLUSTER

- Create the folder on the spark cluster:

    [hadoop@ip-xxx-xx-xx-xxx ~] hdfs dfs -mkdir /user/my_folder_name
    
- Copy the file to the `my_folder_name` on the spark cluster:
    + Copy on the local machine: On terminal of the local machine, using the command scp

        `scp test_emr.py hadoop@<YOUR IP>:/home/hadoop/`


        `scp test_emr.py hadoop@ec2-xx-xxx-xx-xxx.us-west-2.compute.amazonaws.com:~/`

        `scp test_emr.py -i .aws/spark-cluster.pem hadoop@ec2-xx-xxx-xx-xxx.us-west-2.compute.amazonaws.com:~/`
    
    + Copy on the spark cluster:
    
        [hadoop@ip-xxx-xx-xx-xxx ~] hdfs dfs -copyFromLocal  test_emr.py /user/my_folder_name/
        
- Finding location of spark-submit and submit the script.

<a id='awscli'></a>
## 4. SOME OTHER AWS CLI BASIC COMMANDS

- Type `aws help` to check for specific commands.
- Type `aws s3 help` to check for specific commands in s3.
- Some aws commands:

    + List all IAM user: `aws iam list-users` 
    + List all buckets in s3: `aws s3 ls`
- More could be found in here: https://www.thegeekstuff.com/2019/04/aws-s3-cli-examples/

### 4.1. Copy file from local to a s3 bucket
    
- `aws s3 cp <your current file location>/<filename> s3://<bucket_name>`

### 4.2. Copy file to HDFS on the Spark cluster

- Create the folder on the spark cluster:

`[hadoop@ip-xxx-xx-xx-xxx ~] hdfs dfs -mkdir /user/sparkify_data`

or an older but still working version:

`[hadoop@ip-xxx-xx-xx-xxx ~] hadoop fs -mkdir /user/sparkify_data`

- Copy file from local to the spark cluster:
    
`[hadoop@ip-xxx-xx-xx-xxx ~] hdfs dfs -copyFromLocal  sparkify_log_small_2.json /user/sparkify_data/`

- Verify by using the command:
`[hadoop@ip-xxx-xx-xx-xxx ~] hdfs dfs -mkdir /user/sparkify_data`

- **Look for the HDFS files on the AWS console:**
On AWS EMR console, choose the cluster. On the Summary tab, click on **(View All)** on **Connections:** => Click on the URL of the **HDFS Name Node** => Choose the tab **Utilities** => **Browse the file system**

<a id='notebook'></a>
## 5. USE NOTEBOOK ON THE SPARK CLUSTER

- On AWS EMR console, notebook can be created and opened in a cluster once the cluster is in Waiting or Running status. In another approach, notebook could be created, and during this creating process, a cluster is created for that notebook.
- In EMR, the EMR cluster and EMR notebook are decoupled; notebook could be reattached to a different cluster.
- There are 2 types of notebook: Jupyter Notebook & Zeppelin notebooks. 
- Zeppelin has been available since EMR 5.x versions, and they have direct access to Spark Context, such as a local spark-shell. For example, `sc` is already defined as Spark Context within Zeppelin notebooks.

### 5.1. Simple Spark job

In [2]:
%%script echo Skip running this cell
#### CONTENT FOR THE NOTEBOOK

log_of_songs = [
        "Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "despacito",
        "All the stars"
]

distributed_song_log = sc.parallelize(log_of_songs)
distributed_song_log.map(lambda x: x.lower()).collect()

Skip running this cell


### 5.2. Read & write data to S3

- A s3 path has 2 parts: the bucket and the key for the object. For example, in the path `s3://my_bucket/path_to_file/file.csv`:
    + `s3://my_bucket` is the bucket
    + `path_to_file/file.csv` is the key
- Read a file in spark: 

        `df = spark.read.load(“s3://my_bucket/path_to_file/file.csv”)`
- If all the files underneath `my_bucket` have the same schema, spark can generate a dataframe for all the files.
- If there are conflicts in schema between files, then the dataframe will not be generated.
- For example an s3 bucket `my_bucket` has all the files with the same schema like this:
            my_bucket
              |---test.csv
              path/to/
                 |--test2.csv
                 file/
                   |--test3.csv
                   |--file.csv
we can read all the files using: `df = spark.read.load(“s3://my_bucket/”)`

In [None]:
#### CONTENT FOR THE NOTEBOOK

### Copy some data to s3 as in Session 4.1
sparkify_log_data = "s3n://sparkify/sparkify_log_small.json"

df = spark.read.json(sparkify_log_data)
df.persist()

df.head(5)

### 5.3. Read & write to HDFS

In [None]:
#### CONTENT FOR THE NOTEBOOK

### Copy some data to hdfs as in Session 4.2
sparkify_log2_path = "hdfs:///user/sparkify_data/sparkify_log_small_2.json"

df2 = spark.read.json(sparkify_log2_path)
df2.persist()
df2.head(5)

<a id='clean_up'></a>
## 6. CLEAN UP THE RESOURCES

DO NOT RUN THIS UNLESS YOU ARE SURE TO DELETE YOUR CLUSTER

- Type `$logout` to log out of the cluster. 

- Then terminate the cluster using this command:

    `$aws emr terminate-clusters --cluster-id j-EXAMPLECLUSTERID`

Reference for `"EMR_DefaultRole is invalid" or "EMR_EC2_DefaultRole is invalid" error` when creating an Amazon EMR cluster: https://aws.amazon.com/premiumsupport/knowledge-center/emr-default-role-invalid/