<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Sections
* [Description](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Lab](#2)
  * [2.1 Check Files](#2.1)
* [3. Dataset Documentation](#3)
* [4. DataFrame Creation](#4)
* [5. DataFrame Transformation](#5)
  * [5.1 Column Functions](#5.1)
  * [5.2 DataFrame Aggregations](#5.2)
  * [5.3 DataFrame Joins](#5.3)
* [6. TearDown](#6)
  * [6.1 Stop Hadoop](#6.1)

<a id='0'></a>
## Description
<p>
<div>The goals for this lab are:</div>
<ul>    
    <li>Get familiar with Spark DataFrames API</li>
    <li>Apply some transformations using Spark DataFrames API</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop

Open a terminal and execute
```sh
hadoop-start.sh
```

<a id='1.2'></a>
### 1.2 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [None]:
import findspark
findspark.init()

I'm changing pandas max column width property to improve data displaying

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.3'></a>
### 1.3 Create SparkSession
By setting this environment variable we can include extra libraries in our Spark cluster

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = ' pyspark-shell'

The first thing always is to create the SparkSession

In [None]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
.appName("Pokemon - DataFrames - Lab 2.ipynb")
.config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
.getOrCreate())

<a id='2'></a>
## 2. Lab

<a id='2.1'></a>
### 2.1 Check  Files

In order to complete this lab you need to previosly complete **'Pokemons - RAW to STD - DataFrames'**.<br/>
Check you have the data ready in HDFS

http://localhost:50070/explorer.html#/datalake/std/pokemon/

In order to complete this lab you need to upload an additional pokemons datasests to HDFS.<br/>
Unzip pokemon-sightings.zip and upload the file in the folder /datalake/raw/pokemon/pokemon-sightings

http://localhost:50070/explorer.html#/datalake/raw/pokemon/pokemon-sightings

<a id='2.2'></a>
### 2.2 Install python dependencies
To visualize the data I'm going to use a python library called folium.
Open a terminal and execute the following commands:

pip3 install requests

pip3 install branca

pip3 install jinja2

pip3 install folium

<a id='3'></a>
## 3. Dataset Documentation

### Metadata

id: ID for each pokemon <br/>
name: Name of each pokemon<br/>
type_1: Each pokemon has a type, this determines weakness/resistance to attacks<br/>
type_2: Some pokemon are dual type and have 2<br/>
total: sum of all stats that come after this, a general guide to how strong a pokemon is<br/>
hp: hit points, or health, defines how much damage a pokemon can withstand before fainting<br/>
attack: the base modifier for normal attacks (eg. Scratch, Punch)<br/>
defense: the base damage resistance against normal attacks<br/>
sp_atk: special attack, the base modifier for special attacks (e.g. fire blast, bubble beam)<br/>
sp_def: the base damage resistance against special attacks<br/>
speed: determines which pokemon attacks first each round<br/>
generation: pokemon generation<br/>
legendary: determines if the pokemon is legendary or not<br/>

<a id='4'></a>
## 4. DataFrame Creation

The first step is to create the DataFrames

In [None]:
pokemons = spark.read.parquet("hdfs://localhost:9000/datalake/std/pokemon/pokemon-data/").cache()
pokemons.limit(2).toPandas()

In [None]:
sightings = spark.read.parquet("hdfs://localhost:9000/datalake/raw/pokemon/pokemon-sightings/").cache()
sightings.limit(2).toPandas()

In [None]:
images = spark.read.parquet("hdfs://localhost:9000/datalake/std/pokemon/pokemon-images/").cache()
images.limit(2).toPandas()

<a id='5'></a>
## 5. DataFrame Transformations

<a id='5.1'></a>
### 5.1 Column Manipulation

#### cast : Column function that changes the column type

In [None]:
import pyspark.sql.functions as F
pokemons.withColumn("speed",F.col("speed").cast("double")).printSchema()

#### isin : Column function that returns true if the value is in the collection
#### SQL equivalent :  IN operator
Find all legendary pokemons from generations 1 and 2

In [None]:
pokemons.where((F.col("generation").isin(1,2)) & (pokemons.legendary)).orderBy(pokemons.total.desc()).toPandas()

#### lit: Returns a Column from a literal

#### when: Returns a column expression when the condition matches
#### otherwise: Returns a column expression when no other condition matches
#### SQL equivalent:  CASE clause

Create a new column with emojis based on type_1

In [None]:
pokemons.withColumn("emoji",
                       F.when(pokemons.type_1=="Water",F.lit("💧"))\
                      .when(pokemons.type_1=="Fire",F.lit("🔥"))\
                      .when(pokemons.type_1=="Electric",F.lit("⚡️"))\
                      .when(pokemons.type_1=="Ice",F.lit("❄️"))\
                      .when(pokemons.type_1=="Grass",F.lit("🌿"))\
                      .when(pokemons.type_1=="Dragon",F.lit("🐉"))\
                      .when(pokemons.type_1=="Psychic",F.lit("🧠"))\
                      .when(pokemons.type_1=="Ghost",F.lit("👻"))\
                      .when(pokemons.type_1=="Bug",F.lit("🐛"))\
                      .when(pokemons.type_1=="Poison",F.lit("☠️"))\
                      .otherwise(F.lit(None))).toPandas()

Practicing some math functions

In [None]:
pokemons.select(F.monotonically_increasing_id(),
                F.abs("hp"),
                F.sqrt("hp"),
                F.pow("hp",2),
                F.cos("hp"),
                F.cosh("hp"),
                F.acos("hp"),
                F.sin("hp"),
                F.sinh("hp"),
                F.asin("hp"),
                F.tan("hp"),
                F.tanh("hp"),
                F.atan("hp"),
                F.round(F.sqrt("hp"),2),
                F.bround(F.sqrt("hp"),3)).toPandas()

#### udf: Returns a user defined function (UDF) from a python function
We can do the same thing by creating our custom function

In [None]:
@F.udf("string")
def emoji(type_1):
    d = {
        "Water": "💧",
        "Fire":"🔥",
        "Electric":"⚡️",
        "Ice":"❄️",
        "Grass":"🌱",
        "Dragon":"🐲",
        "Psychic":"🧠",
        "Ghost":"👻",
        "Bug":"🪲",
        "Poison":"☠️",
        "Fairy":"🧚",
        "Dark":"⚫️",
        "Fighting":"⚔️",
        "Flying":"🦅",
        "Rock":"🪨",
        "Ground":"🕳",
        "Steel":"⛓"
    }
    return d.get(type_1,"")


pokemons.withColumn("emoji",emoji(pokemons.type_1)).toPandas()

#### current_date: Returns a Column with current day
#### current_timestamp: Returns a Column with current day and time
#### date_add: Returns a Column adding days to another date column
Add a three columns today,tomorrow and now

In [None]:
pokemons.withColumn("today",F.current_date())\
            .withColumn("tomorrow",F.date_add(F.col("today"),1))\
            .withColumn("now",F.current_timestamp())\
            .limit(5).toPandas()

#### year: Returns a Column with the year value
#### month: Returns a Column with the month value
#### dayofmonth: Returns a Column with the day of the month value
#### dayofweek: Returns a Column with the day of the week value
#### dayofyear Returns a Column with the day of year value
#### hour: Returns a Column with the hour value
#### minute: Returns a Column with the minute value
#### second: Returns a Column with the second value

Practicing more date & time functions

In [None]:
pokemons.withColumn("now",F.current_timestamp())\
         .select(F.col("now"),\
                 F.year(F.col("now")),\
                 F.month(F.col("now")),\
                 F.dayofmonth(F.col("now")),\
                 F.dayofweek(F.col("now")),\
                 F.dayofyear(F.col("now")),\
                 F.hour(F.col("now")),\
                 F.minute(F.col("now")),\
                 F.second(F.col("now")))\
                .limit(5).toPandas()

<a id='5.2'></a>
### 5.2 DataFrame Aggregations

#### groupBy: Returns a new GroupedDataFrame
#### SQL equivalent: GROUP BY clause

Calculate basic stats per pokemon type

In [None]:
pokemons.groupBy("type_1")\
        .agg(
            F.count("*").alias("count"),
            F.max("hp").alias("max_hp"),
            F.round(F.avg("hp")).alias("avg_hp"),
            F.min("hp").alias("min_hp"))\
        .sort(F.col("avg_hp").desc())\
        .toPandas()

Let's find pokemons in Spain.<br/>
I'm going to filter all pokemon sightings to keep only the ones in Spain.<br/>
This url contains the bounding boxes for every country:<br/>
https://gist.github.com/graydon/11198540

In [None]:
@F.udf("boolean")
def in_spain(longitude,latitude):
    # Spain bounding box (aka bbox)
    return (longitude > -18.3936845) & (longitude < 4.5918885) & (latitude > 27.4335426) & (latitude  < 43.9933088) 
sightings_in_spain = (sightings
                .select("pokemonId",
                        F.col("location.coordinates").getItem(0).alias("longitude"),
                        F.col("location.coordinates").getItem(1).alias("latitude"))
                .where(in_spain(F.col("longitude"),F.col("latitude")))
                .cache())

In [None]:
sightings_in_spain.count()

Let's see how many different pokemons classes there are

In [None]:
pokemon_classes=sightings_in_spain.select("pokemonId").distinct()
pokemon_classes.count()

In [None]:
pokemon_classes.toPandas()

<a id='5.3'></a>
### 5.3 DataFrame Joins

#### join: Returns a new DataFrame joining two DataFrames
#### SQL equivalent: JOIN clause

Join pokemon_classes with pokemons and images DataFrames

In [None]:
pokemons_in_spain = (pokemon_classes.alias("c")
                                   .join(pokemons.alias("p"),F.col("c.pokemonId")==F.col("p.id"),"inner")
                                   .join(images.alias("i"),F.col("p.name")==F.col("i.name"),"inner")
                                   .select("p.id","p.name",F.col("i.content").alias("image"))  
                                   .cache())

In [None]:
pokemons_in_spain.limit(2).toPandas()

In [None]:
pokemons_in_spain.count()

It's seems that there are 5 missing pokemons missing (146 - 141).<br/>
Let's check what they are.

In [None]:
pokemon_classes.exceptAll(pokemons_in_spain.select("id")).toPandas()

It seems there are sightings without a pokemon id

In [None]:
sightings_in_spain.where(F.col("pokemonId").isNull()).count()

Let's remove this wrong sightings

In [None]:
sightings_in_spain = sightings_in_spain.where(F.col("pokemonId").isNotNull())

Now let's check why those for pokemons didn't join with the images

In [None]:
pokemons.where(F.col("id").isin(29,32,83,122)).toPandas()

In [None]:
images.where(F.col("name").like("Nidoran%") |
             F.col("name").like("Farfet%") |
             F.col("name").like("Mr%")).toPandas()

The names are slightly different. Let's fix it:

In [None]:
images = images.withColumn("name",F.when(F.col("name")=='Nidoran M',F.lit("Nidoran♂"))
                         .when(F.col("name")=='Nidoran F',F.lit("Nidoran♀"))
                         .when(F.col("name")=='Mr Mime',F.lit("Mr. Mime"))
                         .when(F.col("name")=='Farfetchd',F.lit("Farfetch'd"))
                         .otherwise(F.col("name")))         

Let's recreate pokemons_in_spain DataFrame

In [None]:
pokemons_in_spain = (pokemon_classes.alias("c")
                                   .join(pokemons.alias("p"),F.col("c.pokemonId")==F.col("p.id"),"inner")
                                   .join(images.alias("i"),F.col("p.name")==F.col("i.name"),"inner")
                                   .select("p.id","p.name",F.col("i.content").alias("image"))  
                                   .cache())

In [None]:
pokemons_in_spain.count()

I'm going to create a python dictionary with the pokemons data:

In [None]:
ps = pokemons_in_spain.select("id","name","image").collect()
pokemon_dict = {}
for p in ps:
    pokemon_dict[p.id]=[p.name,p.image]

I'm going to plot some sightings (5000)

In [None]:
rows = sightings_in_spain.select(F.col("pokemonId").alias("id"),"longitude","latitude").take(5000)

To visualize the data I'm going to use a python library called folium.<br/>

In [None]:
import base64
import folium
from folium import IFrame

m = folium.Map(location=[40.416775, -3.703790], zoom_start=14, tiles="Stamen Terrain")
for row in rows:
    encoded = base64.b64encode(pokemon_dict[row.id][1]).decode('UTF-8')
    html=f'<img src="data:image/png;base64,{encoded}">'
    icon=folium.DivIcon(html)
    popup = pokemon_dict[row.id][0]
    location=[row.latitude, row.longitude]
    folium.Marker(location=location,icon=icon,popup=popup).add_to(m)
m

<a id='6'></a>
## 6. Tear Down

Once we complete the the lab we can stop all the services

<a id='6.1'></a>
### 6.1 Stop Hadoop

Stops Hadoop
Open a terminal and execute
```sh
hadoop-stop.sh
```