## Resilient Distributed Dataset - Transformations

In [88]:
import numpy as np
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()

random_numbers = sc.parallelize([np.random.rand() for _ in range(1000)], 4)
random_numbers

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

In [89]:
display(random_numbers)

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

In [90]:
random_numbers.take(3)

[0.3908827457638425, 0.9706352129593969, 0.28054105344126723]

In [91]:
rdd_random_tuples = sc.parallelize([("Mark", 43, "6'1\""), ("Stella", 23, "5'6\""), ("Skye", 6, "3'11\""), ("Albert", 1, "2'7\"")])

In [92]:
rdd_random_tuples.take(3)

[('Mark', 43, '6\'1"'), ('Stella', 23, '5\'6"'), ('Skye', 6, '3\'11"')]

In [93]:
rdd_from_file = sc.textFile('sample_data.csv', 4)
rdd_from_file.take(3)

['OrderDate,Region,Rep,Item,Units,Unit Cost,Total',
 '1-6-16,East,Jones,Pencil,95, 1.99 , 189.05 ',
 '1-23-16,Central,Kivell,Binder,50, 19.99 , 999.50 ']

In [94]:
rdd_from_file.take(5)

['OrderDate,Region,Rep,Item,Units,Unit Cost,Total',
 '1-6-16,East,Jones,Pencil,95, 1.99 , 189.05 ',
 '1-23-16,Central,Kivell,Binder,50, 19.99 , 999.50 ',
 '2-9-16,Central,Jardine,Pencil,36, 4.99 , 179.64 ',
 '2-26-16,Central,Gill,Pen,27, 19.99 , 539.73 ']

### Structured

In [95]:
rdd_valid_structured = sc.parallelize([('Tom', 0), ('Spark', 1)])
rdd_valid_structured.take(2)

[('Tom', 0), ('Spark', 1)]

In [96]:
display(rdd_valid_structured)

ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:175

### Unstructured

In [97]:
rdd_valid_unstructured = sc.parallelize([('Jul 17 22:04:25 router dnsprobe[276]: dnsquery failed'), 
                                         ('Jul 17 22:04:29 router last message repeated two times'), 
                                        ('Jul 17 22:04:29 router dnsprob[276]: Primary DNS server is Down... Switching to'), 
                                        ('Jul 17 22:05:08 router dnsprob[276]: Switching back to primary DNS server'), 
                                        ('Jul 17 22:26:11 debian -- MARK --'), 
                                        ('Jul 17 22:46:11 debian -- Mark --'), 
                                        ('Jul 17 22:47:36 router -- MARK --'), 
                                        ('Jul 17 22:47:36 router dnsprob[276]: dns query failed')])
rdd_valid_unstructured.take(3)

['Jul 17 22:04:25 router dnsprobe[276]: dnsquery failed',
 'Jul 17 22:04:29 router last message repeated two times',
 'Jul 17 22:04:29 router dnsprob[276]: Primary DNS server is Down... Switching to']

### Heterogeneous Data

In [98]:
rdd_valid_heterogeneous = sc.parallelize([('Toll invoice number', 'TRE2321'), 
                                         {'Last bill balance': 0.00, 'New bill balance:': 23.92}, 
                                         ['2017-11-12 12:32:34PM', '2017-11-14 1:32:56PM']])
rdd_valid_heterogeneous.take(3)

[('Toll invoice number', 'TRE2321'),
 {'Last bill balance': 0.0, 'New bill balance:': 23.92},
 ['2017-11-12 12:32:34PM', '2017-11-14 1:32:56PM']]

### Understanding lazy execution

In [99]:
names_only = rdd_valid_structured.map(lambda element: element[0])
names_only.collect()

['Tom', 'Spark']

In [100]:
names_only

PythonRDD[18] at collect at <ipython-input-99-9e08de0ad59a>:2

### .map(...) transformation

### Using lambdas

In [101]:
def splitOnComma(inputString):
    return inputString.split(',')
for el in rdd_from_file.map(splitOnComma).take(2):
    print(el)

['OrderDate', 'Region', 'Rep', 'Item', 'Units', 'Unit Cost', 'Total']
['1-6-16', 'East', 'Jones', 'Pencil', '95', ' 1.99 ', ' 189.05 ']


In [102]:
for el in rdd_from_file.map(lambda element: element.split(',')).take(2):
    print(el)

['OrderDate', 'Region', 'Rep', 'Item', 'Units', 'Unit Cost', 'Total']
['1-6-16', 'East', 'Jones', 'Pencil', '95', ' 1.99 ', ' 189.05 ']


