# Complex processing and data pipelines

Learn how to process complex real-world data using Spark and the basics of pipelines.

## Preparing the environment

### Importing libraries

In [1]:
import pandas as pd
import random
import time

from typing import List

from pyspark.sql.types import (_parse_datatype_string, StructType, StructField, ArrayType,
                               DoubleType, IntegerType, StringType, FloatType)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

### Connect to Spark

In [3]:
spark = (SparkSession.builder
                     .config("spark.sql.repl.eagerEval.enabled", True)  # eval DataFrame in notebooks
                     .getOrCreate())

In [4]:
sc = spark.sparkContext

### Loading data

In [5]:
flights_2014 = spark.read.csv('data-sources/AA_DFW_2014_Departures_Short.csv.gz', header=True, inferSchema=True)

# cast to date
flights_2014 = flights_2014.withColumn("Date (MM/DD/YYYY)", 
                                       F.to_date(flights_2014["Date (MM/DD/YYYY)"], "MM/dd/yyyy"))

flights_2014.createOrReplaceTempView("flights_2014")
flights_2014.printSchema()
flights_2014.limit(2)

root
 |-- Date (MM/DD/YYYY): date (nullable = true)
 |-- Flight Number: integer (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): integer (nullable = true)



Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes)
2014-01-01,5,HNL,519
2014-01-01,7,OGG,505


In [6]:
flights_2015 = spark.read.csv('data-sources/AA_DFW_2015_Departures_Short.csv.gz', header=True, inferSchema=True)

# cast to date
flights_2015 = flights_2015.withColumn("Date (MM/DD/YYYY)", 
                                       F.to_date(flights_2015["Date (MM/DD/YYYY)"], "MM/dd/yyyy"))

flights_2015.createOrReplaceTempView("flights_2015")
flights_2015.printSchema()
flights_2015.limit(2)

root
 |-- Date (MM/DD/YYYY): date (nullable = true)
 |-- Flight Number: integer (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): integer (nullable = true)



Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes)
2015-01-01,5,HNL,526
2015-01-01,7,OGG,517


In [7]:
flights_2017 = spark.read.csv('data-sources/AA_DFW_2017_Departures_Short.csv.gz', header=True, inferSchema=True)

# cast to date
flights_2017 = flights_2017.withColumn("Date (MM/DD/YYYY)", 
                                       F.to_date(flights_2017["Date (MM/DD/YYYY)"], "MM/dd/yyyy"))
flights_2017.createOrReplaceTempView("flights_2017")
flights_2017.printSchema()
flights_2017.limit(2)

root
 |-- Date (MM/DD/YYYY): date (nullable = true)
 |-- Flight Number: integer (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): integer (nullable = true)



Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes)
2017-01-01,5,HNL,537
2017-01-01,7,OGG,498


In [8]:
flights_2018 = spark.read.csv('data-sources/AA_DFW_2018_Departures_Short.csv.gz', header=True, inferSchema=True)

# cast to date
flights_2018 = flights_2018.withColumn("Date (MM/DD/YYYY)", 
                                       F.to_date(flights_2018["Date (MM/DD/YYYY)"], "MM/dd/yyyy"))

# save the file in csv fprmat
(flights_2018.repartition(5)
             .write.format('csv')
             .save('output-files/AA_DFW_2018_Departures_Short.csv', mode='overwrite'))

# Review the data
flights_2018.createOrReplaceTempView("flights_2018")
flights_2018.printSchema()
flights_2018.limit(2)

root
 |-- Date (MM/DD/YYYY): date (nullable = true)
 |-- Flight Number: integer (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): integer (nullable = true)



Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes)
2018-01-01,5,HNL,498
2018-01-01,7,OGG,501


In [9]:
dallas_electors = spark.read.csv('data-sources/DallasCouncilVoters.csv.gz', header=True, inferSchema=True)

# cast to date
dallas_electors = dallas_electors.withColumn("DATE", F.to_date(dallas_electors["DATE"], "MM/dd/yyyy"))

