# DataFrame details

## What is Data Cleaning?
Preparing raw data for use in data processing pipelines.

Possible tasks in data cleaning: 
* Reformatting or replacing text 
* Performing calculations
* Removing garbage or incomplete data


## Defining a schema
Creating a defined schema helps with data quality and import performance. As mentioned during the lesson, we'll create a simple schema to read in the following columns:
* Name
* Age
* City

The `Name` and `City` columns are StringType() and the `Age` column is an `IntegerType()`.

* Import `*` from the `pyspark.sql.types` library.
* Define a new schema using the `StructType` method.
* Define a `StructField` for `name`, `age`, and `city`. Each field should correspond to the correct datatype and not be nullable.

In [None]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

## Immutability
Unlike typical Python variables, Spark Data Frames are immutable. While not strictly required, immutability is often a component of functional programming. We won't go into everything that implies here, but understand that Spark is designed to use immutable objects. Practically, this means Spark Data Frames are defined once and are not modifiable after initialization. If the variable name is reused, the original data is removed (assuming it's not in use elsewhere) and the variable name is reassigned to the new data. While this seems inefficient, it actually allows Spark to share data between all cluster components. It can do so without worry about concurrent data objects.

## Lazy Processing
You may be wondering how Spark does this so quickly, especially on large data sets. Spark can do this because of something called lazy processing. Lazy processing in Spark is the idea that very little actually happens until an action is performed. In our previous example, we read a CSV file, added a new column, and deleted another. The trick is that no data was actually read / added / modified, we only updated the instructions (aka, Transformations) for what we wanted Spark to do. This functionality allows Spark to perform the most efficient set of operations to get the desired result. The code example is the same as the previous slide, but with the added count() method call. This classifies as an action in Spark and will process all the transformation operations.

## Using lazy processing
Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to Spark not performing any transformations until an action is requested.

For this exercise, we'll be defining a Data Frame (`aa_dfw_df`) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

* Load the Data Frame.
* Add the transformation for `F.lower()` to the `Destination Airport` column.
* Drop the `Destination Airport` column from the Data Frame `aa_dfw_df`. Note the time for these operations to complete.
* Show the Data Frame, noting the time difference for this action to complete.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [7]:
import pyspark.sql.functions as F

In [8]:
import pandas as pd
aa_dfw_df = pd.read_csv("AA_DFW_2017_Departures_Short.csv")
aa_dfw_df = spark.createDataFrame(aa_dfw_df)

In [None]:
# Load the CSV file
# aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2017_Departures_Short.csv.gz')

In [9]:
aa_dfw_df.show()

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|            5|                HNL|                          537|
|       01/01/2017|            7|                OGG|                          498|
|       01/01/2017|           37|                SFO|                          241|
|       01/01/2017|           43|                DTW|                          134|
|       01/01/2017|           51|                STL|                           88|
|       01/01/2017|           60|                MIA|                          149|
|       01/01/2017|           71|                LAX|                          203|
|       01/01/2017|           74|                MEM|                           76|
|       01/01/2017|           81|                DEN|                       

In [10]:
# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|            5|                          537|    hnl|
|       01/01/2017|            7|                          498|    ogg|
|       01/01/2017|           37|                          241|    sfo|
|       01/01/2017|           43|                          134|    dtw|
|       01/01/2017|           51|                           88|    stl|
|       01/01/2017|           60|                          149|    mia|
|       01/01/2017|           71|                          203|    lax|
|       01/01/2017|           74|                           76|    mem|
|       01/01/2017|           81|                          123|    den|
|       01/01/2017|           89|                          161|    slc|
|       01/01/2017|           96|                           84| 

## 1. Understanding Parquet
Welcome back! As we've seen, Spark can read in text and CSV files. While this gives us access to many data sources, it's not always the most convenient format to work with. Let's take a look at a few problems with CSV files.

## 2. Difficulties with CSV files
Some common issues with CSV files include: The schema is not defined: there are no data types included, nor column names (beyond a header row). Using content containing a comma (or another delimiter) requires escaping. Using the escape character within content requires even further escaping. The available encoding formats are limited depending on the language used.

