# Caching



First we need to start a SparkSession:

In [None]:
from pyspark.sql import SparkSession

Then start the SparkSession

In [None]:
# May take a little while on a local computer
spark = SparkSession.builder.appName("Basics").getOrCreate()


You will first need to get the data from a file (or connect to a large distributed file like HDFS, we'll talk about this later once we move to larger datasets on AWS EC2).

In [None]:
import pandas as pd
df = spark.createDataFrame(pd.read_csv("https://storage.googleapis.com/neurals/data/people.csv",header='infer'))
df.show()

#### Showing the data

In [None]:
# Note how data is missing!
df.show()

In [None]:
df.printSchema()

## Performance


You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

The DataFrame departures_df is defined, but no actions have been performed.

### Instructions

- Cache the unique rows in the departures_df DataFrame.
- Perform a count query on departures_df, noting how long the operation takes.
- Count the rows again, noting the variance in time of a cached DataFrame.

## DataFrame from cache

You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.

The DataFrame departures_df is defined and has already been cached for you.

## Instructions
- Check the caching status on the departures_df DataFrame.
- Remove the departures_df DataFrame from the cache.
- Validate the caching status again.

In [None]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

## File import performance

You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

You have two types of files available: departures_full.txt.gz and departures_xxx.txt.gz where xxx is 000 - 013. The same number of rows is split between each file.

### Instructions
- Import the departures_full.txt.gz file and the departures_xxx.txt.gz files into separate DataFrames.
- Run a count on each DataFrame and compare the run times.


In [None]:
import pandas as pd
full_df = spark.createDataFrame(pd.read_csv("https://storage.googleapis.com/neurals/data/AA_DFW_2017_Departures_Short.csv.gz",header='infer'))
full_df.show()


# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))



## Reading Spark configurations

You've recently configured a cluster via a cloud provider. Your only access is via the command shell or your python code. You'd like to verify some Spark settings to validate the configuration of the cluster.

The spark object is available for use.

### Instructions
Check the name of the Spark application instance ('spark.app.name').
Determine the TCP port the driver runs on ('spark.driver.port').
Determine how many partitions are configured for joins.
Show the results.

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

## Writing Spark configurations

Now that you've reviewed some of the Spark configurations on your cluster, you want to modify some of the settings to tune Spark to your needs. You'll import some data to review that your changes have affected the cluster.

The spark configuration is initially set to the default value of 200 partitions.

The spark object is available for use. A file named departures.txt.gz is available for import. An initial DataFrame containing the distinct rows from departures.txt.gz is available as departures_df.

### Instructions
- Store the number of partitions in departures_df in the variable before.
- Change the spark.sql.shuffle.partitions configuration to 500 partitions.
- Recreate the departures_df DataFrame reading the distinct rows from the departures file.
- Print the number of partitions from before and after the configuration change.

In [None]:
departures_df = full_df

# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file


# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

## Caching a DataFrame

You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

The DataFrame departures_df is defined, but no actions have been performed.



In [None]:
departures_df = full_df
departures_df.persist()

In [None]:
# Determine if departures_df is in the cache

departures_df = full_df
departures_df.persist()
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

## Reading Spark configurations

You've recently configured a cluster via a cloud provider. Your only access is via the command shell or your python code. You'd like to verify some Spark settings to validate the configuration of the cluster.

The spark object is available for use.

### Instructions
Check the name of the Spark application instance ('spark.app.name').
Determine the TCP port the driver runs on ('spark.driver.port').
Determine how many partitions are configured for joins.
Show the results.

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)