### What is Spark?

Spark is open-source distributed querying and processing engine. It allows user to read, transform and aggregate data, as well as train and deploy sophisticated statistical models. It has different APIs like

- MLlib and ML for machine learning

- GraphX and GraphFrames for graph processing

- Spark Streaming (DStreams and Structured)

- SparkSQL

It can read and write from diverse data sources including but not limited to HDFS, Cassandra, HBase and S3. 

Spark supports Java, Python, Scala and R. Tasks most frequently associated with spark include ETL and SQL batch jobs across large datasets.

SparkSession is unified entry point into spark application. Spark session is combination of different contexts like "Spark Context", "hive context", "SQL context".

### Spark Execution process

<img src = 'datasets/exec.png'>

Any Spark application spins off a single driver process (that contain multiple jobs) on the master node that then directs executor processes (that contain multiple tasks) distributed to a number of worker nodes. The driver node determines the number and composition of the task processes based on the graph generated for the given job.

https://spark.apache.org/docs/latest/cluster-overview.html

### Spark vs MapReduce

Alternative to MapReduce.

MapReduce writes most of the data to disk after each map and reduce operation, spark keeps data in-memory after each transformation. Spark can spill over data to disk if the memory is filled.

Spark can use data stored in variety of formats, and databases like Cassandra, S3, HDFS etc., Where as MapReduce can access data stored only in HDFS.

MapReduce is only for Batch processing. Spark works for Batch processing as well as real time processing.

MapReduce is slower than spark because of its I/O latency. Spark 100x faster in-memory and 10x faster on disk.

MapReduce is Data processing engine. Spark is Data analytics engine

Supports SQL through HiveSQL -- Supports SQL through spark SQL

MapReduce is not interactive -- Spark is interactive

More lines of code -- Less lines of code

### Features of Spark

`In-Memory Computing`: Keeping data in server's RAM as it makes is easy to access data and makes Machine learning algorithms to work faster.

`Lazy Evaluation` Execution will not start until action is triggered.

Supports `Multiple Languages`: Spark allows you to write applications on Java, Python, Scala and R.

`100x faster`

`Advanced Analytics`: Spark not only supports 'Map' and 'Reduce' it also supports SQL queries, Streaming data, Machine Learning Algorithms, and Graph algorithms.

`Real-Time` Processing Spark can handle real-time processing.

### Differences between Pandas DataFrame and Spark DataFrames


The key difference between pandas and spark dataframes is eager and lazy evaluation. 

Example: You can specify operation using Spark DataFrame for loading dataset from S3 and applying no. of transformations to the dataframe, but the operations wont immediately applied, Instead a graph transformation is recorded. and once the data is actually needed the transformations are applied. This approch avoids pulling full dataframe into the memory. With pandas Dataframes everything is pulled into the memory and every transformation is applied immediately.

when reading CSV files into the spark dataframes, spark performs the operations in eager mode, meaning that all the data is loaded into memory before next step begins execution. While lazy evaluation is used when reading files in paraquet format. large CSV files must be transformed into paraquet before executing pipeline. 

Spark output is stored to S3 in the form of paraquet files, or it can be directly sent to NoSQL databases.The best and easy way is to store output in S3 and NoSQL database will access from S3.

#### Differences b/w pandas and spark

In pandas you can easily read csv files directly using 'read_csv()'. But Spark supports professional formats like JSON, paraquet, Hive tables, and can read from S3, HDFS and local or RDBMS but CSV is not supported natively in spark. You can use library 'Spark-csv'.

The other differences are difference is methods like head(), describe(), count() etc.,

#### Client mode and cluster mode

https://www.youtube.com/watch?v=vJ0eUZxF80s&list=PLkz1SCf5iB4dXiPdFD4hXwheRGRwhmd6K&index=4
https://www.youtube.com/watch?v=fyTiJLKEzME&list=PLkz1SCf5iB4dXiPdFD4hXwheRGRwhmd6K&index=5

### Resilient Distributed Dataset

RDDs are collection of JVM objects. RDDs are predominantly stored in memory. These are most fundamental data object used in Spark. Python data is stored in these JVM objects. Most spark programming consists of creating new RDDs by performing operations on existing RDDs.


*`Resilient:`* RDDs are resilient, meaning that if a node performing an operation in Spark is lost, the dataset can be reconstructed. This is because Spark knows the *lineage* of each RDD, which is the sequence of steps to create RDD.


