In [1]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
plt.style.use('dark_background')

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql import functions as F
from pyspark.sql.types import *

23/05/02 19:08:38 WARN Utils: Your hostname, rig resolves to a loopback address: 127.0.1.1; using 192.168.0.102 instead (on interface enp6s0)
23/05/02 19:08:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 19:08:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import *
voter_schema = StructType([
  # Define a StructField for each field
  StructField('DATE', DateType(), False),
  StructField('TITLE', StringType(), False),
  StructField('VOTER_NAME', StringType(), False)
])
voter_df = spark.read.format('csv').options(Header=True).option("dateFormat", "mm/dd/yyyy")\
    .load('DallasCouncilVoters.csv.gz', schema=voter_schema)

# Caching

## What is caching?
Caching in Spark:
* Stores DataFrames in memory or on disk
* Improves speed on later transformations / actions
* Reduces resource usage

## Disadvantages of caching
* Very large data sets may not fit in memory
* Local disk based caching may not be a performance improvement
* Cached objects may not be available

## Caching tips
* When developing Spark tasks:
* Cache only if you need it
* Try caching DataFrames at various points and determine if your performance improves
* Cache in memory and fast SSD / NVMe storage
* Cache to slow local disk if needed
* Use intermediate files (e.g. parquet)!
* Stop caching objects when finished

## Implementing caching
Call .cache() on the DataFrame before Action

In [3]:
from pyspark.sql.functions import *

In [4]:
voter_df.cache().count()

44625

In [5]:
voter_df = voter_df.withColumn('ID', monotonically_increasing_id())
voter_df = voter_df.cache()
voter_df.show()

+----------+-------------+-------------------+---+
|      DATE|        TITLE|         VOTER_NAME| ID|
+----------+-------------+-------------------+---+
|2017-01-08|Councilmember|  Jennifer S. Gates|  0|
|2017-01-08|Councilmember| Philip T. Kingston|  1|
|2017-01-08|        Mayor|Michael S. Rawlings|  2|
|2017-01-08|Councilmember|       Adam Medrano|  3|
|2017-01-08|Councilmember|       Casey Thomas|  4|
|2017-01-08|Councilmember|Carolyn King Arnold|  5|
|2017-01-08|Councilmember|       Scott Griggs|  6|
|2017-01-08|Councilmember|   B. Adam  McGough|  7|
|2017-01-08|Councilmember|       Lee Kleinman|  8|
|2017-01-08|Councilmember|      Sandy Greyson|  9|
|2017-01-08|Councilmember|  Jennifer S. Gates| 10|
|2017-01-08|Councilmember| Philip T. Kingston| 11|
|2017-01-08|        Mayor|Michael S. Rawlings| 12|
|2017-01-08|Councilmember|       Adam Medrano| 13|
|2017-01-08|Councilmember|       Casey Thomas| 14|
|2017-01-08|Councilmember|Carolyn King Arnold| 15|
|2017-01-08|Councilmember| Rick

## More cache operations
Check .is_cached to determine cache status

In [6]:
print(voter_df.is_cached)

True


Call .unpersist() when finished with DataFrame

In [7]:
voter_df.unpersist()
print(voter_df.is_cached)

False


## Exercises

### Caching a DataFrame

In [8]:
departures_df = spark.read.format('csv').options(Header=True).load('AA_DFW_????_Departures_Short.csv.gz')

In [9]:
import time
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

Counting 583718 rows took 1.282351 seconds
Counting 583718 rows again took 0.213948 seconds


### Removing a DataFrame from cache

In [10]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


# Improve import performance

## Spark clusters
Spark Clusters are made of two types of processes
* Driver process
* Worker processes

## Import performance
Important parameters:
* Number of objects (Files, Network locations, etc)
    * More objects better han larger ones
    * Can import via wildcard  
        `airport_df = spark.read.csv('airports-*.txt.gz')`
    * General size of objects
        * Spark performs better if objects are of similar size

## Schemas
A well-defined schema will drastically improve import performance
* Avoids reading the data multiple times
* Provides validation on import

## How to split objects
* Use OS utilities / scripts (split, cut, awk)  
        `split -l 10000 -d largefile chunk-`
* Use custom scripts
* Write out to Parquet

        df_csv = spark.read.csv('singlelargefile.csv')
        df_csv.write.parquet('data.parquet')
        df = spark.read.parquet('data.parquet')

## Exercises

### Saving multiples csv in a single csv

In [11]:
departures_df = spark.read.format('csv').options(Header=True).load('AA_DFW_????_Departures_Short.csv.gz')
departures_df.count()

583718

In [12]:
departures_df.coalesce(1).write.format("csv").option("header", "true").option("compression", "gzip").mode("overwrite").save("departures_full.txt.gz")

                                                                                

### Spliting dataframe in multiple files

In [13]:
departures_df.write.format("csv").option("header", "true").option("compression", "gzip").mode("overwrite").partitionBy().save("departures_split")

### File import performance

In [14]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz', header=True)
split_df = spark.read.csv('departures_split/*.csv.gz', header=True)

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

Total rows in full DataFrame:	583718
Time to run: 0.142767
Total rows in split DataFrame:	583718
Time to run: 0.080472


# Cluster configurations

## Configuration options
* Spark contains many configuration settings
* These can be modified to macht needs
* Reading configurion settings:

        spark.conf.get(<configuration name>)

* Writing configuration settins:

        spark.conf.set(<configuration name>)

## Cluster Types
Spark deployment options:
* Single node
* Standalone
* Managed
    * YARN
    * Mesos
    * Kubernetes 

## Driver
* Task assignment
* Result consolidation
* Shared data access

Tips:
* Driver node should have double the memory of the worker
* Fast local storage helpful