## 3. Spark and CSV files
In addition to the issues with CSV files in general, Spark has some specific problems processing CSV data. CSV files are quite slow to import and parse. The files cannot be shared between workers during the import process. If no schema is defined, all data must be read before a schema can be inferred. Spark has feature known as predicate pushdown. Basically, this is the idea of ordering tasks to do the least amount of work. Filtering data prior to processing is one of the primary optimizations of predicate pushdown. This drastically reduces the amount of information that must be processed in large data sets. Unfortunately, you cannot filter the CSV data via predicate pushdown. Finally, Spark processes are often multi-step and may utilize an intermediate file representation. These representations allow data to be used later without regenerating the data from source. Using CSV would instead require a significant amount of extra work defining schemas, encoding formats, etc.

## 4. The Parquet Format
Parquet is a compressed columnar data format developed for use in any Hadoop based system. This includes Spark, Hadoop, Apache Impala, and so forth. The Parquet format is structured with data accessible in chunks, allowing efficient read / write operations without processing the entire file. This structured format supports Spark's predicate pushdown functionality, providing significant performance improvement. Finally, Parquet files automatically include schema information and handle data encoding. This is perfect for intermediary or on-disk representation of processed data. Note that Parquet files are a binary file format and can only be used with the proper tools. This is in contrast to CSV files which can be edited with any text editor.

## 5. Working with Parquet
Interacting with Parquet files is very straightforward. To read a parquet file into a Data Frame, you have two options. The first is using the `spark.read.format` method we've seen previously. 

1) `df=spark.read.format('parquet').load('filename.parquet')`<br>
2) `df=spark.read.parquet('filename.parquet')` 

Typically, the shortcut version is the easiest to use but you can use them interchangeably. Writing parquet files is similar, using either:

1) `df.write.format('parquet').save('filename.parquet')` or <br>
2) `df.write.parquet('filename.parquet')` 

The long-form versions of each permit extra option flags, such as when overwriting an existing parquet file.

## 6. Parquet and SQL
Parquet files have various uses within Spark. We've discussed using them as an intermediate data format, but they also are perfect for performing SQL operations. To perform a SQL query against a Parquet file, we first need to create a Data Frame via the spark.read.parquet method. Once we have the Data Frame, we can use the `createOrReplaceTempView()` method to add an alias of the Parquet data as a SQL table. Finally, we run our query using normal SQL syntax and the spark.sql method. In this case, we're looking for all flights with a duration under 100 minutes. Because we're using Parquet as the backing store, we get all the performance benefits we've discussed previously (primarily defined schemas and the available use of predicate pushdown).

## Saving a DataFrame in Parquet format
When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. The `Parquet` format is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

In this exercise, we're going to practice creating a new Parquet file and then process some data from it.

In [11]:
# df1 = spark.read.format('csv').options(Header=True).load('AA_DFW_2017_Departures_Short.csv.gz')
df1 = pd.read_csv("AA_DFW_2017_Departures_Short.csv")
df1 = spark.createDataFrame(df1)

In [18]:
# df2 = spark.read.format('csv').options(Header=True).load('AA_DFW_2016_Departures_Short.csv.gz')
df2 = pd.read_csv("AA_DFW_2016_Departures_Short.csv")
df2 = spark.createDataFrame(df2)

In [46]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

df1 Count: 139358
df2 Count: 140604


In [16]:
df1.show()

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|            5|                HNL|                          537|
|       01/01/2017|            7|                OGG|                          498|
|       01/01/2017|           37|                SFO|                          241|
|       01/01/2017|           43|                DTW|                          134|
|       01/01/2017|           51|                STL|                           88|
|       01/01/2017|           60|                MIA|                          149|
|       01/01/2017|           71|                LAX|                          203|
|       01/01/2017|           74|                MEM|                           76|
|       01/01/2017|           81|                DEN|                       

In [20]:
df2.show()

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2016|            5|                HNL|                          529|
|       01/01/2016|            7|                OGG|                          512|
|       01/01/2016|           25|                PHL|                          161|
|       01/01/2016|           37|                SFO|                          259|
|       01/01/2016|           43|                DTW|                          157|
|       01/01/2016|           60|                MIA|                          144|
|       01/01/2016|           71|                LAS|                          165|
|       01/01/2016|           79|                SLC|                          153|
|       01/01/2016|           81|                TUS|                       