dallas_electors.createOrReplaceTempView("dallas_electors")
dallas_electors.printSchema()
dallas_electors.limit(2)

root
 |-- DATE: date (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER_NAME: string (nullable = true)



DATE,TITLE,VOTER_NAME
2017-02-08,Councilmember,Jennifer S. Gates
2017-02-08,Councilmember,Philip T. Kingston


In [10]:
dallas_votes = spark.read.csv('data-sources/DallasCouncilVotes.csv.gz', header=True, inferSchema=True)

# cast to date
dallas_votes = dallas_votes.withColumn("DATE", F.to_date(dallas_votes["DATE"], "MM/dd/yyyy"))

dallas_votes.createOrReplaceTempView("dallas_votes")
dallas_votes.printSchema()
dallas_votes.limit(2)

root
 |-- DATE: date (nullable = true)
 |-- AGENDA_ITEM_NUMBER: string (nullable = true)
 |-- ITEM_TYPE: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER NAME: string (nullable = true)
 |-- VOTE CAST: string (nullable = true)
 |-- FINAL ACTION TAKEN: string (nullable = true)
 |-- AGENDA ITEM DESCRIPTION: string (nullable = true)
 |-- AGENDA_ID: string (nullable = true)
 |-- VOTE_ID: string (nullable = true)



DATE,AGENDA_ITEM_NUMBER,ITEM_TYPE,DISTRICT,TITLE,VOTER NAME,VOTE CAST,FINAL ACTION TAKEN,AGENDA ITEM DESCRIPTION,AGENDA_ID,VOTE_ID
2017-02-08,1,AGENDA,13,Councilmember,Jennifer S. Gates,,NO ACTION NEEDED,Call to Order,020817__Special__1,020817__Special__...
2017-02-08,1,AGENDA,14,Councilmember,Philip T. Kingston,,NO ACTION NEEDED,Call to Order,020817__Special__1,020817__Special__...


In [11]:
people = spark.read.csv('data-sources/people_data_sample.csv', header=True, inferSchema=True)
people.createOrReplaceTempView("people")
people.printSchema()
people.limit(2)

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



name,age,city
Amy Meyer,3,Kimberlyborough
Amy Jones,10,Davidburgh


In [12]:
flight = spark.read.parquet('data-sources/flight-time.parquet')
flight.createOrReplaceTempView("flight")
flight.printSchema()
flight.limit(2)

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)



FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,ORIGIN_CITY_NAME,DEST,DEST_CITY_NAME,CRS_DEP_TIME,DEP_TIME,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,CANCELLED,DISTANCE
2000-01-01,DL,1451,BOS,"Boston, MA",ATL,"Atlanta, GA",1115,1113,1343,5,1400,1348,0,946
2000-01-01,DL,1479,BOS,"Boston, MA",ATL,"Atlanta, GA",1315,1311,1536,7,1559,1543,0,946


### Tables catalogue

In [13]:
spark.catalog.listTables()

