# SparkContext and RDD Basics

#### Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

#### Spark makes use of the concept of RDD to achieve faster and efficient MapReduce operations.

### Import Modules

In [1]:
from pyspark import SparkContext
import numpy as np

### Initialize a SparkContext which uses 4 cores

In [2]:
sc=SparkContext(master="local[4]")
print(sc)

<SparkContext master=local[4] appName=pyspark-shell>


### Spark Parallelization
#### RDD can be created in 2 ways:
1. parallelizing an existing collection in your driver program
2. referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

#### Lets create a RDD by the first method

In [6]:
lst = np.random.randint(0,10,20)
print(lst)
A = sc.parallelize(lst)
type(A)

[3 4 8 9 2 2 6 1 5 1 7 0 0 8 0 9 4 7 1 6]


pyspark.rdd.RDD

In [4]:
A

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

#### Since 4 cores were used, the list was parallelized into 4 sublists

In [7]:
A.glom().collect()

[[3, 4, 8, 9, 2], [2, 6, 1, 5, 1], [7, 0, 0, 8, 0], [9, 4, 7, 1, 6]]

#### Checking the same with  2 cores

In [8]:
sc.stop()
sc=SparkContext(master="local[2]")
A = sc.parallelize(lst)
A.glom().collect()

[[3, 4, 8, 9, 2, 2, 6, 1, 5, 1], [7, 0, 0, 8, 0, 9, 4, 7, 1, 6]]

In [9]:
sc.stop()
sc=SparkContext(master="local[4]")
A = sc.parallelize(lst)

### Basic RDD Operations

#### count

In [10]:
A.count()

20

#### access first element

In [11]:
A.first()

3

#### access first few (4) elements

In [12]:
A.take(4)

[3, 4, 8, 9]

#### create rdd with duplicates removed

In [13]:
A_distinct=A.distinct()
A_distinct.collect()

[4, 8, 0, 9, 1, 5, 2, 6, 3, 7]

### Sum
#### Reduce method

In [14]:
A.reduce(lambda x,y: x+y)

83

#### Direct sum method

In [15]:
A.sum()

83

#### Fold method: Aggregates each partition first and then result of each partition

In [16]:
A.fold(0,lambda x,y:x+y)

83

### Maximum element/ longest word

#### maximum element by reduce

In [17]:
A.reduce(lambda x,y: x if x>y else y)

9

#### longest word by reduce

In [21]:
words = 'In Apache Spark a DataFrame is a distributed collection of rows under named columns'.split(' ')
wordsRDD = sc.parallelize(words)
wordsRDD.reduce(lambda x,y: x if len(x)>len(y) else y)

'distributed'

### Functions/filtering in RDD

#### simple filtering

In [23]:
A.filter(lambda x: x%3==0).collect()

[3, 9, 6, 0, 0, 0, 9, 6]

### Lambda functions are short and sweet but we can write regular Python functions to use with reduce

In [29]:
def largerThan(x,y):
    """
    Returns the last word among the longest words in a list
    """
    if len(x)> len(y):
        return x
    else:
        return y


In [30]:
wordsRDD.reduce(largerThan)

'distributed'

### RDD sampling
#### RDDs are often very large. Aggregates, such as averages, can be approximated efficiently by using a sample. This comes handy often for operation with extremely large datasets where a sample can tell a lot about the pattern and descriptive statistics of the data.
#### Sampling is done in parallel and requires limited computation.

#### The method RDD.sample(withReplacement,p) generates a sample of the elements of the RDD. where 
1. withReplacement is a boolean flag indicating whether or not a an element in the RDD can be sampled more than once.
2. p is the probability of accepting each element into the sample. Note that as the sampling is performed independently in each partition, the number of elements in the sample changes from sample to sample.


In [36]:
# get a sample whose expected size is m
# Note that the size of the sample is different in different runs
m=5
n=20
print('sample1=',A.sample(False,m/n).collect()) 
print('sample2=',A.sample(False,m/n).collect())

sample1= [3, 4, 1, 5, 1, 0, 8]
sample2= [4, 6, 1, 0, 8, 4, 1]


### Things to note and think about
### Each time you run the previous cell, you get a different estimate. The accuracy of the estimate is determined by the size of the sample $n*p$. Here, probability $p=\frac{m}{n}$ .See how the error changes as you vary 

### Basic Statistics

In [38]:
print("Maximum: ",A.max())
print("Minimum: ",A.min())
print("Mean (average): ",A.mean())
print("Standard deviation: ",A.stdev())

Maximum:  9
Minimum:  0
Mean (average):  4.15
Standard deviation:  3.10282129682


In [40]:
A.stats()

(count: 20, mean: 4.15, stdev: 3.10282129682, max: 9.0, min: 0.0)

### Mapping

#### with lambda function

In [41]:
B=A.map(lambda x:x*x)
B.collect()

[9, 16, 64, 81, 4, 4, 36, 1, 25, 1, 49, 0, 0, 64, 0, 81, 16, 49, 1, 36]

#### with regular python function

In [42]:
def square_if_odd(x):
    if x%2==1:
        return x*x
    else:
        return x
A.map(square_if_odd).collect()

[9, 4, 8, 81, 2, 2, 6, 1, 25, 1, 49, 0, 0, 8, 0, 81, 4, 49, 1, 6]

### Flatmapping

#### flatmap method returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results

