#### Improve data cleaning tasks by increasing performance or reducing resource requirements.

### Whatiscaching?

Caching in Spark:
- Stores Data Frames inmemory or on disk
- Improves speed on later transformations/actions
- Reduces resource usage

Disadvantages of caching
- Very large datasets may not fit in memory
- Local disk based caching may not be a performance improvement
- Cached objects may not be available


In [3]:
import os
import sys

os.environ['SPARK_HOME'] = "C:/Spark"
sys.path.append("C:/Spark/spark-3.1.2-bin-hadoop3.2/python/")
from pyspark import  SparkContext as sc# And then try to import SparkContext.
# Verify SparkContext
print(sc)

# Print Spark version
print(sc.version)
#import os
print(os.environ.get("SPARK_HOME"))

print(os.path.join(os.environ.get("SPARK_HOME"), './bin/spark-submit'))

#gateway = JavaGateway()

os.environ['SPARK_HOME']="C:/Spark/spark-3.1.2-bin-hadoop3.2"
os.environ['JAVA_HOME']="C:/Program Files/Java/jdk1.8.0_144"
sys.path.append("C:/Spark/spark-3.1.2-bin-hadoop3.2/python")
os.environ['HADOOP_HOME']="C:/Hadoop"


from pyspark import SparkContext
from pyspark import SparkConf

import pyspark # only run after findspark.init()
from pyspark.sql.functions import to_date, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Print spark
print(spark)
spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

<class 'pyspark.context.SparkContext'>
<property object at 0x000000DDA459B548>
C:/Spark
C:/Spark\./bin/spark-submit
<pyspark.sql.session.SparkSession object at 0x000000DDA58303C8>


In [4]:

processFile=('..\\PySpark')
fileName =(os.listdir(processFile))
file_path_total=[]
def file_path(fileName):
    #processFile=('..\\PySpark')
    #fileName =(os.listdir(processFile))
    for file in fileName:
        if ".csv" in file:
            file_path = "%s/%s" % (processFile, file)
            file_path_total.append(file_path)
    return file_path_total

In [65]:
from pyspark.sql import functions as F
departures_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2017_Departures_Short.csv.gz')

departures_df.limit(5).toPandas()

Unnamed: 0,Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes)
0,01/01/2017,5,HNL,537
1,01/01/2017,7,OGG,498
2,01/01/2017,37,SFO,241
3,01/01/2017,43,DTW,134
4,01/01/2017,51,STL,88


### Caching can improve performance when reusing DataFrames

- applied the caching transformation, it doesn't take effect until an action is run. 
- action instantiates the caching after the function completes. 
- second time, there is no need to recalculate anything so it returns almost immediately.

In [8]:
import time
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))


Counting 139358 rows took 14.410116 seconds
Counting 139358 rows again took 2.197774 seconds


In [9]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


