Skip to content

Latest commit

 

History

History
305 lines (264 loc) · 12.4 KB

4. Complex processing and data pipelines.md

File metadata and controls

305 lines (264 loc) · 12.4 KB

Quick pipeline

  • Import the file 2015-departures.csv.gz to a DataFrame. Note the header is already defined.
  • Filter the DataFrame to contain only flights with a duration over 0 minutes. Use the index of the column, not the column name (remember to use .printSchema() to see the column names / order).
  • Add an ID column.
  • Write the file out as a JSON document named output.json.
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(
     departures_df['Actual elapsed time (Minutes)'] != 0
)

# Add an ID column
departures_df = departures_df.withColumn('id',F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

Pipeline data issue

image

there's a problem in the dataset while trying to sort the duration data. She's not sure what the issue is beyond the sorting operation not working as expected.

After analyzing the data, which command would fix the issue?

  • departures_df = departures_df.orderBy(departures_df.Airport)
  • departures_df = departures_df.withColumn('Duration', departures_df['Duration'].cast(IntegerType()))
  • departures_df = departures_df.orderBy(departures_df['Duration'])
  • departures_df = departures_df.select(departures_df['Duration']).cast(LongType())

# Data handling techniques

# Removing blank lines, headers, and comments

Spark's CSV parser:

# remove comments 'comment'
spark.read.csv("datafile . cSv.gz', comment= '#')

# Remove 'header'
df1 spark.read.csv("datatile.csv.gz', header="False"

# automatic column creation 'sep'
spark.read.csv( 'datafile.csv.gz', sep=',')

stores data in column defaulting to _c0

Removing commented lines

  • Import the annotations.csv.gz file to a DataFrame and perform a row count. Specify a separator character of |.
  • Query the data for the number of rows beginning with #.
  • Import the file again to a new DataFrame, but specify the comment character in the options to remove any commented rows.
  • Count the new DataFrame and verify the difference is as expected.
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

'''
Full count: 32794
Comment count: 1416
Remaining count: 31378
'''

Removing invalid rows

  • Create a new variable tmp_fields using the annotations_df DataFrame column '_c0' splitting it on the tab character.
  • Create a new column in annotations_df named 'colcount' representing the number of fields defined in the previous step.
  • Filter out any rows from annotations_df containing fewer than 5 fields.
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(
    ~ (annotations_df['colcount'] > 4))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

'''
Initial count: 31378
Final count: 10798
'''

Splitting into columns

  • Split the content of the '_c0' column on the tab character and store in a variable called split_cols.
  • Add the following columns based on the first four entries in the variable above: folder, filename, width, height on a DataFrame named split_df.
  • Add the split_cols variable as a column.
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

Further parsing

  • Create a new function called retriever that takes two arguments, the split columns (cols) and the total number of columns (colcount). This function should return a list of the entries that have not been defined as columns yet (i.e., everything after item 4 in the list).
  • Define the function as a Spark UDF, returning an Array of strings.
  • Create the new column dog_list using the UDF and the available columns in the DataFrame.
  • Remove the columns _c0, colcount, and split_cols.
def retriever(cols, colcount):
    # Return a list of dog data
    return cols[4:colcount]

# Define the method as a UDF , to define SQL metrod for calculations in df
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(
    split_df['split_cols'], split_df['colcount']))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('colcount').drop('split_cols')

# Data validation

# Validating via joins

  • Compares data against known values
  • Easy to find data in a given set
parsed_df = spark.read.parquet ("parsed_data. parquet ')
company_df = spark.read. parquet (" companies . parquet ')
verified_df = parsed_df. join (company_df, parsed_df.company = company_df.company)

This automatically removes any rows with a company not in the valid_df!

Validate rows via join

  • Rename the _c0 column to folder on the valid_folders_df DataFrame.
  • Count the number of rows in split_df.
  • Join the two DataFrames on the folder name, and call the resulting DataFrame joined_df. Make sure to broadcast the smaller DataFrame.
  • Check the number of rows remaining in the DataFrame and compare
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0','folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames on the folder name
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

'''
Before: 20580
After: 19956
'''

When should I broadcast join Spark? :

when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join

Examining invalid rows

  • Determine the row counts for each DataFrame.
  • Create a DataFrame containing only the invalid rows.
  • Validate the count of the new DataFrame is as expected.
  • Determine the number of distinct folder rows removed.
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', how='left_anti') 
#  it returns only columns from the left DataFrame for non-matched records.

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

'''
 split_df:	20580
 joined_df:	19956
 invalid_df: 	624
1 distinct invalid folders found
'''

# Final analysis and delivery

# Analysis calculations UDF

# Calculations using UDF
def getAvgSale (saleslist) :
     totalsales = 0
     count
     for sale in saleslist:
          totalsales = sale [2] sale[3]
          count+ 2
     return totalsales/ count
udfGetAvgsale = udfr (getAvgSale, DoubleType ())
df df.withColumn ('avg_sale', udfGetAvgSale (df.sales_list))

# Analysis calculations (inline)

# Inline calculations
df = df.read.csv('datafile')
df = df.withColumn("avg", (df.totalsales / df.sales_count))
df = df.withColumn("sq_ft", df.width * df.length)
df = df.withcolumn("total_avg_size", udfComputeTotal(df.entries)/ df.numEntries)

Dog parsing

  • Select the column representing the dog details from the DataFrame and show the first 10 un-truncated rows.
  • Create a new schema as you've done before, using breed, start_x, start_y, end_x, and end_y as the names. Make sure to specify the proper data types for each field in the schema (any number value is an integer).
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])
'''
+----------------------------------+
|dog_list                          |
+----------------------------------+
|[affenpinscher,0,9,173,298]       |
|[Border_terrier,73,127,341,335]   |
|[kuvasz,0,0,499,327]              |
|[Great_Pyrenees,124,225,403,374]  |
|[schipperke,146,29,416,309]       |
|[groenendael,168,0,469,374]       |
|[Bedlington_terrier,10,12,462,332]|
|[Lhasa,39,1,499,373]              |
|[Kerry_blue_terrier,17,16,300,482]|
|[vizsla,112,93,276,236]           |
+----------------------------------+
only showing top 10 rows
'''

Per image count

  • Create a Python function to split each entry in dog_list to its appropriate parts. Make sure to convert any strings into the appropriate types or the DogType will not parse correctly.
  • Create a UDF using the above function.
  • Use the UDF to create a new column called dogs. Drop the previous column in the same command.
  • Show the number of dogs in the new column for the first 10 rows.
def dogParse(doglist):
    dogs = []
    for dog in doglist:
        (breed, start_x, start_y, end_x, end_y) = dog.split(',')
        dogs.append((breed, int(start_x), int(
            start_y), int(end_x), int(end_y)))
    return dogs


# Create a UDF
# ArrayType(schemaName)
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn(
    'dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

Percentage dog pixels

  • Define a Python function to take a list of tuples (the dog objects) and calculate the total number of "dog" pixels per image.
  • Create a UDF of the function and use it to create a new column called 'dog_pixels' on the DataFrame.
  • Create another column, 'dog_percent', representing the percentage of 'dog_pixels' in the image. Make sure this is between 0-100%. Use the string name of the column alone (ie, "columnname" rather than df.columnname).
  • Show the first 10 rows with more than 60% 'dog_pixels' in the image. Use a SQL style string for this (ie, 'columnname > ____').
def dogPixelCount(doglist):
    totalpixels = 0
    for dog in doglist:
        totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
    return totalpixels


# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn(
    'dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)