In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 37.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=a6732c6ade26f105c00ba7849e278a010c66b051a7f273b109760a82bbea01a0
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


# Spark

Execution is pipelined and parallel. No need to store intermediate results. Lazy execution allows optimization.

RDD has enough information about how it was rderived from to compute its partitions from data in stable storage.

---

**Example:** If a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.

Partitions can be recomputed in parallel on different nodes, without having to roll back the whole program.

## Sample spark program

In [3]:
from pyspark.context import SparkContext
import requests

In [4]:
sc = SparkContext.getOrCreate()
text_file = sc.textFile('sample_data/README.md')

Filter each line which contains T

In [5]:
lines = text_file.filter(lambda line: 'T' in line)
lines.collect()

['This directory includes a few sample datasets to get you started.',
 '    [MNIST database](https://en.wikipedia.org/wiki/MNIST_database), which is',
 '    Statistician. 27 (1): 17-21. JSTOR 2682899.']

Average

In [6]:
text_file = sc.textFile('sample_data/README.md', 2)
lines.map(lambda line: len(line.split())).reduce(lambda a, b: (a + b) / 2)

6.75

Average with partition 3

In [None]:
text_file = sc.textFile('sample_data/README.md', 5)
lines.map(lambda line: len(line.split())).reduce(lambda a, b: (a + b) / 2)

6.75

## RDD Operations

Download file

In [7]:
r = requests.get('https://www.cse.ust.hk/msbd5003/data/fruits.txt')
open('fruits.txt', 'wb').write(r.content)

65

### print file

In [8]:
fruits = sc.textFile('fruits.txt')
fruits.collect()

['apple',
 'banana',
 'canary melon',
 'grap',
 'lemon',
 'orange',
 'pineapple',
 'strawberry']

### Map


In [9]:
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
fruitsReversed.collect()

['elppa',
 'ananab',
 'nolem yranac',
 'parg',
 'nomel',
 'egnaro',
 'elppaenip',
 'yrrebwarts']

### Filter

In [10]:
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()

['apple', 'grap', 'lemon']

### FlatMap

In [11]:
characters = fruits.flatMap(lambda fruit: list(fruit))
print(characters.collect())

['a', 'p', 'p', 'l', 'e', 'b', 'a', 'n', 'a', 'n', 'a', 'c', 'a', 'n', 'a', 'r', 'y', ' ', 'm', 'e', 'l', 'o', 'n', 'g', 'r', 'a', 'p', 'l', 'e', 'm', 'o', 'n', 'o', 'r', 'a', 'n', 'g', 'e', 'p', 'i', 'n', 'e', 'a', 'p', 'p', 'l', 'e', 's', 't', 'r', 'a', 'w', 'b', 'e', 'r', 'r', 'y']


### Collect

In [13]:
new_fruits = fruits.union(fruits)
new_fruits.collect()

['apple',
 'banana',
 'canary melon',
 'grap',
 'lemon',
 'orange',
 'pineapple',
 'strawberry',
 'apple',
 'banana',
 'canary melon',
 'grap',
 'lemon',
 'orange',
 'pineapple',
 'strawberry']

In [14]:
new_fruits = fruits.intersection(fruits)
new_fruits.collect()

['orange',
 'pineapple',
 'canary melon',
 'lemon',
 'banana',
 'apple',
 'grap',
 'strawberry']

In [15]:
new_fruits = fruits.union(fruits).distinct()
new_fruits.collect()

['orange',
 'pineapple',
 'canary melon',
 'lemon',
 'banana',
 'apple',
 'grap',
 'strawberry']

## RDD Actions

### collect

### take

In [16]:
first3Fruits = fruits.take(3)
print(first3Fruits)

['apple', 'banana', 'canary melon']


### count

In [17]:
fruits.count()

8

### reduce

In [None]:
fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))

{' ',
 'a',
 'b',
 'c',
 'e',
 'g',
 'i',
 'l',
 'm',
 'n',
 'o',
 'p',
 'r',
 's',
 't',
 'w',
 'y'}