*`Distributed:`* RDDs are distributed, meaning the data in RDDs is divided into one of many *partitons* and distributed in-memory collections of objects across worker nodes in the cluster.


*`Dataset:`* RDDs are datasets that consists of *records*. Records are uniquely identifiable data collections within a dataset. Records could be similar to rows in relational database, a line of text, etc., RDDs are partitioned such that each partition contains a unique set of records and can be operated independently.

Another key property of RDDs is their *immutability*, which means that after they are instantiated and populated with data, they cannot be updated. Instead, new RDDs are created by performing transformations such as map or filter functions on existing RDDs.

### Creating a spark session

In [1]:
# import required libraries
import findspark
findspark.init('C:\spark-2.4.4-bin-hadoop2.6')
import pyspark
print(findspark.find())
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

C:\spark-2.4.4-bin-hadoop2.6


In [2]:
# Create Spark context
# SparkContext(sc) is the entry point into spark cluster.  
# we use sc object to perform file read operation and then collect the data.
conf = pyspark.SparkConf().setAppName('sparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

### Creating RDDs from textfiles

**textFile():** Reads a text file from HDFS or local file system and return it as an RDD of strings.

**wholeTextFiles():** Reads a directory of text files, rather than creating basic RDD, it returns pairRDD with filename with path as key, and value being the whole file as string.

In [24]:
# reading file into RDD. 
# textFile() method creates RDD with each line as an element.

lines = sc.textFile('datasets/days.txt')
print("Type: ",type(lines))

Type:  <class 'pyspark.rdd.RDD'>


 you cant able to view contents of RDD using print function. collect() is an action that returns all the elements of an RDD as an array.

In [30]:
col = lines.collect() # returns an array. You can print it.

# printing elements one by one
for line in col:
    print(line)

Days
Monday
Tuesday
Wednesday
Thursday
Friday
Saturday
Sunday


In [27]:
# method used to check the number of paritions
lines.getNumPartitions()

1

In [31]:
# counts no. of lines in a text file. 
# if your partitions are located in different nodes. It counts no. of lines in all the partitions
lines.count()

8

In [37]:
pairs = sc.wholeTextFiles('datasets/*.txt')
pairs

org.apache.spark.api.java.JavaPairRDD@6dcec36f

Note the key difference between the two approaches. Partitions were created using the textFile() method to load the same data as there were different files. The wholeTextFiles() method combines all of the files (which are quite small in this case) into a single partition.

In [38]:
pairs.count()

2

keys() method creates a new RDD named filenames containing only a list of keys. 

In [40]:
filenames = pairs.keys()
filenames.collect()

['file:/D:/Notebooks/Spark/datasets/days.txt',
 'file:/D:/Notebooks/Spark/datasets/untitled.txt']

In [43]:
filedata = pairs.values()
filedata.take(1)

['Days\nMonday\nTuesday\nWednesday\nThursday\nFriday\nSaturday\nSunday']

In [44]:
filedata.take(2)

['Days\nMonday\nTuesday\nWednesday\nThursday\nFriday\nSaturday\nSunday',
 'January\nFebruary\nMarch\nApril\nMay\nJune\nJuly\nAugust\nSeptember\nOctober\nNovember\nDecember']

###  Creating a RDD from a Datasource

When loading the data from external sources lke Oracle, MySQL, Postgres, SQL server etc., into an RDD, it will attempt to partition the data into multiple partitions across multiple workers by dividing the table into different partions. These partitions can be loaded in parallel and each partition is responsible for fetching unique set of rows.


The preferred methods of creating an RDD from a relational database table or query use functions from a special SparkContext called **SQLContext.** The SQLContext is spark's entry point for working with tabular data. DataFrames are created using SQL context.

Launch pyspark supplying JDBC database connector, Then initialize SQLContext, From pyspark shell we use load() method to load data from the table and create RDD from JDBC datasource. But the preferred method is to use DataFrameReader and create DataFrames.  

### Creating RDD from Collection

In [89]:
# parallelize is used to create RDD out of a collection
rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
type(rdd)

pyspark.rdd.RDD

In [65]:
# get first element of RDD
rdd.first()

0

In [61]:
# get minimum of RDD
rdd.min()

0

In [62]:
# get maximum of RDD
rdd.max()

9

In [67]:
# get first 5 elements of RDD
rdd.take(5)

[0, 1, 2, 3, 4]

In [68]:
# to get no. of partitions
rdd.getNumPartitions()

1

In [69]:
# in order to get all the data to the driver we use collect()
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [90]:
newrdd = rdd.filter(lambda x:x%2)
newrdd.collect()

[1, 3, 5, 7, 9]

In [85]:
newrdd.count()

5

### Types of RDDs

*`PairRDD`*: RDD of key value pairs.

*`DoubleRDD`*: RDD consisting of a collection of double values only. Because the values are of same numeric type, several additional statistical functions are available, including mean(), sum(), stdev, variance and histogram etc.,

*`DataFrame`*: A distributed collection of data organized into named columns. DataFrame is equivalent to relational table.

*`SequenceFileRDD`*: RDD created from sequence file

*`ShuffledRD`*D: Resulting RDD from shuffle

*`UnionRDD`*: Resulting RDD from union

#### Paired RDD

Paired RDDs are RDDs containing a key-value pair. Where Key is the identifier and value is the corresponding value. Pair RDDs helps in "Shuffle" operations such as grouping and aggregating the elements by a key.

We can also use join() method to merge the two RDDs together by grouping the elements with same key. 

In [114]:
lines = sc.textFile('datasets/days.txt')
pairs = lines.map(lambda x: (x,1))
pairs.collect()

[('Days', 1),
 ('Monday', 1),
 ('Monday', 1),
 ('Monday', 1),
 ('Monday', 1),
 ('Tuesday', 1),
 ('Wednesday', 1),
 ('Wednesday', 1),
 ('Wednesday', 1),
 ('Wednesday', 1),
 ('Thursday', 1),
 ('Friday', 1),
 ('Saturday', 1),
 ('Sunday', 1),
 ('Sunday', 1),
 ('Sunday', 1)]

In [115]:
# reduceByKey returns pairs where values for each key are aggregated using reduce function
from operator import add
add = pairs.reduceByKey(add)
add.collect()

[('Days', 1),
 ('Monday', 4),
 ('Tuesday', 1),
 ('Wednesday', 4),
 ('Thursday', 1),
 ('Friday', 1),
 ('Saturday', 1),
 ('Sunday', 3)]

In [120]:
# getting distinct elements
dis = lines.distinct()
dis.collect()

['Days',
 'Monday',
 'Tuesday',
 'Wednesday',
 'Thursday',
 'Friday',
 'Saturday',
 'Sunday']

In [118]:
# countByValue counts how many times each value is repeating
sorted(lines.countByValue().items())

[('Days', 1),
 ('Friday', 1),
 ('Monday', 4),
 ('Saturday', 1),
 ('Sunday', 3),
 ('Thursday', 1),
 ('Tuesday', 1),
 ('Wednesday', 4)]

### Lazy Evaluation
Lazy evaluation defers processing until an action is called. After an action such as count() or saveAsTextFile() is requested, a DAG is created along with logical and physical execution plans. These are then orchestrated and managed across executors by the driver.
https://medium.com/analytics-vidhya/being-lazy-is-useful-lazy-evaluation-in-spark-1f04072a3648

### Persisting

https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist

RDDs are created and exist in memory on executors. By default, RDDs are transient objects that last only while they are required. Once they are transformed into new RDDs and no longer needed for any other operations, they are removed permanantly. 

This may be problematic if an RDD is required for more than one action because it must be reevaluated entirely each time. An option to address this to use cache or persist.

We cache RDDs in cases when,

- Reusing then in interative loop (Ml algorithms).

- Reuse the RDD multiple times in a single application, job or notebook

There were two methods for RDD persistence,

- persist()

- cache()

when you use these methods, RDD will be kept in memory on all the nodes in the cluster where it is computed after the first action is called on it.


Cache() function is default of persistence with MEMORY_ONLY storage. But with persist(), you can specify which storage level you want.  So cache() is the same as calling persist() with the default storage level. When you call persist() without any argument it is equivalent to cache.

cache() is not ideal for datasets larger than available cluster memory. In that case persist(StorageLevel.MEMORY_AND_DISk_ONLY) is best, which will spill the RDD partitions to worker's local disk if the memory is full.


![storage-levels-rdd-persistence-in-spark.jpg](attachment:storage-levels-rdd-persistence-in-spark.jpg)

In [99]:
# persisting the RDD and retrieving

originalrdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
newrdd = originalrdd.filter(lambda x: x+2)
newrdd.persist()  # persists newrdd to memory
no_elements = newrdd.count()
list_of_elements = newrdd.collect()

# does not have to recompute newrdd
print("There are %s elements in the collection %s",(no_elements, list_of_elements))

There are %s elements in the collection %s (10, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])


