This tutorial is an introduction to PySpark and the RDD API. Most of the examples are based on or isnpired by the following books:


*   [Learning Spark, 2nd Edition](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/)
*   [Spark: The Definitive Guide](https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/)

# PySpark 
PySpark is an interface for Apache Spark that allows users to write Spark applications using python APIs. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core. For detailed information on these components and APIs, please refer to the [official PySpark Documentation](https://spark.apache.org/docs/latest/api/python/index.html).

## SparkSession

[pyspark.sql.SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) is the entry point to programming in Spark. It is mainly used to work with the higher-level APIs in Spark (Dataset and DataFrame) as we will see in the next tutorial. In this tutorial, we use it only to access SparkContext and work with the low-level API (The RDD API).

In [0]:
# Check Spark Session Information
spark

## SparkContext
[pyspark.SparkContext](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html#pyspark.SparkContext) is the entry point for low-level APIs in Spark.

In [0]:
sc = spark.sparkContext
sc

# Resilient Distributed Datasets (RDDs)
RDD is the basic abstraction in Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel. While [Python's RDD API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD) provide the main methods for working with RDDs, [Scala's RDD API](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html) is the native detailed API for working with RDDs.

## Parallelized Collections: Creating RDDs from Local Collections 
Parallelized collections are created by calling SparkContext’s [`parallelize`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html#pyspark.SparkContext.parallelize) method on an existing iterable or collection in your driver program. The [`parallelize`] method turns a local collection into a parallel
collection. When creating this parallel collection, you can also explicitly state the number of
partitions into which you would like to distribute it.
The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

In [0]:
# parallelize a local collection 
rangeRDD = sc.parallelize(range(0, 30, 1))

In [0]:
#rangeRdd is just a logical object, nothing has been created yet 
rangeRDD

In [0]:
#you need an action to execute 
rangeRDD.glom().collect() 


[pyspark.RDD.glom](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.glom.html) returns an RDD created by coalescing all elements within each partition into a list. It is used here just to show the contents of each partition.

In [0]:
#try with a list of words
words = "Singapore Data Science Consortium : Scalable Data Science with Apache Spark - Apache Spark Fundamentals"\
.split(" ")

In [0]:
wordsRDD = sc.parallelize(words)
wordsRDD

In [0]:
wordsRDD.glom().collect() 

## Number of Partitions 
[getNumPartitions](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.getNumPartitions.html) returns the number of partitions in RDD.

In [0]:
rangeRDD.getNumPartitions()

In [0]:
wordsRDD.getNumPartitions()

One important parameter for parallel collections is the number of partitions to cut the dataset into. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to `parallelize`.

In [0]:
# parallelize a local collection with a specific number of partitions
rangeRDD = sc.parallelize(range(0, 30, 1), 5)
rangeRDD.getNumPartitions()

In [0]:
rangeRDD.glom().collect()

In [0]:
# try with the list of words
wordsRDD = sc.parallelize(words,4)
wordsRDD.getNumPartitions()

In [0]:
wordsRDD.glom().collect()

## Transformations
Check the list of common [transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) in Spark.

### distinct

[pyspark.RDD.distinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.distinct.html) is used to return a new RDD that contains the distinct elements of the source RDD.

In [0]:
wordsRDD.distinct().count()

### filter
[pyspark.RDD.filter](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.filter.html) returns a new RDD containing only the elements that satisfy a predicate.

In [0]:
rangeRDD.filter(lambda x: x % 3 == 0).glom().collect()

In [0]:
def startsWithS(word):
  return word.startswith("S")

In [0]:
wordsRDD.filter(lambda word: startsWithS(word)).glom().collect()

### map
In this example, we’ll map the
current word to the word, its starting letter, and whether the word begins with “S.”

In [0]:
wordsRDDmap = wordsRDD.map(lambda word: (word, word[0], word.startswith("S")))

In [0]:
wordsRDDmap.glom().collect()

You can subsequently filter on this by selecting the relevant Boolean value:

In [0]:
wordsRDDmap.filter(lambda word: not word[2]).take(5)

### flatMap 
[pyspark.RDD.flatMap](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap.html) provides a simple extension of the map function we just looked at. Sometimes, each current item should return multiple items. flatMap returns a new RDD by first applying a function to all elements of the RDD, and then flattening the results.
For example, you might want to take your set of words and flatMap it into a set of characters.

In [0]:
wordsRDD.flatMap(lambda word: list(word)).glom().take(2)

In [0]:
wordsRDD.map(lambda word: list(word)).glom().take(2)

### mapPartitions
[pyspark.RDD.mapPartitions](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitions.html) is similar to map, but applies a function to each partition rather than each element in the partition.

In [0]:
rangeRDD.mapPartitions(lambda part: [sum(part)]).collect()

### sample
[pyspark.RDD.sample](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample.html) is used to sample a fraction fraction of the data.

In [0]:
rangeRDD.sample(withReplacement = False,
                fraction = 0.2,
                seed = 33).count() 

### sortBy
To sort an RDD you can use the [pyspark.RDD.sortBy](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortBy.html) method. You specificy a function to extract a value from the elements in your RDD and then sort based on that. For instance, the following example sorts by word length from longest to shortest:

In [0]:
wordsRDD.sortBy(lambda word: len(word), ascending=False).collect()

### randomSplit
We can also randomly split an RDD into list of RDDs by using the [pyspark.RDD.randomSplit](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.randomSplit.html) method,
which accepts an Array of weights and a random seed:

In [0]:
fiftyFiftySplit = wordsRDD.randomSplit([0.3, 0.7])

## Actions

Check the list of common [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) in Spark.

###reduce
You can use the [pyspark.RDD.reduce](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduce.html) method to specify a function to “reduce” an RDD of any kind of value
to one value. For instance, given a set of numbers, you can reduce this to its sum by specifying a
function that takes as input two values and reduces them into one.

In [0]:
rangeRDD.reduce(lambda a, b: a + b)

You can also use this to get something like the longest word in our set of words. The key is just to define the correct function:

In [0]:
def wordLengthReducer(leftWord, rightWord):
  if len(leftWord) > len(rightWord):
    return leftWord
  else:
    return rightWord

wordsRDD.reduce(wordLengthReducer)

### count
[pyspark.RDD.count](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.count.html) is a commonly used action to count the number of elements in the RDD.

In [0]:
wordsRDD.count()

In [0]:
rangeRDD.count()

### first
The [pyspark.RDD.first](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.first.html) method returns the first value in the dataset:

In [0]:
wordsRDD.first()

In [0]:
rangeRDD.first()

### min and max
[pyspark.RDD.min](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.min.html) and [pyspark.RDD.max](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.max.html) return the minimum values and maximum values, respectively:

In [0]:
rangeRDD.min()

In [0]:
rangeRDD.max()

### take
[pyspark.RDD.take](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.take.html) and its derivative methods take a number of values from your RDD.

In [0]:
wordsRDD.take(3)

You
can use [pyspark.RDD.takeSample](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.takeSample.html) to specify a fixed-size random sample from your RDD.

In [0]:
wordsRDD.takeSample(withReplacement= True, 
                    num = 3,
                    seed = 100)

## Working with Key-Value RDDs

There are many methods on RDDs that require you to put your data in a key–value format. The most common operations on key-value RDDs are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.

The easiest way is to make a key-value RDD is just to map over your current RDD to a basic key–value structure.

In [0]:
wordsPairRDD = wordsRDD.map(lambda word: (word.lower(), len(word)))

In [0]:
wordsPairRDD

In [0]:
wordsPairRDD.collect()

### keyBy
The preceding example demonstrated a simple way to create a key for your RDD. However, you can also use the [pyspark.RDD.keyBy](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.keyBy.html) method to achieve the same result by specifying a function that creates the key from
your current value. In this case, you are keying by the first letter in the word. Spark then keeps
the record as the value for the keyed RDD:

In [0]:
wordsPairRDD = wordsRDD.keyBy(lambda word: word.lower()[0])

In [0]:
wordsPairRDD.collect()

### mapValues
If you have a
tuple, Spark will assume that the first element is the key, and the second is the value. When in
this format, you can explicitly choose to map-over the values without changing the keys using the [pyspark.RDD.mapValues](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapValues.html) method.

In [0]:
wordsPairRDD.mapValues(lambda word: word.upper()).collect()

### Extracting Keys and Values
You can extract the [pyspark.RDD.keys](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.keys.html) and [pyspark.RDD.values](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.values.html) of your key-value RDD:

In [0]:
wordsPairRDD.keys().collect()

In [0]:
wordsPairRDD.values().collect()

### lookup
One interesting task you might want to do with an RDD is [pyspark.RDD.lookup](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.lookup.html) the values for a particular key.

In [0]:
wordsPairRDD.lookup('s')

### countByKey
With [pyspark.RDD.countByKey](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.countByKey.html) you can count the number of elements for each key, collecting the results to a local dictionary.

In [0]:
wordsPairRDD.countByKey()

### countByValue
[pyspark.RDD.countByValue](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.countByValue.html) is used to return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

In [0]:
wordsRDD.countByValue()

### groupByKey
[pyspark.RDD.groupByKey](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.groupByKey.html) is used to group the values for each key in the RDD into a single sequence.

In [0]:
wordsPairRDD.groupByKey().mapValues(list).collect()

However, this is, for the majority of cases, the wrong way to approach the problem. The fundamental issue here is that each executor must hold all values for a given key in memory before applying the function to them. If you have massive key skew, some partitions might be completely overloaded with a ton of values for a given key, and you will get OutOfMemoryErrors. This obviously doesn’t cause an issue with our current dataset, but it can cause serious problems at scale. 

There are use cases when groupByKey does make sense. If you have consistent value sizes for each key and know that they will fit in the memory of a given executor, you’re going to be just fine. It’s just good to know exactly what you’re getting yourself into when you do this.

There are other funtions for aggregations such as
reduceByKey, aggregateByKey, combineByKey, SortByKey,...

## RDD Persistence
You can mark an RDD to be [persisted](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence) using the persist() or cache() methods on it. In addition, each persisted RDD can be stored using a different storage level set by passing a [pyspark.StorageLevel](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html#pyspark.StorageLevel) object to persist().

### Cache
The [pyspark.RDD.cache](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cache.html) method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).

In [0]:
wordsRDD.is_cached

In [0]:
wordsRDD.getStorageLevel()

In [0]:
wordsRDD.is_cached

### Persist
The [pyspark.RDD.persist](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.persist.html) method is used to set the RDD's storage level to persist its values across operations after the first time it is computed.

In [0]:
from pyspark.storagelevel import StorageLevel
rangeRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

In [0]:
rangeRDD.getStorageLevel()

## Text Files
Text file RDDs can be created using SparkContext’s [`textFile`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.textFile.html#pyspark.SparkContext.textFile) method. This method takes a URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. It returns one record per line in the file.

In [0]:
#Databricks has its file system too
# you can list the avaiable sample data:
display(dbutils.fs.ls('/databricks-datasets/'))

In [0]:
textRDD = sc.textFile("databricks-datasets/definitive-guide/README.md", 8)

In [0]:
textRDD.take(5)

In [0]:
#define lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness.
lineLengths = textRDD.map(lambda s: len(s))

#run reduce, which is an action. At this point, the RDD is created, transformed, and the action is executed. 
totalLength = lineLengths.reduce(lambda a, b: a + b)

#this is the total number or words in the source file
totalLength

### CSV Files

In [0]:
# define an RDD from csv file
# the same textRDD method
flightRDD =  sc.textFile("/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv", 10)

In [0]:
flightRDD.take(5)

In [0]:
flightRDD.count()