### There are multiple ways to create RDDs in Spark:
-Parallelizing already existing collection in driver program.

-From a dataset in an external storage system (e.g. HDFS, Hbase, shared file system).

-From already existing RDDs.

#### Example parallelize statement

In [1]:
import string

In [2]:
import random

In [3]:
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
...    return ''.join(random.choice(chars) for _ in range(size))
...

In [4]:
my_list = list()

In [5]:
for i in range(1, 10):
...   my_list.append(id_generator())
... 

In [6]:
my_list

['V88GTW',
 'HTMZOE',
 'UEMZB0',
 '1JOUNE',
 '9W9EXV',
 'WUBRAZ',
 'TUIF3E',
 '7LXFRM',
 'A7551C']

In [7]:
test1_RDD = sc.parallelize(my_list)

In [8]:
test1_RDD.collect()

['V88GTW',
 'HTMZOE',
 'UEMZB0',
 '1JOUNE',
 '9W9EXV',
 'WUBRAZ',
 'TUIF3E',
 '7LXFRM',
 'A7551C']

#### From a dataset on filesystem. Here we read a csv file

In [9]:
data_file = "./mtcars.csv"

In [10]:
data_from_file=sc.textFile(data_file)

In [11]:
data_from_file

./mtcars.csv MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

In [12]:
data_from_file.collect()