In [100]:
originalrdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
newrdd = originalrdd.filter(lambda x: x+2)
no_elements = newrdd.count()
list_of_elements = newrdd.collect()

# It reprocess newrdd again
print("There are %s elements in the collection %s",(no_elements, list_of_elements))

There are %s elements in the collection %s (10, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])


### RDD Lineage

RDD Lineage is sequence of transformations that resulted in RDD. Every RDD operation recomputes the entire lineage by default unless RDD is persisted. 

In an RDDs lineage, each RDD will have a parent RDD and/or child RDD. Spark creates a DAG (directed acyclic graph) consisting of dependencies between RDDs. RDDs are processed in stages, which are sets of transformations. RDDs and stages have dependencies that can be narrow or wide.


A shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD. 

Narrow Dependencies: Each partition of the parent RDD is used by atmost one partition of child RDD. No shuffle necessary. Transformations which have narrow dependencies are fast.

Wide dependencies are when each partition of the parent RDD may be used by multiple child partitions. Shuffle is necessary and they are slow!.!

![datasets/narrow_vs_wide_dependencies.png](attachment:narrow_vs_wide_dependencies.png)

![narrow.jpg](attachment:narrow.jpg)

<img src="datasets/wide.jpg">

### Transformations and Actions

The two types of operations in spark are Transformation and Action.

