In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Import the pyspark.sql.types library
from pyspark.sql.types import *

import pyspark.sql.functions as fn

In [None]:
# Create a SparkContext
"""
SparkContext is the entry gate of Apache Spark functionality. The most important step 
of any Spark driver application is to generate SparkContext. It allows your Spark 
Application to access Spark Cluster with the help of Resource Manager (YARN/Mesos).
"""
sc = SparkContext.getOrCreate()

In [None]:
# Create a SparkSession
"""
SparkSession is the entry point to Spark SQL. It is one of the very first objects you 
create while developing a Spark SQL application. As a Spark developer, you create a 
SparkSession using the SparkSession.builder method (that gives you access to Builder 
API that you use to configure the session).
"""

spark = (SparkSession.builder
                  .appName("Spark SQL Query Dataframes")
                  .getOrCreate())

In [None]:
# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

In [None]:
people_df = spark.read.format('csv').load(name='rawdata.csv', schema=peopleSchema)

In [None]:

"""
Using lazy processing

Lazy processing operations will usually return in about the same amount of time regardless 
of the actual quantity of data. Remember that this is due to Spark not performing any 
transformations until an action is requested.

When working with Spark that no transformations take effect until you apply an action. This can 
be confusing at times, but is one of the underpinnings of Spark's power.
"""

data_path = './data'
file_path = f'{data_path}/AA_DFW_2014_Departures_Short.csv.gz'

# Load the CSV file
aa_dfw_df = (spark.read
                    .format('csv')
                    .options(Header=True)
                    .load(file_path))

# Show the DataFrame
aa_dfw_df.show(3)

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', fn.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show(3)

In [None]:
"""
Difculties with CSV files:
--------------------------
No defined schema
Nested data requires special handling
Encoding format limited

Spark and CSV files
-------------------
Slow to parse 
Files cannot be altered(no"predicate pushdown")
Any intermediate use requires redefining schema

The Parquet Format
-------------------
A columnar data format
Supported in Spark and other data processing frameworks
Supports predicate pushdown
Automatically stores schema information


When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides 
a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. 
The Parquet format is a columnar data store, allowing Spark to use predicate pushdown. This means 
Spark will only process the data necessary to complete the operations you define versus reading 
the entire dataset. This gives Spark more flexibility in accessing the data and often drastically 
improves performance on large datasets.


Reading Parquet Files
---------------------
df = spark.read.format('parquet').load('filename.parquet')

df = spark.read.parquet('filename.parquet')


Writing Parquet Files
---------------------
df.write.format('parquet').save('filename.parquet')

df.write.parquet('filename.parquet')


Parquet as backing stores for SparkSQL operations
-------------------------------------------------

flight_df = spark.read.parquet('flights.parquet')

flight_df.createOrReplaceTempView('flights')

short_flights_df = spark.sql('SELECT * FROM flights WHERE flightduration < 100')
"""

In [None]:
"""
DataFrame refresher

DataFrames:
    Made up of rows & columns
    Immutable
    Use various transformation operations to modify data



# Return rows where name starts with "M"
----------------------------------------
voter_df.filter(voter_df.name.like('M%'))



# Return name and position only 
-------------------------------
voters = voter_df.select(fn.col('name'), fn.col('position'))


# Filter/Where
--------------
voter_df.filter(voter_df.date > '1/1/2019')

voter_df.where(...)


# Select
--------
voter_df.select(voter_df.name)


# withColumn
------------
voter_df.withColumn('year', voter_df.date.year)


# drop
------
voter_df.drop('unused_column')



Filtering data
---------------
    Remove nulls
    Remove odd entries
    Split data from combined sources
    Negate with ~


# Illustrations
---------------
voter_df.filter(voter_df['name'].isNotNull())

voter_df.filter(voter_df.date.year > 1800)

voter_df.where(voter_df['_c0'].contains('VOTE'))

voter_df.where(~ voter_df._c1.isNull())

"""

In [None]:
"""
# Applied per column as transformation
--------------------------------------
voter_df.withColumn('upper', fn.upper('name'))

# Can create intermediary columns
---------------------------------
voter_df.withColumn('splits', fn.split('name', ' '))

# Can cast to other types
-------------------------
voter_df.withColumn('year', voter_df['_c4'].cast(IntegerType()))

# Various utility functions / transformations to interact with ArrayType()
------------------------------------------------------------------------
    .size(<column>) - returns length of arrayType() column
    
    .getItem(<index>) - used to retrieve a specic item at index of list column.
    
"""

In [None]:
"""
# Conditional Clauses are: Inline version of if / then / else
-------------------------------------------------------------
.when()

.otherwise()

.when(<if condition>, <then x>)

# Example 1
-----------
df.select(df.Name, df.Age, fn.when(df.Age >= 18, "Adult")


# Example 2
-----------
df.select(df.Name, df.Age,           
            .when(df.Age >= 18, "Adult")          
            .when(df.Age < 18, "Minor"))
            
            
# Example 3: .otherwise() is like else
------------------------------------
df.select(df.Name, df.Age,          
            .when(df.Age >= 18, "Adult")          
            .otherwise("Minor"))          

"""