## Worker
* Runs actual tasks
* Ideally has all code, data, and resources for a given task

Recommendations:
* Depending on the task, more worker nodes is ofter better than larger workers
* Test to find the balance
* Fast local storage extremely useful

## Exercise

### Reading Spark configurations

In [15]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')
# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')
# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: pyspark-shell
Driver TCP port: 46629
Number of partitions: 200


### Writing Spark configurations

In [16]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()
# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)
# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures_split/', header=True).distinct()
# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

Partition count before change: 4
Partition count after change: 11


In [17]:
departures_df.count()

583718

# Performance improvements

## Explaining the Spark execution plan

In [18]:
voter_schema = StructType([
  # Define a StructField for each field
  StructField('DATE', DateType(), False),
  StructField('TITLE', StringType(), False),
  StructField('VOTER_NAME', StringType(), False)
])
voter_df = spark.read.format('csv').options(Header=True).option("dateFormat", "mm/dd/yyyy")\
    .load('DallasCouncilVoters.csv.gz', schema=voter_schema)

In [19]:
voter_df.select(voter_df['VOTER_NAME']).distinct().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[VOTER_NAME#725], functions=[])
   +- Exchange hashpartitioning(VOTER_NAME#725, 500), ENSURE_REQUIREMENTS, [plan_id=493]
      +- HashAggregate(keys=[VOTER_NAME#725], functions=[])
         +- InMemoryTableScan [VOTER_NAME#725]
               +- InMemoryRelation [DATE#723, TITLE#724, VOTER_NAME#725], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- FileScan csv [DATE#0,TITLE#1,VOTER_NAME#2] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/mauricio/code/big-data-with-pyspark/3-cleaning-data-with-py..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DATE:date,TITLE:string,VOTER_NAME:string>




## What is shuffling?
Shuffling referes to moving data around to various workers to complete a task.

Spark distributes data amongst the various nodes in the cluster. A side effect of this is what is known as shuffling. Shuffling is the moving of data fragments to various workers as required to complete certain tasks.

* Hides complexity from the user  
    The user doesn't have to know which nodes have what data)(the user doesn't have to know which nodes have what data
* Can be slow to complete the necessary transfers
    Especially if a few nodes require all the data
* Lowers overall throughput  
    As the workers must spend time waiting for the data to transfer. This limits the amount of available workers for the remaining tasks in the system.
* Is often necessary, but try to minimize

## How to limit shuffling?
* Limit use of .repartition(num_partitions)
    * Use `.coalesce(num_partitions)` instead
* Use care when calling `.join()`  
    Calling .join() indiscriminately can often cause shuffle operations, leading to increased cluster load & slower processing times. To avoid some of the shuffle operations when joining Spark DataFrames you can use the .broadcast().
* Use `.broadcast()`
* May not need to limit it

## Broadcasting
Broadcasting:
* Provides a copy of an object to each worker
* Prevents undue / excess communication between nodes
* Can drastically speed up .join() operations  
Use the `.broadcast(<DataFrame>)` method

```
from pyspark.sql.functions import broadcast
combined_df = df_1.join(broadcast(df_2))
```
> Note broadcasting can slow operations when using very small DataFrames or if you broadcast the larger DataFrame in a join. Spark will often optimize this for you, but as usual, run tests in your environment for best performance.

## Exercises

### Normal joins

In [20]:
flights_df = spark.read.format('csv').options(Header=True).load('AA_DFW_????_Departures_Short.csv.gz')
flights_df.show(3)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|         0005|                HNL|                          519|
|       01/01/2014|         0007|                OGG|                          505|
|       01/01/2014|         0035|                SLC|                          174|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 3 rows



In [21]:
airports_df = spark.read.format('csv').options(Header=True).load('airports.csv')
airports_df.show(3)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 3 rows



In [22]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["faa"] )

# Show the query plan
normal_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Destination Airport#795], [faa#840], Inner, BuildRight, false
   :- Filter isnotnull(Destination Airport#795)
   :  +- FileScan csv [Date (MM/DD/YYYY)#793,Flight Number#794,Destination Airport#795,Actual elapsed time (Minutes)#796] Batched: false, DataFilters: [isnotnull(Destination Airport#795)], Format: CSV, Location: InMemoryFileIndex(4 paths)[file:/home/mauricio/code/big-data-with-pyspark/3-cleaning-data-with-py..., PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=566]
      +- Filter isnotnull(faa#840)
         +- FileScan csv [faa#840,name#841,lat#842,lon#843,alt#844,tz#845,dst#846] Batched: false, DataFilters: [isnotnull(faa#840)], Format: CSV, Location: InMemoryFileIndex(

### Using broadcasting on Spark joins

A couple tips:

* Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
* On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
* If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.

In [23]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["faa"] )

# Show the query plan and compare against the original
broadcast_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Destination Airport#795], [faa#840], Inner, BuildRight, false
   :- Filter isnotnull(Destination Airport#795)
   :  +- FileScan csv [Date (MM/DD/YYYY)#793,Flight Number#794,Destination Airport#795,Actual elapsed time (Minutes)#796] Batched: false, DataFilters: [isnotnull(Destination Airport#795)], Format: CSV, Location: InMemoryFileIndex(4 paths)[file:/home/mauricio/code/big-data-with-pyspark/3-cleaning-data-with-py..., PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=589]
      +- Filter isnotnull(faa#840)
         +- FileScan csv [faa#840,name#841,lat#842,lon#843,alt#844,tz#845,dst#846] Batched: false, DataFilters: [isnotnull(faa#840)], Format: CSV, Location: InMemoryFileIndex(

### Comparing broadcast vs normal joins

In [24]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

Normal count:		581170	duration: 0.274576
Broadcast count:	581170	duration: 0.202525
