# Limitations of MapReduce

While development of Hadoop’s MapReduce the vision was pretty limited, it was developed just to handle 1 problem: Batch processing.

MapReduce cannot handle:

Interactive Processing
Real-time (stream) Processing
Iterative (delta) Processing
In-memory Processing
Graph Processing
Let’s discuss the limitations in great details:

1. Issue with Small Files

Hadoop is not suited for small data. (HDFS) Hadoop distributed file system lacks the ability to efficiently support the random reading of small files because of its high capacity design.

2. Slow Processing Speed

In Hadoop, with a parallel and distributed algorithm, MapReduce process large data sets. There are tasks that need to be performed: Map and Reduce and, MapReduce requires a lot of time to perform these tasks thereby increasing latency. Data is distributed and processed over the cluster in MapReduce which increases the time and reduces processing speed.

3. Support for Batch Processing only

Hadoop supports batch processing only, it does not process streamed data, and hence overall performance is slower. MapReduce framework of Hadoop does not leverage the memory of the Hadoop cluster to the maximum.

4. No Real-time Data Processing

Apache Hadoop is designed for batch processing, that means it take a huge amount of data in input, process it and produce the result. Although batch processing is very efficient for processing a high volume of data, but depending on the size of the data being processed and computational power of the system, an output can be delayed significantly. Hadoop is not suitable for Real-time data processing.

5. No Delta Iteration

Hadoop is not so efficient for iterative processing, as Hadoop does not support cyclic data flow(i.e. a chain of stages in which each output of the previous stage is the input to the next stage).

6. Latency

In Hadoop, MapReduce framework is comparatively slower, since it is designed to support different format, structure and huge volume of data. In MapReduce, Map takes a set of data and converts it into another set of data, where individual element are broken down into key value pair and Reduce takes the output from the map as input and process further and MapReduce requires a lot of time to perform these tasks thereby increasing latency.

7. Not Easy to Use

In Hadoop, MapReduce developers need to hand code for each and every operation which makes it very difficult to work. MapReduce has no interactive mode, but adding one such as hive and pig makes working with MapReduce a little easier for adopters.

8. No Caching

Hadoop is not efficient for caching. In Hadoop, MapReduce cannot cache the intermediate data in memory for a further requirement which diminishes the performance of Hadoop.

# What is RDD?

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.

# Features of RDD

There are following traits of Resilient distributed datasets. Those are list-up below:
3.1. In-Memory
It is possible to store data in spark RDD. Storing of data in spark RDD is size as well as quantity independent. We can store 
as much data we want in any size. In-memory computation means operate information in the main random access memory. It requires 
operating across jobs, not in complicated databases. Since operating jobs in databases slow the drive.

3.2. Lazy Evaluations
By its name, it says that on calling some operation, execution process doesn’t start instantly. To trigger the execution, an 
action is a must. Since that action takes place, data inside RDD cannot get transform or available. Through DAG, Spark maintains
the record of every operation performed. DAG refers to Directed Acyclic Graph.

3.3. Immutable and Read-only
Since, RDDs are immutable, which means unchangeable over time. That property helps to maintain consistency when we perform 
further computations.  As we can not make any change in RDD once created, it can only get transformed into new RDDs. This is 
possible through its transformations processes.

3.4. Cacheable or Persistence
We can store all the data in persistent storage, memory, and disk. Memory (most preferred) and disk (less Preferred because of 
its slow access speed). We can also extract it directly from memory. Hence this property of RDDs makes them useful for fast
computations. Therefore, we can perform multiple operations on the same data. Also, leads reusability which also helps to 
compute faster.

3.5. Partitioned
Each dataset is logically partitioned and distributed across nodes over the cluster. They are just partitioned to enhance the 
processing, Not divided internally. This arrangement of partitions provides parallelism.

3.6. Parallel
As we discussed earlier, RDDs are logically partitioned over the cluster. While we perform any operations, it executes parallelly on entire data.

3.7. Fault Tolerance
While working on any node, if we lost any RDD itself recovers itself. When we apply different transformations on RDDs, it 
creates a logical execution plan. The logical execution plan is generally known as lineage graph. As a consequence, we may lose 
RDD as if any fault arises in the machine. So by applying the same computation on that node of the lineage graph, we can recover
our same dataset again. As a matter of fact, this process enhances its property of Fault Tolerance.

3.8. Location Stickiness
RDDs supports placement preferences. That refers information of the location of RDD. That DAG(Directed Acyclic Graph)  scheduler
use to place computing partitions on. DAG helps to manage the tasks as much close to the data to operate efficiently. This 
placing of data also enhances the speed of computations.

3.9. Typed
We have several types of RDDs which are: RDD [long], RDD [int], RDD [String] .

3.10. Coarse-grained Operations
RDDs support coarse-grained operations. That means we can perform an operation on entire cluster once at a time.

3.11. No-Limitations
There is no specific number that limits the usage of RDD. We can use as much RDDs we require. It totally depends on the size of
its memory or disk.

# Spark RDD operations

We're going to start with a 'hello world' example, which is just reading a text file. First let's create a text file.
Let's write an example text file to read, we'll use some special jupyter notebook commands for this, but feel free to use any .txt file:

In [2]:
%%writefile example.txt
first line
second line
third line
fourth line

Writing example.txt


### Creating the RDD

Now we can take in the textfile using the textFile method off of the SparkContext we created. This method will read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

In [None]:
textFile = sc.textFile('example.txt')

Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.

### Actions

We have just created an RDD using the textFile method and can perform operations on this object, such as counting the rows.

RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Let’s start with a few actions:

### Transformations

Now we can use transformations, for example the filter transformation will return a new RDD with a subset of items in the file. Let's create a sample transformation using the filter() method. This method (just like Python's own filter function) will only return elements that satisfy the condition. Let's try looking for lines that contain the word 'second'. In which case, there should only be one line that has that.

Notice how the transformations won't display an output and won't be run until an action is called

# RDD Transformations and Actions

## Important Terms

Let's quickly go over some important terms:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

## Creating an RDD

There are two common ways to create an RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

## RDD Transformations

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

## RDD Actions

Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

____
# Examples

Now the best way to show all of this is by going through examples! We'll review a bit by creating and working with a simple text file.

### Creating an RDD from a text file:

** Creating the textfile **

In [1]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Writing example2.txt


Now let's perform some transformations and actions on this text file:

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
# Show RDD
sc.textFile('example2.txt')

MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [5]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')

In [7]:
# Map a function (or lambda expression) to each line
# Then collect the results.
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

## Map vs flatMap

In [8]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

# RDDs and Key Value Pairs

Now that we've worked with RDDs and how to aggregate values with them, we can begin to look into working with Key Value Pairs.

In [9]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [10]:
services = sc.textFile('services.txt')

In [11]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [12]:
services.map(lambda x: x.split())

PythonRDD[10] at RDD at PythonRDD.scala:43

In [13]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

Let's remove that first hash-tag!

In [26]:
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [27]:
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

## Using Key Value Pairs for Operations

Let us now begin to use methods that combine lambda expressions that use a ByKey argument. These ByKey methods will assume that data is in a Key,Value form. 


In [28]:
# From Previous
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())

In [29]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [52]:
# Let's start by practicing grabbing fields
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [43]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Uh oh! Looks like we forgot that the amounts are still strings! Let's fix that:

In [42]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

We can continue our analysis by sorting this output:

In [69]:
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]