# Prerrequisites

Installing Spark

---



In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip -q install findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [4]:
import findspark
findspark.init()

Starting Spark Session and print the version


---


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.2.0'

Creating tunnel</br>
**To Check the Spark UI, open the URL printed by running the above command : https://######/jobs/, /SQL/**


In [6]:
 from google.colab.output import eval_js
 print(eval_js("google.colab.kernel.proxyPort(4040)") + "jobs/")

https://2q74vdhtpqt-496ff2e9c6d22116-4040-colab.googleusercontent.com/jobs/


# Download Datasets

In [7]:
!mkdir -p /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/bank.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/vehicles.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/characters.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/netflix_titles.csv -P /dataset
!ls /dataset

bank.csv  characters.csv  netflix_titles.csv  vehicles.csv


# Reading Data with Spark SQL

---



## Example 1

In [8]:
!head /dataset/bank.csv

"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"
30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"
33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"no"
35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"no"
30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"no"
59;"blue-collar";"married";"secondary";"no";0;"yes";"no";"unknown";5;"may";226;1;-1;0;"unknown";"no"
35;"management";"single";"tertiary";"no";747;"no";"no";"cellular";23;"feb";141;2;176;3;"failure";"no"
36;"self-employed";"married";"tertiary";"no";307;"yes";"no";"cellular";14;"may";341;1;330;2;"other";"no"
39;"technician";"married";"secondary";"no";147;"yes";"no";"cellular";6;"may";151;2;-1;0;"unknown";"no"
41;"entrepreneur";"marrie

Converting a RDD to a DataFrame

In [9]:
from pyspark.sql.types import Row
from pyspark.sql.functions import *

bankText = spark.sparkContext.textFile("/dataset/bank.csv")

bank = bankText.map(lambda lineaCsv: lineaCsv.split(";"))\
.filter(lambda s: s[0] != "\"age\"") \
.map(lambda row: Row(int(row[0]), row[1].replace("\"", ""), row[2].replace("\"", ""), row[3].replace("\"", ""), row[5].replace("\"", ""))) \
.toDF(["age", "job", "marital", "education", "balance"]) \
.withColumn("age", col("age").cast("int"))

In [10]:
bank.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- balance: string (nullable = true)



In [11]:
bank.createOrReplaceTempView("bank")

Loading a **Google Colab extension** to show a table with filters

In [12]:
%load_ext google.colab.data_table

In [13]:

from pyspark.sql.functions import *

bank_grouped = bank\
.groupBy(bank.marital) \
.agg({"balance": "avg"}) \
.select("marital", col("avg(balance)").alias("balance_avg")) \
.orderBy(col("balance_avg").desc())\

bank_grouped.show()


+--------+------------------+
| marital|       balance_avg|
+--------+------------------+
| married| 1463.195566678584|
|  single|1460.4147157190635|
|divorced|1122.3901515151515|
+--------+------------------+



In [14]:
bank_grouped.toPandas()

Unnamed: 0,marital,balance_avg
0,married,1463.195567
1,single,1460.414716
2,divorced,1122.390152


In [15]:
spark.sql("SELECT marital, avg(balance) as balance_avg FROM bank group by marital").show()

+--------+------------------+
| marital|       balance_avg|
+--------+------------------+
|divorced|1122.3901515151515|
| married| 1463.195566678584|
|  single|1460.4147157190635|
+--------+------------------+



In [16]:
import plotly.express as px

fig = px.pie(bank_grouped.toPandas(), values='balance_avg', names='marital', title='By Marital')
fig.show()

## Example 2

Loading a CSV file as RDD, converting into a DataFrame, applying a specific schema using the method `createDataFrame`

In [17]:
from pyspark.sql.types import *

bankSchema = StructType([
    StructField("age", IntegerType(), False), 
    StructField("job", StringType(), False),
    StructField("marital", StringType(), False),
    StructField("education", StringType(), False),
    StructField("balance", IntegerType(), False)])

bankText = spark.sparkContext.textFile("/dataset/bank.csv")

bank = bankText\
.map(lambda s: s.split(";")).filter(lambda s: s[0] != "\"age\"")\
.map(lambda s:(int(s[0]), str(s[1]).replace("\"", ""), str(s[2]).replace("\"", ""), str(s[3]).replace("\"", ""), int(s[5]) ))