RDDs are immutable, to perform any operation on RDD we need to create a new RDD. Transformation is a function that produces new RDD from existing RDD. It does not modify the RDD that you apply the transformation on, rather it creates a new RDD. Transformations are not executed until an action is called. 

Actions return values or data to the Driver program. A series of transformations takes place when action is called.

#### Transformations

- Map

- Filter

- FlatMap

- GroupBy

- GroupByKey

- MapPartitions

- MapPartitionwithIndex

- ReduceByKey

- Sample

- Union

- Join

- Distinct

- Coalesce

- KeyBy

- PartitionBy

- zip


### Actions

- Collect

- Take

- Reduce

- Aggregate

- Max

- Sum

- Mean

- Stdev

### Data Sampling transformations

During development and discovery, we need to sample the data in RDDs before running a process across the whole input data. Spark provides several functions to sample RDDs and produce new RDDs.

The **sample** transformation is used to create a sampled subset RDD from original RDD based upon a percentage of overall dataset.

In [125]:
data = sc.parallelize(range(0,1000))
data.count()

1000

In [128]:
# sample transformation
sampled_data = data.sample(withReplacement=False, fraction=0.1, seed=None)
sampled_data.count()

112

The **takeSample** action is used to return a random list of values (elements or values) from the RDD being sampled.

In [131]:
# takeSample action
sample2 = data.takeSample(False, 10, 1)
print(len(sample2))
sample2

10


[894, 664, 71, 857, 626, 776, 673, 536, 891, 318]

### Functional Transformations

Functional transformation include mapping and filtering functions. The **map** transformation is the most basic of all transformations. It applies a function to every element of RDD.

In [150]:
# Map Transformation
print(type(sampled_data))
print("Original: ",sampled_data.take(5))
newRdd = sampled_data.map(lambda x : x+2)
print("+ 2: ",newRdd.take(5))

<class 'pyspark.rdd.PipelinedRDD'>
Original:  [8, 11, 32, 38, 42]
+ 2:  [10, 13, 34, 40, 44]


The **flatMap** is similar to map, it returns a new RDD by applying a function to each element of the RDD but output is flattened. Also, function in flatMap can return a list of elements.

In [155]:
rdd1 = sc.parallelize([3,4,5])
rdd2 = rdd1.flatMap(lambda x: [[x , x*x]])
rdd2.collect()

[[3, 9], [4, 16], [5, 25]]

The **filter** transformation evaluates a Boolean expression against each element(record) in the dataset. The Boolean value returned determines whether or not the record is included in the resultant ouput RDD. 

In [167]:
txt = sc.textFile('datasets/intro.txt')
print(type(txt))

filtered = txt.map(lambda x: x.upper())     \
              .flatMap(lambda x: x.split()) \
              .filter(lambda x: len(x)>4)


