# Cleaning Data with PySpark - Part 4

## Complex processing and data pipelines
Learn how to process complex real-world data using Spark and the basics of pipelines.

In [2]:
BUCKET = 'driven-actor-210609'

In [46]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType

In [4]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/13 15:13:45 INFO SparkEnv: Registering MapOutputTracker
25/03/13 15:13:45 INFO SparkEnv: Registering BlockManagerMaster
25/03/13 15:13:46 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/03/13 15:13:46 INFO SparkEnv: Registering OutputCommitCoordinator


### Quick pipeline
Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

The `spark` context is defined, along with the `pyspark.sql.functions` library being aliased as `F` as is customary.

In [None]:
file_path = f'gs://{BUCKET}/pyspark/datasets/AA_DFW_2015_Departures.csv.gz'
# file_path = 'datasets/AA_DFW_2017_Departures.csv.gz'

In [None]:
# Import the data to a DataFrame
departures_df = spark.read.csv(file_path, header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df['Actual elapsed time (Minutes)'] == 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

### Removing commented lines
Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

The `spark` context, and the base CSV file (`annotations.csv.gz`) are available for you to work with. The `col` function is also available for use.

In [35]:
file_path = f'gs://{BUCKET}/pyspark/datasets/annotations.csv.gz'
# file_path = 'datasets/annotations.csv.gz'

# Import the data to a DataFrame
annotations_df = spark.read.csv(file_path, sep='|')

annotations_df.show(10, truncate=False)

+------------------------------------------------------------------+
|_c0                                                               |
+------------------------------------------------------------------+
|025865917\tn023521131_781                                         |
|022684404\tn029380957_9768                                        |
|021267273\tn022910760_9023                                        |
|02110627\tn02110627_12938\t200\t300\taffenpinscher,0,9,173,298    |
|02093754\tn02093754_1148\t500\t378\tBorder_terrier,73,127,341,335 |
|%s\t%s\t800\t600\tShetland_sheepdog,124,87,576,514                |
|023200662\tn023050402_6068                                        |
|028666219\tn025734155_5660                                        |
|02104029\tn02104029_63\t500\t375\tkuvasz,0,0,499,327              |
|02111500\tn02111500_5137\t500\t375\tGreat_Pyrenees,124,225,403,374|
+------------------------------------------------------------------+
only showing top 10 rows



In [36]:
# Perform a row count
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(F.col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv(file_path, sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

Full count: 32794
Comment count: 1416
Remaining count: 31378


### Removing invalid rows
Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (`\t`) characters.

The DataFrame `annotations_df` is already available, with the commented rows removed. The `spark.sql.functions` library is available under the alias `F`. The initial number of rows available in the DataFrame is stored in the variable `initial_count`.

In [37]:
annotations_df = no_comments_df
initial_count = annotations_df.count()

# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (F.col('colcount') < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

Initial count: 31378
Final count: 20580


### Splitting into columns
You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

You have the `spark` context and the latest version of the `annotations_df` DataFrame. `pyspark.sql.functions` is available under the alias `F`.

In [38]:
annotations_df = annotations_df_filtered

# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

split_df.show(10)

+--------------------+--------+--------+---------------+-----+------+--------------------+
|                 _c0|colcount|  folder|       filename|width|height|          split_cols|
+--------------------+--------+--------+---------------+-----+------+--------------------+
|02110627\tn021106...|       5|02110627|n02110627_12938|  200|   300|[02110627, n02110...|
|02093754\tn020937...|       5|02093754| n02093754_1148|  500|   378|[02093754, n02093...|
|%s\t%s\t800\t600\...|       5|      %s|             %s|  800|   600|[%s, %s, 800, 600...|
|02104029\tn021040...|       5|02104029|   n02104029_63|  500|   375|[02104029, n02104...|
|02111500\tn021115...|       5|02111500| n02111500_5137|  500|   375|[02111500, n02111...|
|02104365\tn021043...|       5|02104365| n02104365_7518|  500|   333|[02104365, n02104...|
|02105056\tn021050...|       5|02105056| n02105056_2834|  500|   375|[02105056, n02105...|
|02093647\tn020936...|       5|02093647|  n02093647_541|  500|   333|[02093647, n02093...|

### Further parsing
You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

The `spark` context is available and `pyspark.sql.functions` is aliased as `F`. The types from `pyspark.sql.types` are already imported. The `split_df` DataFrame is as you last left it. Remember, you can use `.printSchema()` on a DataFrame in the console area to view the column names and types.

⚠️ Note: If you see an `AttributeError`, refresh the exercises and click **Run Solution** without clicking **Run Code**.

In [39]:
def retriever(cols, colcount):
    # Return a list of dog data
    return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df['split_cols'], split_df['colcount']))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('colcount').drop('split_cols')

split_df.show(10, truncate=False)

+--------+---------------+-----+------+----------------------------------+
|folder  |filename       |width|height|dog_list                          |
+--------+---------------+-----+------+----------------------------------+
|02110627|n02110627_12938|200  |300   |[affenpinscher,0,9,173,298]       |
|02093754|n02093754_1148 |500  |378   |[Border_terrier,73,127,341,335]   |
|%s      |%s             |800  |600   |[Shetland_sheepdog,124,87,576,514]|
|02104029|n02104029_63   |500  |375   |[kuvasz,0,0,499,327]              |
|02111500|n02111500_5137 |500  |375   |[Great_Pyrenees,124,225,403,374]  |
|02104365|n02104365_7518 |500  |333   |[schipperke,146,29,416,309]       |
|02105056|n02105056_2834 |500  |375   |[groenendael,168,0,469,374]       |
|02093647|n02093647_541  |500  |333   |[Bedlington_terrier,10,12,462,332]|
|02098413|n02098413_1355 |500  |375   |[Lhasa,39,1,499,373]              |
|02093859|n02093859_2309 |330  |500   |[Kerry_blue_terrier,17,16,300,482]|
+--------+---------------

### Validate rows via join
Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named `valid_folders_df`. The DataFrame `split_df` is as you last left it with a group of split columns.

The `spark` object is available, and `pyspark.sql.functions` is imported as `F`.

In [40]:
file_path = f'gs://{BUCKET}/pyspark/datasets/valid_folders.txt.gz'
# file_path = 'datasets/valid_folders.txt.gz'

# Import the data to a DataFrame
valid_folders_df = spark.read.csv(file_path)

valid_folders_df.show(10)

+--------+
|     _c0|
+--------+
|02085620|
|02085782|
|02085936|
|02086079|
|02086240|
|02086646|
|02086910|
|02087046|
|02087394|
|02088094|
+--------+
only showing top 10 rows



In [41]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

Before: 20580
After: 19956


### Examining invalid rows
You've successfully filtered out the rows using a join, but sometimes you'd like to examine the data that is invalid. This data can be stored for later processing or for troubleshooting your data sources.

You want to find the difference between two DataFrames and store the invalid rows.

The `spark` object is defined and `pyspark.sql.functions` are imported as `F`. The original DataFrame `split_df` and the joined DataFrame `joined_df` are available as they were in their previous states.

In [42]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

 split_df:	20580
 joined_df:	19956
 invalid_df: 	624


[Stage 125:>                                                        (0 + 1) / 1]

1 distinct invalid folders found


                                                                                

### Dog parsing
You've done a considerable amount of cleanup on the initial dataset, but now need to analyze the data a bit deeper. There are several questions that have now come up about the type of dogs seen in an image and some details regarding the images. You realize that to answer these questions, you need to process the data into a specific type. Before you can use it, you'll need to create a schema / type to represent the dog details.

The `joined_df` DataFrame is as you last defined it, and the `pyspark.sql.types` have all been imported.

In [47]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
    StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

+----------------------------------+
|dog_list                          |
+----------------------------------+
|[affenpinscher,0,9,173,298]       |
|[Border_terrier,73,127,341,335]   |
|[kuvasz,0,0,499,327]              |
|[Great_Pyrenees,124,225,403,374]  |
|[schipperke,146,29,416,309]       |
|[groenendael,168,0,469,374]       |
|[Bedlington_terrier,10,12,462,332]|
|[Lhasa,39,1,499,373]              |
|[Kerry_blue_terrier,17,16,300,482]|
|[vizsla,112,93,276,236]           |
+----------------------------------+
only showing top 10 rows

None


### Per image count
Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your `dog_list` column created earlier. You have also created the `DogType` which will allow better parsing of the data within some of the data columns.

The `joined_df` is available as you last defined it, and the `DogType` StructType is defined. `pyspark.sql.functions` is available under the `F` alias.

In [48]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
    dogs = []
    for dog in doglist:
        (breed, start_x, start_y, end_x, end_y) = dog.split(',')
        dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
    return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list'))

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

+----------+
|size(dogs)|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 10 rows



### Percentage dog pixels
The final task for parsing the dog annotation data is to determine the percentage of pixels in each image that represents a dog (or dogs). You'll need to use the various techniques you've learned in this course to help calculate this information and add it as columns for later analysis.

To calculate the percentage of pixels, first calculate the total number of pixels representing each dog then sum them for the image. You can calculate the bounding box with the formula:

(Xend - Xstart) * (Yend - Ystart)

NOTE: You can ignore the possibility of overlapping bounding boxes in this instance.

For the percentage, calculate the total number of "dog" pixels divided by the total size of the image, multiplied by 100.

The `joined_df` DataFrame is as you last used it. `pyspark.sql.functions` is aliased to `F`.

In [49]:
# Calculate total pixels occupied by dogs in the image
def dogPixelCount(doglist):
    totalpixels = 0
    for dog in doglist:
        totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
    return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())

# Add a new column 'dog_pixels' containing the pixel count for dogs in each image
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Add a column 'dog_percent' representing the percentage of the image occupied by dogs
joined_df = joined_df.withColumn('dog_percent', (F.col('dog_pixels') / (F.col('width') * F.col('height'))) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.filter(F.col('dog_percent') > 60).show(10)

[Stage 140:>                                                        (0 + 1) / 1]

+--------+---------------+-----+------+--------------------+--------------------+----------+-----------------+
|  folder|       filename|width|height|            dog_list|                dogs|dog_pixels|      dog_percent|
+--------+---------------+-----+------+--------------------+--------------------+----------+-----------------+
|02110627|n02110627_12938|  200|   300|[affenpinscher,0,...|[{affenpinscher, ...|     49997|83.32833333333333|
|02104029|   n02104029_63|  500|   375|[kuvasz,0,0,499,327]|[{kuvasz, 0, 0, 4...|    163173|          87.0256|
|02105056| n02105056_2834|  500|   375|[groenendael,168,...|[{groenendael, 16...|    112574|60.03946666666666|
|02093647|  n02093647_541|  500|   333|[Bedlington_terri...|[{Bedlington_terr...|    144640|86.87087087087087|
|02098413| n02098413_1355|  500|   375|[Lhasa,39,1,499,373]|[{Lhasa, 39, 1, 4...|    171120|           91.264|
|02093859| n02093859_2309|  330|   500|[Kerry_blue_terri...|[{Kerry_blue_terr...|    131878|79.92606060606062|
|

                                                                                