# Prerrequisites

Installing Spark


---



In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip -q install findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.2.0'

Creating tunnel</br>
**To Check the Spark UI, open the URL printed by running the above command : https://######/jobs/, /SQL/**


In [5]:
 from google.colab.output import eval_js
 print(eval_js("google.colab.kernel.proxyPort(4040)") + "jobs/")

https://xwkkhjduta-496ff2e9c6d22116-4040-colab.googleusercontent.com/jobs/


# Download Datasets

In [6]:
!mkdir -p /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/bank.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/vehicles.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/characters.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/planets.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/species.csv -P /dataset
!ls /dataset

bank.csv  characters.csv  planets.csv  species.csv  vehicles.csv


# Windows Partitioning

---



## Example 1

In [7]:
!head /dataset/bank.csv

"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"
30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"
33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"no"
35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"no"
30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"no"
59;"blue-collar";"married";"secondary";"no";0;"yes";"no";"unknown";5;"may";226;1;-1;0;"unknown";"no"
35;"management";"single";"tertiary";"no";747;"no";"no";"cellular";23;"feb";141;2;176;3;"failure";"no"
36;"self-employed";"married";"tertiary";"no";307;"yes";"no";"cellular";14;"may";341;1;330;2;"other";"no"
39;"technician";"married";"secondary";"no";147;"yes";"no";"cellular";6;"may";151;2;-1;0;"unknown";"no"
41;"entrepreneur";"marrie

Reading data from `bank.csv` file to a DataFrame

In [8]:
from pyspark.sql.functions import *

bank_df = spark.read.format("csv") \
  .option("sep", ";") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load("/dataset/bank.csv")

In [9]:
bank_df.show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| unknown| 

Get the balance of the two youngest people by job


In [10]:
from pyspark.sql.window import Window

byJob = Window.partitionBy("job").orderBy("age")

bank_df \
  .withColumn("new_column_job", row_number().over(byJob)) \
  .filter(col("new_column_job") <= 2) \
  .select("age", "job", "balance") \
  .orderBy("job", "age") \
  .show()

+---+-------------+-------+
|age|          job|balance|
+---+-------------+-------+
| 22|       admin.|   4111|
| 23|       admin.|      5|
| 23|  blue-collar|    817|
| 23|  blue-collar|   8627|
| 23| entrepreneur|      4|
| 25| entrepreneur|  16874|
| 26|    housemaid|    543|
| 26|    housemaid|   -759|
| 23|   management|    736|
| 24|   management|    172|
| 24|      retired|    366|
| 35|      retired|    285|
| 25|self-employed|    453|
| 26|self-employed|    211|
| 21|     services|    361|
| 21|     services|   1903|
| 19|      student|      0|
| 19|      student|    103|
| 22|   technician|    333|
| 23|   technician|    598|
+---+-------------+-------+
only showing top 20 rows



## Exercise 1

Using the dataframe built from `bank.csv`file, get the TOP 3 of maximum balance by marital
Obtén el Top 3 de máximos balances por estado civil


---




## Exercise 2



Load `vehicles.csv` file into a DataFrame

---

In [None]:
!head /dataset/vehicles.csv

For each vehicle, get the difference in price (`cost_in_credits`) for each product compared to the cheapest product in the same vehicle class


---



# Joins

## Exercise 3

1. Create dataframes for files `characters.csv` and `planets.csv`
2. Get the planet gravity for each character, selecting only the character name, planet name and gravity.


---




In [12]:
charactersDF = spark.read.format("csv")\
.option("sep", ",")\
.option("inferSchema", "true")\
.option("header", "true")\
.load("/dataset/characters.csv")

charactersDF.printSchema()

planetsDF = spark.read.format("csv")\
.option("sep", ";")\
.option("inferSchema", "true")\
.option("header", "true")\
.load("/dataset/planets.csv")

planetsDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- height: string (nullable = true)
 |-- mass: string (nullable = true)
 |-- hair_color: string (nullable = true)
 |-- skin_color: string (nullable = true)
 |-- eye_color: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- homeworld: string (nullable = true)
 |-- species: string (nullable = true)

root
 |-- name: string (nullable = true)
 |-- rotation_period: string (nullable = true)
 |-- orbital_period: string (nullable = true)
 |-- diameter: string (nullable = true)
 |-- climate: string (nullable = true)
 |-- gravity: string (nullable = true)
 |-- terrain: string (nullable = true)
 |-- surface_water: string (nullable = true)
 |-- population: string (nullable = true)



In [21]:
# Without columns names changed
charactersDF\
.join(broadcast(planetsDF))\
.select(charactersDF.name, planetsDF.name, planetsDF.gravity)\
.show()