In [45]:
A.flatMap(lambda x:(x,x*x)).collect()

[3,
 9,
 4,
 16,
 8,
 64,
 9,
 81,
 2,
 4,
 2,
 4,
 6,
 36,
 1,
 1,
 5,
 25,
 1,
 1,
 7,
 49,
 0,
 0,
 0,
 0,
 8,
 64,
 0,
 0,
 9,
 81,
 4,
 16,
 7,
 49,
 1,
 1,
 6,
 36]

#### map(func) Return a new distributed dataset formed by passing each element of the source through a function func. N inputs, N outputs

* sc.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect() 
#### **Output:**
[[3, 9], [4, 16], [5, 25]]

#### flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
* sc.parallelize([3,4,5]).flatMap(lambda x: [x, x*x]).collect() 
#### Output: notice flattened list
[3, 9, 4, 16, 5, 25]


### Grouping and binning
#### groupBy works same way as in sql. It groups the elements of a dataframe by a field or a function.
#### here groupBy is shown with a lambda function, where the elements are grouped according to their reminder when multiplied by 2. It returns a List of RDD

In [71]:
result=A.groupBy(lambda x:x%2).collect()
sorted([(x, sorted(y)) for (x, y) in result])

[(0, [0, 0, 0, 2, 2, 4, 4, 6, 6, 8, 8]), (1, [1, 1, 1, 3, 5, 7, 7, 9, 9])]

### histogram method takes a list of bins/buckets and returns a tuple with result of the histogram (binning)

In [78]:
B.collect()

[9, 16, 64, 81, 4, 4, 36, 1, 25, 1, 49, 0, 0, 64, 0, 81, 16, 49, 1, 36]

In [80]:
B.histogram([x for x in range(0,100,10)])

([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [9, 2, 1, 2, 2, 0, 2, 0, 2])

#### here the first element of the tuple is a list of x as defined. The second element is a list of the number of elements of B, which falls in each range. For example there are 9 elements in B which falls between 0 and 10, 2 elements fall between 10 and 20

### Set Operations

In [81]:
print("A:",A.collect())
print("B:",B.collect())

A: [3, 4, 8, 9, 2, 2, 6, 1, 5, 1, 7, 0, 0, 8, 0, 9, 4, 7, 1, 6]
B: [9, 16, 64, 81, 4, 4, 36, 1, 25, 1, 49, 0, 0, 64, 0, 81, 16, 49, 1, 36]


### Union, Intersection, subtraction, cartesian product of two rdds

In [82]:
(A+B).collect()

[3,
 4,
 8,
 9,
 2,
 2,
 6,
 1,
 5,
 1,
 7,
 0,
 0,
 8,
 0,
 9,
 4,
 7,
 1,
 6,
 9,
 16,
 64,
 81,
 4,
 4,
 36,
 1,
 25,
 1,
 49,
 0,
 0,
 64,
 0,
 81,
 16,
 49,
 1,
 36]

In [85]:
A.intersection(B).collect()

[0, 9, 1, 4]

In [86]:
A.subtract(B).collect()

[8, 8, 2, 2, 3, 5, 6, 6, 7, 7]

#### cartesian product gives all possible combinations of all elements in two RDD. each product is a tuple

In [87]:
A.cartesian(B).collect()

[(3, 9),
 (4, 9),
 (8, 9),
 (9, 9),
 (2, 9),
 (3, 16),
 (3, 64),
 (4, 16),
 (4, 64),
 (8, 16),
 (8, 64),
 (9, 16),
 (9, 64),
 (2, 16),
 (2, 64),
 (3, 81),
 (3, 4),
 (4, 81),
 (4, 4),
 (8, 81),
 (8, 4),
 (9, 81),
 (9, 4),
 (2, 81),
 (2, 4),
 (3, 4),
 (4, 4),
 (8, 4),
 (9, 4),
 (2, 4),
 (3, 36),
 (3, 1),
 (4, 36),
 (4, 1),
 (8, 36),
 (8, 1),
 (9, 36),
 (9, 1),
 (2, 36),
 (2, 1),
 (3, 25),
 (3, 1),
 (4, 25),
 (4, 1),
 (8, 25),
 (8, 1),
 (9, 25),
 (9, 1),
 (2, 25),
 (2, 1),
 (3, 49),
 (4, 49),
 (8, 49),
 (9, 49),
 (2, 49),
 (3, 0),
 (3, 0),
 (4, 0),
 (4, 0),
 (8, 0),
 (8, 0),
 (9, 0),
 (9, 0),
 (2, 0),
 (2, 0),
 (3, 64),
 (3, 0),
 (4, 64),
 (4, 0),
 (8, 64),
 (8, 0),
 (9, 64),
 (9, 0),
 (2, 64),
 (2, 0),
 (3, 81),
 (4, 81),
 (8, 81),
 (9, 81),
 (2, 81),
 (3, 16),
 (3, 49),
 (4, 16),
 (4, 49),
 (8, 16),
 (8, 49),
 (9, 16),
 (9, 49),
 (2, 16),
 (2, 49),
 (3, 1),
 (3, 36),
 (4, 1),
 (4, 36),
 (8, 1),
 (8, 36),
 (9, 1),
 (9, 36),
 (2, 1),
 (2, 36),
 (2, 9),
 (6, 9),
 (1, 9),
 (5, 9),
 (1, 9),


### Thats all from