filtered.collect()

<class 'pyspark.rdd.RDD'>


['KALYAN,', 'SPARK', 'NOTES,', 'FLATMAP', 'TRANSFORMATIONS']

### Grouping, Sorting and Distinct Transformations

In [226]:
# groupby
rdd = sc.parallelize([1,2,8,9,3,8,8, 8, 8,6,4,7])
result = rdd.groupBy(lambda x : x%2).collect()
sorted([(x, sorted(y)) for (x,y) in result])

[(0, [2, 4, 6, 8, 8, 8, 8, 8]), (1, [1, 3, 7, 9])]

In [227]:
# sort
sort = rdd.sortBy(lambda x:x)
sort.collect()

[1, 2, 3, 4, 6, 7, 8, 8, 8, 8, 8, 9]

In [228]:
# distinct
sort.distinct().collect()

[1, 2, 3, 4, 6, 7, 8, 9]

### Set Operations

The **union** transformation takes one RDD and appends it to another RDD resulting in a combined RDD. The RDDs are not required to have same schema or structure. The union transformation doesnt filter duplicates. To filter duplicates, you could follow the union transformation with the distinct function. 

The **intersection** transformation returns elements that are present in both RDDs.

The **subtract** transformation returns all elements from the first RDD that are not present in second RDD.

In [229]:
# union
odds = sc.parallelize([1,3,5,7,9])
even = sc.parallelize([0,2,4,6,8])
odds.union(even).collect()

[1, 3, 5, 7, 9, 0, 2, 4, 6, 8]

In [232]:
# intersection
set_a = sc.parallelize([1,2,3,5,7])
set_b = sc.parallelize([1,2,4,6,7,8,9,10])
set_b.intersection(set_a).collect()

[2, 1, 7]

In [235]:
# subtract
set_a.subtract(set_b).collect()

[3, 5]

### Spark Actions

Actions in spark return values to the Spark Driver program. With Lazy evaluation, the complete set of Spark transformations in a program are only processed when an action is requested.

The **count** action takes no arguments, it returns the number of records in RDD.

The **collect** action returns a list that contains all the elements in RDD to Spark driver. Note: Collect doesnt restrict the output, which can be quite lrage and can potentially cause out-of-memory errors on the driver, it is typically used for small RDDs or development.

The **take** action returns first n elements of RDD. The elements are not in any particular order; they can differ if the same action is run again. *takeOrdered*, is the another function which takes the first n elements based upon a key supplied by a key function.

The **top** action returns the top n elements from an RDD, but unlike *take*, the elements are orderd and returned in descending order. Order is determined by object type, such as numerical order for integers and dictionary order for strings.

The **first** action returns the first element in RDD.

The **reduce** is aggregate action. There are two important properties that an aggregation function should have.

*Commutative:* (A+B = B+A) result would be independent of order of elements.

*Associative:* (A+B)+C = A+(B+C) ensuring any two elements associated in the aggregation at a time does not affect the final result.



In [236]:
# count
rdd1 = sc.textFile('datasets/days.txt')
rdd2 = rdd1.flatMap(lambda x : x.split())
rdd2.count()

16

In [237]:
# collect
rdd2.collect()

['Days',
 'Monday',
 'Monday',
 'Monday',
 'Monday',
 'Tuesday',
 'Wednesday',
 'Wednesday',
 'Wednesday',
 'Wednesday',
 'Thursday',
 'Friday',
 'Saturday',
 'Sunday',
 'Sunday',
 'Sunday']

In [238]:
# take
rdd2.take(2)

['Days', 'Monday']

In [239]:
# top
rdd2.top(2)

['Wednesday', 'Wednesday']

In [None]:
# first
rdd2.first()

In [242]:
# reduce
rdd1 = sc.parallelize([1,2,3,4,5,6])
sum1 = rdd1.reduce(lambda a,b : a+b)
print("The sum is: ", sum1)

The sum is:  21


In [246]:
# reduceByKey

from operator import add
rdd1 = sc.textFile('datasets/days.txt')
rdd2 = rdd1.map(lambda x: (x,1))
rdd2.reduceByKey(add).collect()

[('Days', 1),
 ('Monday', 4),
 ('Tuesday', 1),
 ('Wednesday', 4),
 ('Thursday', 1),
 ('Friday', 1),
 ('Saturday', 1),
 ('Sunday', 3)]