In [None]:
"""
# User defined functions or UDFs
---------------------------------
    Python method
    Wrapped via the pyspark.sql.functions.udf method
    Stored as a variable
    Called like a normal Spark function


# Define a Python method
-------------------------
def reverseString(mystr):
    return mystr[::-1]
    

# Wrap the function and store as a variable
--------------------------------------------
udfReverseString = udf(reverseString, StringType())


# Use with Spark
-----------------
user_df = user_df.withColumn('ReverseName', udfReverseString(user_df.Name))


# Argument-less example
-----------------------
def sortingCap():
    return random.choice(['G', 'H', 'R', 'S'])

udfSortingCap = udf(sortingCap, StringType())

user_df = user_df.withColumn('Class', udfSortingCap())
"""

In [None]:
"""

Partitioning 
------------
DataFrames are brokenup into partitions
Partition size can vary 
Each partition is handled independently



Lazy processing
---------------

Transformations are lazy
    .withColumn(...)
    
    .select(...)

Nothing is actually done until an action is performed
    .count()
    
    .write(...)


Transformations can be re-ordered for best performance

Sometimes causes unexpected behavior

"""

In [None]:
# Show the distinct VOTER_NAME entries
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ fn.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', fn.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(fn.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

In [None]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', fn.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

In [None]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                                   when(voter_df.TITLE == 'Councilmember', fn.rand())
                                   .when(voter_df.TITLE == 'Mayor', 2)
                                   .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

In [None]:
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = fn.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show()

In [None]:
# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', fn.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

In [None]:
# Print the number of partitions in each DataFrame
print(f'There are {voter_df.rdd.getNumPartitions()} partitions in the voter_df DataFrame')
print(f'There are {voter_df_single.rdd.getNumPartitions()} partitions in the voter_df_single DataFrame.')

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', fn.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID', fn.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)

In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', fn.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

In [None]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_0*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))


"""
Awesome! The results should illustrate that using split files runs more quickly than using one 
large file for import. Note that in certain circumstances the results may be reversed. This is 
a side effect of running as a single node cluster. Depending on the tasks required and resources 
available, it may occasionally take longer than expected. If you perform multiple runs of the 
tasks, you should see the full file import as generally slower than the split file import.
"""

In [None]:
"""
Reading Spark configurations

You've recently configured a cluster via a cloud provider. Your only access is via the command 
shell or your python code. You'd like to verify some Spark settings to validate the configuration 
of the cluster.


Using the spark.conf object allows you to validate the settings of a cluster without having 
configured it initially. This can help you know what changes should be optimized for your needs.
"""


# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')


# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')


# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')


# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('./data/departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())


"""
It's important to remember that modifying the settings in Spark may change objects that already exist. 
Sometimes the changes only take effect after configuring a new DataFrame. Remember to test changes 
you make to Spark configurations to verify it does exactly what you think.
"""

In [None]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()

"""
You've implemented a basic join and examined the query plan. Learning to parse a query plan 
will help you understand what Spark is doing and when
"""



In [None]:
"""
Using broadcasting on Spark joins:

Remember that table joins in Spark are split between the cluster workers. If the data is not local, 
various shuffle operations are required and can have a negative impact on performance. Instead, we're 
going to use Spark's broadcast operations to give each node a copy of the specified data.

A couple tips:
    Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to 
    the worker nodes.
    
    On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization 
    on its own.
    
    If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured 
    broadcasting.
"""

# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()


"""
Using Spark broadcasting to improve the performance of your data operations. You should see that the query 
plan uses the Broadcast operations instead of the default Spark versions. You'll likely use broadcasting 
often with production datasets - checking the query plan will help validate your configuration without 
actually running the tasks.
"""

In [None]:
"""
Your manager would like to see a simple pipeline example including the basic steps. For this example, 
you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.
"""

# Import the data to a DataFrame
departures_df = spark.read.csv('./data/2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', fn.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))



# Split _c0 on the tab character and store the list in a variable
tmp_fields = fn.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))



# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df["_c0"], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)




# Split the content of _c0 on the tab character (aka, '\t')
split_cols = fn.split(annotations_df["_c0"], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)


# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(fn.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))



# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(fn.broadcast(joined_df), 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)



In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
    StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])


# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
    dogs = []
    for dog in doglist:
        (breed, start_x, start_y, end_x, end_y) = dog.split(',')
        dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
    return dogs

# Create a UDF
udfDogParse = fn.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(fn.size('dogs')).show(10)



# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
    totalpixels = 0
    for dog in doglist:
        totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
    return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = fn.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn(
                                    'dog_percent', 
                                    (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100
                                    )

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)