### Transforming Data

In [103]:
def converToFloat(inputString):
    try:
        return float(inputString)
    except: 
        return -666

to_filter = rdd_from_file.map(lambda element: element.split(',')).map(lambda element:
                                                                     [e for e in element[:4]] + 
                                                                     [converToFloat(e) for e in element[4:]])
to_filter.take(5)

[['OrderDate', 'Region', 'Rep', 'Item', -666, -666, -666],
 ['1-6-16', 'East', 'Jones', 'Pencil', 95.0, 1.99, 189.05],
 ['1-23-16', 'Central', 'Kivell', 'Binder', 50.0, 19.99, 999.5],
 ['2-9-16', 'Central', 'Jardine', 'Pencil', 36.0, 4.99, 179.64],
 ['2-26-16', 'Central', 'Gill', 'Pen', 27.0, 19.99, 539.73]]

### .filter(...) transformation

In [104]:
filtered = to_filter.filter(lambda element: element[4] != -666)
filtered.take(4)

[['1-6-16', 'East', 'Jones', 'Pencil', 95.0, 1.99, 189.05],
 ['1-23-16', 'Central', 'Kivell', 'Binder', 50.0, 19.99, 999.5],
 ['2-9-16', 'Central', 'Jardine', 'Pencil', 36.0, 4.99, 179.64],
 ['2-26-16', 'Central', 'Gill', 'Pen', 27.0, 19.99, 539.73]]

### .flat(...) Map transformation

#### .map to flatMap comparison

In [105]:
def mapExample(inputList):
    outputList = []
    for item in inputList:
        temp = item.copy()
        temp[1] *= 10
        outputList.append(temp)
    return outputList

def flatMapExample(inputList):
    outputList = []
    for item in inputList:
        temp = item.copy()
        temp[1] *= 10
        outputList += temp
    return outputList

sampleList = [[1, 3], [2, 4], [3, 5], [4, 6]]
print(mapExample(sampleList))
print(flatMapExample(sampleList))

[[1, 30], [2, 40], [3, 50], [4, 60]]
[1, 30, 2, 40, 3, 50, 4, 60]


### Filtering malformed record

In [106]:
import datetime as dt

def parseCSVRow(inputRow):
    try:
        rowSplit = inputRow.split(',')
        rowSplit[0] = dt.datetime.strptime(rowSplit[0], '%m-%d-%y')
        rowSplit[4] = int(rowSplit[4])
        
        for i in [5,6]:
            rowSplit[i] = float(rowSplit[i])
            
        return [rowSplit]
    except:
        return []
rdd_from_file.map(parseCSVRow).take(3)

[[],
 [[datetime.datetime(2016, 1, 6, 0, 0),
   'East',
   'Jones',
   'Pencil',
   95,
   1.99,
   189.05]],
 [[datetime.datetime(2016, 1, 23, 0, 0),
   'Central',
   'Kivell',
   'Binder',
   50,
   19.99,
   999.5]]]

In [107]:
rdd_from_file.map(parseCSVRow).filter(lambda element: len(element)>0).take(3)

[[[datetime.datetime(2016, 1, 6, 0, 0),
   'East',
   'Jones',
   'Pencil',
   95,
   1.99,
   189.05]],
 [[datetime.datetime(2016, 1, 23, 0, 0),
   'Central',
   'Kivell',
   'Binder',
   50,
   19.99,
   999.5]],
 [[datetime.datetime(2016, 2, 9, 0, 0),
   'Central',
   'Jardine',
   'Pencil',
   36,
   4.99,
   179.64]]]

In [108]:
rdd_from_file_clean = rdd_from_file.flatMap(parseCSVRow)
rdd_from_file_clean.take(3)

[[datetime.datetime(2016, 1, 6, 0, 0),
  'East',
  'Jones',
  'Pencil',
  95,
  1.99,
  189.05],
 [datetime.datetime(2016, 1, 23, 0, 0),
  'Central',
  'Kivell',
  'Binder',
  50,
  19.99,
  999.5],
 [datetime.datetime(2016, 2, 9, 0, 0),
  'Central',
  'Jardine',
  'Pencil',
  36,
  4.99,
  179.64]]

### .distinct(...) transformation

#### Understanding the distinct method

In [109]:
sampleList = [1,1,1,4,3,4,6,4,8,6,9]
distinct = []
seen = {}
for elem in sampleList:
    if elem in seen:
        continue
    else:
        distinct.append(elem)
        seen[elem] = 1
print(distinct)        