[Table(name='dallas_electors', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='dallas_votes', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flight', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flights_2014', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flights_2015', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flights_2017', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='flights_2018', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='people', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

## Introduction to data pipelines

### Pipeline details

- Not formally defined in Spark- 
Typically all normal Spark code required for task

In [14]:
schema = StructType([
    StructField('name', StringType(), False),
    StructField('age', StringType(), False),
    StructField('city', StringType(), False),
])
df = spark.read.format('csv').schema(schema).load('data-sources/people_data_sample.csv')
df = df.withColumn('id', F.monotonically_increasing_id()+1)
df.write.parquet('output-files/people_data_sample.parquet', mode='overwrite')
df.write.json('output-files/outdata.json')

## Ex. 1 - Quick pipeline

Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

**Instructions:**

1. Import the file `AA_DFW_2015_Departures_Short.csv.gz` to a DataFrame. Note the header is already defined.
2. Filter the DataFrame to contain only flights with a duration over 0 minutes. Use the index of the column, not the column name (remember to use `.printSchema()` to see the column names / order).
3. Add an `ID` column.
4. Write the file out as a JSON document named `AA_DFW_2015_output.json`.

### Pipeline - all together

In [15]:
 # Import the data to a DataFrame
departures_df = spark.read.csv('data-sources/AA_DFW_2015_Departures_Short.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df['Actual elapsed time (Minutes)'] != 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output-files/AA_DFW_2015_output.json', mode='overwrite')

### Explore a litte bit more the code

In [16]:
# Import the data to a DataFrame
departures_df.printSchema()
departures_df.limit(2)

root
 |-- Date (MM/DD/YYYY): string (nullable = true)
 |-- Flight Number: string (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): string (nullable = true)
 |-- id: long (nullable = false)



Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes),id
01/01/2015,5,HNL,526,0
01/01/2015,7,OGG,517,1


In [17]:
# Using column index to select the data to show
departures_df.select(departures_df.columns[3]).limit(2)

Actual elapsed time (Minutes)
526
517


In [18]:
# Filtering the data with the column index
departures_df.filter(departures_df[3] > 600).limit(2)

Date (MM/DD/YYYY),Flight Number,Destination Airport,Actual elapsed time (Minutes),id
02/27/2015,7,OGG,679,22142
04/24/2015,7,OGG,631,43929


## Ex. 2 - Removing commented lines
Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

**Instructions:**

1. Import the `annotations.csv.gz` file to a DataFrame and perform a row count. Specify a separator character of `|`.
2. Query the data for the number of rows beginning with `#`.
3. Import the file again to a new DataFrame, but specify the comment character in the options to remove any commented rows.
4. Count the new DataFrame and verify the difference is as expected.

In [19]:
# File location
path_file = 'data-sources/annotations-sample-file.csv'

In [20]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv(path_file, sep='|')
full_count = annotations_df.count()

print(f'Total rows: {full_count}')
annotations_df.limit(2)

Total rows: 32794


_c0,_c1
025865917\tn02352...,Temp
022684404\tn02938...,Temp


In [21]:
# Count the number of rows beginning with '#'
comment_count = annotations_df.where(F.col('_c0').startswith('#')).count()
comment_count

1416

In [22]:
# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv(path_file, sep='|', comment='#')
no_comments_count = no_comments_df.count()

print(f'Total rows: {no_comments_count}')
no_comments_df.limit(2)

Total rows: 31378


_c0,_c1
025865917\tn02352...,Temp
022684404\tn02938...,Temp


In [23]:
# Count the new DataFrame and verify the difference is as expected
print(f'''
Full count: {full_count}
Comment count: {comment_count}
Remaining count: {no_comments_count}
''')


Full count: 32794
Comment count: 1416
Remaining count: 31378



## Ex. 3 - Removing invalid rows

Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (`\t`) characters.

**Instructions:**

1. Create a new variable `tmp_fields` using the `annotations_df` DataFrame column `'_c0'` splitting it on the tab character.
2. Create a new column in `annotations_df` named `'colcount'` representing the number of fields defined in the previous step.
3. Filter out any rows from `annotations_df` containing fewer than 5 fields.
4. Count the number of rows in the DataFrame and compare to the `initial_count`.

In [24]:
# Getting the cleaned database (no comments)
annotations_df = no_comments_df.select('*')
initial_count = annotations_df.count()
print(f'Total rows: {initial_count}')
annotations_df.limit(2)

Total rows: 31378


_c0,_c1
025865917\tn02352...,Temp
022684404\tn02938...,Temp


In [25]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')
annotations_df.withColumn('tmp_fields', tmp_fields).limit(2)

_c0,_c1,tmp_fields
025865917\tn02352...,Temp,"[025865917, n0235..."
022684404\tn02938...,Temp,"[022684404, n0293..."


In [26]:
# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))
print(f'Total rows: {annotations_df.count()}')
annotations_df.orderBy(annotations_df.colcount).limit(2)

Total rows: 31378


_c0,_c1,colcount
025865917\tn02352...,Temp,2
022684404\tn02938...,Temp,2


In [27]:
# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df.colcount < 5))
final_count = annotations_df_filtered.count()
print(f'Total rows: {final_count}')
annotations_df_filtered.limit(2)

Total rows: 20580


_c0,_c1,colcount
02110627\tn021106...,Temp,5
02093754\tn020937...,Temp,5


In [28]:
# Show the number of rows
print(f'''
Initial count: {initial_count}
Final count:   {final_count}
''')


Initial count: 31378
Final count:   20580



## Ex. 4 - Splitting into columns

You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

**Instructions:**

1. Split the content of the `'_c0'` column on the tab character and store in a variable called `split_cols`.
2. Add the following columns based on the first four entries in the variable above: `folder`, `filename`, `width`, `height` on a DataFrame named `split_df`.
3. Add the `split_cols` variable as a column.

In [29]:
# Getting the cleaned dataframe
annotations_df = annotations_df_filtered.select('*')
print(f'Total rows: {annotations_df.count()}')
annotations_df.limit(2)

Total rows: 20580


_c0,_c1,colcount
02110627\tn021106...,Temp,5
02093754\tn020937...,Temp,5


In [30]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)
print(f'Total rows: {split_df.count()}')
split_df.limit(5)