In [31]:
df3.show()

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|            5|                HNL|                          537|
|       01/01/2017|            7|                OGG|                          498|
|       01/01/2017|           37|                SFO|                          241|
|       01/01/2017|           43|                DTW|                          134|
|       01/01/2017|           51|                STL|                           88|
|       01/01/2017|           60|                MIA|                          149|
|       01/01/2017|           71|                LAX|                          203|
|       01/01/2017|           74|                MEM|                           76|
|       01/01/2017|           81|                DEN|                       

In [47]:
df3 = df3.withColumnRenamed("Date (MM/DD/YYYY)", "date")
df3 = df3.withColumnRenamed("Flight Number", "flight_number")
df3 = df3.withColumnRenamed("Destination Airport", "destination_airport")
df3 = df3.withColumnRenamed("Actual elapsed time (Minutes)", "flight_duration")

In [48]:
# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

279962


## SQL and Parquet
Parquet files are perfect as a backing data store for SQL queries in Spark. While it is possible to run the same queries directly via Spark's Python functions, sometimes it's easier to run SQL queries alongside the Python options.

For this example, we're going to read in the Parquet file we created in the last exercise and register it as a SQL table. Once registered, we'll run a quick query against the table (aka, the Parquet file).

In [49]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet("AA_DFW_ALL.parquet")

In [50]:
flights_df.show()

+----------+-------------+-------------------+---------------+
|      date|flight_number|destination_airport|flight_duration|
+----------+-------------+-------------------+---------------+
|09/30/2017|         2214|                ABQ|            109|
|09/30/2017|         2215|                DEN|            113|
|09/30/2017|         2222|                AUS|             49|
|09/30/2017|         2226|                DSM|            104|
|09/30/2017|         2228|                SMF|            241|
|09/30/2017|         2236|                TUL|             59|
|09/30/2017|         2248|                BWI|            163|
|09/30/2017|         2250|                LAX|            196|
|09/30/2017|         2252|                SFO|            223|
|09/30/2017|         2258|                IAH|             52|
|09/30/2017|         2259|                BNA|             96|
|09/30/2017|         2260|                PSP|            180|
|09/30/2017|         2262|                LAS|         

In [51]:
# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

The average flight time is: 151


# Manipulating DataFrames in the real world


## Filtering

In [None]:
# Return rows where name starts with "M" 
voter_df.filter(voter_df.name.like('M%'))

# Return name and position 
onlyvoters = voter_df.select('name', 'position')

## Common DataFrame transformations

* Filter/where

In [None]:
voter_df.filter(voter_df.date > '1/1/2019') # or voter_df.where(...)

* Select

In [None]:
voter_df.select(voter_df.name)

* withColumn

In [None]:
voter_df.withColumn('year', voter_df.date.year)

* drop

In [None]:
voter_df.drop('unused_column')

## Filtering data

In [None]:
voter_df.filter(voter_df['name'].isNotNull())
voter_df.filter(voter_df.date.year > 1800)
voter_df.where(voter_df['_c0'].contains('VOTE'))
voter_df.where(~ voter_df._c1.isNull())

## Column string transformations

* Contained in pyspark.sql.functions

In [None]:
import pyspark.sql.functions as F

* Applied per column as transformation

In [None]:
voter_df.withColumn('upper', F.upper('name'))

* Can create intermediary columns

In [None]:
voter_df.withColumn('splits', F.split('name', ' '))

* Can cast to other types

In [None]:
voter_df.withColumn('year', voter_df['_c4'].cast(IntegerType()))

## ArrayType() column functions

`.size(<column>)` - returns length of arrayType() column

`.getItem(<index>)` - used to retrieve a specific item at index of list column

## Filtering column content with Python
You've looked at using various operations on DataFrame columns - now you can modify a real dataset. The DataFrame `voter_df` contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter. Your manager has asked you to clean this data so it can later be integrated into some desired reports. The primary task is to remove any null entries or odd characters and return a specific set of voters where you can validate their information.

This is often one of the first steps in data cleaning - removing anything that is obviously outside the format. For this dataset, make sure to look at the original data and see what looks out of place for the `VOTER_NAME` column.

In [96]:
# voter_df = spark.read.format('csv').options(Header=True).load('DallasCouncilVoters.csv.gz')
voter_df = pd.read_csv("DallasCouncilVoters.csv")
voter_df = spark.createDataFrame(voter_df)