[1, 4, 3, 6, 8, 9]


### Finding distinct in RDD

In [110]:
items = rdd_from_file_clean.map(lambda element: element[3]).distinct()
items.collect()

['Desk', 'Pencil', 'Pen', 'Binder', 'Pen Set']

In [111]:
rdd_from_file_clean.map(lambda element: element[2]).distinct().collect()

['Jardine',
 'Gill',
 'Thompson',
 'Howard',
 'Smith',
 'Jones',
 'Sorvino',
 'Andrews',
 'Morgan',
 'Parent',
 'Kivell']

In [112]:
rdd_from_file_clean.map(lambda element: element[2]).distinct().count()

11

### .sample(...) transformation

In [113]:
rdd_from_file_clean.count()

37

In [114]:
rdd_from_file_clean.sample(False, 0.2).count()

8

### .join(...) transformation

#### The mechanics of join

In [115]:
a = sc.parallelize([('a', 2), ('b', 3)])
b = sc.parallelize([('a', 4), ('b', 4), ('c', 4)])
a.join(b).collect()

[('a', (2, 4)), ('b', (3, 4))]

### Append location

In [116]:
cities = sc.parallelize([('East', 'Boston'), ('Central', 'Chicago'), ('West', 'Seattle')])

In [117]:
rdd_from_file_clean \
.map(lambda element: (element[1], element)) \
.join(cities)\
.map(lambda element: element[1][0] + [element[1][1]]) \
.take(3)

[[datetime.datetime(2016, 1, 6, 0, 0),
  'East',
  'Jones',
  'Pencil',
  95,
  1.99,
  189.05,
  'Boston'],
 [datetime.datetime(2016, 4, 1, 0, 0),
  'East',
  'Jones',
  'Binder',
  60,
  4.99,
  299.4,
  'Boston'],
 [datetime.datetime(2016, 6, 8, 0, 0),
  'East',
  'Jones',
  'Binder',
  60,
  8.99,
  539.4,
  'Boston']]

### .repartition(...) transformation

In [118]:
rdd_from_file_clean.getNumPartitions()

4

In [119]:
rdd_from_file_repartitioned = rdd_from_file.repartition(2)
rdd_from_file_repartitioned.getNumPartitions()

2

In [120]:
rdd_from_file_repartitioned_sorted = rdd_from_file_clean.map(lambda element: (int(element[6]), element)).repartitionAndSortWithinPartitions(2, lambda x: x).map(lambda element: tuple(element[1]))

rdd_from_file_repartitioned_sorted.glom() \
.collect()