Total rows: 20580


_c0,_c1,colcount,folder,filename,width,height,split_cols
02110627\tn021106...,Temp,5,02110627,n02110627_12938,200,300,"[02110627, n02110..."
02093754\tn020937...,Temp,5,02093754,n02093754_1148,500,378,"[02093754, n02093..."
%s\t%s\t800\t600\...,Temp,5,%s,%s,800,600,"[%s, %s, 800, 600..."
02104029\tn021040...,Temp,5,02104029,n02104029_63,500,375,"[02104029, n02104..."
02111500\tn021115...,Temp,5,02111500,n02111500_5137,500,375,"[02111500, n02111..."


## Ex. 5 - Further parsing

You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

**Instructions:**

1. Create a new function called retriever that takes two arguments, the split columns (cols) and the total number of columns (colcount). This function should return a list of the entries that have not been defined as columns yet (i.e., everything after item 4 in the list).
2. Define the function as a Spark UDF, returning an Array of strings.
3. Create the new column `dog_list` using the UDF and the available columns in the DataFrame.
4. Remove the columns `_c0`, `colcount`, and `split_cols`.

In [31]:
# Reviewing the data
print(f'Total rows: {split_df.count()}')
split_df.limit(5)

Total rows: 20580


_c0,_c1,colcount,folder,filename,width,height,split_cols
02110627\tn021106...,Temp,5,02110627,n02110627_12938,200,300,"[02110627, n02110..."
02093754\tn020937...,Temp,5,02093754,n02093754_1148,500,378,"[02093754, n02093..."
%s\t%s\t800\t600\...,Temp,5,%s,%s,800,600,"[%s, %s, 800, 600..."
02104029\tn021040...,Temp,5,02104029,n02104029_63,500,375,"[02104029, n02104..."
02111500\tn021115...,Temp,5,02111500,n02111500_5137,500,375,"[02111500, n02111..."


In [32]:
# Define a Python method
def retriever(cols, colcount):
    '''Return a list of dog data (remaining items in list after position 4)'''
    return cols[4:colcount]

# Wrap the function and store as a variable
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Use with Spark
dog_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))
print(f'Total rows: {dog_df.count()}')
dog_df.limit(2)

Total rows: 20580


_c0,_c1,colcount,folder,filename,width,height,split_cols,dog_list
02110627\tn021106...,Temp,5,2110627,n02110627_12938,200,300,"[02110627, n02110...","[affenpinscher,0,..."
02093754\tn020937...,Temp,5,2093754,n02093754_1148,500,378,"[02093754, n02093...","[Border_terrier,7..."


In [33]:
# Remove unused columns
clean_dog_df = dog_df.drop('_c0').drop('_c1').drop('split_cols').drop('colcount')
print(f'Total rows: {clean_dog_df.count()}')
clean_dog_df.show(10, truncate=False)

