# PySpark Tutorial Part 2

## RDD (Resilient Distributed Dataset)

### Terminologies

RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster.

RDDs are...
* immutable
* fault tolerant / automatic recovery
* can apply multiple ops on RDDs

RDD operation are...
* Transformation
* Action

### Basic Operations (Ops)
- `count()`: Number of elements in the RDD is returned.
- `collect()`: All the elements in the RDD are returned.
- `foreach(f)`: input callable, and returns only those elements which meet the condition of the function inside foreach.
- `filter(f)`: input callable, and returns new RDDs containing the elements which satisfy the given callable
- `map(f, preservesPartitioning = False)`: A new RDD is returned by applying a function to each element in the RDD
- `reduce(f)`: After performing the specified commutative and associative binary operation, the element in the RDD is returned.
- `join(other, numPartitions = None)`: It returns RDD with a pair of elements with the matching keys and all the values for that particular key. 
- `cache()`: Persist this RDD with the default storage level (MEMORY_ONLY). You can also check if the RDD is cached or not.

### Examples

In [1]:
### import modules
from pyspark import SparkContext

In [2]:
### count() ###

sc = SparkContext('local', 'count app') # master, appName
words = ["scala", "java", "hadoop", "spark", "akka",
         "spark vs hadoop", "pyspark","pyspark and spark"]

rdd = sc.parallelize(words)
count = rdd.count()
print(f'Num of elements in RDD: {count}')
sc.stop()

Num of elements in RDD: 8


In [3]:
### Collect ###

sc = SparkContext('local', 'collect app')
words = ["scala", "java", "hadoop", "spark", "akka",
         "spark vs hadoop", "pyspark","pyspark and spark"]

rdd = sc.parallelize(words)
coll = rdd.collect()
print(f'elements in rdd, using collect(): \n{coll}')
sc.stop()

elements in rdd, using collect(): 
['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [11]:
### foreach(f)
# here we pass in print function as arg

sc = SparkContext('local', 'foreach app')
words = ["scala", "java", "hadoop", "spark", "akka",
         "spark vs hadoop", "pyspark","pyspark and spark"]

rdd = sc.parallelize(words)
def f(x): print(f'print func: {x}')
fore = rdd.foreach(f)
sc.stop()
## doesn't work??

In [19]:
### Filter ###
# filtering out the words that includes 'spark'
sc.stop()
sc = SparkContext('local', 'filter app')
words = ["scala", "java", "hadoop", "spark", "akka",
         "spark vs hadoop", "pyspark","pyspark and spark"]

rdd = sc.parallelize(words)
filtered = rdd.filter(lambda arg: 'spark' in arg) # pythonRDD obj
collected = filtered.collect()
print(f'filtered words, contains \'spark\': \n{collected}')
sc.stop()

filtered words, contains 'spark': 
['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [30]:
### map ###

sc.stop()
sc = SparkContext('local', 'map app')
words = ["scala", "java", "hadoop", "spark", "akka",
         "spark vs hadoop", "pyspark","pyspark and spark"]

rdd = sc.parallelize(words)

def cipher(words):
    '''
    encrypt character by character using unicode, and shift
    '''
    return [ord(ch) + 8 for ch in words]

mapped = rdd.map(cipher)
ans = mapped.collect()
print(f'mapped words, map(): {ans}')
sc.stop()

mapped words, map(): [[123, 107, 105, 116, 105], [114, 105, 126, 105], [112, 105, 108, 119, 119, 120], [123, 120, 105, 122, 115], [105, 115, 115, 105], [123, 120, 105, 122, 115, 40, 126, 123, 40, 112, 105, 108, 119, 119, 120], [120, 129, 123, 120, 105, 122, 115], [120, 129, 123, 120, 105, 122, 115, 40, 105, 118, 108, 40, 123, 120, 105, 122, 115]]


In [33]:
### reduce() ###
# apply add ops on the elements in list

from operator import add
sc.stop()
sc = SparkContext('local', 'reduce app')
nums = [1, 2, 3, 4, 5]
rdd = sc.parallelize(nums)
result = rdd.reduce(add)
print(f'addition using reduce. this returns eldements in RDD.\nResult: {result}')
sc.stop()

addition using reduce. this returns eldements in RDD.
Result: 15


In [34]:
### join() ###

'''In the following example, there are two pair of elements
in two different RDDs. After joining these two RDDs, 
we get an RDD with elements having matching keys and their values.'''

sc.stop()
sc = SparkContext('local', 'join app')
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([('spark', 7), ('hadoop', 2)])

union = x.join(y)
result = union.collect()
print(f'joined RDD: \n{result}')
sc.stop()

joined RDD: 
[('hadoop', (4, 2)), ('spark', (1, 7))]


In [36]:
### cache ###

sc.stop()
sc = SparkContext('local', 'cache app')
words = ["scala", "java", "hadoop", "spark", "akka",
         "spark vs hadoop", "pyspark","pyspark and spark"]

rdd = sc.parallelize(words)
caching = rdd.is_cached
print(f'before cache: {caching}')
rdd.cache()
caching = rdd.persist().is_cached
print(f'after persist: {caching}')
sc.stop()

before cache: False
after persist: True


### Resources, links

- [How to stop a running SparkContext before opening the new one](https://stackoverflow.com/questions/36844075/how-to-stop-a-running-sparkcontext-before-opening-the-new-one)
- [\*args and \*\*kwargs in python explained](https://pythontips.com/2013/08/04/args-and-kwargs-in-python-explained/)