# Apache Spark Fundamentals: Advanced Features

In this notebook we will learn some advanced functions to optimize the performance of Spark, to impute missing values or to create user-defined functions (UDFs).

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

### Create the SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

### Create the DataFrame

In [4]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"]) 

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

# Save as HIVE tables.
df.write.saveAsTable("hive_empdf", mode = "overwrite")
deptdf.write.saveAsTable("hive_deptdf", mode = "overwrite")

### BroadCast Join

The size of the broadcast table is 10 MB. However, we can change the threshold up to 8GB according to the official Spark 2.3 documentation.

* We can check the size of the transmission table as follows:

In [172]:
size = int(spark.conf.get("spark.sql.autoBroadcastJoinThreshold")) / (1024 * 1024)
print("Default size of broadcast table is {0} MB.".format(size))

Default size of broadcast table is 50.0 MB.


* We can set the size of the streaming table to say 50MB as follows:

In [173]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

In [174]:
# Considere que necesitamos unir 2 Dataframes.
# small_df: DataFrame pequeño que puede caber en la memoria y es más pequeño que el umbral especificado.
# big_df: DataFrame grande que debe unirse con DataFrame pequeño.

join_df = big_df.join(broadcast(small_df), big_df["id"] == small_df["id"])

### Caching

We can use the cache/persistence function to keep the data frame in memory. You can significantly improve the performance of your Spark application if we cache data that we need to use very frequently in our application.

In [175]:
df.cache()
df.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


When we use the cache function, it will use the storage tier as Memory_Only until Spark 2.0.2. Since Spark 2.1.x it is Memory_and_DISK.

However, if we need to specify the different levels of storage available, we can use the persist( ) method. For example, if we need to keep the data only in memory, we can use the following snippet.

In [176]:
from pyspark.storagelevel import StorageLevel

In [177]:
deptdf.persist(StorageLevel.MEMORY_ONLY)
deptdf.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


### Don't persist

It is also important to clear the cache of data when it is no longer needed.

In [178]:
df.unpersist()

DataFrame[id: bigint, name: string, dept: string, salary: bigint]

In [None]:
sqlContext.clearCache()

# SQL expressions

We can also use SQL expression for data manipulation. We have the function **expr** and also a variant of a select method like **selectExpr** for evaluating SQL expressions.

In [179]:
from pyspark.sql.functions import expr

# Intentemos categorizar el salario en Bajo, Medio y Alto según la categorización a continuación.

# 0-2000: salario_bajo
# 2001 - 5000: mid_salary
#> 5001: high_salary

cond = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(cond))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Using the selectExpr function

In [180]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### User Defined Functions (UDFs)

We often need to write the function based on our very specific requirement. Here we can take advantage of the udfs. We can write our own functions in a language like python and register the function as a udf, then we can use the function for DataFrame operations.

* Python function to find the salary_level for a given salary.

In [181]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

* Then register the "detSalary_Level" function as a UDF.

In [182]:
sal_level = udf(detSalary_Level, StringType())

* Apply function to determine the salary_level for a given salary.

In [183]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Working with NULL values

NULL values are always difficult to handle regardless of the Framework or language we use. Here in Spark we have few specific functions to deal with NULL values.

- **isNull()**

This function will help us find the null values for any given column. For example if we need to find the columns where the id columns contain the null values.

In [184]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|null|null|null|  7500|
|   9| III|null|  4500|
+----+----+----+------+



* **isNotNull()**

This function works in the opposite way to the isNull() function and will return all non-null values for a particular function.

In [185]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



* **fillna()**

This function will help us replace null values.

In [186]:
# Replace -1 where the salary is null.
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|null|null|INVALID|  7500|
|   9| III|INVALID|  4500|
|  10|null|  dept5|  2500|
+----+----+-------+------+



* **dropna()**

This function will help us remove rows with null values.

In [187]:
# Remove all rows which contains any null values.
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



In [None]:
# Elimina todas las filas que contienen todos los valores nulos.
newdf = df.dropna(how = "all")
newdf.show()

