# Cleaning Data with PySpark

Working with data is tricky - working with millions or even billions of rows is worse. Did you receive some data processing code written on a laptop with fairly pristine data? Chances are you’ve probably been put in charge of moving a basic data process from prototype to production. You may have worked with real world datasets, with missing fields, bizarre formatting, and orders of magnitude more data. Even if this is all new to you, this course helps you learn what’s needed to prepare data processes using Python with Apache Spark. You’ll learn terminology, methods, and some best practices to create a performant, maintainable, and understandable data processing platform.

## Table of Contents

- [Introduction](#intro)
- [Manipulating DataFrames in the real wold](#man)
- [Improving Performance](#perf)
- [Introduction to data pipelines](#pipe)

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

path = "data/dc33/"

In [3]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
print(sc)

<SparkContext master=local appName=First App>


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('First App').getOrCreate()

---
<a id='intro'></a>

## Intro to data cleaning with Apache Spark

<img src="images/spark3_001.png" alt="" style="width: 800px;"/>

<img src="images/spark3_002.png" alt="" style="width: 800px;"/>

<img src="images/spark3_003.png" alt="" style="width: 800px;"/>

<img src="images/spark3_004.png" alt="" style="width: 800px;"/>

<img src="images/spark3_005.png" alt="" style="width: 800px;"/>

## Defining a schema

Creating a defined schema helps with data quality and import performance. As mentioned during the lesson, we'll create a simple schema to read in the following columns:

- Name
- Age
- City

The Name and City columns are `StringType()` and the Age column is an `IntegerType()`.

- Import * from the pyspark.sql.types library.
- Define a new schema using the StructType method.
- Define a StructField for name, age, and city. Each field should correspond to the correct datatype and not be nullable.

In [2]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

## Immutability and lazy processing

<img src="images/spark3_006.png" alt="" style="width: 800px;"/>

<img src="images/spark3_007.png" alt="" style="width: 800px;"/>

<img src="images/spark3_008.png" alt="" style="width: 800px;"/>

<img src="images/spark3_009.png" alt="" style="width: 800px;"/>

Spark takes advantage of data immutability to efficiently share / create new data representations throughout the cluster.

## Using lazy processing

`Lazy processing operations` will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to `Spark not performing any transformations until an action is requested`.

For this exercise, we'll be defining a Data Frame (aa_dfw_df) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

- Load the Data Frame.
- Add the transformation for F.lower() to the Destination Airport column.
- Drop the Destination Airport column from the Data Frame aa_dfw_df. Note the time for these operations to complete.
- Show the Data Frame, noting the time difference for this action to complete.

In [10]:
from pyspark.sql import functions as F

# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load(path+'AA_DFW_2017_Departures_Short.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|         0005|                          537|    hnl|
|       01/01/2017|         0007|                          498|    ogg|
|       01/01/2017|         0037|                          241|    sfo|
|       01/01/2017|         0043|                          134|    dtw|
|       01/01/2017|         0051|                           88|    stl|
|       01/01/2017|         0060|                          149|    mia|
|       01/01/2017|         0071|                          203|    lax|
|       01/01/2017|         0074|                           76|    mem|
|       01/01/2017|         0081|                          123|    den|
|       01/01/2017|         0089|                          161|    slc|
|       01/01/2017|         0096|                           84| 

You've just seen how lazy processing works in action. Remember when working with Spark that no transformations take effect until you apply an action. This can be confusing at times, but is one of the underpinnings of Spark's power.

## Understanding Parquet

<img src="images/spark3_010.png" alt="" style="width: 800px;"/>

<img src="images/spark3_011.png" alt="" style="width: 800px;"/>

<img src="images/spark3_012.png" alt="" style="width: 800px;"/>

<img src="images/spark3_013.png" alt="" style="width: 800px;"/>

<img src="images/spark3_014.png" alt="" style="width: 800px;"/>

## Saving a DataFrame in Parquet format

When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. The `Parquet format` is a columnar data store, allowing Spark to use `predicate pushdown`. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

In this exercise, we're going to practice creating a new Parquet file and then process some data from it.

The spark object and the df1 and df2 DataFrames have been setup for you.

- View the row count of df1 and df2.
- Combine df1 and df2 in a new DataFrame named df3 with the union method.
- Save df3 to a parquet file named AA_DFW_ALL.parquet.
- Read the AA_DFW_ALL.parquet file and show the count.

In [None]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

```
<script.py> output:
    df1 Count: 139359
    df2 Count: 119911
    259270
```

## SQL and Parquet

`Parquet files are perfect as a backing data store for SQL queries in Spark`. While it is possible to run the same queries directly via Spark's Python functions, sometimes it's easier to run SQL queries alongside the Python options.

For this example, we're going to read in the Parquet file we created in the last exercise and register it as a SQL table. Once registered, we'll run a quick query against the table (aka, the Parquet file).

The spark object and the AA_DFW_ALL.parquet file are available for you automatically.

- Import the AA_DFW_ALL.parquet file into flights_df.
- Use the createOrReplaceTempView method to alias the flights table.
- Run a Spark SQL query against the flights table.

In [None]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

```
<script.py> output:
    The average flight time is: 151
```
You've just run a SQL query against a Parquet data source. When building production Spark code, you'll often port SQL operations directly.

---
<a id='man'></a>

## Manipulating DataFrames in the real wold

## DataFrame column operations

<img src="images/spark3_015.png" alt="" style="width: 800px;"/>

<img src="images/spark3_016.png" alt="" style="width: 800px;"/>

<img src="images/spark3_017.png" alt="" style="width: 800px;"/>

<img src="images/spark3_018.png" alt="" style="width: 800px;"/>

<img src="images/spark3_019.png" alt="" style="width: 800px;"/>

## Filtering column content with Python

You've looked at using various operations on DataFrame columns - now you can modify a real dataset. The DataFrame `voter_df` contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter. Your manager has asked you to clean this data so it can later be integrated into some desired reports. The primary task is to remove any null entries or odd characters and return a specific set of voters where you can validate their information.

This is often one of the first steps in data cleaning - removing anything that is obviously outside the format. For this dataset, make sure to look at the original data and see what looks out of place for the VOTER_NAME column.

The `pyspark.sql.functions` library is already imported under the alias `F`.

- Show the distinct VOTER_NAME entries.
- Filter voter_df where the VOTER_NAME is 1-20 characters in length.
- Filter out voter_df where the VOTER_NAME contains an _.
- Show the distinct VOTER_NAME entries again.

In [None]:
# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

## Modifying DataFrame columns

Previously, you filtered out any rows that didn't conform to something generally resembling a name. Now based on your earlier work, your manager has asked you to create two new columns - first_name and last_name. She asks you to split the VOTER_NAME column into words on any space character. You'll treat the last word as the last_name, and all other words as the first_name. You'll be using some new functions in this exercise including `.split()`, `.size()`, and `.getItem()`.

Please note that these operations are always somewhat specific to the use case. Having your data conform to a format often matters more than the specific details of the format. Rarely is a data cleaning task meant just for one person - matching a defined format allows for easier sharing of the data later (ie, Paul doesn't need to worry about names - Mary already cleaned the dataset).

The filtered voter DataFrame from your previous exercise is available as voter_df. The pyspark.sql.functions library is available under the alias F.

- Add a new column called splits holding the list of possible names.
- Use the getItem() method and create a new column called first_name.
- Get the last entry of the splits list and create a column called last_name.
- Drop the splits column and show the new voter_df.

In [None]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

This exercise requires some creative thought to determine how best to handle the VOTER_NAME column to get the desired result. You may be wondering about the middle initial. We've left it out of this exercise for clarity, but consider the various ways you could add it to a given column. The string processing functions in Spark allow a wide range of operations to suit most requirements. Always refer to the Spark documentation when you need to modify a string column: you may be surprised at the available options!

## Conditional DataFrame column operations

<img src="images/spark3_020.png" alt="" style="width: 800px;"/>

<img src="images/spark3_021.png" alt="" style="width: 800px;"/>

<img src="images/spark3_022.png" alt="" style="width: 800px;"/>

<img src="images/spark3_023.png" alt="" style="width: 800px;"/>

## when() example

The `when()` clause lets you conditionally modify a Data Frame based on its content. You'll want to modify our voter_df DataFrame to add a random number to any voting member that is defined as a "Councilmember".

The voter_df DataFrame is defined and available to you. The pyspark.sql.functions library is available as F. You can use F.rand() to generate the random value.

- Add a column to voter_df named random_val with the results of the `F.rand()` method for any voter with the title Councilmember.
- Show some of the DataFrame rows, noting whether the `.when()` clause worked.

In [None]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

## When / Otherwise

This requirement is similar to the last, but now you want to add multiple values based on the voter's position. Modify your voter_df DataFrame to add a random number to any voting member that is defined as a Councilmember. Use 2 for the Mayor and 0 for anything other position.

The voter_df Data Frame is defined and available to you. The pyspark.sql.functions library is available as F. You can use F.rand() to generate the random value.

- Add a column to voter_df named random_val with the results of the F.rand() method for any voter with the title Councilmember. Set random_val to 2 for the Mayor. Set any other title to the value 0.
- Show some of the Data Frame rows, noting whether the clauses worked.
- Use the .filter clause to find 0 in random_val.

In [None]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

You've successfully used multiple when clauses and the otherwise clause to modify a Data Frame. When clauses can be useful for changing errant data in your Data Frames without extensive work. Make sure to consider using when / otherwise if you ever need to perform conditional steps on your data cleaning processes.

## User defined functions

<img src="images/spark3_024.png" alt="" style="width: 800px;"/>

<img src="images/spark3_025.png" alt="" style="width: 800px;"/>

<img src="images/spark3_026.png" alt="" style="width: 800px;"/>

## Using user defined functions in Spark

You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

For this exercise, we'll use our voter_df DataFrame, but you're going to replace the first_name column with the first and middle names.

The pyspark.sql.functions library is available under the alias F. The classes from pyspark.sql.types are already imported.

- Edit the getFirstAndMiddle() function to return a space separated string of names, except the last entry in the names list.
- Define the function as a user-defined function. It should return a string type.
- Create a new column on voter_df called first_and_middle_name using your UDF.
- Drop the "first_name" and "splits" columns (on separate lines), then show the Data Frame.

In [None]:
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Drop the unecessary columns then show the DataFrame
voter_df = voter_df.drop('first_name')
voter_df = voter_df.drop('splits')
voter_df.show()

## Partitioning and lazy processing

<img src="images/spark3_027.png" alt="" style="width: 800px;"/>

<img src="images/spark3_028.png" alt="" style="width: 800px;"/>

<img src="images/spark3_029.png" alt="" style="width: 800px;"/>

<img src="images/spark3_030.png" alt="" style="width: 800px;"/>

<img src="images/spark3_031.png" alt="" style="width: 800px;"/>

## Adding an ID Field

When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the unique voter names from the DataFrame and add a unique ID number. Remember that `Spark IDs are assigned based on the DataFrame partition - as such the ID values may be much greater than the actual number of rows in the DataFrame`.

With Spark's lazy processing, `the IDs are not actually generated until an action is performed and can be somewhat random depending on the size of the dataset`.

The spark session and a Spark DataFrame df containing the DallasCouncilVotes.csv.gz file are available in your workspace. The pyspark.sql.functions library is available under the alias F.

- Select the unique entries from the column VOTER NAME and create a new DataFrame called voter_df.
- Count the rows in the voter_df DataFrame.
- Add a ROW_ID column using Spark's id function.
- Show the rows with the 10 highest ROW_IDs.

In [None]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

## IDs with different partitions

You've just completed adding an ID field to a DataFrame. Now, take a look at what happens when you do the same thing on DataFrames containing a different number of partitions.

To check the number of partitions, use the method `.rdd.getNumPartitions()` on a DataFrame.

The spark session and two DataFrames, voter_df and voter_df_single, are available in your workspace. The instructions will help you discover the difference between the DataFrames. The pyspark.sql.functions library is available under the alias F.

- Print the number of partitions on each DataFrame.
- Add a ROW_ID field to each DataFrame.
- Show the top 10 IDs in each DataFrame.

In [None]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)

```
<script.py> output:
    
    There are 200 partitions in the voter_df DataFrame.
    
    
    There are 1 partitions in the voter_df_single DataFrame.
    
    +--------------------+-------------+
    |          VOTER NAME|       ROW_ID|
    +--------------------+-------------+
    |        Lee Kleinman|1709396983808|
    |  the  final  201...|1700807049217|
    |         Erik Wilson|1700807049216|
    |  the  final   20...|1683627180032|
    | Carolyn King Arnold|1632087572480|
    | Rickey D.  Callahan|1597727834112|
    |   the   final  2...|1443109011456|
    |    Monica R. Alonzo|1382979469312|
    |     Lee M. Kleinman|1228360646656|
    |   Jennifer S. Gates|1194000908288|
    +--------------------+-------------+
    only showing top 10 rows
    
    +--------------------+------+
    |          VOTER NAME|ROW_ID|
    +--------------------+------+
    |        Lee Kleinman|    35|
    |  the  final  201...|    34|
    |         Erik Wilson|    33|
    |  the  final   20...|    32|
    | Carolyn King Arnold|    31|
    | Rickey D.  Callahan|    30|
    |   the   final  2...|    29|
    |    Monica R. Alonzo|    28|
    |     Lee M. Kleinman|    27|
    |   Jennifer S. Gates|    26|
    +--------------------+------+
    only showing top 10 rows
```
Notice the drastic difference in the 'ROW_ID' values between the two Data Frames. Understanding how lazy processing and partitioning behave are integral to mastering Spark. Make sure to always test your assumptions when creating a Spark workflow to avoid nasty suprises in production.

## More ID tricks

Once you define a Spark process, you'll likely want to use it many times. Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

The spark session and two DataFrames, voter_df_march and voter_df_april, are available in your workspace. The pyspark.sql.functions library is available under the alias F.

- Determine the highest ROW_ID in voter_df_march and save it in the variable previous_max_ID. The statement .rdd.max()[0] will get the maximum ID.
- Add a ROW_ID column to voter_df_april starting at the value of previous_max_ID.
- Show the ROW_ID's from both Data Frames and compare.

In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

```
<script.py> output:
    +-------------+
    |       ROW_ID|
    +-------------+
    |   8589934592|
    |  25769803776|
    |  34359738368|
    |  42949672960|
    |  51539607552|
    | 103079215104|
    | 111669149696|
    | 231928233984|
    | 240518168576|
    | 360777252864|
    | 395136991232|
    | 601295421440|
    | 635655159808|
    | 670014898176|
    | 807453851648|
    | 850403524608|
    | 944892805120|
    | 962072674304|
    |1005022347264|
    |1047972020224|
    +-------------+
    only showing top 20 rows
    
    +-------------+
    |       ROW_ID|
    +-------------+
    |1717986918400|
    |1735166787584|
    |1743756722176|
    |1752346656768|
    |1760936591360|
    |1812476198912|
    |1821066133504|
    |1941325217792|
    |1949915152384|
    |2070174236672|
    |2104533975040|
    |2310692405248|
    |2345052143616|
    |2379411881984|
    |2516850835456|
    |2559800508416|
    |2654289788928|
    |2671469658112|
    |2714419331072|
    |2757369004032|
    +-------------+
    only showing top 20 rows
```
It's easy to forget that the output of a Spark method can often be modified before being assigned. This provides a lot of power and flexibility, especially when trying to migrate tasks from various technologies. Consider how you could use everything we've learned in this chapter to create a combination ID containing a name, a new ID, and perhaps a conditional value. When you are able to view your tasks as compositions of available functions, you can clean and modify your data in any way you see fit.

---
<a id='perf'></a>

## Improving Performance

<img src="images/spark3_032.png" alt="" style="width: 800px;"/>

<img src="images/spark3_033.png" alt="" style="width: 800px;"/>

<img src="images/spark3_034.png" alt="" style="width: 800px;"/>

<img src="images/spark3_035.png" alt="" style="width: 800px;"/>

<img src="images/spark3_036.png" alt="" style="width: 800px;"/>

## Caching a DataFrame

You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

The DataFrame departures_df is defined, but no actions have been performed.

- Cache the unique rows in the departures_df DataFrame.
- Perform a count query on departures_df, noting how long the operation takes.
- Count the rows again, noting the variance in time of a cached DataFrame.

In [None]:
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

```
<script.py> output:
    Counting 139358 rows took 1.848280 seconds
    Counting 139358 rows again took 0.987845 seconds
```
You've successfully implemented caching on a DataFrame. Consider why the first run takes longer even though you've told it to `cache()` the DataFrame. Remember that even though you've applied the caching transformation, it doesn't take effect until an action is run. The action instantiates the caching after the `distinct()` function completes. The second time, there is no need to recalculate anything so it returns almost immediately.

## Removing a DataFrame from cache

You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.

The DataFrame departures_df is defined and has already been cached for you.

- Check the caching status on the departures_df DataFrame.
- Remove the departures_df DataFrame from the cache.
- Validate the caching status again.

In [None]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

```
<script.py> output:
    Is departures_df cached?: True
    Removing departures_df from cache
    Is departures_df cached?: False
```
You've removed the DataFrame from cache using `.unpersist()`. This is a fairly simple operation but can be very useful on long running or complex Spark jobs.

## Improve import performance

<img src="images/spark3_037.png" alt="" style="width: 800px;"/>

<img src="images/spark3_038.png" alt="" style="width: 800px;"/>

<img src="images/spark3_039.png" alt="" style="width: 800px;"/>

<img src="images/spark3_040.png" alt="" style="width: 800px;"/>

## File import performance

You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

You have two types of files available: departures_full.txt.gz and departures_xxx.txt.gz where xxx is 000 - 013. The same number of rows is split between each file.

- Import the departures_full.txt.gz file and the departures_xxx.txt.gz files into separate DataFrames.
- Run a count on each DataFrame and compare the run times.

In [None]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_0*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

```
<script.py> output:
    Total rows in full DataFrame:	139359
    Time to run: 0.755548
    Total rows in split DataFrame:	139359
    Time to run: 0.269601
```
The results should illustrate that using split files runs more quickly than using one large file for import. Note that in certain circumstances the results may be reversed. This is a side effect of running as a single node cluster. Depending on the tasks required and resources available, it may occasionally take longer than expected. If you perform multiple runs of the tasks, you should see the full file import as generally slower than the split file import.

## Cluster configurations

<img src="images/spark3_041.png" alt="" style="width: 800px;"/>

<img src="images/spark3_042.png" alt="" style="width: 800px;"/>

<img src="images/spark3_043.png" alt="" style="width: 800px;"/>

<img src="images/spark3_044.png" alt="" style="width: 800px;"/>

## Reading Spark configurations

You've recently configured a cluster via a cloud provider. Your only access is via the command shell or your python code. You'd like to verify some Spark settings to validate the configuration of the cluster.

The spark object is available for use.

- Check the name of the Spark application instance ('spark.app.name').
- Determine the TCP port the driver runs on ('spark.driver.port').
- Determine how many partitions are configured for joins.
- Show the results.

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

```
<script.py> output:
    Name: pyspark-shell
    Driver TCP port: 44415
    Number of partitions: 200
```
Using the `spark.conf` object allows you to validate the settings of a cluster without having configured it initially. This can help you know what changes should be optimized for your needs.

## Writing Spark configurations

Now that you've reviewed some of the Spark configurations on your cluster, you want to modify some of the settings to tune Spark to your needs. You'll import some data to review that your changes have affected the cluster.

The spark configuration is initially set to the default value of 200 partitions.

The spark object is available for use. A file named departures.txt.gz is available for import. An initial DataFrame containing the distinct rows from departures.txt.gz is available as departures_df.

- Store the number of partitions in departures_df in the variable before.
- Change the spark.sql.shuffle.partitions configuration to 500 partitions.
- Recreate the departures_df DataFrame reading the distinct rows from the departures file.
- Print the number of partitions from before and after the configuration change.

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

```
<script.py> output:
    Partition count before change: 200
    Partition count after change: 500
```
It's important to remember that modifying the settings in Spark may change objects that already exist. Sometimes the changes only take effect after configuring a new DataFrame. Remember to test changes you make to Spark configurations to verify it does exactly what you think.

## Performance improvements

<img src="images/spark3_045.png" alt="" style="width: 800px;"/>

<img src="images/spark3_046.png" alt="" style="width: 800px;"/>

<img src="images/spark3_047.png" alt="" style="width: 800px;"/>

<img src="images/spark3_048.png" alt="" style="width: 800px;"/>

## Normal joins

You've been given two DataFrames to combine into a single useful DataFrame. Your first task is to combine the DataFrames normally and view the execution plan.

The DataFrames flights_df and airports_df are available to you.

- Create a new DataFrame normal_df by joining flights_df with airports_df.
- Determine which type of join is used in the query plan.

In [None]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()

```
<script.py> output:
    == Physical Plan ==
    *(5) SortMergeJoin [Destination Airport#192], [IATA#209], Inner
    :- *(2) Sort [Destination Airport#192 ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(Destination Airport#192, 500)
    :     +- *(1) Project [Date (MM/DD/YYYY)#190, Flight Number#191, Destination Airport#192, Actual elapsed time (Minutes)#193]
    :        +- *(1) Filter isnotnull(Destination Airport#192)
    :           +- *(1) FileScan csv [Date (MM/DD/YYYY)#190,Flight Number#191,Destination Airport#192,Actual elapsed time (Minutes)#193] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/tmprhigmoha/AA_DFW_2018_Departures_Short.csv.gz], PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
    +- *(4) Sort [IATA#209 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(IATA#209, 500)
          +- *(3) Project [AIRPORTNAME#208, IATA#209]
             +- *(3) Filter isnotnull(IATA#209)
                +- *(3) FileScan csv [AIRPORTNAME#208,IATA#209] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/tmprhigmoha/airportnames.txt.gz], PartitionFilters: [], PushedFilters: [IsNotNull(IATA)], ReadSchema: struct<AIRPORTNAME:string,IATA:string>
```
You've implemented a basic join and examined the query plan. Learning to parse a query plan will help you understand what Spark is doing and when.

## Using broadcasting on Spark joins

Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's broadcast operations to give each node a copy of the specified data.

A couple tips:

- Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
- On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
- If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.

The DataFrames flights_df and airports_df are available to you.

- Import the broadcast() method from pyspark.sql.functions.
- Create a new DataFrame broadcast_df by joining flights_df with airports_df, using the broadcasting.
- Show the query plan and consider differences from the original.

In [None]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

```
<script.py> output:
    == Physical Plan ==
    *(2) BroadcastHashJoin [Destination Airport#242], [IATA#259], Inner, BuildRight
    :- *(2) Project [Date (MM/DD/YYYY)#240, Flight Number#241, Destination Airport#242, Actual elapsed time (Minutes)#243]
    :  +- *(2) Filter isnotnull(Destination Airport#242)
    :     +- *(2) FileScan csv [Date (MM/DD/YYYY)#240,Flight Number#241,Destination Airport#242,Actual elapsed time (Minutes)#243] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/tmprhigmoha/AA_DFW_2018_Departures_Short.csv.gz], PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
    +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
       +- *(1) Project [AIRPORTNAME#258, IATA#259]
          +- *(1) Filter isnotnull(IATA#259)
             +- *(1) FileScan csv [AIRPORTNAME#258,IATA#259] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/tmprhigmoha/airportnames.txt.gz], PartitionFilters: [], PushedFilters: [IsNotNull(IATA)], ReadSchema: struct<AIRPORTNAME:string,IATA:string>
```
You've used Spark broadcasting to improve the performance of your data operations. You should see that the query plan uses the Broadcast operations instead of the default Spark versions. You'll likely use broadcasting often with production datasets - checking the query plan will help validate your configuration without actually running the tasks.

## Comparing broadcast vs normal joins

You've created two types of joins, normal and broadcasted. Now your manager would like to know what the performance improvement is by using Spark optimizations. If the results are promising, you'll be given more opportunity to tweak the Spark setup as needed.

Your DataFrames normal_df and broadcast_df are available for your use.

- Execute .count() on the normal DataFrame.
- Execute .count() on the broadcasted DataFrame.
- Print the count and duration of the DataFrames noting and differences.

In [None]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

```
<script.py> output:
    Normal count:		119910	duration: 2.495135
    Broadcast count:	119910	duration: 0.381412
```
While the difference in time is miniscule for our example, the ratio between the durations is significant. Depending on the makeup of the data being joined, you can notably cut the run time for Spark operations.

---
<a id='pipe'></a>

## Introduction to data pipelines

<img src="images/spark3_049.png" alt="" style="width: 800px;"/>

<img src="images/spark3_050.png" alt="" style="width: 800px;"/>

<img src="images/spark3_051.png" alt="" style="width: 800px;"/>

## Quick pipeline

Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

The spark context is defined, along with the pyspark.sql.functions library being aliased as F as is customary.

- Import the file 2015-departures.csv.gz to a DataFrame. Note the header is already defined.
- Filter the DataFrame to contain only flights with a duration over 0 minutes. Use the index of the column, not the column name (remember to use .printSchema() to see the column names / order).
- Add an ID column.
- Write the file out as a JSON document named output.json.

In [None]:
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json')

This is a very simple example, but this does represent a multi-step data pipeline in Spark. The same procedures are used even when the content requires much more processing to parse.

## Data handling techniques

<img src="images/spark3_052.png" alt="" style="width: 800px;"/>

<img src="images/spark3_053.png" alt="" style="width: 800px;"/>

<img src="images/spark3_054.png" alt="" style="width: 800px;"/>

<img src="images/spark3_055.png" alt="" style="width: 800px;"/>

## Removing commented lines

Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

The spark context, and the base CSV file (annotations.csv.gz) are available for you to work with. The col function is also available for use.

- Import the annotations.csv.gz file to a DataFrame and perform a row count. Specify a separator character of |.
- Query the data for the number of rows beginning with #.
- Import the file again to a new DataFrame, but specify the comment character in the options to remove any commented rows.
- Count the new DataFrame and verify the difference is as expected.

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

Handling commented rows is easy in Spark and allows you to quickly remove any row beginning with a defined character. Consider what would happen if you had multiple comments to filter out and how you might accomplish this.

## Removing invalid rows

Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (\t) characters.

The DataFrame annotations_df is already available, with the commented rows removed. The spark.sql.functions library is available under the alias F. The initial number of rows available in the DataFrame is stored in the variable initial_count.

- Create a new variable tmp_fields using the annotations_df DataFrame column '_c0' splitting it on the tab character.
- Create a new column in annotations_df named 'colcount' representing the number of fields defined in the previous step.
- Filter out any rows from annotations_df containing fewer than 5 fields.
- Count the number of rows in the DataFrame and compare to the initial_count.

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

## Splitting into columns

You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

You have the spark context and the latest version of the annotations_df DataFrame. pyspark.sql.functions is available under the alias F.

- Split the content of the '_c0' column on the tab character and store in a variable called split_cols.
- Add the following columns based on the first four entries in the variable above: folder, filename, width, height on a DataFrame named split_df.
- Add the split_cols variable as a column.

In [None]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df["_c0"], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

We're getting close to the end of the course and things are getting more complex. You may be wondering why we're not using a schema instead to define the content layout. Spark's CSV parser can't handle advanced types (Arrays or Maps) so it wouldn't process correctly. In our example, we bypass using the types.

## Further parsing

You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

The spark context is available and pyspark.sql.functions is aliased as F. The types from pyspark.sql.types are already imported. The split_df DataFrame is as you last left it. Remember, you can use .printSchema() on a DataFrame in the console area to view the column names and types.

- Create a new function called retriever that takes two arguments, the split columns (cols) and the total number of columns (colcount). This function should return a list of the entries that have not been defined as columns yet (i.e., everything after item 4 in the list).
- Define the function as a Spark UDF, returning an Array of strings.
- Create the new column dog_list using the UDF and the available columns in the DataFrame.
- Remove the columns _c0, colcount, and split_cols.

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

## Data validation

<img src="images/spark3_056.png" alt="" style="width: 800px;"/>

<img src="images/spark3_057.png" alt="" style="width: 800px;"/>

<img src="images/spark3_058.png" alt="" style="width: 800px;"/>

## Validate rows via join

Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named valid_folders_df. The DataFrame split_df is as you last left it with a group of split columns.

The spark object is available, and pyspark.sql.functions is imported as F.

- Rename the _c0 column to folder on the valid_folders_df DataFrame.
- Count the number of rows in split_df.
- Join the two DataFrames on the folder name, and call the resulting DataFrame joined_df. Make sure to broadcast the smaller DataFrame.
- Check the number of rows remaining in the DataFrame and compare.

In [None]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

Nicely done - using joins in this fashion drastically simplifies a validation task if your data permits it. The validation data doesn't necessarily need to be loaded from a file - it could be calculated on the fly, or based on a previous dataset. Optimizing these tasks will improve your overall data cleaning process.

## Examining invalid rows

You've successfully filtered out the rows using a join, but sometimes you'd like to examine the data that is invalid. This data can be stored for later processing or for troubleshooting your data sources.

You want to find the difference between two DataFrames and store the invalid rows.

The spark object is defined and pyspark.sql.functions are imported as F. The original DataFrame split_df and the joined DataFrame joined_df are available as they were in their previous states.

- Determine the row counts for each DataFrame.
- Create a DataFrame containing only the invalid rows.
- Validate the count of the new DataFrame is as expected.
- Determine the number of distinct folder columns removed.

In [None]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder columns removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

```
<script.py> output:
     split_df:	20580
     joined_df:	19956
     invalid_df: 	624
    1 distinct invalid folders found
```
Using different types of joins can produce useful results for the various stages of data cleaning. While there are often multiple ways to accomplish a task, using the various join methods will often finish more quickly and cleanly.

## Final analysis and delivery

<img src="images/spark3_059.png" alt="" style="width: 800px;"/>

<img src="images/spark3_060.png" alt="" style="width: 800px;"/>

## Dog parsing

You've done a considerable amount of cleanup on the initial dataset, but now need to analyze the data a bit deeper. There are several questions that have now come up about the type of dogs seen in an image and some details regarding the images. You realize that to answer these questions, you need to process the data into a specific type. Before you can use it, you'll need to create a schema / type to represent the dog details.

The joined_df DataFrame is as you last defined it, and the pyspark.sql.types have all been imported.

- Select the column representing the dog details from the DataFrame and show the first 10 un-truncated rows.
- Create a new schema as you've done before, using breed, start_x, start_y, end_x, and end_y as the names. Make sure to specify the proper data types for each field in the schema (any number value is an integer).

In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
    StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

```
<script.py> output:
    +--------------------------------------------------------+
    |dog_list                                                |
    +--------------------------------------------------------+
    |[affenpinscher,0,9,173,298]                             |
    |[Border_terrier,73,127,341,335]                         |
    |[kuvasz,0,0,499,327]                                    |
    |[Great_Pyrenees,124,225,403,374]                        |
    |[schipperke,146,29,416,309]                             |
    |[groenendael,168,0,469,374]                             |
    |[Bedlington_terrier,10,12,462,332]                      |
    |[Lhasa,39,1,499,373]                                    |
    |[Kerry_blue_terrier,17,16,300,482]                      |
    |[vizsla,112,93,276,236]                                 |
    |[Eskimo_dog,43,20,472,461]                              |
    |[cairn,71,2,319,302]                                    |
    |[EntleBucher,307,94,515,448, EntleBucher,101,33,330,448]|
    |[Japanese_spaniel,23,0,598,435]                         |
    |[Great_Dane,51,36,355,332]                              |
    |[Siberian_husky,7,2,235,498]                            |
    |[Blenheim_spaniel,25,66,401,387]                        |
    |[cairn,82,2,472,369]                                    |
    |[Lhasa,141,40,423,185]                                  |
    |[giant_schnauzer,227,130,339,367]                       |
    +--------------------------------------------------------+
    only showing top 20 rows
    
    None
```
Nicely done - you'll use this schema soon to determine some details about the dogs in the data. As you've just seen, schemas can be used for importing data, but they can also be used to simplify accessing information within pre-parsed data. If you're wondering why we didn't just define a full schema for the import, the Spark CSV parser is not capable of using complex schema types using lists.

## Per image count

Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your dog_list column created earlier. You have also created the DogType which will allow better parsing of the data within some of the data columns.

The joined_df is available as you last defined it, and the DogType structtype is defined. pyspark.sql.functions is available under the F alias.

- Create a Python function to split each entry in dog_list to its appropriate parts. Make sure to convert any strings into the appropriate types or the DogType will not parse correctly.
- Create a UDF using the above function.
- Use the UDF to create a new column called dogs. Drop the previous column in the same command.
- Show the number of dogs in the new column for the first 10 rows.

In [None]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

```
<script.py> output:
    +----------+
    |size(dogs)|
    +----------+
    |         1|
    |         1|
    |         1|
    |         1|
    |         1|
    |         1|
    |         1|
    |         1|
    |         1|
    |         1|
    +----------+
    only showing top 10 rows
```
It can be tricky handling nested data, but using UDF and normal Python functions will often make better sense of the data.

## Percentage dog pixels

The final task for parsing the dog annotation data is to determine the percentage of pixels in each image that represents a dog (or dogs). You'll need to use the various techniques you've learned in this course to help calculate this information and add it as columns for later analysis.

To calculate the percentage of pixels, first calculate the total number of pixels representing each dog then sum them for the image. You can calculate the bounding box with the formula:

(Xend - Xstart) * (Yend - Ystart)

NOTE: You can ignore the possibility of overlapping bounding boxes in this instance.

For the percentage, calculate the total number of "dog" pixels divided by the total size of the image, multiplied by 100.
The joined_df DataFrame is as you last used it. pyspark.sql.functions is aliased to F.

- Define a Python function to take a list of tuples (the dog objects) and calculate the total number of "dog" pixels per image.
- Create a UDF of the function and use it to create a new column called 'dog_pixels' on the DataFrame.
- Create another column, 'dog_percent', representing the percentage of 'dog_pixels' in the image. Make sure this is between 0-100%.
- Show the first 10 rows with more than 60% 'dog_pixels' in the image.

In [None]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)