bankdf = spark.createDataFrame(bank, bankSchema)
bankdf.createOrReplaceTempView("bank2")

In [18]:
spark.sql("select * from bank2 limit 10").show()

+---+-------------+-------+---------+-------+
|age|          job|marital|education|balance|
+---+-------------+-------+---------+-------+
| 30|   unemployed|married|  primary|   1787|
| 33|     services|married|secondary|   4789|
| 35|   management| single| tertiary|   1350|
| 30|   management|married| tertiary|   1476|
| 59|  blue-collar|married|secondary|      0|
| 35|   management| single| tertiary|    747|
| 36|self-employed|married| tertiary|    307|
| 39|   technician|married|secondary|    147|
| 41| entrepreneur|married| tertiary|    221|
| 43|     services|married|  primary|    -88|
+---+-------------+-------+---------+-------+



## Exercise 1
Load file `vehicles.csv` to a DataFrame, showing the content and printing the schema.

Use this [documentation](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html) to read data in a DataFrame

---



In [19]:
df = spark.read.load("/dataset/vehicles.csv", format="csv", sep=",", inferSchema="True", header="True")
df.show()

+--------------------+--------------------+--------------------+---------------+------+----------------------+----+----------+--------------+---------------+--------------------+
|                name|               model|        manufacturer|cost_in_credits|length|max_atmosphering_speed|crew|passengers|cargo_capacity|    consumables|       vehicle_class|
+--------------------+--------------------+--------------------+---------------+------+----------------------+----+----------+--------------+---------------+--------------------+
|        Sand Crawler|      Digger Crawler|Corellia Mining C...|         150000|  36.8|                    30|  46|        30|         50000|       2 months|             wheeled|
|      T-16 skyhopper|      T-16 skyhopper|   Incom Corporation|          14500|  10.4|                  1200|   1|         1|            50|              0|       repulsorcraft|
|    X-34 landspeeder|    X-34 landspeeder|SoroSuub Corporation|          10550|   3.4|                  

Filter out the previous DafaFrame to get vehicles where the capicity is greater than 70

---



In [20]:
df.groupby("vehicle_class").agg({'passengers':'avg'}).show()


+--------------------+------------------+
|       vehicle_class|   avg(passengers)|
+--------------------+------------------+
|          droid tank|               4.0|
|space/planetary b...|               0.0|
|   droid starfighter|               0.0|
|      wheeled walker|             150.5|
|          sail barge|             500.0|
|       landing craft|             284.0|
|             speeder|0.3333333333333333|
|      assault walker|              40.0|
|         starfighter|               0.0|
|         air speeder|               0.0|
|           transport|               6.0|
|repulsorcraft car...|              16.0|
|             wheeled|              30.0|
|          airspeeder|               0.8|
|           submarine|               2.0|
|             gunship|              15.0|
|       repulsorcraft|17.285714285714285|
|fire suppression ...|              null|
|              walker|              16.5|
+--------------------+------------------+



# Spark SQL. Aggregation Functions

Useful Links:

http://spark.apache.org/docs/latest/api/python/


## Exercise 2

Using the DataFrame with all vehicles loaded in Exercise 1, get the number of passengers by vehicle class


---




In [21]:
#df.groupby("vehicle_class").agg({'passengers':'avg'}).show()

vehicleDF_all
  withcol

IndentationError: ignored

## Exercise 3

Load the file `characters.csv` getting the most common eye color among all characters

---

In [None]:
charactersDF = spark.read.load("/dataset/characters.csv", format="csv", sep=",", inferSchema="True", header="True")

(charactersDF
  .groupBy("eye_color")
  .agg({"eye_color":"count"})
  .orderBy(desc('count(eye_color)'))
  .limit(1)
  .show())

## Exercise 4

1. Load characters DataFrame into a temporary table
2. Using SQL, get the number of characters by gender


---



In [None]:
charactersDF.createOrReplaceTempView("characters_t")

sqlDF = spark.sql(
    "SELECT gender, count(distinct name) as number_of_characters "\
    "FROM characters_t group by gender order by number_of_characters desc")
