In [None]:
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))


In [None]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

**Reading in multiple small files vs one large**

In [None]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_???.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

**Cluster Configs**
![rwconfig](images/rw-config.png)

In [3]:
import findspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession


def create_spark_session():
    findspark.init('/home/kbaafi/spark')
    spark = SparkSession \
        .builder \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [4]:
app_name = spark.conf.get("spark.app.name")
app_name

'pyspark-shell'

In [6]:
driver_tcp_port = spark.conf.get("spark.driver.port")
driver_tcp_port

'36943'

In [10]:
# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
num_partitions

'200'

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

# Joins

In [None]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, 
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()


In [None]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and aiports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()