In [98]:
voter_df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

In [99]:
# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                    |
+------------------------------------------------------------------------------------------------------------------------------+
|  the  final   2018 Assessment Plan and the 2018 Assessment  Roll  (to  be  kept  on  file  with  the  City  Secretary)       |
|Mark  Clayton                                                                                                                 |
|Omar Narvaez                                                                                                                  |
|Dwaine R. Caraway                                                                                                             |
|011018__42                                                                                      

If we wanted to return only the Name and State fields for any ID greater than 3000, which code snippet meets these requirements?

`users_df.filter('ID > 3000').select("Name", "State")`

## Modifying DataFrame columns
Previously, you filtered out any rows that didn't conform to something generally resembling a name. Now based on your earlier work, your manager has asked you to create two new columns - `first_name` and `last_name`. She asks you to split the `VOTER_NAME` column into words on any space character. You'll treat the last word as the `last_name`, and all other words as the `first_name`. You'll be using some new functions in this exercise including `.split()`, `.size()`, and `.getItem()`. The `.getItem(index)` takes an integer value to return the appropriately numbered item in the column. The functions `.split()` and `.size()` are in the `pyspark.sql.functions` library.

In [100]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn("splits", F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn("first_name", voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn("last_name", voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

+----------+-------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|
+----------+-------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|       Lee| Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|     Sandy|  Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|  

## Conditional DataFrame column operations


Conditional clauses:
* `.when()`
* `.otherwise()`

### Example

`.when(<if condition>, <then x>)`

`df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult"))`

![title](when_clause_example.png)

`df.select(df.Name, df.Age,
           .when(df.Age >= 18, "Adult")
           .when(df.Age < 18, "Minor"))`

`df.select(df.Name, df.Age,
           .when(df.Age >= 18, "Adult")
           .otherwise("Minor"))`

![title](when_clause_example_2.png)

## when() example
The `when()` clause lets you conditionally modify a Data Frame based on its content. You'll want to modify our `voter_df` DataFrame to add a random number to any voting member that is defined as a "Councilmember".

In [101]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE=='Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|   0.674547539110537|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.37137244547814596|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                null|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano| 0.06825943955148639|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|  0.6522310214396713|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|0.040625990148935864|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|  0.5519885719191457|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|  0.6052389865961758|
|02/08/2017|Councilme

## When / Otherwise
This requirement is similar to the last, but now you want to add multiple values based on the voter's position. Modify your `voter_df` DataFrame to add a random number to any voting member that is defined as a `Councilmember`. Use 2 for the `Mayor` and 0 for anything other position.

* Add a column to voter_df named `random_val` with the results of the `F.rand()` method for any voter with the title Councilmember. Set `random_val` to 2 for the Mayor. Set any other title to the value 0.
* Show some of the Data Frame rows, noting whether the clauses worked.
* Use the `.filter` clause to find 0 in `random_val`.

In [105]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter('random_val == 0').show()

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.2800703955427465|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|  0.6567583884005201|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|0.056485536081120946|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas| 0.11924405661951543|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|  0.3647192865244855|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|  0.9766897561647625|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough| 0.48555629626510455|
|02/08/2017|Councilme

## User defined functions


1) Define a Python method

In [106]:
def reverseString(mystr):
    return mystr[::-1]

In [111]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

2) Wrap the function and store as a variable

In [112]:
udfReverseString = udf(reverseString, StringType())

3) Use with Spark

In [None]:
user_df = user_df.withColumn('ReverseName',
                             udfReverseString(user_df.Name))

In [None]:
defsortingCap():
    return random.choice(['G', 'H', 'R', 'S'])

udfSortingCap = udf(sortingCap, StringType())
user_df = user_df.withColumn('Class', udfSortingCap())

## Using user defined functions in Spark
You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

For this exercise, we'll use our `voter_df` DataFrame, but you're going to replace the `first_name` column with the first and middle names.

In [114]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn("splits", F.split(voter_df.VOTER_NAME, '\s+'))

In [116]:
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))
voter_df = voter_df.drop('splits')

# Show the DataFrame
voter_df.show()