# Nota: valor predeterminado de "cómo" param es "any".

In [189]:
# Remove all rows where columns : dept is null.
newdf = df.dropna(subset = "dept")
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



## partitioning


Partitioning is a very important aspect of controlling the parallelism of your Spark application.

* Check number of partitions.

In [190]:
df.rdd.getNumPartitions()

4

* Increase the number of partitions. For example Increase partitions to 6

In [191]:
newdf = df.repartition(6)
newdf.rdd.getNumPartitions()

6

**Note: This is an expensive operation as it requires data shuffling between workers.**

* Decrease the number of partitions. For example decrease the partitions to 2.

In [192]:
newdf = df.coalesce(2)
newdf.rdd.getNumPartitions()

2

* By default, the number of partitions for Spark SQL is 200.
* But we can also set the number of partitions at the Spark application level. For example set to 500

In [193]:
# Set number of partitions as Spark Application.
spark.conf.set("spark.sql.shuffle.partitions", "500")

# Check the number of patitions.
num_part = spark.conf.get("spark.sql.shuffle.partitions")
print("No of Partitions : {0}".format(num_part))

No of Partitions : 500


# API Catalog

Spark Catalog is a user-facing API, which you can access via SparkSession.catalog.

* **listDatabases()**

It will return all databases along with their location on the file system.

In [194]:
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/home/jovyan/work/spark-warehouse')]

* **listTables()**

It will return all the tables for a given database along with information such as the table type (foreign/managed) and whether a particular table is temporary or permanent.
This includes all temporary views.

In [195]:
spark.catalog.listTables("default")

[Table(name='hive_deptdf', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='hive_empdf', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='deptdf', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='empdf', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

* **listColumns()**

It will return all columns from a particular table in DataBase. Also, it will return the data type, if the column is used in partitions or pools.

In [196]:
spark.catalog.listColumns("hive_empdf", "default")

[Column(name='id', description=None, dataType='bigint', nullable=True, isPartition=False, isBucket=False),
 Column(name='name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='dept', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='salary', description=None, dataType='bigint', nullable=True, isPartition=False, isBucket=False)]

* **listFunctions()**

It will return all the features available in Sparksession along with the information if it is temporary or not.

In [197]:
spark.catalog.listFunctions()

[Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True),
 Function(name='%', description=None, className='org.apache.spark.sql.catalyst.expressions.Remainder', isTemporary=True),
 Function(name='&', description=None, className='org.apache.spark.sql.catalyst.expressions.BitwiseAnd', isTemporary=True),
 Function(name='*', description=None, className='org.apache.spark.sql.catalyst.expressions.Multiply', isTemporary=True),
 Function(name='+', description=None, className='org.apache.spark.sql.catalyst.expressions.Add', isTemporary=True),
 Function(name='-', description=None, className='org.apache.spark.sql.catalyst.expressions.Subtract', isTemporary=True),
 Function(name='/', description=None, className='org.apache.spark.sql.catalyst.expressions.Divide', isTemporary=True),
 Function(name='<', description=None, className='org.apache.spark.sql.catalyst.expressions.LessThan', isTemporary=True),
 Function(name='<=', description=None, cl

* **currentDatabase()**

Get the current database.

In [198]:
spark.catalog.currentDatabase()

'default'

* **setCurrentDatabase()**

Set the current database

In [199]:

spark.catalog.setCurrentDatabase(<DB_Name>)

* **cacheTable()**

cache a particular table.


In [200]:
spark.catalog.cacheTable("default.hive_empdf")

* **isCached()**

Check whether the table is cached or not.

In [201]:
spark.catalog.isCached("default.hive_empdf")

True

* **uncacheTable()**

Un-cache of a particular table.

In [202]:
spark.catalog.uncacheTable("default.hive_empdf")

In [203]:
# Verify uncached table. Now you will see that it will return "False" which means table is not cached.
spark.catalog.isCached("default.hive_empdf")

False

* **clearCache()**

De-cache the entire table in the Spark session.

In [204]:
spark.catalog.clearCache()