In [67]:
# read multifile
split_df = spark.read.csv('AA_DFW_2*_Departures_Short.csv.gz', header =True)
df1= spark.read.csv('AA_DFW_2014_Departures_Short.csv.gz', header =True)
df2= spark.read.csv('AA_DFW_2015_Departures_Short.csv.gz', header =True)
split_df.show()
start_time_b = time.time()
print("Total rows in DataFrame 2014: \t %d" %(df1.count()+df2.count()))
print("Time to run: %f" % (time.time() - start_time_b))
start_time_b = time.time()
print("Total rows in all DataFrames (2014-2017):\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|         0005|                HNL|                          519|
|       01/01/2014|         0007|                OGG|                          505|
|       01/01/2014|         0035|                SLC|                          174|
|       01/01/2014|         0043|                DTW|                          153|
|       01/01/2014|         0052|                PIT|                          137|
|       01/01/2014|         0058|                SAN|                          174|
|       01/01/2014|         0060|                MIA|                          155|
|       01/01/2014|         0064|                JFK|                          185|
|       01/01/2014|         0090|                ORD|                       

### Cluster configurations

In [37]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: pyspark-shell
Driver TCP port: 56657
Number of partitions: 500


### Writing Spark configurations

In [38]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('AA_DFW_2014_Departures_Short.csv.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

Partition count before change: 200
Partition count after change: 500


### What is shuffling?
- Shufling refers to moving data around to various workers to complete a task 
- Hides complexity from the user 
- Can be slow to complete 
- Lowers overall throughput
- Is often necessary,but try to minimize


In [39]:
departures_df.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[_c0#1014, _c1#1015, _c2#1016, _c3#1017], functions=[])
+- Exchange hashpartitioning(_c0#1014, _c1#1015, _c2#1016, _c3#1017, 500), ENSURE_REQUIREMENTS, [id=#752]
   +- *(1) HashAggregate(keys=[_c0#1014, _c1#1015, _c2#1016, _c3#1017], functions=[])
      +- FileScan csv [_c0#1014,_c1#1015,_c2#1016,_c3#1017] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/lienphuong/Documents/GitHub/PySpark/AA_DFW_2014_Departures_Short..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string>




In [43]:
departures_df_1=departures_df.select(departures_df['_c2']).distinct()
departures_df_1.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[_c2#1016], functions=[])
+- Exchange hashpartitioning(_c2#1016, 500), ENSURE_REQUIREMENTS, [id=#811]
   +- *(2) HashAggregate(keys=[_c2#1016], functions=[])
      +- *(2) HashAggregate(keys=[_c0#1014, _c1#1015, _c2#1016, _c3#1017], functions=[])
         +- Exchange hashpartitioning(_c0#1014, _c1#1015, _c2#1016, _c3#1017, 500), ENSURE_REQUIREMENTS, [id=#806]
            +- *(1) HashAggregate(keys=[_c0#1014, _c1#1015, _c2#1016, _c3#1017], functions=[])
               +- FileScan csv [_c0#1014,_c1#1015,_c2#1016,_c3#1017] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/lienphuong/Documents/GitHub/PySpark/AA_DFW_2014_Departures_Short..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string>




In [44]:
flights_df= spark.read.csv("flights_small.csv", header =True)
flights_df.show(5)
airports_df= spark.read.csv("airports.csv", header =True)
airports_df.show(5)



+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

In [45]:
airports_df.count()

1397

In [54]:
from pyspark.sql.functions import *
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df,flights_df["dest"] == airports_df["faa"] )

# Show the query plan
normal_df.limit(5).toPandas()


Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,...,distance,hour,minute,faa,name,lat,lon,alt,tz,dst
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,...,954,6,58,LAX,Los Angeles Intl,33.942536,-118.408075,126,-8,A
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,...,2677,10,40,HNL,Honolulu Intl,21.318681,-157.922428,13,-10,N
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,...,679,14,43,SFO,San Francisco Intl,37.618972,-122.374889,13,-8,A
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,...,569,17,5,SJC,Norman Y Mineta San Jose Intl,37.3626,-121.929022,62,-8,A
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,...,937,7,54,BUR,Bob Hope,34.200667,-118.358667,778,-8,A


In [57]:
normal_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [dest#1077], [faa#1195], Inner, BuildRight, false
:- *(2) Filter isnotnull(dest#1077)
:  +- FileScan csv [year#1066,month#1067,day#1068,dep_time#1069,dep_delay#1070,arr_time#1071,arr_delay#1072,carrier#1073,tailnum#1074,flight#1075,origin#1076,dest#1077,air_time#1078,distance#1079,hour#1080,minute#1081] Batched: false, DataFilters: [isnotnull(dest#1077)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/lienphuong/Documents/GitHub/PySpark/flights_small.csv], PartitionFilters: [], PushedFilters: [IsNotNull(dest)], ReadSchema: struct<year:string,month:string,day:string,dep_time:string,dep_delay:string,arr_time:string,arr_d...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#1158]
   +- *(1) Filter isnotnull(faa#1195)
      +- FileScan csv [faa#1195,name#1196,lat#1197,lon#1198,alt#1199,tz#1200,dst#1201] Batched: false, DataFilters: [isnotnull(faa#1195)], Format: CSV, Location: InMemoryFileIndex[fi

In [62]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["dest"] == airports_df["faa"] )

broadcast_df.limit(5).toPandas()


Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,...,distance,hour,minute,faa,name,lat,lon,alt,tz,dst
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,...,954,6,58,LAX,Los Angeles Intl,33.942536,-118.408075,126,-8,A
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,...,2677,10,40,HNL,Honolulu Intl,21.318681,-157.922428,13,-10,N
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,...,679,14,43,SFO,San Francisco Intl,37.618972,-122.374889,13,-8,A
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,...,569,17,5,SJC,Norman Y Mineta San Jose Intl,37.3626,-121.929022,62,-8,A
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,...,937,7,54,BUR,Bob Hope,34.200667,-118.358667,778,-8,A


In [63]:
# Show the query plan and compare against the original
broadcast_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [dest#1077], [faa#1195], Inner, BuildRight, false
:- *(2) Filter isnotnull(dest#1077)
:  +- FileScan csv [year#1066,month#1067,day#1068,dep_time#1069,dep_delay#1070,arr_time#1071,arr_delay#1072,carrier#1073,tailnum#1074,flight#1075,origin#1076,dest#1077,air_time#1078,distance#1079,hour#1080,minute#1081] Batched: false, DataFilters: [isnotnull(dest#1077)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/lienphuong/Documents/GitHub/PySpark/flights_small.csv], PartitionFilters: [], PushedFilters: [IsNotNull(dest)], ReadSchema: struct<year:string,month:string,day:string,dep_time:string,dep_delay:string,arr_time:string,arr_d...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#1325]
   +- *(1) Filter isnotnull(faa#1195)
      +- FileScan csv [faa#1195,name#1196,lat#1197,lon#1198,alt#1199,tz#1200,dst#1201] Batched: false, DataFilters: [isnotnull(faa#1195)], Format: CSV, Location: InMemoryFileIndex[fi

In [64]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

Normal count:		10000	duration: 1.359908
Broadcast count:	10000	duration: 0.620395