+----------+-------------+-------------------+----------+---------+--------------------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|first_and_middle_name|
+----------+-------------+-------------------+----------+---------+--------------------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.2800703955427465|          Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|  0.6567583884005201|            Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|           Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|0.056485536081120946|                 Adam|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas| 0.11924405661951543|                Casey|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|  0.3647192865244855|         Carolyn King|
|

## Partitioning and lazy processing


## Adding an ID Field
When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the unique voter names from the DataFrame and add a unique ID number. Remember that Spark IDs are assigned based on the DataFrame partition - as such the ID values may be much greater than the actual number of rows in the DataFrame.

With Spark's lazy processing, the IDs are not actually generated until an action is performed and can be somewhat random depending on the size of the dataset.

The spark session and a Spark DataFrame `df` containing the `DallasCouncilVotes.csv.gz` file are available in your workspace. The `pyspark.sql.functions` library is available under the alias `F`.

In [118]:
# Select all the unique council voters
voter_df = voter_df.select(voter_df["VOTER_NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)


There are 27 rows in the voter_df DataFrame.

+-------------------+------------+
|         VOTER_NAME|      ROW_ID|
+-------------------+------------+
|       Lee Kleinman|249108103168|
|Rickey D.  Callahan|240518168576|
|  Jennifer S. Gates|223338299392|
|Philip T.  Kingston|214748364800|
| Jennifer S.  Gates|197568495616|
|      Sandy Greyson|188978561024|
|     Tennell Atkins|180388626432|
|        Erik Wilson|171798691840|
|    Lee M. Kleinman|163208757248|
| Rickey D. Callahan|154618822656|
+-------------------+------------+
only showing top 10 rows



## More ID tricks
Once you define a Spark process, you'll likely want to use it many times. Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

# Improving Performance

## Caching
Caching in Spark refers to storing the results of a DataFrame in memory or on disk of the processing nodes in a cluster. Caching improves the speed for subsequent transformations or actions as the data likely no longer needs to be retrieved from the original data source. Using caching reduces the resource utilization of the cluster - there is less need to access the storage, networking, and CPU of the Spark nodes as the data is likely already present.

## Disadvantages of caching
There are a few disadvantages of caching you should be aware of. Very large data sets may not fit in the memory reserved for cached DataFrames. Depending on the later transformations requested, the cache may not do anything to help performance. If a data set does not stay cached in memory, it may be persisted to disk. Depending on the disk configuration of a Spark cluster, this may not be a large performance improvement. If you're reading from a local network resource and have slow local disk I/O, it may be better to avoid caching the objects. Finally, the lifetime of a cached object is not guaranteed. Spark handles regenerating DataFrames for you automatically, but this can cause delays in processing.

## Caching a DataFrame
You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

The DataFrame `departures_df` is defined, but no actions have been performed.

## Implementing caching

Call `.cache()` on the DataFrame before Action

In [None]:
voter_df = spark.read.csv('voter_data.txt.gz')
voter_df.cache().count()

In [None]:
voter_df = voter_df.withColumn('ID', monotonically_increasing_id())
voter_df = voter_df.cache()
voter_df.show()

Check `.is_cached` to determine cache status

In [None]:
print(voter_df.is_cached)

Call `.unpersist()` when finished with DataFrame

In [None]:
voter_df.unpersist()

## Caching a DataFrame
You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

In [119]:
# departures_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2017_Departures_Short.csv.gz')
departures_df = pd.read_csv("AA_DFW_2017_Departures_Short.csv")
departures_df = spark.createDataFrame(departures_df)

In [121]:
import time

In [122]:
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

Counting 139358 rows took 13.094988 seconds
Counting 139358 rows again took 1.103701 seconds


## Removing a DataFrame from cache
You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.

In [123]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


## File import performance
You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

Important parameters: 
* Number of objects (Files, Networklocations, etc) 
     * More objects better than larger ones 
     * Can import via wildcard 
     `airport_df = spark.read.csv('airports*.txt.gz')` 

* General size of objects 
    * Spark performs better if objects are of similar size


## How to split objects
* Use OS utilities / scripts (split,cut,awk)

`split -l 10000 -d largefile chunk-`

* Write out to Parquet

In [None]:
df_csv = spark.read.csv('singlelargefile.csv')
df_csv.write.parquet('data.parquet')
df = spark.read.parquet('data.parquet')

## File import performance
You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

You have two types of files available: `departures_full.txt.gz` and `departures_xxx.txt.gz` where xxx is 000 - 013. The same number of rows is split between each file.

In [None]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

## Reading Spark configurations
You've recently configured a cluster via a cloud provider. Your only access is via the command shell or your python code. You'd like to verify some Spark settings to validate the configuration of the cluster.

In [124]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: pyspark-shell
Driver TCP port: 31095
Number of partitions: 200


## Performance improvements


## Normal joins
You've been given two DataFrames to combine into a single useful DataFrame. Your first task is to combine the DataFrames normally and view the execution plan.

In [None]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()

## Using broadcasting on Spark joins
Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's broadcast operations to give each node a copy of the specified data.

A couple tips:

* Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
* On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
* If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.

In [None]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

## Comparing broadcast vs normal joins
You've created two types of joins, normal and broadcasted. Now your manager would like to know what the performance improvement is by using Spark optimizations. If the results are promising, you'll be given more opportunity to tweak the Spark setup as needed.

In [None]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

# Complex processing and data pipelines

## 2. What is a data pipeline?
Data pipelines are simply the set of steps needed to move from an input data source, or sources, and convert it to the desired output. A data pipeline can consist of any number of steps or components, and can span many systems. For our purposes, we’ll be setting up a data pipeline within Spark, but realize that a full production data pipeline will likely communicate with many systems.

## Quick pipeline
Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

The `spark` context is defined, along with the `pyspark.sql.functions` library being aliased as `F` as is customary.

In [133]:
# Import the data to a DataFrame
# departures_df = spark.read.csv('AA_DFW_2015_Departures_Short.csv', header=True)
departures_df = pd.read_csv("AA_DFW_2017_Departures_Short.csv")
departures_df = spark.createDataFrame(departures_df)

In [135]:
# Remove any duration of 0
departures_df = departures_df.filter(departures_df['Actual elapsed time (Minutes)'] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
# departures_df.write.json('output.json', mode='overwrite')

## Data handling techniques

## Removing commented lines
Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

The `spark` context, and the base CSV file (`annotations.csv.gz`) are available for you to work with. The `col` function is also available for use.

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

## Removing invalid rows
Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (`\t`) characters.

The DataFrame `annotations_df` is already available, with the commented rows removed. The `spark.sql.functions` library is available under the alias `F`. The initial number of rows available in the DataFrame is stored in the variable `initial_count`.

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], "\t")

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] > 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