[[(datetime.datetime(2017, 10, 31, 0, 0),
   'Central',
   'Andrews',
   'Pencil',
   14,
   1.29,
   18.06),
  (datetime.datetime(2017, 11, 17, 0, 0),
   'Central',
   'Jardine',
   'Binder',
   11,
   4.99,
   54.89),
  (datetime.datetime(2017, 5, 14, 0, 0),
   'Central',
   'Gill',
   'Pencil',
   53,
   1.29,
   68.37),
  (datetime.datetime(2016, 12, 12, 0, 0),
   'Central',
   'Smith',
   'Pencil',
   67,
   1.29,
   86.43),
  (datetime.datetime(2016, 8, 15, 0, 0),
   'East',
   'Jones',
   'Pencil',
   35,
   4.99,
   174.65),
  (datetime.datetime(2016, 9, 1, 0, 0),
   'Central',
   'Smith',
   'Desk',
   2,
   125.0,
   250.0),
  (datetime.datetime(2017, 7, 21, 0, 0),
   'Central',
   'Morgan',
   'Pen Set',
   55,
   12.49,
   686.95)],
 [(datetime.datetime(2017, 9, 10, 0, 0),
   'Central',
   'Gill',
   'Pencil',
   7,
   1.29,
   9.03),
  (datetime.datetime(2017, 2, 18, 0, 0),
   'East',
   'Jones',
   'Binder',
   4,
   4.99,
   19.96),
  (datetime.datetime(2016, 7, 12, 0, 0

In [121]:
rdd_from_file_repartitioned_sorted.getNumPartitions()

2

### Read in the data

In [122]:
%%sh
head -n 2 sample_data.csv

OrderDate,Region,Rep,Item,Units,Unit Cost,Total
1-6-16,East,Jones,Pencil,95, 1.99 , 189.05 


In [123]:
def parseCSVRow(inputRow):
    try:
        rowSplit = inputRow.split(',')
        rowSplit[0] = dt.datetime.strptime(rowSplit[0], '%m-%d-%y')
        rowSplit[4] = int(rowSplit[4])
        
        for i in [5,6]:
            rowSplit[i] = float(rowSplit[i])
            
        return [rowSplit]
    except:
        return []
    
rdd_clean = sc.textFile('sample_data.csv', 4).flatMap(parseCSVRow)

### .take(...) action

In [124]:
rdd_clean.take(2)

[[datetime.datetime(2016, 1, 6, 0, 0),
  'East',
  'Jones',
  'Pencil',
  95,
  1.99,
  189.05],
 [datetime.datetime(2016, 1, 23, 0, 0),
  'Central',
  'Kivell',
  'Binder',
  50,
  19.99,
  999.5]]

In [125]:
for element in rdd_clean.takeOrdered(5, key = lambda el: el[0]):
    print(element)

[datetime.datetime(2016, 1, 6, 0, 0), 'East', 'Jones', 'Pencil', 95, 1.99, 189.05]
[datetime.datetime(2016, 1, 23, 0, 0), 'Central', 'Kivell', 'Binder', 50, 19.99, 999.5]
[datetime.datetime(2016, 2, 9, 0, 0), 'Central', 'Jardine', 'Pencil', 36, 4.99, 179.64]
[datetime.datetime(2016, 2, 26, 0, 0), 'Central', 'Gill', 'Pen', 27, 19.99, 539.73]
[datetime.datetime(2016, 3, 15, 0, 0), 'West', 'Sorvino', 'Pencil', 56, 2.99, 167.44]


In [126]:
for element in rdd_clean.takeSample(False, 5, seed = 666):
    print(element)

[datetime.datetime(2017, 6, 17, 0, 0), 'Central', 'Kivell', 'Desk', 5, 125.0, 625.0]
[datetime.datetime(2016, 6, 8, 0, 0), 'East', 'Jones', 'Binder', 60, 8.99, 539.4]
[datetime.datetime(2016, 8, 15, 0, 0), 'East', 'Jones', 'Pencil', 35, 4.99, 174.65]
[datetime.datetime(2017, 3, 7, 0, 0), 'West', 'Sorvino', 'Binder', 7, 19.99, 139.93]
[datetime.datetime(2016, 5, 5, 0, 0), 'Central', 'Jardine', 'Pencil', 90, 4.99, 449.1]


### .collect(...) action

In [127]:
len(rdd_clean.collect()) == rdd_clean.count()

True

In [131]:
for element in rdd_clean.filter(lambda el: el[-1] > 500).collect():
    print(element)

[datetime.datetime(2016, 1, 23, 0, 0), 'Central', 'Kivell', 'Binder', 50, 19.99, 999.5]
[datetime.datetime(2016, 2, 26, 0, 0), 'Central', 'Gill', 'Pen', 27, 19.99, 539.73]
[datetime.datetime(2016, 6, 8, 0, 0), 'East', 'Jones', 'Binder', 60, 8.99, 539.4]
[datetime.datetime(2016, 10, 22, 0, 0), 'East', 'Jones', 'Pen', 64, 8.99, 575.36]
[datetime.datetime(2017, 5, 31, 0, 0), 'Central', 'Gill', 'Binder', 80, 8.99, 719.2]
[datetime.datetime(2017, 6, 17, 0, 0), 'Central', 'Kivell', 'Desk', 5, 125.0, 625.0]
[datetime.datetime(2017, 7, 21, 0, 0), 'Central', 'Morgan', 'Pen Set', 55, 12.49, 686.95]
[datetime.datetime(2017, 8, 24, 0, 0), 'West', 'Sorvino', 'Desk', 3, 275.0, 825.0]


### .reduce(...) action

In [133]:
from operator import add
total_value = rdd_clean.map(lambda el: el[-1]).reduce(add)
total_value

11496.039999999999

In [134]:
total_value = rdd_clean.map(lambda el: el[-1]).reduce(lambda x, y: x+y)
total_value

11496.039999999999

### .reduceByKey(...) action

In [135]:
rdd_clean.take(2)

[[datetime.datetime(2016, 1, 6, 0, 0),
  'East',
  'Jones',
  'Pencil',
  95,
  1.99,
  189.05],
 [datetime.datetime(2016, 1, 23, 0, 0),
  'Central',
  'Kivell',
  'Binder',
  50,
  19.99,
  999.5]]

In [137]:
sales_by_region = rdd_clean.map(lambda el: (el[1], el[-1])).reduceByKey(lambda x, y: x+y)
sales_by_region.collect()

[('East', 3199.64), ('Central', 6949.109999999999), ('West', 1347.29)]

In [138]:
for element in sales_by_region.collect():
    print(element)

('East', 3199.64)
('Central', 6949.109999999999)
('West', 1347.29)


### .count() action

In [139]:
rdd_clean.count()

37

In [140]:
rdd_clean.countApprox(1, confidence=0.7)

37

In [146]:
sales = rdd_clean.map(lambda el: el[2])
sales.countApproxDistinct()

11

In [148]:
sales.distinct().count()

11

### .foreach(...) action

In [151]:
distinct_sales = sales.distinct()
distinct_sales.foreach(print)

### .aggregate(...) action

In [157]:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

rdd_clean.map(lambda el: el[-1]).aggregate((0.0, 0), seqOp, combOp)

(11496.039999999999, 37)

In [158]:
rdd_clean.take(3)

[[datetime.datetime(2016, 1, 6, 0, 0),
  'East',
  'Jones',
  'Pencil',
  95,
  1.99,
  189.05],
 [datetime.datetime(2016, 1, 23, 0, 0),
  'Central',
  'Kivell',
  'Binder',
  50,
  19.99,
  999.5],
 [datetime.datetime(2016, 2, 9, 0, 0),
  'Central',
  'Jardine',
  'Pencil',
  36,
  4.99,
  179.64]]

### .aggregateByKey(...) action

In [159]:
seqOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

for element in rdd_clean.map(lambda el: (el[2], (el[-1], 1))).aggregateByKey((0.0, 0), seqOp, combOp).map(lambda el: (el[0], el[1][0], el[1][1], el[1][0]/el[1][1])).collect():
    print(element)

('Jardine', 933.13, 4, 233.2825)
('Gill', 1749.8700000000001, 5, 349.97400000000005)
('Thompson', 63.68, 1, 63.68)
('Howard', 536.75, 2, 268.375)
('Smith', 336.43, 2, 168.215)
('Jones', 2363.04, 8, 295.38)
('Sorvino', 1283.6100000000001, 4, 320.90250000000003)
('Andrews', 438.37, 4, 109.5925)
('Morgan', 1387.77, 3, 462.59)
('Parent', 299.85, 1, 299.85)
('Kivell', 2103.54, 3, 701.18)


### .coalesce(...) action

In [160]:
rdd_clean.getNumPartitions()

4

In [161]:
rdd_single = rdd_clean.coalesce(1)
rdd_single.getNumPartitions()

1

### .combineByKey(...) action

In [162]:
def combiner(element):
    return [element]

def valueMerger(element1, element2):
    element1.append(element2)
    return element1

def combinerMerger(element1, element2):
    el1 = dict(element1)
    for e in element2:
        if e[0] not in el1:
            el1[e[0]] = 0
        el1[e[0]] += e[1]
        
    return list(el1.items())

for element in rdd_clean.map(lambda el: (el[2], (el[3], el[-1]))).combineByKey(combiner, valueMerger, combinerMerger).collect():
    print(element)

('Jardine', [('Pencil', 449.1), ('Pen Set', 249.5), ('Binder', 54.89)])
('Gill', [('Pen', 539.73), ('Binder', 1132.74), ('Pencil', 77.4)])
('Thompson', [('Pencil', 63.68)])
('Howard', [('Binder', 57.71), ('Pen', 479.04)])
('Smith', [('Desk', 250.0), ('Pencil', 86.43)])
('Jones', [('Pencil', 363.70000000000005), ('Binder', 559.36), ('Pen Set', 565.22), ('Pen', 575.36)])
('Sorvino', [('Pencil', 167.44), ('Binder', 139.93), ('Desk', 825.0), ('Pen', 151.24)])
('Andrews', [('Pencil', 298.65000000000003), ('Binder', 139.72)])
('Morgan', [('Pencil', 449.1), ('Binder', 251.72), ('Pen Set', 686.95)])
('Parent', [('Pen', 299.85)])
('Kivell', [('Binder', 999.5), ('Pen Set', 479.04), ('Desk', 625.0)])


In [165]:
hist = rdd_clean.map(lambda el: el[-1]).histogram(10)

In [169]:
for bucket in [(round(b, 0), v) for b, v in zip(hist[0], hist[1])]:
    print(bucket)

(9.0, 8)
(108.0, 9)
(207.0, 6)
(306.0, 1)
(405.0, 5)
(504.0, 3)
(603.0, 2)
(702.0, 1)
(801.0, 1)
(900.0, 1)
