# Introduction to PySpark

### Lambda Functions
Lambda functions are anonymous functions that are defined inline using the `lambda` keyword and limited to a single expression. A common use-case for lambda functions is to have small anonymous functions that maintain no external state.

In [1]:
x = ['Python', 'programming', 'is', 'awesome']
print(sorted(x))

['Python', 'awesome', 'is', 'programming']


In [2]:
print(sorted(x, key=lambda arg: arg.lower()))

['awesome', 'is', 'programming', 'Python']


### `filter()`, `map()`, and `reduce()`

It is important to understand these functions in a core Python context. Then, you'll be able to translate the knowledge into PySpark programs and the Spark API.

`filter` filters items out of an iterable based on a condition, typically expressed as a `lambda` function.

In [3]:
# Using filter()
print(list(filter(lambda arg: len(arg) < 8, x)))

['Python', 'is', 'awesome']


In [4]:
# Much more verbose option
def is_less_than_8_characters(item):
    return len(item) < 8

results = list()

for item in x:
    if is_less_than_8_characters(item):
        results.append(item)
print(results)

['Python', 'is', 'awesome']


The code above collects all the strings that have less than 8 characters. The last code cell is more verbose than the `filter` example, but performs the same function with the results.

Another less obvious benefit of `filter` is that it returns an iterable. This means `filter` doesn't require that your computer have enough memory to hold all the items in the iterable at once. This is increasingly important with big data sets that can quickly grow to several gigabytes in size.

`map` is similar to `filter` in that it applies a function to each item in an iterable, but it always produces a 1-to-1 mapping of the original items. The new iterable that `map` returns will always have the same number of elements as the original iterable, whie was not the case with `filter`.

In [5]:
# Using map()
print(list(map(lambda arg: arg.upper(), x)))

['PYTHON', 'PROGRAMMING', 'IS', 'AWESOME']


In [6]:
# Using an iterable
results = list()
for item in x:
    results.append(item.upper())
print(results)

['PYTHON', 'PROGRAMMING', 'IS', 'AWESOME']


The `for` loop has the same result as the `map` example, which collects all items in their upper-case form. However, as with the `filter` example, `map` returns an iterable, which again makes it possible to process large data that are too big to fit entirely in memory.

`reduce` does not return an iterable. Instead, `reduce` uses the function called to reduce the iterable to a single value:

In [7]:
from functools import reduce # reduce() moved to functools in Python 3
print(reduce(lambda val1, val2: val1 + ' ' + val2, x))

Python programming is awesome


The code combines all the items in the iterable, from left to right, into a single item. There is no call to `list` because `reduce` already returns a single item.

### "Hello World" in PySpark

In [8]:
import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

316
52


The program counts the total number of lines and the number of lines that have the word `python` in a file named `copyright`.

Remember, **a PySpark program is not that much different from a regular Python program**, but the **execution model can be very different** from a regular Python program, especially if it is running on a cluster.

### What is Spark?

Apache Spark is made up of several components - describing it can be difficult. At its core, Spark is a generic engine for processing large amounts of data.

Spark is written in Scala and runs on the JVM. It has built-in components for processing streaming data, machine learning, graph processing, and even interacting with data via SQL.

### What is PySpark?

Spark is implemented in Scala, a programming language that runs on the JVM. To access all the functionality of Spark in Python you can use PySpark.

Think of PySpark as a Python-based wrapper on top of the Scala API. This means we have to refer to two sets of documentation:

