# Big Data with PySpark

Datasets that are smaller than 8 Gb can be processed **locally**, but when the data exceeds that limit, we need to start using **distributed systems**. On distributed systems, one machine (main node) controls how to distribute the data aross several machines (worker nodes); thus, we **scale** the processing power and become **fault-tolerant**: if a node/machine fails, the model/system still works distributed on the rest.

### Hadoop: HDFS & MapReduce

Apache **Hadoop** is often used in such distrubuted systems. Hadoop is a collection of open-source software utilities for computer networks that process massive amounts of data.
- Hadoop uses **HDFS: Hadoop Distributed File system**; these allow distributuíng large datasets across several machines
- As HDFS is used to distrubute data, **MapReduce** is used to distribute computation on HDFS data

More details on HDFS & MapReduce:
- HDFS distributes the data in blocks (of 128 MB by default) for parallelization, and replicates each block 3x; blocks are distributed for fault-tolerance
- MapReduce distributes a computation task to a distributed set of files
    - It has one **job tracker** (in the main node) and **task trackers** on the distributed nodes
    - Job trackers send code to run to the task trackers
    - Task trackers allocate CPU + RAM and monitor the tasks on the worker nodes
    
An alternative to MapReduce is **Spark**, a technology gaining popularity for Big Data.


### Spark

Apache Spark is an open source general pupose cluster computing framework, like an alternative for MapReduce, but which can be still used with Hadoop. It supports data stored in many formats:
- Cassandra
- AWS S3
- HDFS
- and more

Spark is 100x faster than MapReduce because it keeps the data in the RAM, whereas MapReduce writes the data to disk after eahc transformation.

Spark works with **Resilient Distributed Datasets (RDDs)**.
The conceptual functioning is as explained for Hadoop: we have a cluster manager node which controls worker nodes; all nodes are additionally interfaced by a driver program with a SparkContext.
However, the users interact usually with RDD obejcts.
RDDs have 4 main features:
- Distributed Collection of Data
- Fault-tolerant
- Parallel operations
- Ability to use many data sources

RDDs perform basically two types of operations:
- Transformations; common examples:
    - RDD.filter(): apply a function to each element and return elements that evaluate to true
    - RDD.map(): transform elements with a function and return same number of elements; like pd.apply()
        - Example: grab first letter from a list of names
    - RDD.flatMap(): transform each element (e.g., a tuple/array) into N elements; number of total elements is changed
        - Example: transform a corpus of text into a list of words
- Actions; common examples:
    - Collect: return all the elements of the RDD as an array
    - Count: Return number of elements in the RDD
    - First: Return first element in the RDD
    - Take: Return an array with the first n elements in the RDD

We have also Pair RDDs, which contain key-value pairs; these have additionally these operations, which are similar to pd.groupby():
- Reduce: aggregate RDD elements and return a single element
- ReduceByKey

The Spark ecosystem is expanding very fast; now, there are: Spark SQL, Spark DataFrames, MLlib, Spark Streaming, ...

**Important: We can use Spark on our local machine to learn (pnly on Ubuntu), but it only makes sense to use it on a cloud service, like AWS. If we plan to work locally, it makes more sense to work with pandas or SQL.**


### Amazon Web Services (AWS)

Create a new free account on AWS:
https://aws.amazon.com/free/

**IMPORTANT: It's free as long as we don't exceed the usage limits**

We create an Amazon EC2 instance on AWS: that is a virtual machin to which we connect from our local computer via SSH.

#### Configuration Steps:
- Log in
- Select in AWS Services: Services: Compute: EC2
- Launch/Create Instance
- Choose Amazon Machine Image = OS we want
- Choose Free Tier, Ubuntu
- Choose an instance type (free elegible): t2.micro; we see instance types vary in RAM: larger ones have 100+ Gb!
- Configure instance details
    - Make sure we have one unique instance
    - We can leave rest as default
- Add storage: default 8 Gb, leave it like that
- Add a Tag key-value: myspark-mymachine
- Create new security group
    - Type: default is SSH, we change it to All traffic, which opens all ports; **IMPORTANT: This is for educational purposes only, do not open all ports on production!**
    - Source: anywhere
- Review instance launch
- Launch
- Key pair (private key):
    - Create a new key pair: newspark
    - Download key pair: save it securely and don't loose it, otherwise we need to go through all the configuration steps again; the file is `newspark.pem` -> `~/keys/aws/newspark.pem`
- Launch instance(s); then, click on instance link and we go to the Amazon EC2 Dashboard

#### Amazon EC2 Console:
- We see the instances here
- Most important button: Actions > Instance State > Terminate; we can start instances again!

