# Spark with Python programming
## Introduction

[Apache Spark](https://spark.apache.org/) is one of the hottest new trends in the technology domain. It is the framework with probably the highest potential to realize the fruit of the mariage between Big Data and Machine Learning.

It runs fast (up to 100x faster than traditional [Hadoop MapReduce](https://www.tutorialspoint.com/hadoop/hadoop_mapreduce.htm) due to in-memory operation, offers robust, distributed, fault-tolerant data objects (called [RDD](https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm)), and integrates beautifully with the world of machine learning and graph analytics through supplementary packages like [Mlib](https://spark.apache.org/mllib/) and [GraphX](https://spark.apache.org/graphx/).

<p align='center'>
<img src="https://raw.githubusercontent.com/tirthajyoti/PySpark_Basics/master/Images/Spark%20ecosystem.png" width="400" height="400">
</p>


Spark is implemented on [Hadoop/HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html) and written mostly in [Scala](https://www.scala-lang.org/), a functional programming language, similar to Java. In fact, Scala needs the latest Java installation on your system and runs on JVM.

However, for most beginners, Scala is not a language that they learn first to venture into the world of data science. Fortunately, Spark provides a wonderful Python integration, called **PySpark**, which lets Python programmers to interface with the Spark framework and learn how to manipulate data at scale and work with objects and algorithms over a distributed file system.

In this article, we will learn the basics of Python integration with Spark. There are a lot of concepts (constantly evolving and introduced), and therefore, we just focus on fundamentals with a few simple examples. Readers are encouraged to build on these and explore more on their own.

## Short history

Apache Spark started as a research project at the UC Berkeley AMPLab in 2009, and was open sourced in early 2010. It was a class project at UC Berkeley. Idea was to build a cluster management framework, which can support different kinds of cluster computing systems. Many of the ideas behind the system were presented in various research papers over the years.

After being released, Spark grew into a broad developer community, and moved to the Apache Software Foundation in 2013. Today, the project is developed collaboratively by a community of hundreds of developers from hundreds of organizations.

## Spark in a nutshell

One thing to remember is that Spark is not a programming language like Python or Java. It is a general-purpose distributed data processing engine, suitable for use in a wide range of circumstances. It is particularly useful for big data processing both at scale and with high speed.

Application developers and data scientists generally incorporate Spark into their applications to rapidly query, analyze, and transform data at scale.

Some of the tasks that are most frequently associated with Spark, include,
- ETL and SQL batch jobs across large data sets (often of terabytes of size),
- processing of streaming data from IoT devices and nodes, data from various sensors, financial and transactional systems of all kinds, and 
- machine learning tasks for e-commerce or IT applications.

At its core, Spark builds on top of the Hadoop/HDFS framework for handling distributed files. It is mostly implemented with Scala, a functional language variant of Java.

There is a core Spark data processing engine, but on top of that, there are many libraries developed for SQL-type query analysis, distributed machine learning, large-scale graph computation, and streaming data processing.

Multiple programming languages are supported by Spark in the form of easy interface libraries: Java, Python, Scala, and R.

## Spark uses the MapReduce paradigm

The basic idea of distributed processing is to divide the data chunks into small manageable pieces (including some filtering and sorting), bring the computation close to the data i.e. use small nodes of a large cluster for specific jobs and then re-combine them back.

he dividing portion is called the ‘Map’ action and the recombination is called the ‘Reduce’ action. Together, they make the famous ‘MapReduce’ paradigm, which was introduced by Google around 2004.

For example, if a file has 100 records to be processed, 100 mappers can run together to process one record each. Or maybe 50 mappers can run together to process two records each. After all the mappers complete processing, the framework shuffles and sorts the results before passing them on to the reducers.

A reducer cannot start while a mapper is still in progress. All the map output values that have the same key are assigned to a single reducer, which then aggregates the values for that key.

## Set up Python for Spark


If you’re already familiar with Python and libraries such as Pandas and Numpy, then PySpark is a great extension/framework to learn in order to create more scalable, data-intensive analyses and pipelines by utilizing the power of Spark at the background.

The exact process of installing and setting up PySpark environment (on a standalone machine) is somewhat involved and can vary slightly depending on your system and environment. The goal is to get your regular Jupyter data science environment working with Spark at the background using PySpark.

Alternatively, you can use Databricks setup for practicing Spark. This company was created by the original creators of Spark and have an excellent ready-to-launch environment to do distributed analysis with Spark

But the idea is always the same. You are distributing (and replicating) your large dataset in small fixed chunks over many nodes. You then bring the compute engine close to them so that the whole operation is parallelized, fault-tolerant and scalable.

By working with PySpark and Jupyter notebook, you can learn all these concepts without spending anything on AWS or Databricks platform. You can also easily interface with SparkSQL and MLlib for database manipulation and machine learning.
It will be much easier to start working with real-life large clusters if you have internalized these concepts beforehand!

## RDD and SparkContext

Many Spark programs revolve around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. SparkContext resides in the Driver program and manages the distributed data over the worker nodes through the cluster manager. The good thing about using PySpark is that all this complexity of data partitioning and task management is handled automatically at the back and the programmer can focus on the specific analytics or machine learning job at hand.

![rdd-1](https://raw.githubusercontent.com/tirthajyoti/Spark-with-Python/master/Images/RDD-1.png)

### Spark Context

The Spark Context is the main entry point for Spark functionality and so the heart of any Spark application. It allows Spark Driver to access the cluster through a Cluster Resource Manager and it can be used to create RDDs, accumulators and broadcast variables on the cluster. Spark Context also keeps track of live executors by sending heartbeat messages regularly.

The Spark Context is created by the Spark Driver for each Spark application when it is first submitted by the user. It exists throughout the entire life of a spark application.

The Spark Context terminates once the spark application completes. Only one Spark Context can be active per JVM. You must stop() the active Spark Context before creating a new one.

### Cluster Resource Manager

Cluster Manager in a distributed Spark application is the process that monitors, governs, reserves resources in the form of containers on the cluster worker nodes. These containers are reserved upon request by the Application Masters and allocated to the Application Master when released or available.

Once the Cluster Manager allocates the containers, the Application Master provides the container's resources back to Spark Driver and Spark Driver will be responsible for executing the various stages and tasks of Spark application.

### Executors

Executors are processes on the worker nodes whose job is to execute the assigned tasks. These tasks are executed on the partitioned RDDs on the worker nodes and then return the result to the Spark Driver.

Executors launch once at the beginning of Spark Application and then they run for the entire lifetime of an application this phenomenon is known as "Static Allocation of Executors". However, users can also opt for dynamic allocations of executors wherein they can add or remove spark executors dynamically to match with the overall workload. Even if the Spark executor fails, the Spark application can continue.

Executors provide in-memory storage for RDD's partitions that are cached(locally) in Spark applications.

Other executor properties:

- stores the data in the cache in JVM heap or on HDDs
- reads data from external sources
- writes data to external sources
- performs all the data processing

There are two ways to create RDDs:
- parallelizing an existing collection in your driver program, or
- referencing a dataset in an external storage system, such as a shared file- system, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Over RDD, you can do two types of operations (and, accordingly, all the work with the data is in the sequence of these two types): transformations and actions

## Transformations



The result of applying this operation to RDD is a new RDD. As a rule, these are operations that in some way convert the elements of a given data.

Transformations are lazy in nature meaning when we call some operation in RDD, it does not execute immediately. Spark maintains the record of which operation is being called(through DAG, we will talk about it later).

We can think Spark RDD as the data, that we built up through transformation. Because of transformations laziness, we can execute operation any time by calling an action on data. Hence, data is not loaded until it is necessary. It gives plenty of opportunities to induce low-level optimizations.

At a high level, two groups of transformations can be applied onto the RDDs, namely narrow transformations, and wide transformations.

<img src="transformation.png">

Narrow transformation doesn't require the data to be shuffled or reorganized across the partitions. For example, map, filter, etc. The narrow transformations will be grouped (or pipe-lined) together into a single stage.

A shuffle occurs when data is rearranged between partitions. This is required when a transformation requires information from other partitions, such as summing all the values in a column. Spark will gather the required data from each partition and combine it into a new partition, likely on a different executor.

## Actions

Actions are applied when it is necessary to materialize the result — save the data to disk, or output part of the data to the console. collect() operation we used so far is also an action — it collects data.

Actions are not lazy — they actually will trigger the data processing. Actions are RDD operations that produce non-RDD values.

## DAG

Unlike Hadoop where the user has to break down the whole job into smaller jobs and chain them together to go along with MapReduce, Spark identifies the tasks that can be computed in parallel with partitioned data on the cluster. With these identified tasks, Spark builds a logical flow of operations that can be represented in a graph that is directed and acyclic, also known as DAG (Directed Acyclic Graph), where a node is RDD partition and the edge is transformation on the data. Thus Spark builds its plan of executions implicitly from the provided spark application.

<img src="scheduling.jpeg">

The **DAGScheduler** divides operators into stages of tasks. A stage is consists of tasks based on the input data partitions. The DAGScheduler pipelines some transformations together. E.g. many map operators can be squash into a single stage. The final result of a DAGScheduler is a set of stages. The stages are passed on to the **TaskScheduler**. The number of tasks submitted depends on the number of partitions. The TaskScheduler launches tasks via **the cluster manager**. The TaskScheduler doesn't know about dependencies of the stages.

RDDs are capable of defining location preference to compute partitions. Location preference refers to information about the RDD location. The **DAGScheduler** places the partitions in such a way that the task is close to data as much as possible(data locality).

For illustration with a Python-based approach, we will give examples of the first type here. We can create a simple Python array of 20 random integers (between 0 and 10), using Numpy `random.randint()`, and then create an RDD object as following,

In [None]:
sc.stop()

In [None]:
from pyspark import SparkContext
import numpy as np

sc.stop()
sc = SparkContext(master="local[4]")

lst = np.random.randint(0,10,20)

A = sc.parallelize(lst)
A

**Note the '4' in the argument. It denotes 4 computing cores (in your local machine) to be used for this `SparkContext` object**. If we check the type of the RDD object, we get the following,

In [None]:
type(A)

Opposite to parallelization is the collection (with `collect()`) which brings all the distributed elements and returns them to the head node.

In [None]:
A.collect(), type(A.collect())

But `A` is no longer is a simple Numpy array. We can use the `glom()` method to check how the partitions are created.

In [None]:
A.glom().collect()

Now stop the SC and reinitialize it with 2 cores and see what happens when you repeat the process.

In [None]:
sc.stop()
sc=SparkContext(master="local[4]")
A = sc.parallelize(lst)
A.glom().collect()

The RDD is now distributed over two chunks, not four! **You have learned about the first step in distributed data analytics i.e. controlling how your data is partitioned over smaller chunks for further processing**

## Examples of basic operations with RDD
### Count the elements

In [None]:
A.count()

### The first element (`first`) and the first few elements (`take`)

In [None]:
A.first()

In [None]:
A.take(3)

### Removing duplicates with using `distinct`
**NOTE**: This operation requires a **shuffle** in order to detect duplication across partitions. So, it is a slow operation. Don't overdo it.

In [None]:
A.glom().collect()

In [None]:
print(A.glom().collect())
A_distinct=A.distinct()
A_distinct.glom().collect()

### To sum all the elements use `reduce` method
Note the use of a lambda function in this

In [None]:
A.reduce(lambda x,y:x+y)

### Or the direct `sum()` method

In [None]:
A.sum()

### Finding maximum element by `reduce`


In [None]:
A.reduce(lambda x,y: x if x > y else y)


### Finding longest word in a blob of text


In [None]:
words = 'These are some of the best Macintosh computers ever'.split(' ')
wordRDD = sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)

### Use `filter` for logic-based filtering 

In [None]:
# Return RDD with elements (greater than zero) divisible by 3
A.filter(lambda x:x%3==0 and x!=0).collect()

### Write regular Python functions to use with `reduce()`

In [None]:
def largerThan(x,y):
    """
    Returns the last word among the longest words in a list
    """
    if len(x)> len(y):
        return x
    elif len(y) > len(x):
        return y
    else:
        if x < y: return x
        else: return y

wordRDD.reduce(largerThan)

Note here the `x < y` does a lexicographic comparison and determines that `Macintosh` is larger than `computers`!


### Mapping operation with a lamba function


In [None]:
B=A.map(lambda x:x*x)
B.collect()

### Mapping with a regular Python function


In [None]:
def square_if_odd(x):
    """
    Squares if odd, otherwise keeps the argument unchanged
    """
    if x%2==1:
        return x*x
    else:
        return x

A.map(square_if_odd).collect()

### `groupby` returns a RDD of grouped elements (iterable) as per a given group operation

In the following example, we use a list-comprehension along with the `groupby` to create a list of two elements, each having a header (the result of the lambda function, simple modulo 2 here), and a sorted list of the elements which gave rise to that result. You can imagine easily that this kind of seperation can come particularly handy for processing data which needs to be binned/canned out based on particular operation performed over them.

In [None]:
result=A.groupBy(lambda x:x%2).collect()
sorted([(x, sorted(y)) for (x, y) in result])

### Using `histogram`
The `histogram()` method takes a list of bins/buckets and returns a tuple with result of the histogram (binning)

In [None]:
B.histogram([x for x in range(0,100,10)])

### Set operations

You can also do regular set operations on RDDs like - `union()`, `intersection()`, `subtract()`, or `cartesian()`.

## Lazy evaluation with Spark (and _Caching_)

Lazy evaluation is an evaluation/computation strategy which prepares a detailed step-by-step internal map of the execution pipeline for a computing task, but delays the final excution untill when it is absolutely needed. This strategy is at the heart of Spark for speeding up many parallelized Big Data operations.

Let's use two CPU cores for this example,


In [None]:
sc.stop()
sc = SparkContext(master="local[2]")


### Make a RDD with 1 million elements


In [None]:
%%time
rdd1 = sc.parallelize(range(1000000))

### Some computing function - `taketime`

In [None]:
from math import cos
def taketime(x):
    [cos(j) for j in range(100)]
    return cos(x)


### Check how much time is taken by taketime function¶

In [None]:
%%time
taketime(2)

Remember this result, the `taketime()` function took a wall time of 31.5 us. Of course, the exact number will depend on the machine you are working on.

### Now do the map operation on the function


In [None]:
%%time
interim = rdd1.map(lambda x: taketime(x))

**Because of lazy evaluation i.e. nothing was computed in the previous step, just a plan of execution was made**. The variable interim does not point to a data structure, instead it points to a plan of execution, expressed as a dependency graph. The dependency graph defines how RDDs are computed from each other.

### The actual execution by `reduce` method

In [None]:
%%time
print('output =',interim.reduce(lambda x,y:x+y))

So, the wall time here is 15.6 seconds. Remember, the `taketime()` function had a wall time of 31.5 us? Therefore, we expect the total time to be on the order of ~ 31 seconds for a 1-million array. Because of parallel operation on two cores, it took ~ 15 seconds.

Now, we have not saved (materialized) any intermediate results in interim, so another simple operation (e.g. counting elements > 0) will take almost same time.

In [None]:
%%time
print(interim.filter(lambda x:x>0).count())

### Caching to reduce computation time on similar operation (spending memory)

Remember the dependency graph that we built in the previous step? We can run the same computation as before with cache method to tell the dependency graph to plan for caching.

The first computation will not improve, but it caches the interim result,

In [None]:
%%time
print('output =',interim.reduce(lambda x,y:x+y))

Now run the same filter method with the help of cached result,

In [None]:
%%time
print(interim.filter(lambda x:x>0).count())

Wow! The compute time came down to less than a second from 12 seconds earlier! This way, caching and parallelization with lazy excution, is the core feature of programming with Spark.

## Dataframe and SparkSQL

Apart from the RDD, the second key data structure in the Spark framework, is the _DataFrame_. If you have done work with Python Pandas or R DataFrame, the concept may seem familiar.

A DataFrame is a distributed collection of rows under named columns. It is conceptually equivalent to a table in a relational database, an Excel sheet with Column headers, or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. It also shares some common characteristics with RDD:

- Immutable in nature : We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD after applying transformations.
- Lazy Evaluations: Which means that a task is not executed until an action is performed.
Distributed: RDD and DataFrame both are distributed in nature.

<p align='center'><img src="https://cdn-images-1.medium.com/max/1202/1*wiXLNwwMyWdyyBuzZnGrWA.png" width="600" height="400"></p>

### Advantages of the DataFrame


- DataFrames are designed for processing large collection of structured or semi-structured data.
- Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.
- DataFrame in Apache Spark has the ability to handle petabytes of data.
- DataFrame has a support for wide range of data format and sources.
- It has API support for different languages like Python, R, Scala, Java.

### SparkSQL


Relational data stores are easy to build and query. Users and developers often prefer writing easy-to-interpret, declarative queries in a human-like readable language such as SQL. However, as data starts increasing in volume and variety, the relational approach does not scale well enough for building Big Data applications and analytical systems.

We have had success in the domain of Big Data analytics with Hadoop and the MapReduce paradigm. This was powerful, but often slow, and gave users a low-level, **procedural programming interface** that required people to write a lot of code for even very simple data transformations. However, once Spark was released, it really revolutionized the way Big Data analytics was done with a focus on in-memory computing, fault tolerance, high-level abstractions, and ease of use.

Spark SQL essentially tries to bridge the gap between the two models we mentioned previously—the relational and procedural models. Spark SQL works through the DataFrame API that can perform relational operations on both external data sources and Spark's built-in distributed collections—at scale!

![sparksql-1](https://raw.githubusercontent.com/tirthajyoti/Spark-with-Python/master/Images/SparkSQL-1.png)


Why is Spark SQL so fast and optimized? The reason is because of a new extensible optimizer, **Catalyst**, based on functional programming constructs in Scala. Catalyst supports both rule-based and cost-based optimization. While extensible optimizers have been proposed in the past, they have typically required a complex domain-specific language to specify rules. Usually, this leads to having a significant learning curve and maintenance burden. In contrast, Catalyst uses standard features of the Scala programming language, such as pattern-matching, to let developers use the full programming language while still making rules easy to specify.