sqlDF.show()

In [None]:
( charactersDF
  .withColumn("gender"
  when(col("gender") == "none", lit ("NA")) 
  .otherwise(charactersdf.gender))
  .registerTempTable("characters") )

## Exercise 5

Load `netflix_titles.csv` file in a DataFrame, printing the schema

---



In [22]:
!head /dataset/netflix_titles.csv

show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
81145628,Movie,Norm of the North: King Sized Adventure,"Richard Finn, Tim Maltby","Alan Marriott, Andrew Toth, Brian Dobson, Cole Howard, Jennifer Cameron, Jonathan Holmes, Lee Tockar, Lisa Durupt, Maya Kay, Michael Dobson","United States, India, South Korea, China","September 9, 2019",2019,TV-PG,90 min,"Children & Family Movies, Comedies","Before planning an awesome wedding for his grandfather, a polar bear king must take back a stolen artifact from an evil archaeologist first."
80117401,Movie,Jandino: Whatever it Takes,,Jandino Asporaat,United Kingdom,"September 9, 2016",2016,TV-MA,94 min,Stand-Up Comedy,"Jandino Asporaat riffs on the challenges of raising kids and serenades the audience with a rousing rendition of ""Sex on Fire"" in his comedy show."
70234439,TV Show,Transformers Prime,,"Peter Cullen, Sumalee Montano, Frank Welker, Jeffrey Combs, Kevin Michael Richardson, Tania Gun

In [36]:
netflix = spark.read.format("csv")\
.option("sep", ",")\
.option("inferSchema", "true")\
.option("header", "true")\
.load("/dataset/netflix_titles.csv")\
.filter("date_added is not null")\
.filter ("type == 'Movie'")

netflix.show()
netflix.createOrReplaceTempView("netflix_titles")


+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+--------------------+
| show_id| type|               title|            director|                cast|             country|       date_added|release_year|rating|duration|           listed_in|         description|
+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+--------------------+
|81145628|Movie|Norm of the North...|Richard Finn, Tim...|Alan Marriott, An...|United States, In...|September 9, 2019|        2019| TV-PG|  90 min|Children & Family...|Before planning a...|
|80117401|Movie|Jandino: Whatever...|                null|    Jandino Asporaat|      United Kingdom|September 9, 2016|        2016| TV-MA|  94 min|     Stand-Up Comedy|"Jandino Asporaat...|
|80125979|Movie|        #realityhigh|    Fernando 

+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+--------------------+
| show_id| type|               title|            director|                cast|             country|       date_added|release_year|rating|duration|           listed_in|         description|
+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+--------------------+
|81145628|Movie|Norm of the North...|Richard Finn, Tim...|Alan Marriott, An...|United States, In...|September 9, 2019|        2019| TV-PG|  90 min|Children & Family...|Before planning a...|
|80117401|Movie|Jandino: Whatever...|                null|    Jandino Asporaat|      United Kingdom|September 9, 2016|        2016| TV-MA|  94 min|     Stand-Up Comedy|"Jandino Asporaat...|
|80125979|Movie|        #realityhigh|    Fernando 

## Exercise 6

Get the year in which most films were added(No TV Shows). Use a UDF to get the year

---



In [37]:
def get_year(x):
    s = x[-4: ]
    return s

get_year=udf(get_year)

In [38]:
(netflix
.filter(netflix["type"]=="Movie")
.filter("date_added is not null")
.select(get_year(netflix["date_added"]))
.show())

+--------------------+
|get_year(date_added)|
+--------------------+
|                2019|
|                2016|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
|                2017|
+--------------------+
only showing top 20 rows



In [40]:
# Dataframe method

def getYear(x): return x[-4:]
getYear = udf(getYear, StringType())

netflix\
.withColumn("date_added", getYear(netflix.date_added))\
.where(netflix.type != "TV Show")\
.filter(netflix.date_added.isNotNull())\
.groupBy('date_added')\
.agg({'title': 'count'})\
.orderBy(desc("count(title)"))\
.limit(1)\
.show()

+----------+------------+
|date_added|count(title)|
+----------+------------+
|      2019|        1545|
+----------+------------+