#### Connection Steps (from local computer):
- For windows: Putty -> google('ssh windows ec2')
    - Basically 2 Putty execuatbles are downloaded: Putty and PuttyGen; the latter is used to convert the downloaded private key into Putty's format
    - Putty is launched and we create a new session
        - SSH
        - username@dns (from Amazon EC2 dashboard)
        - Connection, SSH, Authorization: we select our converted ley
- For Linux/Mac: open AWS EC2 console and get the DNS of the virtual machine, and
```bash
    # go to folder with key
    cd ~/keys/aws
    # make key readable only, to avoid overwriting it (it's compulsory, otherwise doesn't work)
    chmod 400 newspark.pem
    # connect to virtual EC2 instance
    ssh -i ~/keys/aws/newspark.pem ubuntun@DNS
```
- Once connected per SSH, we need to set up our virtual machine: install stuff, etc.

#### PySpark Setup

Connect to the virtual machine.

https://medium.com/@josemarcialportilla/getting-spark-python-and-jupyter-notebook-running-on-amazon-ec2-dec599e1c297

```bash

# Connect to virtual EC2 instance
ssh -i ~/keys/aws/newspark.pem ubuntun@DNS
    
# Install Anaconda: Look in the link which is the latest version
wget http://repo.continuum.io/archive/Anaconda3-2020.11-Linux-x86_64.sh
bash Anaconda3-2020.11-Linux-x86_64.sh

# Set paths, initialize
eval "$(/home/ubuntu/anaconda3/bin/conda shell.bash hook)"
conda init

# Check we're using the Anaconda python
which python
which python3

# If not the Anaconda version:
source ~/.bashrc

# Configurate jupyter notebook
jupyter notebook --generate-config

# Create certifications
mkdir ~/certs
cd ~/certs
sudo openssl req -x509 -nodes -days 365 -newkey rsa:1024 -keyout mycert.pem -out mycert.pem
sudo su
chown ubuntu:ubuntu mycert.pem

# Edit Jupyter config file
cd ~/.jupyter/
vim jupyter_notebook_config.py

    c = get_config()

    # Notebook config this is where you saved your pem cert
    c.NotebookApp.certfile = u'/home/ubuntu/certs/mycert.pem' 

    # Run on all IP addresses of your instance
    c.NotebookApp.ip = '*'

    # Don't open browser by default
    c.NotebookApp.open_browser = False  

    # Fix port to 8888
    c.NotebookApp.port = 8888

# Start Jupyter Notebook
jupyter notebook

# Remove Anaconda installation file
cd ~
rm -fr Anaconda3-2020.11-Linux-x86_64.sh

## Test that Jupyter server is accessible from local computer

# Open on local machine (Firefox) the address/DNS of the virtual machine
# With port 8888 to access the Jupyter notebook instance
# If warning, Accept Risk and Continue
# Taken token from ubuntu Terminal
https://ec2-xx-xx-xxx-xxx.us-west-2.compute.amazonaws.com:8888

## Back in the Ubuntu/AWS Terminal

# Install Java
sudo apt-get update
sudo apt-get install default-jre

# Check it worked
java -version

# Install Scala
sudo apt-get install scala

# Check it worked
scala -version

# Optional: For specific versions of Scala
# wget http://www.scala-lang.org/files/archive/scala-2.11.8.deb
# sudo dpkg -i scala-2.11.8.deb

# Install pip
export PATH=$PATH:$HOME/anaconda3/bin
# echo export PATH=\$PATH:\$HOME/anaconda3/bin >> ~/.bashrc
conda install pip
which pip

# Install py4j (python-java connection)
pip install py4j

# Install Spark and Hadoop (browse the version we'd like first)
cd ~
wget http://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
sudo tar -zxvf spark-2.4.7-bin-hadoop2.7.tgz

# Add these lines to ~/.bashrc
export SPARK_HOME='/home/ubuntu/spark-2.4.7-bin-hadoop2.7'
export PATH=$SPARK_HOME:$PATH
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
source ~/.bashrc 

# Remove Spark installation files
rm -fr spark-2.4.7-bin-hadoop2.7.tgz

# I had a version conflict between Spark and Python and I had to downgrade python
# Look for available versions
conda search python
# Downgrade
conda install python=3.6.2

# Test Jupyter: close all open instances and start a new one
jupyter notebook

# Open in Firefox
https://ec2-xx-xx-xxx-xxx.us-west-2.compute.amazonaws.com:8888

# Start new notebook and type
from pyspark import SparkContext
sc = SparkContext()

```