In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

Quiz: Topics between week 3-4. More difficult than quiz 2 and less straightforward. Do homework + practice example. 

## Pair RDDs Operations

### Actions (on 2 pairs RDDs)

- **`subtrackByKey(otherDatset)`**: Remove elements	with a key present in the	other RDD.
    ```python 
    A = [(2,3), (1,2), (1,3)]
    B = [(1,2)]
    A.subtract(B) = [(2,3), (1,3)]
    A.subtractByKey(B) = [(2,3)]
    ``` 

- **`join(otherDataset)`** - Perform an inner join between two RDDs. Return in format `(key, (value1, value2))`
- **`leftOuterJoin(otherDataset)`** - Perform a join between two RDDs where the key must be present in the **first** RDD.
- **`rightOuterJoin(otherDataset)`** - Perform a join between two RDDs where the key must be present in the **second** RDD.

- **`cogroup(otherDataset)`**: groupByKey() --> outerjoin()
 + output format is: (key, (Result Iterable, Result Iterable)
 + Group data from both RDDs sharing the same key.
 + Go over two RDDs sharing the same key.
 + Return the key and the respective lists from two RDD values.
 + Pairs of (Key, (Resulterable, ResultIteratable)).
 + Can work on more than two RDDs at once
 
 ```python
 A = [(1,2), (1,2)]
 B = [(2,2), (2,3), (1,3)]
 
 # groupByKey
 (1, [2,3])
 (2, [2,3]), (1, [3])
 
 # outerjoin
 (1, ([2,3] , [3]) # the 1st element contains value from rdd1 and the 2nd contains value from rdd2
 (2, ([], [2,3]) # the 1st element contains value from rdd1 and the 2nd contains value from rdd2
 
 #mapValues(lambda x: (list(x[0]), list(x[1]))
 ```

### Example 1

In [5]:
# load and split data
biz_raw = sc.textFile('/Users/ThyKhueLy/msan694/data/filtered_registered_business_sf.csv')
biz = biz_raw.flatMap(lambda l: l.split("\n")).distinct().map(lambda x: x.split(","))

In [3]:
# load and split data
sup_raw = sc.textFile('/Users/ThyKhueLy/msan694/data/supervisor_sf.csv')
sup = sup_raw.flatMap(lambda l: l.split("\n")).distinct().map(lambda x: x.split(","))

In [47]:
# map key and values
biz = biz.map(lambda x: (x[0], x[1]))
sup = sup.map(lambda x: (x[0], x[1]))

In [48]:
no_sup = biz.subtractByKey(sup).values().distinct()

In [49]:
no_sup.count()

39422

In [50]:
no_sup.take(10)

[u'Precision Communication Serv',
 u'Schefer Thomas R',
 u'Lucid Systems',
 u'Jacob Abraham',
 u'Daniel Dela Rosa',
 u'Sudhir Marahatta',
 u'Batista Luis S',
 u'"Wti',
 u'Boutin Jacqueline M',
 u'Avinesh P Singh']

### Example 2

In [19]:
first_num_pairs = sc.parallelize({(2,3),(1,2),(1,3),(2,4),(3,6)})
second_num_pairs = sc.parallelize({(1,3),(2,2),(4,6)})

In [21]:
print first_num_pairs.count()
print second_num_pairs.count()

5
3


In [22]:
first_num_pairs.countByKey()

defaultdict(int, {1: 2, 2: 2, 3: 1})

In [23]:
first_num_pairs.glom().collect()

[[(1, 2)], [(1, 3)], [(2, 3)], [(2, 4), (3, 6)]]

In [24]:
second_num_pairs.glom().collect()

[[], [(1, 3)], [(4, 6)], [(2, 2)]]

In [25]:
first_num_pairs.subtract(second_num_pairs).collect()

[(1, 2), (2, 4), (2, 3), (3, 6)]

In [26]:
first_num_pairs.subtractByKey(second_num_pairs).collect()

[(3, 6)]

In [27]:
second_num_pairs.subtractByKey(first_num_pairs).collect()

[(4, 6)]

In [28]:
first_num_pairs.join(second_num_pairs).collect()

[(1, (2, 3)), (1, (3, 3)), (2, (3, 2)), (2, (4, 2))]

In [29]:
first_num_pairs.join(second_num_pairs).countByKey()

defaultdict(int, {1: 2, 2: 2})

In [31]:
first_num_pairs.leftOuterJoin(second_num_pairs).collect()

[(1, (2, 3)), (1, (3, 3)), (2, (3, 2)), (2, (4, 2)), (3, (6, None))]

In [32]:
first_num_pairs.rightOuterJoin(second_num_pairs).collect()

[(1, (2, 3)), (1, (3, 3)), (2, (3, 2)), (2, (4, 2)), (4, (None, 6))]

In [33]:
first_num_pairs.leftOuterJoin(second_num_pairs).countByKey()

defaultdict(int, {1: 2, 2: 2, 3: 1})

In [43]:
first_num_pairs.cogroup(second_num_pairs).map(lambda x: (x[0], list(x[1][0]), list(x[1][1]))).collect()

[(1, [2, 3], [3]), (2, [3, 4], [2]), (3, [6], []), (4, [], [6])]

### Example 3

Using Example 1, list (zip,(business_name,supervisor_id)) pairs ordered by
supervisor_id.
- Only if both business and supervisor exist.
- If a business exists.
- If a supervisor exists.

In [5]:
# only if both business and supervisor exist
biz_sup0 = biz.join(sup)
biz_sup0.sortBy(lambda x: x[1][1], ascending=False).collect()[:10]

[(u'94112', (u'Vos Barbara E', u'9')),
 (u'94112', (u'W J Britton & Co', u'9')),
 (u'94112', (u'Fernandez Ervin A', u'9')),
 (u'94112', (u'"Legaspi', u'9')),
 (u'94112', (u'Murphy Doris C', u'9')),
 (u'94112', (u'Daley Margaret T', u'9')),
 (u'94112', (u'Deng Bisheng', u'9')),
 (u'94112', (u'Liu Meizhen', u'9')),
 (u'94112', (u'Tan Shu Kun', u'9')),
 (u'94112', (u'Rasmussen Maureen Woelfel Katy', u'9'))]

In [6]:
# only if business exists
biz_sup1 = biz.leftOuterJoin(sup)
biz_sup1.sortBy(lambda x: x[1][1], ascending=False).take(10)

[(u'94112', (u'Vos Barbara E', u'9')),
 (u'94112', (u'W J Britton & Co', u'9')),
 (u'94112', (u'Fernandez Ervin A', u'9')),
 (u'94112', (u'"Legaspi', u'9')),
 (u'94112', (u'Murphy Doris C', u'9')),
 (u'94112', (u'Daley Margaret T', u'9')),
 (u'94112', (u'Deng Bisheng', u'9')),
 (u'94112', (u'Liu Meizhen', u'9')),
 (u'94112', (u'Tan Shu Kun', u'9')),
 (u'94112', (u'Rasmussen Maureen Woelfel Katy', u'9'))]

In [7]:
# only if supervisor exists
biz_sup2 = sup.rightOuterJoin(biz)
biz_sup2.sortBy(lambda x: x[1][1]).take(10)

[(u'94103', (u'10', u'"1')),
 (u'94103', (u'6', u'"1')),
 (u'94103', (u'8', u'"1')),
 (u'94103', (u'3', u'"1')),
 (u'94103', (u'9', u'"1')),
 (u'94103', (u'5', u'"1')),
 (u'94105', (u'6', u'"1-2-3 Deli')),
 (u'94105', (u'3', u'"1-2-3 Deli')),
 (u'94105', (u'6', u'"1055 Pine Street')),
 (u'94105', (u'3', u'"1055 Pine Street'))]

In [8]:
biz_sup0.subtract(biz_sup2).count()

439999

### Example 4

In [19]:
# cogroup
biz_zip_cogroup = biz.cogroup(sup)

In [27]:
biz_zip_cogroup.collect()[:5]

[(u'',
  (<pyspark.resultiterable.ResultIterable at 0x10f62c350>,
   <pyspark.resultiterable.ResultIterable at 0x110906f50>)),
 (u'70363',
  (<pyspark.resultiterable.ResultIterable at 0x110906f90>,
   <pyspark.resultiterable.ResultIterable at 0x110906fd0>)),
 (u'19443',
  (<pyspark.resultiterable.ResultIterable at 0x1109067d0>,
   <pyspark.resultiterable.ResultIterable at 0x110906750>)),
 (u'28036',
  (<pyspark.resultiterable.ResultIterable at 0x110906490>,
   <pyspark.resultiterable.ResultIterable at 0x1118a7050>)),
 (u'97302',
  (<pyspark.resultiterable.ResultIterable at 0x1118a7090>,
   <pyspark.resultiterable.ResultIterable at 0x1118a7150>))]

In [20]:
biz_zip_cogroup.map(lambda x: (x[0], (list(x[1][0]), list(x[1][1])))).collect()[:1]

[(u'',
  ([u'221 7th Street Residences Llc',
    u'Arconas Corporation',
    u'Miniclip America Inc',
    u'1625 Leavenworth Street Llc',
    u'Atc Managed Sites Llc',
    u'Lexa Mary C',
    u'Red Oxygen Inc',
    u'Leo And Janis Paslin Trust',
    u'Atc Managed Sites Llc',
    u'Hampton Court Sf Lp',
    u'Joseph D & Candice M Harney Trust Jc & Cm Harney Trust',
    u'Opower Inc',
    u'Woo H Woo S',
    u'Ultra Electronics Forensic Technology Inc',
    u'New Flyer Industriescanada Ulc',
    u'Willis Supply Corporation',
    u'Endurance Wind Power Inc',
    u'Built 1925 Llc',
    u'Paybyphone Technologies Inc',
    u'Atc Managed Sites Llc',
    u'Hartmann Studios Incorporated',
    u'Nada Pacific Corporation',
    u'Atc Managed Sites Llc',
    u'Siemens Public',
    u'Iotum Inc',
    u'Philz Coffee Inc',
    u'Allstream Inc',
    u'Vip Plumbing And Drain Cleanin',
    u'Bond Blacktop Inc',
    u'Htut Chris',
    u'Ramon Chavez',
    u'Viavid Broadcasting Inc',
    u'Cardno Entrix',
 

In [30]:
biz_zip_cogroup.map(lambda x : (x[0],(len(list(x[1][0])), len(list(x[1][1]))))).collect()[:5]

[(u'', (92, 0)),
 (u'70363', (1, 0)),
 (u'19443', (1, 0)),
 (u'28036', (1, 0)),
 (u'97302', (2, 0))]

### Example 5

From ”filtered_registered_business_sf.csv”, create a pair RDD of (zip, (store name, city))
+ Count pairs which donot have a key.
+ Filter pairs that donot include “San Francisco” in the city value.

In [46]:
biz_raw = sc.textFile('/Users/ThyKhueLy/msan694/data/filtered_registered_business_sf.csv')
biz = biz_raw.flatMap(lambda l: l.split("\n")).map(lambda x: x.split(","))

In [62]:
biz.take(5)

[[u'94123',
  u'Tournahu George L',
  u'3301 Broderick St',
  u'San Francisco',
  u'CA'],
 [u'94124',
  u'Stephens Institute Inc',
  u'2225 Jerrold Ave',
  u'San Francisco',
  u'CA'],
 [u'94105',
  u'Stephens Institute Inc',
  u'180 New Montgomery St',
  u'San Francisco',
  u'CA'],
 [u'94108',
  u'Stephens Institute Inc',
  u'540 Powell St',
  u'San Francisco',
  u'CA'],
 [u'94107',
  u'Stephens Institute Inc',
  u'460 Townsend St',
  u'San Francisco',
  u'CA']]

In [55]:
store_info = biz.map(lambda x: (x[0], (x[1],x[3])))

In [64]:
store_info.take(5)

[(u'94123', (u'Tournahu George L', u'San Francisco')),
 (u'94124', (u'Stephens Institute Inc', u'San Francisco')),
 (u'94105', (u'Stephens Institute Inc', u'San Francisco')),
 (u'94108', (u'Stephens Institute Inc', u'San Francisco')),
 (u'94107', (u'Stephens Institute Inc', u'San Francisco'))]

In [65]:
# count number which do not have key
store_info.countByKey()[u'']

92

In [67]:
# store without San Francisco
store_info.filter(lambda x: 'San Francisco' not in x[1][1]).distinct().take(5)

[(u'94110', (u'"Rlm Partners', u'60 29th St Ste 537')),
 (u'90805', (u'Lopez Roberto Frias', u'Long+beach')),
 (u'95492', (u'Denbeste Transportation Inc', u'Windsor')),
 (u'94510', (u'Trueman Ben', u'Benicia')),
 (u'94111', (u'Rosewood Venture Associates Iv', u'San+francisco'))]

## Tuning Spark – Persist in Memory/Disk

RDDs are by default recomputed each time.
However, if you want to reuse an RDD for multiple actions, you can ask Spark to store the content in memory/disk and query repeatedly.
```
line_with_spark.persist(StorageLevel.persistency_level)
```

### Persistence Level

1. **MEMORY_ONLY**
Store RDDs in memory, If an RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

2. **MEMORY_AND_DISK**
If an RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

3. **MEMORY_ONLY_SER (Java and Scala)**
Store RDDs as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
 
4. **MEMORY_AND_DISK_SER (Java and Scala)**
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

5. **DISK_ONLY**
Store RDD partitions only on disk.

6. **MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.**
Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental). Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

There are replicated storage options available with each of the storage level.
   + Replicated storage levels provide much faster fault recovery than RDD lineage in the event of a task or node failure.

- `persists(storage_level)`: Many options - memory/disk and replication. 
    + When using disk options, the persisted data on disk can be used to reconstitute partitions lost due to executor or memory failure.

+ `.cache() = .persist(MEMORY_ONLY)`: 
   + Once persisted, RDDs can be reused multiple times without requiring reevaluation (recalculation).
   + If there is not enough memory available to cache the RDD, it will be reevaluated for each lineage triggered by an action.
   + If we want to change `.persist(MEMORY_ONLY)` --> `.persist(MEMORY_AND_DISK)` will return an error. We will have to call `unpersist()` first


+ `.unpersist()` lets you unpersist the RDD.

### Example without persisting: 
Input: 
```python
# without persisting
start=time.time()
lines.count()
print "first "
print time.time() - start

start=time.time()
lines.count()
print "first "
print time.time() - start
```

Output: 
```python 
first 59.704123
second 59.72811
```

### Example with persisting
Input: 
```python
# with persisting
lines.persist(StorageLevel.MENORY_AND_DISK)
start=time.time()
lines.count()
print "first "
print time.time() - start # trigger reevalution

start=time.time()
lines.count()
print "second "
print time.time() - start # does not trigger reevalution

start=time.time()
lines.count()
print "third "
print time.time() - start # does not trigger reevalution
```


Output: 
```python 
first 79.704123
second 20.72811
third 20.44669
```

### Check the persistency level
getSotrageLevel() – returns different storage option flags set for an RDD.

StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)
+ useDisk : If set, partitions that do not fit in memory will be written to disk.
+ useMemory : If set, the RDDs will be stored in-memory.
+ useOffHeap : If set, the RDD will be stored outside of the Spark executor in an external system such as Tachyon.
+ deserialization : If set, the RDD will be stored as deserialized Java objects.
+ replication : An integer that controls the number of copies of the persisted data to be stored.

- `MEMORY_ONLY`: Best-Disk only if reevalution is expensive and cannot fit in memory (although sometimes recalculation can be cheaper or fast than reading from the disk)
- `OffHeap`: Stored outside of the spark executor in an external system (e.g. Tachyon.) => Use when there is memory issue or noisy cluster
- `serialization`: when job is too big to fit in memory
- `replication`: faster for recovery, however it will cost more space by making 2 copies and will take sometime to create a second copy. It will be useful when bad connection happens to your cluster. 

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

If your RDDs fit comfortably with the default storage level (**MEMORY_ONLY**), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

If not, try using **MEMORY_ONLY_SER** and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.



### Example 7 

In [75]:
file_name = "./data/README.md"
lines = sc.textFile(file_name)
lines_with_Spark = lines.filter(lambda x: "Spark" in x)

lines_with_Spark.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [76]:
lines_with_Spark.cache()
lines_with_Spark.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [80]:
lines_with_Spark.is_cached

True

In [81]:
lines_with_Spark.persist()

PythonRDD[95] at RDD at PythonRDD.scala:48

In [82]:
lines_with_Spark.unpersist()

PythonRDD[95] at RDD at PythonRDD.scala:48

In [83]:
lines_with_Spark.cache()

PythonRDD[95] at RDD at PythonRDD.scala:48

In [84]:
lines_with_Spark.is_cached

True

In [86]:
from pyspark import StorageLevel
lines_with_Spark.unpersist()
lines_with_Spark.persist(StorageLevel.DISK_ONLY)

PythonRDD[95] at RDD at PythonRDD.scala:48

In [87]:
lines_with_Spark.unpersist()
lines_with_Spark.persist(StorageLevel(useDisk=False,
                                     useMemory=True,
                                     useOffHeap=False,
                                     deserialized=False,
                                     replication=3))

PythonRDD[95] at RDD at PythonRDD.scala:48

### Notes
+ If your RDDs fit comfortably with MEMORY_ONLY, leave them that way. 
    + This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. Iterative algorithms are often good candidates for caching.
    
    
+ Don’t spill to disk unless the functions that computed your datasets are expensive, or they use a large amount of the data. 
    + Otherwise, recomputing a partition may be as fast as reading it from disk.


+ Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). 
    + All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.