Practice for Data Camp Course [Cleaning Data with Apache Spark in Python](https://www.datacamp.com/courses/cleaning-data-with-apache-spark-in-python)

### References

#### Cheatsheets

* [PySpark/SQL Cheatsheet by KDnuggets](https://www.kdnuggets.com/2017/11/pyspark-sql-cheat-sheet-big-data-python.html)
* [PySpark/RDD Cheatsheet by Datacamp](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf)
* [PySpark Cheatsheet by Gong](https://github.com/wgong/pyspark-cheatsheet)
* [PySpark Cheatsheet by Feng](http://web.utk.edu/~wfeng1/doc/cheatSheet_pyspark.pdf)
* [PySpark Cheatsheet by Agarwal](https://www.linkedin.com/pulse/pyspark-cheat-sheet-netik-agarwal/)


#### APIs

* [DataFrame/Dataset methods](http://bit.ly/2rKkALY)
* [DataFrame functions](http://bit.ly/2DPAycx)
* [DataFrameStatFunctions](http://bit.ly/2DPYhJC)
* [DataFrameNaFunctions](http://bit.ly/2DPAqd3)
* [Column methods](http://bit.ly/2FloFbr)
* [SparkSQL builtin functions](https://spark.apache.org/docs/latest/api/sql/index.html#count)

#### Blogs

* [Spark Knowledge Base at Databrick](https://kb.databricks.com/index.html)

#### Common DataFrame transformations

* Select

```
voter_df.select(voter_df.name)

df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult"))

df.select(df.Name, df.Age,
    .when(df.Age >= 18, "Adult")
    .when(df.Age < 18, "Minor"))
    
df.select(df.Name, df.Age,
    .when(df.Age >= 18, "Adult")
    .when((df.Age >= 13) & (df.Age < 18), "Teenager")
    .otherwise("Kids"))    

```

* Filter/where

```
voter_df.filter(voter_df.date > '1/1/2019') # or voter_df.where(...)
voter_df.filter(voter_df.date.year > 1800)
voter_df.filter(voter_df['name'].isNotNull())
voter_df.where(voter_df['_c0'].contains('VOTE'))
voter_df.where(~ voter_df._c1.isNull())

users_df.filter('ID > 3000').select("Name", "State")
```


* withColumn

```
voter_df.withColumn('year', voter_df.date.year)
voter_df.withColumn('upper', F.upper('name'))
voter_df.withColumn('splits', F.split('name', ' '))
voter df.withColumn('year', voter_df[' c4'].cast(IntegerType()))

departures_df = departures_df.withColumn('Duration', departures_df['Duration'].cast(IntegerType()))
```

* drop

`voter_df.drop('unused_column')`

* join

```
parsed_df = spark.read.parquet('parsed_data.parquet')
company_df = spark.read.parquet('companies.parquet')
verified_df = parsed_df.join(company_df, parsed_df.company == company_df.company)
```

* groupBy
```
flights.filter("origin = 'PDX").groupBy("distince").min().show()
```


In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
spark = SparkSession\
    .builder\
    .appName("clean-data")\
    .getOrCreate()

In [3]:
spark

In [4]:
sc = spark.sparkContext

In [5]:
sc

### Defining a schema

In [7]:
# Import the pyspark.sql.types library
# import pyspark.sql.types


# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

### Using lazy processing

Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to Spark not performing any transformations until an action is requested.

For this exercise, we'll be defining a Data Frame (aa_dfw_df) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

In [8]:
!pwd

/home/gong/projects/py4kids/lesson-17-pyspark/datacamp/02_data-cleaning


In [9]:
!ls data

AA_DFW_2014_Departures_Short.csv.gz  airports.csv.gz
AA_DFW_2015_Departures_Short.csv.gz  DallasCouncilVoters.csv.gz
AA_DFW_2016_Departures_Short.csv.gz  DallasCouncilVotes.csv.gz
AA_DFW_2017_Departures_Short.csv.gz  flights_small.csv.gz
AA_DFW_sample1.parquet


#### Read CSV file

In [10]:
file_path = "./data/AA_DFW_2017_Departures_Short.csv.gz"
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load(file_path)

# or simply
# aa_dfw_df = spark.read.csv(file_path,header=True)

In [11]:
aa_dfw_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|         0005|                HNL|                          537|
|       01/01/2017|         0007|                OGG|                          498|
|       01/01/2017|         0037|                SFO|                          241|
|       01/01/2017|         0043|                DTW|                          134|
|       01/01/2017|         0051|                STL|                           88|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



#### Schema and Describe

In [12]:
aa_dfw_df.printSchema()

root
 |-- Date (MM/DD/YYYY): string (nullable = true)
 |-- Flight Number: string (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): string (nullable = true)



In [13]:
aa_dfw_df.columns

['Date (MM/DD/YYYY)',
 'Flight Number',
 'Destination Airport',
 'Actual elapsed time (Minutes)']

In [14]:
aa_dfw_df.schema.names

['Date (MM/DD/YYYY)',
 'Flight Number',
 'Destination Airport',
 'Actual elapsed time (Minutes)']

Unlike Pandas, pySpark Dataframe has no attribute .shape, but easy to create a function

In [15]:
def shape(df):
    return df.count(), len(df.columns), df.columns

In [16]:
shape(aa_dfw_df)

(139358,
 4,
 ['Date (MM/DD/YYYY)',
  'Flight Number',
  'Destination Airport',
  'Actual elapsed time (Minutes)'])

In [17]:
# describe one column
aa_dfw_df.describe('Destination Airport').show()

+-------+-------------------+
|summary|Destination Airport|
+-------+-------------------+
|  count|             139358|
|   mean|               null|
| stddev|               null|
|    min|                ABQ|
|    max|                XNA|
+-------+-------------------+



In [18]:
# describe multi-columns
aa_dfw_df.describe(['Date (MM/DD/YYYY)','Flight Number']).show()

+-------+-----------------+------------------+
|summary|Date (MM/DD/YYYY)|     Flight Number|
+-------+-----------------+------------------+
|  count|           139358|            139358|
|   mean|             null|1580.4799867965958|
| stddev|             null|  817.419749529198|
|    min|       01/01/2017|              0005|
|    max|       12/31/2017|              2779|
+-------+-----------------+------------------+



In [19]:
# describe all columns
aa_dfw_df.describe().show()

+-------+-----------------+------------------+-------------------+-----------------------------+
|summary|Date (MM/DD/YYYY)|     Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-------+-----------------+------------------+-------------------+-----------------------------+
|  count|           139358|            139358|             139358|                       139358|
|   mean|             null|1580.4799867965958|               null|           151.99931112673832|
| stddev|             null|  817.419749529198|               null|            64.25904268384629|
|    min|       01/01/2017|              0005|                ABQ|                            0|
|    max|       12/31/2017|              2779|                XNA|                           99|
+-------+-----------------+------------------+-------------------+-----------------------------+



#### renaming columns

https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark

In [20]:
# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

In [21]:
# Drop the Destination Airport column
## aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])
aa_dfw_df = aa_dfw_df.drop(F.col('Destination Airport'))

In [22]:
# Show the DataFrame
aa_dfw_df.show(5)

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|         0005|                          537|    hnl|
|       01/01/2017|         0007|                          498|    ogg|
|       01/01/2017|         0037|                          241|    sfo|
|       01/01/2017|         0043|                          134|    dtw|
|       01/01/2017|         0051|                           88|    stl|
+-----------------+-------------+-----------------------------+-------+
only showing top 5 rows



In [23]:
aa_dfw_df = aa_dfw_df\
            .withColumnRenamed("Date (MM/DD/YYYY)", "Date")\
            .withColumnRenamed("Actual elapsed time (Minutes)", "FlightTime")

aa_dfw_df.show(5)

+----------+-------------+----------+-------+
|      Date|Flight Number|FlightTime|airport|
+----------+-------------+----------+-------+
|01/01/2017|         0005|       537|    hnl|
|01/01/2017|         0007|       498|    ogg|
|01/01/2017|         0037|       241|    sfo|
|01/01/2017|         0043|       134|    dtw|
|01/01/2017|         0051|        88|    stl|
+----------+-------------+----------+-------+
only showing top 5 rows



In [24]:
# alias column
aa_dfw_df = aa_dfw_df.select(F.col("Date").alias("date"), 
                 F.col("Flight Number").alias("flight_no"),
                 F.col("FlightTime").alias("flight_time"),
                 F.col("airport")
                )

aa_dfw_df.show(5)

+----------+---------+-----------+-------+
|      date|flight_no|flight_time|airport|
+----------+---------+-----------+-------+
|01/01/2017|     0005|        537|    hnl|
|01/01/2017|     0007|        498|    ogg|
|01/01/2017|     0037|        241|    sfo|
|01/01/2017|     0043|        134|    dtw|
|01/01/2017|     0051|         88|    stl|
+----------+---------+-----------+-------+
only showing top 5 rows



#### selectExpr is flexible & powerful 

In [25]:
aa_dfw_df1 = aa_dfw_df.selectExpr("date as Date", "flight_no as FlightNum", "flight_time as FlightTime", "airport as Airport")

aa_dfw_df1.show(5)

+----------+---------+----------+-------+
|      Date|FlightNum|FlightTime|Airport|
+----------+---------+----------+-------+
|01/01/2017|     0005|       537|    hnl|
|01/01/2017|     0007|       498|    ogg|
|01/01/2017|     0037|       241|    sfo|
|01/01/2017|     0043|       134|    dtw|
|01/01/2017|     0051|        88|    stl|
+----------+---------+----------+-------+
only showing top 5 rows



In [26]:
aa_dfw_df1.selectExpr("round(avg(FlightTime),3) as avg_flight_time", "count(distinct(Airport)) as airports").show()

+---------------+--------+
|avg_flight_time|airports|
+---------------+--------+
|        151.999|      90|
+---------------+--------+



#### Sample Rows

In [27]:
aa_dfw_df.sample?

In [28]:
df1 = aa_dfw_df.sample(False, 0.01, 10)

In [29]:
df1.show(5)

+----------+---------+-----------+-------+
|      date|flight_no|flight_time|airport|
+----------+---------+-----------+-------+
|01/01/2017|     0475|        185|    san|
|01/01/2017|     0618|        176|    las|
|01/01/2017|     1292|         55|    tul|
|01/01/2017|     2179|        139|    guc|
|01/01/2017|     2386|         51|    okc|
+----------+---------+-----------+-------+
only showing top 5 rows



In [30]:
shape(df1)

(1492, 4, ['date', 'flight_no', 'flight_time', 'airport'])

In [31]:
df2 = aa_dfw_df.sample(False, 0.01, 20)

In [32]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

df1 Count: 1492
df2 Count: 1423


In [33]:
# Combine 2 DataFrames into one
df3 = df1.union(df2)

In [34]:
shape(df3)

(2915, 4, ['date', 'flight_no', 'flight_time', 'airport'])

#### Saving a DataFrame in Parquet format

When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. The Parquet format is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

__Parquet is the preferred format__

In this exercise, we're going to practice creating a new Parquet file and then process some data from it.

In [35]:
fileout_path = './data/AA_DFW_sample1.parquet'
# Save the df3 DataFrame in Parquet format
df3.write.parquet(fileout_path, mode='overwrite')

In [36]:
df3_1 = spark.read.parquet(fileout_path)
# Read the Parquet file into a new DataFrame and run a count
print(df3_1.count())

2915


In [37]:
df3_1.show(5)

+----------+---------+-----------+-------+
|      date|flight_no|flight_time|airport|
+----------+---------+-----------+-------+
|01/01/2017|     0475|        185|    san|
|01/01/2017|     0618|        176|    las|
|01/01/2017|     1292|         55|    tul|
|01/01/2017|     2179|        139|    guc|
|01/01/2017|     2386|         51|    okc|
+----------+---------+-----------+-------+
only showing top 5 rows



#### Spark-SQL 

Parquet files are perfect as a backing data store for SQL queries in Spark. While it is possible to run the same queries directly via Spark's Python functions, sometimes it's easier to run SQL queries alongside the Python options.

For this example, we're going to read in the Parquet file we created in the last exercise and register it as a SQL table. Once registered, we'll run a quick query against the table (aka, the Parquet file).

In [38]:
# create table from dataframe
df3_1.createOrReplaceTempView("flights")

In [39]:
# use SQL to rename columns
flights = spark.sql("""SELECT Date, 
                        flight_no AS FlightNo, 
                        flight_time AS FlightTime, 
                        airport AS Airport 
                        from flights
                        """)

In [40]:
flights.show(5)

+----------+--------+----------+-------+
|      Date|FlightNo|FlightTime|Airport|
+----------+--------+----------+-------+
|01/01/2017|    0475|       185|    san|
|01/01/2017|    0618|       176|    las|
|01/01/2017|    1292|        55|    tul|
|01/01/2017|    2179|       139|    guc|
|01/01/2017|    2386|        51|    okc|
+----------+--------+----------+-------+
only showing top 5 rows



In [41]:
# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_time) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

The average flight time is: 153


In [42]:
flights_2 = spark.sql("select * from flights")

In [43]:
flights_2.show(5)

+----------+---------+-----------+-------+
|      date|flight_no|flight_time|airport|
+----------+---------+-----------+-------+
|01/01/2017|     0475|        185|    san|
|01/01/2017|     0618|        176|    las|
|01/01/2017|     1292|         55|    tul|
|01/01/2017|     2179|        139|    guc|
|01/01/2017|     2386|         51|    okc|
+----------+---------+-----------+-------+
only showing top 5 rows



In [50]:
spark.sql("""CREATE TABLE if not exists flights_t USING parquet 
    as SELECT * from flights
""")

DataFrame[]

In [51]:
t2 = spark.sql("""SELECT * from flights_t limit 100""")

In [52]:
t2.show(5)

+----------+--------+----------+-------+
|      Date|FlightNo|FlightTime|Airport|
+----------+--------+----------+-------+
|01/01/2017|    0475|       185|    san|
|01/01/2017|    0618|       176|    las|
|01/01/2017|    1292|        55|    tul|
|01/01/2017|    2179|       139|    guc|
|01/01/2017|    2386|        51|    okc|
+----------+--------+----------+-------+
only showing top 5 rows



### Filtering column content with Python

You've looked at using various operations on DataFrame columns - now you can modify a real dataset. The DataFrame voter_df contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter. Your manager has asked you to clean this data so it can later be integrated into some desired reports. The primary task is to remove any null entries or odd characters and return a specific set of voters where you can validate their information.

This is often one of the first steps in data cleaning - removing anything that is obviously outside the format. For this dataset, make sure to look at the original data and see what looks out of place for the VOTER_NAME column.

In [53]:
file_path = "./data/DallasCouncilVoters.csv.gz"
voter_df = spark.read.format('csv').options(Header=True).load(file_path)

In [54]:
voter_df.show(5)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
+----------+-------------+-------------------+
only showing top 5 rows



In [55]:
# Show the distinct VOTER_NAME entries
# voter_df.select(voter_df['VOTER_NAME']).distinct().show(5, truncate=False)
voter_df.select('VOTER_NAME').distinct().show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [56]:
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
# voter_df = voter_df.where('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

In [57]:
voter_df.show(5)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
+----------+-------------+-------------------+
only showing top 5 rows



In [58]:
# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains("_"))

In [59]:
voter_df.show(5)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
+----------+-------------+-------------------+
only showing top 5 rows



### Modifying DataFrame columns

Previously, you filtered out any rows that didn't conform to something generally resembling a name. Now based on your earlier work, your manager has asked you to create two new columns - first_name and last_name. She asks you to split the VOTER_NAME column into words on any space character. You'll treat the last word as the last_name, and all other words as the first_name. You'll be using some new functions in this exercise including .split(), .size(), and .getItem().

Please note that these operations are always somewhat specific to the use case. Having your data conform to a format often matters more than the specific details of the format. Rarely is a data cleaning task meant just for one person - matching a defined format allows for easier sharing of the data later (ie, Paul doesn't need to worry about names - Mary already cleaned the dataset).

In [60]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn("splits", F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn("first_name", voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn("last_name", voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show(5)

+----------+-------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|
+----------+-------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|
+----------+-------------+-------------------+----------+---------+
only showing top 5 rows



### when() example

The when() clause lets you conditionally modify a Data Frame based on its content. You'll want to modify our voter_df DataFrame to add a random number to any voting member that is defined as a "Councilmember".

The voter_df DataFrame is defined and available to you. The pyspark.sql.functions library is available as F. You can use F.rand() to generate the random value.

In [61]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df['TITLE'].contains("Councilmember"), F.round(F.rand(),3)))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show(5)

+----------+-------------+-------------------+----------+---------+----------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|random_val|
+----------+-------------+-------------------+----------+---------+----------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|     0.848|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|     0.853|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|      null|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|     0.718|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|     0.594|
+----------+-------------+-------------------+----------+---------+----------+
only showing top 5 rows



### When / Otherwise

This requirement is similar to the last, but now you want to add multiple values based on the voter's position. Modify your voter_df DataFrame to add a random number to any voting member that is defined as a Councilmember. Use 2 for the Mayor and 0 for anything other position.

The voter_df Data Frame is defined and available to you. The pyspark.sql.functions library is available as F. You can use F.rand() to generate the random value.

In [62]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
            F.when(voter_df.TITLE == 'Councilmember', F.round(F.rand(),3) )
            .when(voter_df.TITLE == 'Mayor', 2)
            .otherwise(0))

# Show some of the DataFrame rows
voter_df.show(5)

+----------+-------------+-------------------+----------+---------+----------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|random_val|
+----------+-------------+-------------------+----------+---------+----------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|     0.235|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|     0.429|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|       2.0|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|     0.694|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|     0.996|
+----------+-------------+-------------------+----------+---------+----------+
only showing top 5 rows



In [63]:
# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show(5)

+----------+--------------------+-----------------+----------+---------+----------+
|      DATE|               TITLE|       VOTER_NAME|first_name|last_name|random_val|
+----------+--------------------+-----------------+----------+---------+----------+
|04/25/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|04/25/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
+----------+--------------------+-----------------+----------+---------+----------+
only showing top 5 rows



#### working with Date

In [64]:
dateDF = spark.range(3)\
    .withColumn("today", F.current_date())\
    .withColumn("now", F.current_timestamp())

In [65]:
dateDF.show()

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2019-09-15|2019-09-15 10:53:...|
|  1|2019-09-15|2019-09-15 10:53:...|
|  2|2019-09-15|2019-09-15 10:53:...|
+---+----------+--------------------+



In [66]:
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [67]:
dateDF.select("today", F.date_add(F.col("today"),5).alias("5days_later"), F.date_sub(F.col("today"),5).alias("5days_ago")).show(1)

+----------+-----------+----------+
|     today|5days_later| 5days_ago|
+----------+-----------+----------+
|2019-09-15| 2019-09-20|2019-09-10|
+----------+-----------+----------+
only showing top 1 row



In [68]:
dateDF = dateDF.withColumn("week_ago", F.date_sub(F.col("today"),7))

dateDF.select(F.datediff(F.col("week_ago"), F.col("today"))).show(2)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
|                       -7|
+-------------------------+
only showing top 2 rows



In [69]:
dateDF.show(2)

+---+----------+--------------------+----------+
| id|     today|                 now|  week_ago|
+---+----------+--------------------+----------+
|  0|2019-09-15|2019-09-15 10:53:...|2019-09-08|
|  1|2019-09-15|2019-09-15 10:53:...|2019-09-08|
+---+----------+--------------------+----------+
only showing top 2 rows



#### to_date(), to_timestamp()

In [70]:
dateDF.select(
    F.to_date(F.lit("2016-01-01")).alias("start"),
    F.to_date(F.lit("2017-01-01")).alias("end") )\
    .select(F.months_between(F.col("end"), F.col("start")).alias("month_diff")).show(1)

+----------+
|month_diff|
+----------+
|      12.0|
+----------+
only showing top 1 row



In [71]:
dateFormat = "yyyy-MM-dd"
cleanDateDF = spark.range(2).select(
    F.to_date(F.lit("2019-01-02"), dateFormat).alias("dt1"),
    F.to_date(F.lit("2019-02-13"), dateFormat).alias("dt2"),
    F.to_date(F.lit("2019-13-02"), dateFormat).alias("dt3")
    )
cleanDateDF.show()

+----------+----------+----+
|       dt1|       dt2| dt3|
+----------+----------+----+
|2019-01-02|2019-02-13|null|
|2019-01-02|2019-02-13|null|
+----------+----------+----+



In [72]:
cleanDateDF.select(F.to_timestamp(F.col("dt1"), dateFormat)).show()

+---------------------------------+
|to_timestamp(`dt1`, 'yyyy-MM-dd')|
+---------------------------------+
|              2019-01-02 00:00:00|
|              2019-01-02 00:00:00|
+---------------------------------+



In [73]:
cleanDateDF.filter(F.col("dt1") > F.lit("2018-12-30")).show()

+----------+----------+----+
|       dt1|       dt2| dt3|
+----------+----------+----+
|2019-01-02|2019-02-13|null|
|2019-01-02|2019-02-13|null|
+----------+----------+----+



In [74]:
cleanDateDF2 = spark.range(2).select(
    F.to_date(F.lit("2018-01-02"), dateFormat).alias("dt1"),
    F.to_date(F.lit("2019-12-13"), dateFormat).alias("dt2"),
    F.to_date(F.lit("2008-03-03"), dateFormat).alias("dt3")
    )
cleanDateDF2.show()

+----------+----------+----------+
|       dt1|       dt2|       dt3|
+----------+----------+----------+
|2018-01-02|2019-12-13|2008-03-03|
|2018-01-02|2019-12-13|2008-03-03|
+----------+----------+----------+



In [75]:
cleanDateDF = cleanDateDF.union(cleanDateDF2)
cleanDateDF.show()

+----------+----------+----------+
|       dt1|       dt2|       dt3|
+----------+----------+----------+
|2019-01-02|2019-02-13|      null|
|2019-01-02|2019-02-13|      null|
|2018-01-02|2019-12-13|2008-03-03|
|2018-01-02|2019-12-13|2008-03-03|
+----------+----------+----------+



In [76]:
cleanDateDF.na.drop("any").show()

+----------+----------+----------+
|       dt1|       dt2|       dt3|
+----------+----------+----------+
|2018-01-02|2019-12-13|2008-03-03|
|2018-01-02|2019-12-13|2008-03-03|
+----------+----------+----------+



In [77]:
cleanDateDF.na.fill('2019-09-14', subset=["dt3"]).show()

+----------+----------+----------+
|       dt1|       dt2|       dt3|
+----------+----------+----------+
|2019-01-02|2019-02-13|      null|
|2019-01-02|2019-02-13|      null|
|2018-01-02|2019-12-13|2008-03-03|
|2018-01-02|2019-12-13|2008-03-03|
+----------+----------+----------+



### User defined functions or UDFs
* Python method
* Wrapped via the pyspark.sql.functions.udf method
* Stored as a variable
* Called like a normal Spark function

argument-less udf
```
def sortingCap():
    return random.choice(['G', 'H', 'R', 'S'])
udfSortingCap = udf(sortingCap, StringType())
user_df = user_df.withColumn('Class', udfSortingCap())
```

In [78]:
def reverseString(mystr):
    return mystr[::-1]

In [79]:
reverseString("annabella")

'allebanna'

In [80]:
def getFirstAndMiddle(names):
    # Return a space separated string of names
    return ' '.join(names[:-1])

In [81]:
getFirstAndMiddle(['First', "Mid", "Last"])

'First Mid'

In [82]:
file_path = "./data/DallasCouncilVoters.csv.gz"
voter_df = spark.read.format('csv').options(Header=True).load(file_path)

In [83]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn("splits", F.split(voter_df.VOTER_NAME, '\s+'))

In [84]:
voter_df = voter_df.withColumn("first_name", voter_df.splits.getItem(0))
# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn("last_name", voter_df.splits.getItem(F.size('splits') - 1))

In [85]:
voter_df.show(5)

+----------+-------------+-------------------+--------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|
+----------+-------------+-------------------+--------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|
+----------+-------------+-------------------+--------------------+----------+---------+
only showing top 5 rows



In [86]:
def getFirstAndMiddle(names):
    ret_val = ''
    if names:
        # Return a space separated string of names
        ret_val = ' '.join(names[:-1])
    return ret_val

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, F.StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(F.col('splits')))

# Drop the unecessary columns then show the DataFrame
voter_df = voter_df.drop('first_name')
voter_df = voter_df.drop('splits')

voter_df.show(5)

+----------+-------------+-------------------+---------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|last_name|first_and_middle_name|
+----------+-------------+-------------------+---------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|    Gates|          Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston| Kingston|            Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings| Rawlings|           Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|  Medrano|                 Adam|
|02/08/2017|Councilmember|       Casey Thomas|   Thomas|                Casey|
+----------+-------------+-------------------+---------+---------------------+
only showing top 5 rows



### Adding an ID Field

When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the unique voter names from the DataFrame and add a unique ID number. Remember that Spark IDs are assigned based on the DataFrame partition - as such the ID values may be much greater than the actual number of rows in the DataFrame.

With Spark's lazy processing, the IDs are not actually generated until an action is performed and can be somewhat random depending on the size of the dataset.

The spark session and a Spark DataFrame df containing the DallasCouncilVotes.csv.gz file are available in your workspace. The pyspark.sql.functions library is available under the alias F.

In [87]:
file_path = "./data/DallasCouncilVotes.csv.gz"
df = spark.read.format('csv').options(Header=True).load(file_path)

In [88]:
df.show(5)

+----------+------------------+---------+--------+-------------+-------------------+---------+------------------+-----------------------+------------------+--------------------+
|      DATE|AGENDA_ITEM_NUMBER|ITEM_TYPE|DISTRICT|        TITLE|         VOTER NAME|VOTE CAST|FINAL ACTION TAKEN|AGENDA ITEM DESCRIPTION|         AGENDA_ID|             VOTE_ID|
+----------+------------------+---------+--------+-------------+-------------------+---------+------------------+-----------------------+------------------+--------------------+
|02/08/2017|                 1|   AGENDA|      13|Councilmember|  Jennifer S. Gates|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|   AGENDA|      14|Councilmember| Philip T. Kingston|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|   AGENDA|      15|        Mayor|Michael S. Rawlings|      N/A|  NO ACTION NEED

In [89]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()

In [90]:
shape(voter_df)

(36, 1, ['VOTER NAME'])

In [91]:
# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())


There are 36 rows in the voter_df DataFrame.



#### Add a ROW_ID

* F.monotonically_increasing_id()

In [92]:

voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

In [93]:
voter_df.show(5)

+--------------------+-----------+
|          VOTER NAME|     ROW_ID|
+--------------------+-----------+
|      Tennell Atkins| 8589934592|
|  the  final   20...|25769803776|
|        Scott Griggs|34359738368|
|       Scott  Griggs|42949672960|
|       Sandy Greyson|51539607552|
+--------------------+-----------+
only showing top 5 rows



In [94]:
# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

+--------------------+-------------+
|          VOTER NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the   final  2...|1443109011456|
|    Monica R. Alonzo|1382979469312|
|     Lee M. Kleinman|1228360646656|
|   Jennifer S. Gates|1194000908288|
+--------------------+-------------+
only showing top 10 rows



#### Partition

In [98]:
voter_df.rdd.getNumPartitions()

200

In [99]:
voter_df_single = voter_df.repartition(1)

In [100]:
voter_df_single.rdd.getNumPartitions()

1

In [101]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)


There are 200 partitions in the voter_df DataFrame.


There are 1 partitions in the voter_df_single DataFrame.

+--------------------+-------------+
|          VOTER NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the   final  2...|1443109011456|
|    Monica R. Alonzo|1382979469312|
|     Lee M. Kleinman|1228360646656|
|   Jennifer S. Gates|1194000908288|
+--------------------+-------------+
only showing top 10 rows

+--------------------+------+
|          VOTER NAME|ROW_ID|
+--------------------+------+
|        Lee Kleinman|    35|
|  the  final  201...|    34|
|         Erik Wilson|    33|
|  the  final   20...|    32|
| Carolyn King Arnold|    31|
| Rickey D.  Callahan|    30|
|   the   final  2...|    29|
|    Monica R. Alonzo|    28|
|     Lee M. 

### More ID tricks

Once you define a Spark process, you'll likely want to use it many times. Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

The spark session and two DataFrames, voter_df_march and voter_df_april, are available in your workspace. The pyspark.sql.functions library is available under the alias F.

In [114]:
file_path = "./data/DallasCouncilVotes.csv.gz"
df = spark.read.format('csv').options(Header=True).load(file_path)

In [115]:
df.show(2)

+----------+------------------+---------+--------+-------------+------------------+---------+------------------+-----------------------+------------------+--------------------+
|      DATE|AGENDA_ITEM_NUMBER|ITEM_TYPE|DISTRICT|        TITLE|        VOTER NAME|VOTE CAST|FINAL ACTION TAKEN|AGENDA ITEM DESCRIPTION|         AGENDA_ID|             VOTE_ID|
+----------+------------------+---------+--------+-------------+------------------+---------+------------------+-----------------------+------------------+--------------------+
|02/08/2017|                 1|   AGENDA|      13|Councilmember| Jennifer S. Gates|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|   AGENDA|      14|Councilmember|Philip T. Kingston|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
+----------+------------------+---------+--------+-------------+------------------+---------+------------------+---

In [116]:
df = df.withColumn('ROW_ID', F.monotonically_increasing_id())

In [117]:
voter_df_march = df.where(F.col("DATE").like("03/%"))

voter_df_april = df.where(F.col("DATE").like("04/%"))

In [119]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', previous_max_ID + F.monotonically_increasing_id())

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show(5)
voter_df_april.select('ROW_ID').show(5)

+------+
|ROW_ID|
+------+
|  4658|
|  4663|
|  4664|
|  4665|
|  4670|
+------+
only showing top 5 rows

+------+
|ROW_ID|
+------+
| 22124|
| 22125|
| 22126|
| 22127|
| 22128|
+------+
only showing top 5 rows



### Caching a DataFrame

You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

The DataFrame departures_df is defined, but no actions have been performed.

In [123]:
df.is_cached

False

In [124]:
%%time
df.count()

CPU times: user 2.08 ms, sys: 0 ns, total: 2.08 ms
Wall time: 216 ms


44625

In [125]:
df = df.cache()
df.is_cached

True

In [126]:
%%time
df.count()

CPU times: user 1.64 ms, sys: 363 µs, total: 2 ms
Wall time: 1.55 s


44625

In [127]:
%%time
df.count()

CPU times: user 1.03 ms, sys: 0 ns, total: 1.03 ms
Wall time: 62.9 ms


44625

In [128]:
df = voter_df.unpersist()
df.is_cached

False

In [130]:
%%time
df.count()

CPU times: user 1.52 ms, sys: 340 µs, total: 1.86 ms
Wall time: 885 ms


36

In [153]:
file_path = "./data/AA_DFW_2014_Departures_Short.csv.gz"
departures_df = spark.read.format('csv').options(Header=True).load(file_path)

In [155]:
import time
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

Counting 157198 rows took 2.452549 seconds
Counting 157198 rows again took 0.439769 seconds


```
Counting 157198 rows took 2.452549 seconds
Counting 157198 rows again took 0.439769 seconds
```


### Removing a DataFrame from cache

You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.


In [156]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


### File import performance

You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

You have two types of files available: departures_full.txt.gz and departures_xxx.txt.gz where xxx is 000 - 013. The same number of rows is split between each file.

```
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))


Total rows in full DataFrame:	139359
Time to run: 0.212767
Total rows in split DataFrame:	278718
Time to run: 0.349123
```

### Reading Spark configurations

You've recently configured a cluster via a cloud provider. Your only access is via the command shell or your python code. You'd like to verify some Spark settings to validate the configuration of the cluster.

The spark object is available for use.

In [157]:
# Name of the Spark application instance
app_name = spark.conf.get("spark.app.name")

# Driver TCP port
driver_tcp_port = spark.conf.get("spark.driver.port")

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: clean-data
Driver TCP port: 33563
Number of partitions: 200


### Writing Spark configurations

Now that you've reviewed some of the Spark configurations on your cluster, you want to modify some of the settings to tune Spark to your needs. You'll import some data to review that your changes have affected the cluster.

The spark configuration is initially set to the default value of 200 partitions.

The spark object is available for use. A file named departures.txt.gz is available for import. An initial DataFrame containing the distinct rows from departures.txt.gz is available as departures_df.

In [158]:
file_path = "./data/AA_DFW_2014_Departures_Short.csv.gz"
departures_df = spark.read.format('csv').options(Header=True).load(file_path)

In [167]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv(file_path).distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

Partition count before change: 1
Partition count after change: 500


In [168]:
departures_df.show(5)

+----------+----+---+---+
|       _c0| _c1|_c2|_c3|
+----------+----+---+---+
|01/02/2014|1049|RDU|138|
|01/02/2014|1685|STL|113|
|01/02/2014|2321|ORD|191|
|01/03/2014|1129|IND|114|
|01/03/2014|1407|STL| 98|
+----------+----+---+---+
only showing top 5 rows



### Explaining the Spark execution plan

In [170]:
departures_df = departures_df.select(departures_df['_c2']).distinct()
departures_df.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[_c2#2112], functions=[])
+- Exchange hashpartitioning(_c2#2112, 500)
   +- *(2) HashAggregate(keys=[_c2#2112], functions=[])
      +- *(2) HashAggregate(keys=[_c0#2110, _c1#2111, _c2#2112, _c3#2113], functions=[])
         +- Exchange hashpartitioning(_c0#2110, _c1#2111, _c2#2112, _c3#2113, 500)
            +- *(1) HashAggregate(keys=[_c0#2110, _c1#2111, _c2#2112, _c3#2113], functions=[])
               +- *(1) FileScan csv [_c0#2110,_c1#2111,_c2#2112,_c3#2113] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/gong/projects/py4kids/lesson-17-pyspark/datacamp/02_data-cleaning/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string>


### Normal joins

You've been given two DataFrames to combine into a single useful DataFrame. Your first task is to combine the DataFrames normally and view the execution plan.

The DataFrames flights_df and airports_df are available to you.

In [171]:
file_path = "./data/airports.csv.gz"
airports_df = spark.read.format('csv').options(Header=True).load(file_path)

file_path = "./data/flights_small.csv.gz"
flights_df = spark.read.format('csv').options(Header=True).load(file_path)

In [175]:
airports_df.show(5)

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [176]:
airports_df.where(airports_df['name'].contains("Kennedy")).select("faa").show(5)

+---+
|faa|
+---+
|JFK|
+---+



In [173]:
flights_df.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

In [177]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["dest"] == airports_df["faa"] )

# Show the query plan
normal_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [dest#2185], [faa#2150], Inner, BuildRight
:- *(2) Project [year#2174, month#2175, day#2176, dep_time#2177, dep_delay#2178, arr_time#2179, arr_delay#2180, carrier#2181, tailnum#2182, flight#2183, origin#2184, dest#2185, air_time#2186, distance#2187, hour#2188, minute#2189]
:  +- *(2) Filter isnotnull(dest#2185)
:     +- *(2) FileScan csv [year#2174,month#2175,day#2176,dep_time#2177,dep_delay#2178,arr_time#2179,arr_delay#2180,carrier#2181,tailnum#2182,flight#2183,origin#2184,dest#2185,air_time#2186,distance#2187,hour#2188,minute#2189] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/gong/projects/py4kids/lesson-17-pyspark/datacamp/02_data-cleaning/da..., PartitionFilters: [], PushedFilters: [IsNotNull(dest)], ReadSchema: struct<year:string,month:string,day:string,dep_time:string,dep_delay:string,arr_time:string,arr_d...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(1) Project [faa#2150

### Using broadcasting on Spark joins

Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's broadcast operations to give each node a copy of the specified data.

A couple tips:

*    Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
*    On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
*    If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.


In [178]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and aiports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["dest"] == airports_df["faa"] )

# Show the query plan and compare against the original
broadcast_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [dest#2185], [faa#2150], Inner, BuildRight
:- *(2) Project [year#2174, month#2175, day#2176, dep_time#2177, dep_delay#2178, arr_time#2179, arr_delay#2180, carrier#2181, tailnum#2182, flight#2183, origin#2184, dest#2185, air_time#2186, distance#2187, hour#2188, minute#2189]
:  +- *(2) Filter isnotnull(dest#2185)
:     +- *(2) FileScan csv [year#2174,month#2175,day#2176,dep_time#2177,dep_delay#2178,arr_time#2179,arr_delay#2180,carrier#2181,tailnum#2182,flight#2183,origin#2184,dest#2185,air_time#2186,distance#2187,hour#2188,minute#2189] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/gong/projects/py4kids/lesson-17-pyspark/datacamp/02_data-cleaning/da..., PartitionFilters: [], PushedFilters: [IsNotNull(dest)], ReadSchema: struct<year:string,month:string,day:string,dep_time:string,dep_delay:string,arr_time:string,arr_d...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(1) Project [faa#2150

### Comparing broadcast vs normal joins

You've created two types of joins, normal and broadcasted. Now your manager would like to know what the performance improvement is by using Spark optimizations. If the results are promising, you'll be given more opportunity to tweak the Spark setup as needed.

Your DataFrames normal_df and broadcast_df are available for your use.

```
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))


Normal count:		119910	duration: 1.571761
Broadcast count:	119910	duration: 0.340586
```

### Quick pipeline

Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

The spark context is defined, along with the pyspark.sql.functions library being aliased as F as is customary.

In [180]:
file_path = "./data/AA_DFW_2015_Departures_Short.csv.gz"
departures_df = spark.read.format('csv').options(Header=True).load(file_path)

In [184]:
departures_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2015|         0005|                HNL|                          526|
|       01/01/2015|         0007|                OGG|                          517|
|       01/01/2015|         0023|                SFO|                          233|
|       01/01/2015|         0027|                LAS|                          165|
|       01/01/2015|         0035|                HDN|                          178|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



In [185]:
# Remove any duration of 0
departures_df = departures_df.filter(departures_df['Actual elapsed time (Minutes)'] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json("output.json")

### Removing commented lines

Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

The spark context, and the base CSV file (annotations.csv.gz) are available for you to work with. The col function is also available for use.

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = annotations_df.filter(~col('_c0').startswith('#')).count()

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, (full_count-comment_count)))

### Removing invalid rows

Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (\t) characters.

The DataFrame annotations_df is already available, with the commented rows removed. The spark.sql.functions library is available under the alias F. The initial number of rows available in the DataFrame is stored in the variable initial_count.

```
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))
```

### Splitting into columns

You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

You may be wondering why we're not using a schema instead to define the content layout. Spark's CSV parser can't handle advanced types (Arrays or Maps) so it wouldn't process correctly. In our example, we bypass using the types

```
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

split_df.head(5)

split_df.printSchema()

```

### Further parsing

You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

```
def retriever(cols, colcount):
    # Return a list of dog data
    return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(col('split_cols'), col('colcount')))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')



In [1]: split_df.show(2)
+--------+---------------+-----+------+--------------------+
|  folder|       filename|width|height|            dog_list|
+--------+---------------+-----+------+--------------------+
|02110627|n02110627_12938|  200|   300|[affenpinscher,0,...|
|02093754| n02093754_1148|  500|   378|[Border_terrier,7...|
+--------+---------------+-----+------+--------------------+
only showing top 2 rows

```

### Validate rows via join

Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named valid_folders_df. The DataFrame split_df is as you last left it with a group of split columns.

```
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed("_c0","folder")

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), split_df.folder == valid_folders_df.folder)

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))



Before: 20580
After: 19956

In [5]: joined_df.show(2)
+--------+---------------+-----+------+--------------------+--------+
|  folder|       filename|width|height|            dog_list|  folder|
+--------+---------------+-----+------+--------------------+--------+
|02110627|n02110627_12938|  200|   300|[affenpinscher,0,...|02110627|
|02093754| n02093754_1148|  500|   378|[Border_terrier,7...|02093754|
+--------+---------------+-----+------+--------------------+--------+
```

### Dog parsing

You've done a considerable amount of cleanup on the initial dataset, but now need to analyze the data a bit deeper. There are several questions that have now come up about the type of dogs seen in an image and some details regarding the images. You realize that to answer these questions, you need to process the data into a specific type. Before you can use it, you'll need to create a schema / type to represent the dog details.

```
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

```

joined_df.select('dog_list', F.size(F.col('dog_list'))).show(5)

### Per image count

Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your dog_list column created earlier. You have also created the DogType which will allow better parsing of the data within some of the data columns.

The joined_df is available as you last defined it, and the DogType structtype is defined. pyspark.sql.functions is available under the F alias.

```
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)
```