+--------------+--------------+--------------------+
|          name|          name|             gravity|
+--------------+--------------+--------------------+
|Luke Skywalker|      Alderaan|          1 standard|
|Luke Skywalker|      Yavin IV|          1 standard|
|Luke Skywalker|          Hoth|        1.1 standard|
|Luke Skywalker|       Dagobah|                 N/A|
|Luke Skywalker|        Bespin|1.5 (surface), 1 ...|
|Luke Skywalker|         Endor|       0.85 standard|
|Luke Skywalker|         Naboo|          1 standard|
|Luke Skywalker|     Coruscant|          1 standard|
|Luke Skywalker|        Kamino|          1 standard|
|Luke Skywalker|      Geonosis|        0.9 standard|
|Luke Skywalker|        Utapau|          1 standard|
|Luke Skywalker|      Mustafar|          1 standard|
|Luke Skywalker|      Kashyyyk|          1 standard|
|Luke Skywalker|   Polis Massa|       0.56 standard|
|Luke Skywalker|       Mygeeto|          1 standard|
|Luke Skywalker|       Felucia|       0.75 sta

In [18]:
# Ghanging name of columns

charactersDF\
.join(broadcast(planetsDF.withColumnRenamed('name', 'planet')))\
.select(col("name"), col("planet"), col("gravity"))\
.show()

+--------------+--------------+--------------------+
|          name|        planet|             gravity|
+--------------+--------------+--------------------+
|Luke Skywalker|      Alderaan|          1 standard|
|Luke Skywalker|      Yavin IV|          1 standard|
|Luke Skywalker|          Hoth|        1.1 standard|
|Luke Skywalker|       Dagobah|                 N/A|
|Luke Skywalker|        Bespin|1.5 (surface), 1 ...|
|Luke Skywalker|         Endor|       0.85 standard|
|Luke Skywalker|         Naboo|          1 standard|
|Luke Skywalker|     Coruscant|          1 standard|
|Luke Skywalker|        Kamino|          1 standard|
|Luke Skywalker|      Geonosis|        0.9 standard|
|Luke Skywalker|        Utapau|          1 standard|
|Luke Skywalker|      Mustafar|          1 standard|
|Luke Skywalker|      Kashyyyk|          1 standard|
|Luke Skywalker|   Polis Massa|       0.56 standard|
|Luke Skywalker|       Mygeeto|          1 standard|
|Luke Skywalker|       Felucia|       0.75 sta

## Exercise 4

Check exercise 3. What join type are been used? Why?

---

After checking execution plan, execute the following instructions:

---

In [22]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", '0')

In [23]:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

'0'

Execute again the query of the exercise 3

---

In [24]:
# Exercise 3 query

charactersDF\
.join(broadcast(planetsDF.withColumnRenamed('name', 'planet')))\
.select(col("name"), col("planet"), col("gravity"))\
.show()

+--------------+--------------+--------------------+
|          name|        planet|             gravity|
+--------------+--------------+--------------------+
|Luke Skywalker|      Alderaan|          1 standard|
|Luke Skywalker|      Yavin IV|          1 standard|
|Luke Skywalker|          Hoth|        1.1 standard|
|Luke Skywalker|       Dagobah|                 N/A|
|Luke Skywalker|        Bespin|1.5 (surface), 1 ...|
|Luke Skywalker|         Endor|       0.85 standard|
|Luke Skywalker|         Naboo|          1 standard|
|Luke Skywalker|     Coruscant|          1 standard|
|Luke Skywalker|        Kamino|          1 standard|
|Luke Skywalker|      Geonosis|        0.9 standard|
|Luke Skywalker|        Utapau|          1 standard|
|Luke Skywalker|      Mustafar|          1 standard|
|Luke Skywalker|      Kashyyyk|          1 standard|
|Luke Skywalker|   Polis Massa|       0.56 standard|
|Luke Skywalker|       Mygeeto|          1 standard|
|Luke Skywalker|       Felucia|       0.75 sta

## Exercise 5

1. Create a DataFrame from `species.csv`.
2. Repartition the previous DataFrame to 100 partitions
3. Repartition `characters` DataFrame to 100 partitions

---



## Exercise 6

Get the specie classification for each character. Select only the character name and its classification<br>
Use DataFrames repartitioned previously


---



## Exercise 7

1. Execute the following statement over the DataFrame built in exercise 6
2. Check the difference in terms of rows distribution across all partitions

---



In [None]:
from pyspark.sql.functions import *

classDF \
  .withColumn("partitionId", spark_partition_id()) \
  .groupBy("partitionId") \
  .count() \
  .orderBy(col("count").desc()) \
  .show()