# Create Spark session

Open a Spark session available in our notebook as the `spark` object. In the background, YARN allocate CPU and RAM resources to create a driver and one or more executor JVMs.

In [1]:
import findspark
findspark.init()

# Create a spark-session (akin to what pyspark provides when it is started)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

Check the YARN UI: [http://slalomdsvm:8088/ui2/](http://slalomdsvm:8088/ui2/)

In [2]:
! yarn application -list

20/08/07 15:52:28 INFO client.RMProxy: Connecting to ResourceManager at slalomdsvm/10.0.2.15:8050
20/08/07 15:52:28 INFO client.AHSProxy: Connecting to Application History server at slalomdsvm/10.0.2.15:10200
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL
application_1596825752622_0004	       pyspark-shell	               SPARK	   vagrant	   default	           RUNNING	         UNDEFINED	            10%	             http://slalomdsvm:4040


# Checking Spark session options

In [3]:
sc = spark.sparkContext
sc.getConf().getAll()

[('spark.history.kerberos.keytab', 'none'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.driver.appUIAddress', 'http://slalomdsvm:4040'),
 ('spark.history.ui.port', '18081'),
 ('spark.driver.memory', '512M'),
 ('spark.driver.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.history.fs.cleaner.interval', '7d'),
 ('spark.shuffle.io.serverThreads', '128'),
 ('spark.yarn.historyServer.address', 'slalomdsvm:18081'),
 ('spark.sql.streaming.streamingQueryListeners', ''),
 ('spark.executor.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.sql.statistics.fallBackToHdfs', 'true'),
 ('spark.executorEnv.PYTHONPATH',
  '{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.app.id', 'application_1596825752622_0004'),
 ('spark.shuffle.file.buffer', '1m'),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistory

# Imports

In [4]:
import pandas as pd

%matplotlib inline

# Create and see dataframes

In [5]:
col_names = ['id', 'first_name', 'last_name']
rows = [
    (1, 'John', 'Doe'),
    (1, 'John', 'Doe'), 
    (1, 'John', None), 
    (2, 'Jane', 'Doe'),
    (3, 'Herbie', 'Hancock'),
    (4, 'Erin', 'brockovich'),        
]

df1 = spark.createDataFrame(rows, col_names)

In [6]:
col_names = ['id', 'number_sox']
rows = [
    (1, 24),
    (2, 30),
    (3, 29),
    (4, 40),        
]

df2 = spark.createDataFrame(rows, col_names)

# See the dataframes

In [7]:
df1.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



In [8]:
df2.printSchema()

root
 |-- id: long (nullable = true)
 |-- number_sox: long (nullable = true)



Show the DF:

  * Asking for `df1` only shows the reference to the object. This is beacuse of Spark's lazy evaluation: the dataframe has not been created yet (`createDataFrame()` is a transformation).
  * The dataframe will only be created when we apply an action to it like `show()`.
  * Note the difference in duration between the 2 cells.

In [9]:
df1

DataFrame[id: bigint, first_name: string, last_name: string]

In [10]:
df1.show()

+---+----------+----------+
| id|first_name| last_name|
+---+----------+----------+
|  1|      John|       Doe|
|  1|      John|       Doe|
|  1|      John|      null|
|  2|      Jane|       Doe|
|  3|    Herbie|   Hancock|
|  4|      Erin|brockovich|
+---+----------+----------+



In [11]:
df2

DataFrame[id: bigint, number_sox: bigint]

In [12]:
df2.show()

+---+----------+
| id|number_sox|
+---+----------+
|  1|        24|
|  2|        30|
|  3|        29|
|  4|        40|
+---+----------+



# Joining dataframes

In [13]:
df = df1.join(df2, 'id', how = 'inner')

df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  1|      John|       Doe|        24|
|  1|      John|      null|        24|
|  3|    Herbie|   Hancock|        29|
|  2|      Jane|       Doe|        30|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



# Keeping distinct values

In [14]:
df = df.distinct() \

df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  1|      John|      null|        24|
|  3|    Herbie|   Hancock|        29|
|  2|      Jane|       Doe|        30|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



# Removing rows containing `null` values

In [15]:
df = df.dropna()

df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  3|    Herbie|   Hancock|        29|
|  2|      Jane|       Doe|        30|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



# Selecting columns

Get a column object with:

In [16]:
df.id

Column<b'id'>

In [17]:
selected_df = df.select('id', 'first_name')

selected_df.show()

+---+----------+
| id|first_name|
+---+----------+
|  1|      John|
|  3|    Herbie|
|  2|      Jane|
|  4|      Erin|
+---+----------+



# Filtering rows

In [18]:
filtered_df = df.filter((df.number_sox <= 25) | (df.number_sox >= 35))

filtered_df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



# Randomizing and ordering rows

In [19]:
from pyspark.sql.functions import rand

randomized_df = df.orderBy(rand())
randomized_df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|brockovich|        40|
|  2|      Jane|       Doe|        30|
|  1|      John|       Doe|        24|
+---+----------+----------+----------+



In [20]:
randomized_df \
.orderBy('id') \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  2|      Jane|       Doe|        30|
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



In [21]:
randomized_df \
.orderBy('last_name', 'number_sox') \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  2|      Jane|       Doe|        30|
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



In [22]:
from pyspark.sql.functions import desc

randomized_df \
.orderBy('last_name', desc('number_sox')) \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  2|      Jane|       Doe|        30|
|  1|      John|       Doe|        24|
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|brockovich|        40|
+---+----------+----------+----------+



# User Defined Functions (UDFs) and Row-wise operations ("Lamda" functions)

UDFs can be defined to be re-used during row-wise operations (ex: Lambda functions in Pandas) on a dataframe.

In [23]:
# Define your function

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType


# Define the UDF using a "classic" function
def sox_pairs(x):
    return round(float(x)/2.0)
    
sox_pairs_udf = udf(lambda x: sox_pairs(x), IntegerType())

# OR 
# Define the UDF using a lambda function
sox_pairs_udf_l = udf(lambda x: round(float(x)/2.0), IntegerType())

In [24]:
# Apply the UDFs to the DF

df \
.withColumn("number_sox_pair", sox_pairs_udf("number_sox")) \
.show()

+---+----------+----------+----------+---------------+
| id|first_name| last_name|number_sox|number_sox_pair|
+---+----------+----------+----------+---------------+
|  1|      John|       Doe|        24|             12|
|  3|    Herbie|   Hancock|        29|             14|
|  2|      Jane|       Doe|        30|             15|
|  4|      Erin|brockovich|        40|             20|
+---+----------+----------+----------+---------------+



In [25]:
df_with_pairs = df \
.withColumn("number_sox_pair", sox_pairs_udf_l("number_sox"))

df_with_pairs.show()

+---+----------+----------+----------+---------------+
| id|first_name| last_name|number_sox|number_sox_pair|
+---+----------+----------+----------+---------------+
|  1|      John|       Doe|        24|             12|
|  3|    Herbie|   Hancock|        29|             14|
|  2|      Jane|       Doe|        30|             15|
|  4|      Erin|brockovich|        40|             20|
+---+----------+----------+----------+---------------+



# Aggregation

Note: The concept of UDF can be taken one step futher to be applicable to aggregation situation, these UDFs are called UDAFs (User Defined Aggregated Functions). Ex: if you want to apply an advanced algorithm to a dataframe in an aggregated fashion: you would need to create a UDAF which implements the algorithm but is also taking care of grouping. This is an expert topic and is not covered in this training.

In [26]:
from pyspark.sql.functions import sum

counts_df = df  \
.groupBy('last_name') \
.agg(sum('number_sox') \
.alias('number_sox_per_family'))

counts_df.show()

+----------+---------------------+
| last_name|number_sox_per_family|
+----------+---------------------+
|   Hancock|                   29|
|       Doe|                   54|
|brockovich|                   40|
+----------+---------------------+



# Exploding a "list" column into rows

If a column contains a list, it can be expanded into multiple rows, one row per item in the list.

In [27]:
col_names = ['id', 'list']
rows = [
    (1, ['A', 'B']),
    (2, ['C']),
    (3, ['D', 'D']),
    (4, ['E', 'F']),        
]

list_df = spark.createDataFrame(rows, col_names)
list_df.show()

+---+------+
| id|  list|
+---+------+
|  1|[A, B]|
|  2|   [C]|
|  3|[D, D]|
|  4|[E, F]|
+---+------+



In [28]:
from pyspark.sql.functions import explode

exploded_df = list_df.select('id', explode('list').alias('item'))
exploded_df.show()

+---+----+
| id|item|
+---+----+
|  1|   A|
|  1|   B|
|  2|   C|
|  3|   D|
|  3|   D|
|  4|   E|
|  4|   F|
+---+----+



# Collapsing multiple rows into a "list" column

Conversly, multiple rows can be collapsed into a list or a set with one row per list or set. Order is conserved for  lists, sets do have the concept of order so the original ordering information from the rows will be lost.

In [29]:
from pyspark.sql.functions import collect_list

exploded_df \
.groupBy('id') \
.agg(collect_list('item') \
.alias('list')) \
.show()

+---+------+
| id|  list|
+---+------+
|  1|[A, B]|
|  3|[D, D]|
|  2|   [C]|
|  4|[E, F]|
+---+------+



In [30]:
from pyspark.sql.functions import collect_set

exploded_df \
.groupBy('id') \
.agg(collect_set('item') \
.alias('set')) \
.show()

+---+------+
| id|   set|
+---+------+
|  1|[B, A]|
|  3|   [D]|
|  2|   [C]|
|  4|[F, E]|
+---+------+



# Loading data from a CSV file on HDFS into a Spark dataframe

In [31]:
csv_path = "/user/vagrant/data/earth-surface-temperature/csv/GlobalLandTemperaturesByMajorCity.csv"

temperature_df = spark \
.read \
.option("header", "true") \
.option("inferschema", "true") \
.option("mode", "DROPMALFORMED") \
.csv(csv_path)

temperature_df.show()

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1849-01-01 00:00:00|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-02-01 00:00:00|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-03-01 00:00:00|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-04-01 00:00:00|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-05-01 00:00:00|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-06-01 00:00:00|            24.844|                        1.402|Abidjan|Côte D'Ivoire|   5.63N|   

# Sampling rows

In [32]:
temperature_df.count()

239177

In [33]:
sampled_df = temperature_df.sample(fraction = 0.1, seed = 1234)
sampled_df.count()

23828

# Writing a Spark dataframe to a parquet file on HDFS

Parquet is a great format to persist tabular data. It performs especially well for dataframes which have columns with values repeating on contigous rows.

In [34]:
parquet_path = "/user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity"

In [35]:
! hdfs dfs -rm -R $parquet_path

20/08/07 15:53:29 INFO fs.TrashPolicyDefault: Moved: 'hdfs://slalomdsvm:8020/user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity' to trash at: hdfs://slalomdsvm:8020/user/vagrant/.Trash/Current/user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity1596840809338


In [36]:
# Default write, will write to multiple files (due to number reducers in Spark's internal architecture)
temperature_df \
.write \
.parquet(parquet_path)

In [37]:
! hdfs dfs -ls -R -h $parquet_path

-rw-r--r--   1 vagrant hdfs          0 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/_SUCCESS
-rw-r--r--   1 vagrant hdfs    854.8 K 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/part-00000-14911455-fc0f-4379-8b5a-a208d3121de6-c000.snappy.parquet
-rw-r--r--   1 vagrant hdfs    567.4 K 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/part-00001-14911455-fc0f-4379-8b5a-a208d3121de6-c000.snappy.parquet


To control the number of output files:

In [38]:
n_files = 1

temperature_df \
.coalesce(n_files) \
.write \
.mode('overwrite') \
.parquet(parquet_path)

In [39]:
! hdfs dfs -ls -R -h $parquet_path

-rw-r--r--   1 vagrant hdfs          0 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/_SUCCESS
-rw-r--r--   1 vagrant hdfs      1.2 M 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/part-00000-2a38e5fa-86ee-41ff-be6a-eec3e97e6d7a-c000.snappy.parquet


Parquet also provides a very convenient partitioning functionality, the data for each single value in the partition will be under its own directory on HDFS.
In-file indexing  is comping up but requires installation of additional libraries and is not ubiquitous yet.

In [40]:
temperature_df \
.orderBy('Country', 'City', 'dt') \
.write \
.partitionBy('Country') \
.mode('overwrite') \
.parquet(parquet_path)

In [41]:
! hdfs dfs -ls -R -h $parquet_path | head -20

drwxr-xr-x   - vagrant hdfs          0 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Afghanistan
-rw-r--r--   1 vagrant hdfs     18.4 K 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Afghanistan/part-00000-8ddc7980-029a-4092-887c-7630122c97e7.c000.snappy.parquet
-rw-r--r--   1 vagrant hdfs     17.8 K 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Afghanistan/part-00001-8ddc7980-029a-4092-887c-7630122c97e7.c000.snappy.parquet
drwxr-xr-x   - vagrant hdfs          0 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Angola
-rw-r--r--   1 vagrant hdfs      2.4 K 2020-08-07 15:53 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Angola/part-00001-8ddc7980-029a-4092-887c-7630122c97e7.c000.sn

Notes: In general, we want to create files with a size of a few times the HDFS block-size (default: 128MB). We want a few large files (YES: ≈100+MB -> ≈1GB), not many small files (NO: ≈1KB -> ≈10MB). 

# Collecting a Spark dataframe into a "regular" pandas dataframe

In [42]:
# Collect as a list of pyspark.sql.Rows
collected = df.collect()
collected

[Row(id=1, first_name='John', last_name='Doe', number_sox=24),
 Row(id=3, first_name='Herbie', last_name='Hancock', number_sox=29),
 Row(id=2, first_name='Jane', last_name='Doe', number_sox=30),
 Row(id=4, first_name='Erin', last_name='brockovich', number_sox=40)]

In [43]:
# Collect into a "classic" pandas dataframe
pandas_df = df.toPandas()
pandas_df

Unnamed: 0,id,first_name,last_name,number_sox
0,1,John,Doe,24
1,3,Herbie,Hancock,29
2,2,Jane,Doe,30
3,4,Erin,brockovich,40


# Closing the spak session

Close the `spark` session to destroy the release cluster CPU/RAM resources (completes YARN application and destroys executors, driver JVM).

In [44]:
spark.stop()

Check the YARN UI: [http://slalomdsvm:8088/ui2/](http://slalomdsvm:8088/ui2/)

In [46]:
! yarn application -list

20/08/07 15:59:54 INFO client.RMProxy: Connecting to ResourceManager at slalomdsvm/10.0.2.15:8050
20/08/07 15:59:55 INFO client.AHSProxy: Connecting to Application History server at slalomdsvm/10.0.2.15:10200
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):0
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL
