<a href="https://colab.research.google.com/github/uzampogn/pyspark/blob/main/PySpark%20intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Resources:

* https://sparkbyexamples.com/

# Big Data Fundamentals with PySpark

# Understanding SparkContext
A SparkContext represents the entry point to Spark functionality. It's like a key to your car. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. PySpark automatically creates a SparkContext for you in the PySpark shell (so you don't have to create it by yourself) and is exposed via a variable sc.

In this simple exercise, you'll find out the attributes of the SparkContext in your PySpark shell which you'll be using for the rest of the course.

sc = SparkContext

sc.version spark version

sc.pythonVer Python ver

sc.master 

# RDD = Resilient Distributed Datasets

*   Resilient = ability to cope with failures
*   Distributed = spread across various machines
*   Collect of partitioned data e.g. arrays, tables, tuples, etc

Create/load RDDs:

*   sc.parallelize(list)
*   sc.textFile(path)

Check RDD partitions:
*   rdd.getNumPartitions()

Operations of RDD Transformation vs Action:
* Transformation return a new RDD
  > Transformation follows a lazy evaluation

  > example: map(), filter(), flatMap(returns multiple values for each element), union()

  > join()

* Action perform computation on RDD
  > collect() return all element of data as array

  > take(x) return an array with first x elements of the datasets

  > first() return first element

  > count() return total number of elements

  > reduce()

  > saveAsTextFile()

  > sc.coalesce(1).saveAsTextFile() save in 1 partition
  

Pair RDDs = RDD for pair/value data structure
Specific Transformation
  > reduceByKey()
  >> Apply reduceByKey() operation to sum value of unique key

  >> Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)

  > sortByKey()

  > groupByKey()

Specific action
  > countByKey()
  
  > collectAsMap() return k,v pair as a dictionary


#Dataframes in Pyspark

Create:
* sc.createDataFrame(rdd)
* spark.read.csv(file_path)

Transformations:
* select()
* filter(df.Age < 21)
* groupby()
* orderby()
* dropDuplicates()
* withColumnRenamed()

Actions:
* printSchema()
* head()
* show() print first 20 rows by default
* count()
* columns
* describe()
* info()


Create a temp table from a dataframe:
* people_df.createOrReplaceTempView("people")

Query with spark sql:
* new_df = spark.sql(query)

Visualization in Pyspark:
* pyspark_dist_explore - Plotting library
> hist()

  > distplot()

  > pandas_histogram()

* toPandas()
* HandySpark library
> .toHandy()

# Snippet

In [None]:
# Display the first 10 words and their frequencies from the input RDD
for word in resultRDD.take(10):
	print(word)

# Swap the keys and values from the input RDD
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies from the sorted RDD
for word in resultRDD_swap_sort.take(10):
	print("{},{}". format(word[1], word[0]))

In [None]:
# Create a temporary view of fifa_df
fifa_df.createOrReplaceTempView('fifa_df_table')

# Construct the "query"
query = '''SELECT Age FROM fifa_df_table WHERE Nationality == "Germany"'''

# Apply the SQL "query"
fifa_df_germany_age = spark.sql(query)

# Generate basic statistics
fifa_df_germany_age.describe().show()

# Cleaning data using Pyspark

Pyspark Dataframe propreties:


*   Immutable

  >Once created, an object can't be changed. If a transformation is applied to an object, the data will be overwritten.

  >Spark takes advantage of data immutability to efficiently share / create new data representations throughout the cluster.



*   Lazy processing
  
  >"Nothing (Transformation) happen till an action is performed" = Transformation are lazy

  > Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. ** Spark does not perform any transformations until an action is requested.**

  > NB: Transformation can be re-ordered for best performance

* Parquet vs Csv

  > Parquest has schema on read properties. Allowing to filter before processing.

  > Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.


# Load, read, write

In [None]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