>1. [PySpark API documentation](http://spark.apache.org/docs/latest/api/python/index.html)
>2. [Spark Scala API documentation](https://spark.apache.org/docs/latest/api/scala/index.html#package)

PySpark communicates with the Spark Scala-based API via the Py4J library. Py4J is not specific to PySpark or Spark. Py4J allows any Python program to talk to JVM-based code.

Two reasons why PySpark is based on the the functional programming paradigm:

>1. Spark's native language, Scala, is functional.
>2. Functional code is much easier to parallelize.

Another way to think of PySpark is that it is a library that allows processing of large amounts of data on a single machine or a cluster of machines.

In a Python context, think of PySpark as a way to handle parallel processing without the need for the `threading` or `multiprocessing` modules. All the complicated communication and synchronization between threads, processes, and even different CPUs is handled by Spark.

### PySpark API and Data Structures

To interact with PySpark, we create specialized data structures called Resilient Distributed Datasets (RDDs).

RDDs hide all the complexity of transforming and distributing our data automatically across multiple nodes by a scheduler if we running on a cluster.

To better understand PySpark's API and data structures, recall the `Hello World` program above:

```python
import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())
```

The entry-point of any PySpark program is a `SparkContext` object. This object allows us to connect to a Spark cluster and create RDDs. The `local[*]` string is a special string denoting that we are using a *local* cluster, which is another way of saying we are running in single-machine mode. The `*` tells Spark to creat as many worker threads as logical cores on our machine.

Creating a `SparkContext` can be more involved when we are using a cluster. To connect to a Spark cluster, we might need to handle authentication and a few other pieces of information specific to our cluster. We can set up those details in the following manner:

```python
conf = pyspark.SparkConf()
conf.setMaster('spark://head_node:56887')
conf.set('spark.authenticate', True)
conf.set('spark.authenticate.secret', 'secret-key')
sc = pyspark.SparkContext(conf=conf)
```

You can only start creating RDDs once you have a `SparkContext`.

RDDs can be created in a number of ways, but one common way is the PySpark `parallelize()` function. `parallelize` can transform some Python data structures like lists and tuples into RDDs, which gives you functionality that makes them fault-tolerant and distributed.

To better understand RDDs, consider another example. The following code creates an iterator of 10K elements and then uses `parallelize` to distribute that data into 2 partitions:

In [9]:
big_list = range(10000)
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
odds.take(5)

[1, 3, 5, 7, 9]

`parallelize` turns that iterator into a distributed set of numbers and gives us all the capability of Spark's infrastructure.

Note that we use the RDD's `filter` method instead of Python's built-in `filter`. The result is the same, but what is happening behind the scenes is drastically different. By using the RDD `filter` method, that operation occurs in a distributed manner across several CPUs or computers.

Again, imagine this is Spark doing the `multiprocessing` work all encapsulated in the RDD data structure.

`take()` is a way to see the contents of our RDD, but only as small subset. `take` pulls that subset of data from the distributed system onto a single machine.

`take()` is important for debugging because inspecting our entire dataset on a single machine may not be possible. RDDs are optimised to be used on big data so in a real world scenario, a single machine may not have enough RAM to hold an entire dataset.


<div class="alert alert-block alert-info">
<b>Spark temporarily prints information to stdout when running examples like this in the shell. Your stdout might temporarily show something like [Stage 0:> (0 + 1) / 1].

The stdout text demonstrates how Spark is splitting up the RDDs and processing your data into multiple stages across different CPUs and machines.
</b>
</div>


Another way to create RDDs is to read the file with `textFile()`. RDDs are one of the foundational data structures for using PySpark so many of the functions in the API return RDDs.

One of the key distinctions between RDDs and other data structures is that processing is delayed until the result is requested. This is similar to a Python generator. Developers in the Python ecosystem typically use the term lazy evaluation to explain this behavior.

You can stack up multiple transformations on the same RDD without any processing happening. This functionality is possible because Spark maintains a directed acyclic graph (DAG) of the transformations. The underlying graph is only activated when the final results are requested. In the previous example, no computation took place until we requested the results by calling `take()`.

There are multiple ways to request the results from an RDD. You can explicitly request results to be evaluated and collected to a single cluster node by using `collect()` on a RDD. You can also implicitly request the results in various ways, one of which is using `count()`.

<div class="alert alert-block alert-info">
<b>
Be careful when using these methods because they pull the entire dataset into memory, which will not work if the dataset is too big to fit into the RAM of a single machine.
</b>
</div>

#### References

> [First Steps with PySpark and Big Data Processing](https://realpython.com/pyspark-intro/)