Total rows: 20580
+--------+---------------+-----+------+----------------------------------+
|folder  |filename       |width|height|dog_list                          |
+--------+---------------+-----+------+----------------------------------+
|02110627|n02110627_12938|200  |300   |[affenpinscher,0,9,173,298]       |
|02093754|n02093754_1148 |500  |378   |[Border_terrier,73,127,341,335]   |
|%s      |%s             |800  |600   |[Shetland_sheepdog,124,87,576,514]|
|02104029|n02104029_63   |500  |375   |[kuvasz,0,0,499,327]              |
|02111500|n02111500_5137 |500  |375   |[Great_Pyrenees,124,225,403,374]  |
|02104365|n02104365_7518 |500  |333   |[schipperke,146,29,416,309]       |
|02105056|n02105056_2834 |500  |375   |[groenendael,168,0,469,374]       |
|02093647|n02093647_541  |500  |333   |[Bedlington_terrier,10,12,462,332]|
|02098413|n02098413_1355 |500  |375   |[Lhasa,39,1,499,373]              |
|02093859|n02093859_2309 |330  |500   |[Kerry_blue_terrier,17,16,300,482]|
+------

## Ex. 6 - Validate rows via join

Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named `valid_folders_df`. The DataFrame `clean_dog_df` is as you last left it with a group of split columns.

**Instructions:**

1. Rename the `_c0` column to folder on the `valid_folders_df` DataFrame.
2. Count the number of rows in `clean_dog_df`.
3. Join the two DataFrames on the folder name, and call the resulting DataFrame `joined_df`. Make sure to broadcast the smaller DataFrame.
4. Check the number of rows remaining in the DataFrame and compare.

In [34]:
# Reviewing the data
print(f'Total rows: {clean_dog_df.count()}')
clean_dog_df.limit(2)

Total rows: 20580


folder,filename,width,height,dog_list
2110627,n02110627_12938,200,300,"[affenpinscher,0,..."
2093754,n02093754_1148,500,378,"[Border_terrier,7..."


In [35]:
# Preparing the valid_folders_df
valid_folders_df = spark.read.csv('data-sources/annotations-validation.csv')

print(f'Total rows: {valid_folders_df.count()}')
valid_folders_df.limit(2)

Total rows: 1106


_c0
2085620
2085782


In [36]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in clean_dog_df
clean_dog_count = clean_dog_df.count()

# Join the DataFrames
joined_df = clean_dog_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print(f'''
Before: {clean_dog_count}
After: {joined_count}
''')


Before: 20580
After: 19956



## Ex. 7 - Examining invalid rows

You've successfully filtered out the rows using a join, but sometimes you'd like to examine the data that is invalid. This data can be stored for later processing or for troubleshooting your data sources. You want to find the difference between two DataFrames and store the invalid rows.

**Instructions:**

1. Determine the row counts for each DataFrame.
2. Create a DataFrame containing only the invalid rows.
3. Validate the count of the new DataFrame is as expected.
4. Determine the number of distinct folder rows removed.