['model,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1',
 'Hornet Sportabout,18.7,8,360,175,3.15,3.44,17.02,0,0,3,2',
 'Valiant,18.1,6,225,105,2.76,3.46,20.22,1,0,3,1',
 'Duster 360,14.3,8,360,245,3.21,3.57,15.84,0,0,3,4',
 'Merc 240D,24.4,4,146.7,62,3.69,3.19,20,1,0,4,2',
 'Merc 230,22.8,4,140.8,95,3.92,3.15,22.9,1,0,4,2',
 'Merc 280,19.2,6,167.6,123,3.92,3.44,18.3,1,0,4,4',
 'Merc 280C,17.8,6,167.6,123,3.92,3.44,18.9,1,0,4,4',
 'Merc 450SE,16.4,8,275.8,180,3.07,4.07,17.4,0,0,3,3',
 'Merc 450SL,17.3,8,275.8,180,3.07,3.73,17.6,0,0,3,3',
 'Merc 450SLC,15.2,8,275.8,180,3.07,3.78,18,0,0,3,3',
 'Cadillac Fleetwood,10.4,8,472,205,2.93,5.25,17.98,0,0,3,4',
 'Lincoln Continental,10.4,8,460,215,3,5.424,17.82,0,0,3,4',
 'Chrysler Imperial,14.7,8,440,230,3.23,5.345,17.42,0,0,3,4',
 'F

This still needs some processing to make it usable. Can do something different:

In [13]:
mtcars=sqlContext.read.format('csv').options(header='true', inferSchema='true').load('./mtcars.csv')

In [14]:
mtcars.collect()

[Row(model='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
 Row(model='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4),
 Row(model='Datsun 710', mpg=22.8, cyl=4, disp=108.0, hp=93, drat=3.85, wt=2.32, qsec=18.61, vs=1, am=1, gear=4, carb=1),
 Row(model='Hornet 4 Drive', mpg=21.4, cyl=6, disp=258.0, hp=110, drat=3.08, wt=3.215, qsec=19.44, vs=1, am=0, gear=3, carb=1),
 Row(model='Hornet Sportabout', mpg=18.7, cyl=8, disp=360.0, hp=175, drat=3.15, wt=3.44, qsec=17.02, vs=0, am=0, gear=3, carb=2),
 Row(model='Valiant', mpg=18.1, cyl=6, disp=225.0, hp=105, drat=2.76, wt=3.46, qsec=20.22, vs=1, am=0, gear=3, carb=1),
 Row(model='Duster 360', mpg=14.3, cyl=8, disp=360.0, hp=245, drat=3.21, wt=3.57, qsec=15.84, vs=0, am=0, gear=3, carb=4),
 Row(model='Merc 240D', mpg=24.4, cyl=4, disp=146.7, hp=62, drat=3.69, wt=3.19, qsec=20.0, vs=1, am=0, gear=4, carb=2),
 Row(model

In [15]:
mtcars.show(4)

+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|     Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
| Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 4 rows



#### From an existing RDD

Transformation function - take an input RDD and produce a new one. Some of the operations applied on RDD are: filter, count, sample, distinct, Map, FlatMap etc. Reminder: RDDs are immutable => The input RDD does change and just a new RDD is created.

In [16]:
mtcars_sub1=mtcars.select(mtcars['model'], mtcars['mpg'], mtcars['hp'])

In [17]:
mtcars_sub1.show()

+-------------------+----+---+
|              model| mpg| hp|
+-------------------+----+---+
|          Mazda RX4|21.0|110|
|      Mazda RX4 Wag|21.0|110|
|         Datsun 710|22.8| 93|
|     Hornet 4 Drive|21.4|110|
|  Hornet Sportabout|18.7|175|
|            Valiant|18.1|105|
|         Duster 360|14.3|245|
|          Merc 240D|24.4| 62|
|           Merc 230|22.8| 95|
|           Merc 280|19.2|123|
|          Merc 280C|17.8|123|
|         Merc 450SE|16.4|180|
|         Merc 450SL|17.3|180|
|        Merc 450SLC|15.2|180|
| Cadillac Fleetwood|10.4|205|
|Lincoln Continental|10.4|215|
|  Chrysler Imperial|14.7|230|
|           Fiat 128|32.4| 66|
|        Honda Civic|30.4| 52|
|     Toyota Corolla|33.9| 65|
+-------------------+----+---+
only showing top 20 rows



### Examples of Transformations

In [18]:
x = sc.parallelize(range(0,20), 2)

In [19]:
x.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [20]:
y = x.filter(lambda x: x % 2 == 0)

In [21]:
y.collect()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [22]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])

rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

### Examples of Actions

We already saw the "collect" action above. Lets check some more:

In [23]:
y.count()

10

In [24]:
y.take(5)

[0, 2, 4, 6, 8]

In [25]:
y.first()

0

In [26]:
z=x.reduce(lambda accum, n: accum + n)

In [27]:
z

190

In [28]:
x1 = sc.parallelize(range(1,5), 2)

In [29]:
w=x1.reduce(lambda accum, n: accum * n)

In [30]:
w

24

## Full Examples

### Calculate the value of PI (from Spark examples)

In [31]:
from random import random
from operator import add

partitions = 4
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

Pi is roughly 3.134480


### Housing data example 

#### Data: houses number of bedrooms, neighborhood, price

Find the total dollar amount of properties on sale for each neighborhood

In [32]:
text_RDD = sc.parallelize([
                            "3 Downtown 400000",
                            "2 Downtown 240000",
                            "3 Hilltop 650000"
                          ], 2)

#### Mapper: parse each line into a (key, value) pair

In [33]:
text_RDD.collect()

['3 Downtown 400000', '2 Downtown 240000', '3 Hilltop 650000']

In [34]:
def mapper_parse_lines(line):
    """Parse line into (neighborhoood, price) pair"""
    words = line.split()
    return (words[1], float(words[2]))

In [35]:
mapper_parse_lines("1 Downtown 120000")

('Downtown', 120000.0)

In [36]:
house_prices_RDD = text_RDD.map(mapper_parse_lines)

In [37]:
house_prices_RDD.collect()

[('Downtown', 400000.0), ('Downtown', 240000.0), ('Hilltop', 650000.0)]

In [38]:
type(house_prices_RDD)

pyspark.rdd.PipelinedRDD

#### Reducer: sum values across all pairs with the same key

In [39]:
def reducer_sum(a,b):
    return a+b

In [40]:
house_prices_RDD.reduceByKey(reducer_sum)

PythonRDD[41] at RDD at PythonRDD.scala:48

In [41]:
house_prices_RDD.reduceByKey(reducer_sum).collect()

[('Downtown', 640000.0), ('Hilltop', 650000.0)]

### Exercise
Copy-pasting, modifying and rerunning the code above, count the **total** number of bedrooms available in each neighborhood