In [None]:
# Load the CSV file
aa_dfw_df = spark.read.format("csv").options(Header=True).load('AA_DFW_2018.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

In [None]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

In [None]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet("AA_DFW_ALL.parquet")

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

# Filter & Create new Column

In [None]:
# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.where('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

In [None]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn("first_name", voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn("last_name", voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

In [None]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

# CONDITIONAL STEP : IF THEN ELSE equivalent

In [None]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0)
                               )

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

#UDF : USER DEFINE FUNCTION

In [None]:
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show() 

# Create id in distributed system

In [None]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

In [None]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)


In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

# Caching

action of storing the dataframe in memory or on disk

Advantages:

   * Improve speed on later transformations & actions as the data doesn't need to be retrieved from the original data source
   * Reduce overall usage of cluster. No need to use network & cpu as data is already in memory

Disadvantages:

  * Large dataset may not fit in memory
  * Cached objects may not be available


Tips:

  * Cache only if you need dataframe again
  * Try caching df at different moment and check performance


Example:

    Counting 139358 rows took 2.838447 seconds
    Counting 139358 rows again took 1.051270 seconds

Consider why the first run takes longer even though you've told it to cache() the DataFrame. Remember that even though you've applied the caching transformation, it doesn't take effect until an action is run. The action instantiates the caching after the distinct() function completes. The second time, there is no need to recalculate anything so it returns almost immediately.

In [None]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

# Improve import performance

First it's important to understand a Spark cluster. It's composed of:

  * Driver process
    * Handle task assignement & consolidation of data back from workers
  * Worker process
    * Handle transformation & action steps
    * Once assign a task, workers are fairly independent

Import performance:

  * Better many small files than a large one
  * Better if files are roughly the same size
  * Both previous step let Spark decide how best to read the data
  * Define Schema

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

# Performance improvements

Check Spark the query plan with .explain(). It will help validate your configuration without actually running the tasks.

<br>

Shuffling refers to moving data around to various workers to complete a task

Side effect of shuffling:

  * Lower throughput of the system as workers are busy transferring data among the network

How to limit shuffling:
  * limit the use of .repartition(num_partitions)
    > It triggers a reshuffling of the entire datasets

    > Instead use .coalesce(num_partitions) to merge several partitions together
  * watch out for .join()
  * if possible use .broadcast() 
    > Provide a copy of an object to each worker

    > To used when one dataframe is much smaller than the other

In [None]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

    Normal count:		119910	duration: 2.819368
    Broadcast count:	119910	duration: 1.102918

While the difference in time is miniscule for our example, the ratio between the durations is significant. Depending on the makeup of the data being joined, you can notably cut the run time for Spark operations.

# Data pipeline

General steps:

1.   Input: CSV, JSON, API, etc
2.   Transformation
3.   Output
4.   Validation
5.   Analysis (compute certain metric to ease data consumption)



In [None]:
# Import the data to a DataFrame
departures_df = spark.read.csv("2015-departures.csv.gz", header=True)

departures_df.printSchema()

# Remove any duration of 0
departures_df = departures_df.where('"Actual elapsed time (Minutes)" > 0')

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')
# Handling commented rows is easy in Spark and allows you to quickly remove any row beginning with a defined character. 

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

In [None]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

# Validation

Filter out the rows using a join:

> using joins in this fashion drastically simplifies a validation task if your data permits it. The validation data doesn't necessarily need to be loaded from a file - it could be calculated on the fly, or based on a previous dataset. Optimizing these tasks will improve your overall data cleaning process. Note: There are multiple ways to define the join statement. As both DataFrames have a column with the name 'folder', Spark handles this for us automatically.

In [None]:
 # Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0','folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

In [None]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'leftanti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

# Parsing

schemas can be used for importing data, but they can also be used to simplify accessing information within pre-parsed data. If you're wondering why we didn't just define a full schema for the import, the Spark CSV parser is not capable of using complex schema types using lists.

In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField('start_y', IntegerType(), False),
    StructField('end_x', IntegerType(), False),
    StructField('end_y', IntegerType(), False)
])

# Analysis

In [None]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

In [None]:
print(joined_df.schema.json())
joined_df.printSchema()

# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels',udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', ('dog_pixels' / (joined_df.width*joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)