## Splitting into columns
You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

In [None]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

## Further parsing
You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('colcount').drop('split_cols')

## Data validation

##  Validate rows via join
Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named `valid_folders_df`. The DataFrame `split_df` is as you last left it with a group of split columns.

In [None]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(valid_folders_df, "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

## Examining invalid rows
You've successfully filtered out the rows using a join, but sometimes you'd like to examine the data that is invalid. This data can be stored for later processing or for troubleshooting your data sources.

You want to find the difference between two DataFrames and store the invalid rows.

In [None]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(joined_df, 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

## Dog parsing
You've done a considerable amount of cleanup on the initial dataset, but now need to analyze the data a bit deeper. There are several questions that have now come up about the type of dogs seen in an image and some details regarding the images. You realize that to answer these questions, you need to process the data into a specific type. Before you can use it, you'll need to create a schema / type to represent the dog details.

In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
    StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

## Per image count
Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your `dog_list` column created earlier. You have also created the `DogType` which will allow better parsing of the data within some of the data columns.

In [None]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
    dogs = []
    for dog in doglist:
        (breed, start_x, start_y, end_x, end_y) = dog.split(',')
        dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
    return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

## Percentage dog pixels
The final task for parsing the dog annotation data is to determine the percentage of pixels in each image that represents a dog (or dogs). You'll need to use the various techniques you've learned in this course to help calculate this information and add it as columns for later analysis.

To calculate the percentage of pixels, first calculate the total number of pixels representing each dog then sum them for the image. You can calculate the bounding box with the formula:

(Xend - Xstart) * (Yend - Ystart)

NOTE: You can ignore the possibility of overlapping bounding boxes in this instance.

For the percentage, calculate the total number of "dog" pixels divided by the total size of the image, multiplied by 100.

In [None]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
    totalpixels = 0
    for dog in doglist:
        totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
    return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)