In [37]:
# Determine the row counts for each DataFrame
clean_dog_count = clean_dog_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = clean_dog_df.join(F.broadcast(joined_df), on='folder', how='left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()

# Print the results
print(f'''
Clean_dog_df: {clean_dog_count}
Joined_df   : {joined_count} 
Invalid_df  : {invalid_count}
--------------------------------------
Distinct invalid folders found: {invalid_folder_count}
''')


Clean_dog_df: 20580
Joined_df   : 19956 
Invalid_df  : 624
--------------------------------------
Distinct invalid folders found: 1



In [38]:
# Reviewing what is the unique folder value not found
invalid_df.limit(10)

folder,filename,width,height,dog_list
%s,%s,800,600,[Shetland_sheepdo...
%s,%s,333,500,"[French_bulldog,4..."
%s,%s,375,500,[Shetland_sheepdo...
%s,%s,500,355,"[French_bulldog,1..."
%s,%s,160,180,[Shetland_sheepdo...
%s,%s,191,284,"[French_bulldog,1..."
%s,%s,209,150,[English_foxhound...
%s,%s,210,173,[Shetland_sheepdo...
%s,%s,500,456,[English_foxhound...
%s,%s,300,271,[English_foxhound...


## Ex. 8 - Dog parsing

You've done a considerable amount of cleanup on the initial dataset, but now need to analyze the data a bit deeper. There are several questions that have now come up about the type of dogs seen in an image and some details regarding the images. You realize that to answer these questions, you need to process the data into a specific type. Before you can use it, you'll need to create a schema / type to represent the dog details.

**Instructions:**

1. Select the column representing the dog details from the DataFrame and show the first 10 un-truncated rows.
2. Create a new schema as you've done before, using `breed`, `start_x`, `start_y`, `end_x`, and `end_y` as the names. Make sure to specify the proper data types for each field in the schema (any number value is an integer).

In [39]:
# Select the dog details and show 10 untruncated rows
joined_df.select('dog_list').show(10, truncate=False)

+----------------------------------+
|dog_list                          |
+----------------------------------+
|[affenpinscher,0,9,173,298]       |
|[Border_terrier,73,127,341,335]   |
|[kuvasz,0,0,499,327]              |
|[Great_Pyrenees,124,225,403,374]  |
|[schipperke,146,29,416,309]       |
|[groenendael,168,0,469,374]       |
|[Bedlington_terrier,10,12,462,332]|
|[Lhasa,39,1,499,373]              |
|[Kerry_blue_terrier,17,16,300,482]|
|[vizsla,112,93,276,236]           |
+----------------------------------+
only showing top 10 rows



In [40]:
# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

## Ex. 9 - Per image count

Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your `dog_list` column created earlier. You have also created the `DogType` which will allow better parsing of the data within some of the data columns.

**Instructions:**

1. Create a Python function to split each entry in `dog_list` to its appropriate parts. Make sure to convert any strings into the appropriate types or the `DogType` will not parse correctly.
2. Create a UDF using the above function.
3. Use the UDF to create a new column called `dogs`. Drop the previous column in the same command.
4. Show the number of dogs in the new column for the first 10 rows.

In [41]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
    '''Return a tupla with breed, start_x, start_y, end_x, and end_y'''
    dogs = []
    for dog in doglist:
        (breed, start_x, start_y, end_x, end_y) = dog.split(',')
        dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
    return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
clean_dog_df_parsed = clean_dog_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')
clean_dog_df_parsed.show(2, truncate=False)

+--------+---------------+-----+------+-------------------------------------+
|folder  |filename       |width|height|dogs                                 |
+--------+---------------+-----+------+-------------------------------------+
|02110627|n02110627_12938|200  |300   |[{affenpinscher, 0, 9, 173, 298}]    |
|02093754|n02093754_1148 |500  |378   |[{Border_terrier, 73, 127, 341, 335}]|
+--------+---------------+-----+------+-------------------------------------+
only showing top 2 rows



In [42]:
# Show the number of dogs in the first 10 rows
clean_dog_df_parsed.select(F.size('dogs')).show(10)

+----------+
|size(dogs)|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 10 rows



## Ex. 10 - Percentage dog pixels
The final task for parsing the dog annotation data is to determine the percentage of pixels in each image that represents a dog (or dogs). You'll need to use the various techniques you've learned in this course to help calculate this information and add it as columns for later analysis.

To calculate the percentage of pixels, first calculate the total number of pixels representing each dog then sum them for the image. You can calculate the bounding box with the formula:

`(Xend - Xstart) * (Yend - Ystart)`

**NOTE**: You can ignore the possibility of overlapping bounding boxes in this instance.

For the percentage, calculate the total number of "dog" pixels divided by the total size of the image, multiplied by 100.

**Instructions:**

1. Define a Python function to take a list of tuples (the dog objects) and calculate the total number of "dog" pixels per image.
2. Create a UDF of the function and use it to create a new column called `'dog_pixels'` on the DataFrame.
3. Create another column, `'dog_percent'`, representing the percentage of `'dog_pixels'` in the image. Make sure this is between 0-100%. Use the string name of the column alone (ie, "columnname" rather than df.columnname).
4. Show the first 10 rows with more than 60% `'dog_pixels'` in the image. Use a SQL style string for this (ie, 'columnname > ____').

In [43]:
# Review the data
clean_dog_df_parsed.show(2, truncate=False)

+--------+---------------+-----+------+-------------------------------------+
|folder  |filename       |width|height|dogs                                 |
+--------+---------------+-----+------+-------------------------------------+
|02110627|n02110627_12938|200  |300   |[{affenpinscher, 0, 9, 173, 298}]    |
|02093754|n02093754_1148 |500  |378   |[{Border_terrier, 73, 127, 341, 335}]|
+--------+---------------+-----+------+-------------------------------------+
only showing top 2 rows



In [44]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
    '''Return the total pixels per dog image'''
    totalpixels = 0
    for dog in doglist:
        totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
    return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())

# Use UDF with Spark
dog_pixels_df = clean_dog_df_parsed.withColumn('dog_pixels', udfDogPixelCount('dogs'))
dog_pixels_df.show(2, truncate=False)

+--------+---------------+-----+------+-------------------------------------+----------+
|folder  |filename       |width|height|dogs                                 |dog_pixels|
+--------+---------------+-----+------+-------------------------------------+----------+
|02110627|n02110627_12938|200  |300   |[{affenpinscher, 0, 9, 173, 298}]    |49997     |
|02093754|n02093754_1148 |500  |378   |[{Border_terrier, 73, 127, 341, 335}]|55744     |
+--------+---------------+-----+------+-------------------------------------+----------+
only showing top 2 rows



In [45]:
# Create a column representing the percentage of pixels
dog_pixels_60_df = dog_pixels_df.withColumn(
    'dog_percent', 
    (dog_pixels_df['dog_pixels'] / (
        dog_pixels_df['width'].cast('Double') * dog_pixels_df['height'].cast('Double')
    )) * 100)
dog_pixels_60_df.show(2, truncate=False)

+--------+---------------+-----+------+-------------------------------------+----------+------------------+
|folder  |filename       |width|height|dogs                                 |dog_pixels|dog_percent       |
+--------+---------------+-----+------+-------------------------------------+----------+------------------+
|02110627|n02110627_12938|200  |300   |[{affenpinscher, 0, 9, 173, 298}]    |49997     |83.32833333333333 |
|02093754|n02093754_1148 |500  |378   |[{Border_terrier, 73, 127, 341, 335}]|55744     |29.494179894179894|
+--------+---------------+-----+------+-------------------------------------+----------+------------------+
only showing top 2 rows



In [46]:
# Show the first 10 annotations with more than 60% dog
dog_pixels_60_df.filter('dog_percent > 60').limit(10)

folder,filename,width,height,dogs,dog_pixels,dog_percent
2110627,n02110627_12938,200,300,"[{affenpinscher, ...",49997,83.32833333333333
2104029,n02104029_63,500,375,"[{kuvasz, 0, 0, 4...",163173,87.0256
2105056,n02105056_2834,500,375,"[{groenendael, 16...",112574,60.03946666666666
2093647,n02093647_541,500,333,[{Bedlington_terr...,144640,86.87087087087087
2098413,n02098413_1355,500,375,"[{Lhasa, 39, 1, 4...",171120,91.264
2093859,n02093859_2309,330,500,[{Kerry_blue_terr...,131878,79.92606060606062
2109961,n02109961_1017,475,500,"[{Eskimo_dog, 43,...",189189,79.65852631578947
2108000,n02108000_3491,600,450,"[{EntleBucher, 30...",168667,62.46925925925926
2085782,n02085782_1731,600,449,[{Japanese_spanie...,250125,92.84521158129176
2110185,n02110185_2736,259,500,"[{Siberian_husky,...",113088,87.32664092664093


## Close session

In [47]:
spark.stop()