### Packages

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

### SparkContext

In [4]:
# Create SparkContext object with 4 executors. It is the delegate between head and executors
sc = SparkContext(master="local[4]")

In [5]:
print(sc)
#sc.stop() ; # to stop context. Do this before starting new one. Only 1 at a time

<SparkContext master=local[4] appName=pyspark-shell>


In [6]:
sc.version

'2.2.1'

### Basics

#### RDDs (Resilient distributed datasets)

In [7]:
# Main data structure in Spark.
# Driver node runs this program. 
#    It has pointer to object that has locations of RDD elements (partitions)
# The elements (partitions) of RDD are distributed on worker nodes.  
#    Executors on worker nodes work on them

In [8]:
# Create RDD
x = range(3)
rdd = sc.parallelize(x)
print(rdd)

# Collect RDD on head node
y = rdd.collect()
print(y, type(y))

PythonRDD[1] at RDD at PythonRDD.scala:48
[0, 1, 2] <class 'list'>


#### Map Reduce

In [9]:
## Map 
# 1:1. 
# Operation function is parameter. 
# All operations in parallel on data local to it
out = rdd.map(lambda x: x*x)
out.collect()

[0, 1, 4]

In [10]:
## Reduce 
# many:1. 
# Operation function is input
# Repeatedly does reduce on data local to it.
# Operation function should be commutative (order independent) 
#   reduce is like a tree with result at root.
out = rdd.reduce(lambda x,y: x+y)
print(out)

3


In [11]:
## Reduce - more complex:

words = ["this", "is", "a", "test"]

# 1: find min length word
rdd2= sc.parallelize(words)
out = rdd2.reduce(lambda x,y: x if len(x)<len(y) else y)
print(out)

# 2: find last lexicograph word among max length words
rdd3 = sc.parallelize(words)
def last_lex_word(x,y):
    if len(x) > len(y):
        return x
    elif len(y) > len(x):
        return y
    else:
        if x > y: return x
        else: return y
out = rdd3.reduce(last_lex_word)
print(out)    

a
this


#### Number of workers

In [12]:
print(sc)
sc.stop()

<SparkContext master=local[4] appName=pyspark-shell>


In [13]:
data = [10, 11] * 100000
len(data)

200000

In [14]:
# Recommendation is 1 per core . 
# It improves till 8 here and gets worse after that (hyperthreading with 4 cores => 8)

from time import time
for num_workers in range(1,16):
    sc = SparkContext(master = "local[%d]" % num_workers)
    tic=time()
    # some dummy operation
    for dummy_itn in range(10):
        sc.parallelize(data).reduce(lambda x,y: x*y)
    exec_time = time() - tic
    print("num of workers: %2d, time taken: %3.2f" %(num_workers,exec_time))
    sc.stop()

num of workers:  1, time taken: 18.14
num of workers:  2, time taken: 5.56
num of workers:  3, time taken: 3.16
num of workers:  4, time taken: 2.27
num of workers:  5, time taken: 2.33
num of workers:  6, time taken: 2.10
num of workers:  7, time taken: 1.97
num of workers:  8, time taken: 1.83
num of workers:  9, time taken: 1.95
num of workers: 10, time taken: 1.98
num of workers: 11, time taken: 2.15
num of workers: 12, time taken: 2.19
num of workers: 13, time taken: 2.14
num of workers: 14, time taken: 2.20
num of workers: 15, time taken: 2.06


#### Lazy evaluation (pipelined execution), Execution plan (dependence graph)

In [15]:
# Computation starts only when result is needed. (not when command is executed).
# Map/reduce only constructs the execution plan
# Minimizes number of memory accesses (and cache misses) and single pass through data. 
# Also, Smaller memory footprint since not intermediate results are stored

In [16]:
from math import sin
def consume_time(i):
    [sin(i) for i in range(50)]
    return sin(i)

In [17]:
%%time
consume_time(1)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 23.6 µs


0.8414709848078965

In [18]:
# create spark context
sc = SparkContext(master = "local[4]")

In [19]:
%%time
# create rdd
data = range(100000)
rdd = sc.parallelize(data)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 3.22 ms


In [20]:
# View execution plan, as dependence graph
# Top-most line is output. Bottom line is input rdd 
print(rdd.toDebugString().decode())

(4) PythonRDD[1] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 []


In [21]:
%%time

# Operation but no execution (map doesnt need to output anything)
temp_rdd = rdd.map(lambda x: consume_time(x))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 21.7 µs


In [22]:
# View execution plan, as dependence graph
# 1stTop-most line is output. Bottom line is input rdd 
print(temp_rdd.toDebugString().decode())

(4) PythonRDD[2] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 []


In [23]:
%%time

# Operation reduce has to print output. 
# temp_rdd map is evaluated now
out = temp_rdd.reduce(lambda x,y: x+y)

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 464 ms


In [24]:
%%time 

# Another operation with same temp_rdd
# temp_rdd map is evaluated again instead of re-using it. 
# So, runtime doesnt reduce.
out = temp_rdd.filter(lambda x: x>0).count()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 174 ms


#### Caching

In [25]:
%%time

# Add plan to cache the intermediate result (memory vs runtime tradeoff)
temp_rdd = rdd.map(lambda x: consume_time(x)).cache()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 8.02 ms


In [26]:
# View execution plan, as dependence graph
# Top-most line is output. Bottom line is input rdd 
print(temp_rdd.toDebugString().decode())

(4) PythonRDD[5] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 [Memory Serialized 1x Replicated]


In [27]:
%%time

# Operation reduce has to print output. 
# temp_rdd map is evaluated now and cached
out = temp_rdd.reduce(lambda x,y: x+y)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 239 ms


In [28]:
%%time 

# Another operation with same temp_rdd
# temp_rdd map cache re-used. 
# So, runtime doesnt reduce.
out = temp_rdd.filter(lambda x: x>0).count()

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 67.7 ms


#### Partitions

In [29]:
## Partitions:
# Number of partitions of the RDD
# Default: Number of workers in spark context
# After some RDD operations, some worker partitions may not have local data and will be idle.
# Can repartition for load balancing
# Should be atleast number of workers so that executor in each worker has a unit to work on

In [30]:
# Initial number of partitions
rdd1 = sc.parallelize(range(100))
print(rdd1.getNumPartitions())

rdd2 = sc.parallelize(range(100), numSlices = 8)
print(rdd2.getNumPartitions())

4
8


#### Gloming

In [31]:
# Used to refer to elements of RDD partition
# Transforms partition into a tuple (immutable list)

# Print length of each partition
print(rdd2.glom().map(len).collect())

[12, 13, 12, 13, 12, 13, 12, 13]


In [32]:
# Print elements in each glom tuple
def getpartinfo(g, print_level = 1):
    if len(g) > 0:
        if print_level == 1:
            out = (g[0], g, len(g))
        else:
            out = len(g)
    else:
        out = None
    return(out)
rdd2.glom().map(lambda g: getpartinfo(g)).collect()

[(0, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 12),
 (12, [12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24], 13),
 (25, [25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36], 12),
 (37, [37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], 13),
 (50, [50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61], 12),
 (62, [62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74], 13),
 (75, [75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86], 12),
 (87, [87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99], 13)]

#### Editing partitions

In [33]:
rdd1 = sc.parallelize(range(100), numSlices = 10)
print(rdd1.getNumPartitions())
rdd2 = rdd1.map(lambda x: (x,x))

10


In [34]:
# Print elements in each glom tuple
def getpartinfo(g, print_level = 1):
    if len(g) > 0:
        if print_level == 1:
            out = (g[0], len(g))
        elif print_level == 2:
            out = (g[0][0], g, len(g))        
        else:
            out = len(g)
    else:
        out = None
    return(out)
rdd2.glom().map(lambda g: getpartinfo(g,1)).collect()

[((0, 0), 10),
 ((10, 10), 10),
 ((20, 20), 10),
 ((30, 30), 10),
 ((40, 40), 10),
 ((50, 50), 10),
 ((60, 60), 10),
 ((70, 70), 10),
 ((80, 80), 10),
 ((90, 90), 10)]

In [35]:
# Option 1 to edit: Random partition
# Give number of partitions
# Easy but no control on where elements go

rdd_temp = rdd2.repartition(10)
print(rdd_temp.getNumPartitions())
rdd_temp.glom().map(lambda g: getpartinfo(g)).collect()

10


[((90, 90), 10),
 ((0, 0), 10),
 ((60, 60), 10),
 ((40, 40), 10),
 None,
 ((30, 30), 20),
 ((10, 10), 10),
 ((70, 70), 10),
 ((50, 50), 10),
 ((20, 20), 10)]

In [36]:
# Option 2 to edit: Partition by key
# Needs (key, value) element RDD with key as integer.
# Give k. key%k is the partition# for each element.
# Extra work to define key but more control

rdd_temp = rdd2.partitionBy(10)
print(rdd_temp.getNumPartitions())
rdd_temp.glom().map(lambda g: getpartinfo(g, 1)).collect()

10


[((0, 0), 10),
 ((1, 1), 10),
 ((2, 2), 10),
 ((3, 3), 10),
 ((4, 4), 10),
 ((5, 5), 10),
 ((6, 6), 10),
 ((7, 7), 10),
 ((8, 8), 10),
 ((9, 9), 10)]

### RDD Operations

#### Chaining

In [37]:
rdd1 = sc.parallelize([1,2,3])
rdd1.map(lambda x: x*x*x)\
    .reduce(lambda x,y: x+y)

36

#### RDD get info

In [38]:
rdd1 = sc.parallelize(range(100))

In [39]:
rdd1.first(), rdd1.take(5)

(0, [0, 1, 2, 3, 4])

In [40]:
print(rdd1.sample(False, 5./100).collect())
print(rdd1.sample(False, 5./100).collect())

[8, 25, 33, 38, 62, 70]
[7, 11, 59, 67, 91, 95, 99]


#### Transformations:  RDD -> RDD

In [41]:
# Transforms RDD -> RDD. 
# No communication across partitions needed.
# Examples: map, filter, flatMap, set operations

In [42]:
# map, flatMap, filter

# map
rdd1 = sc.parallelize(["test1 a b", "test2 c d"])
print("map:", rdd1.map(lambda x: x.split(" ")).collect())

# flatMap - single list
print("flatmap:", rdd1.flatMap(lambda x: x.split(" ")).collect())

# filter 
rdd1 = sc.parallelize(range(100))
print(rdd1.count())
print(rdd1.filter(lambda x: x >= 20).count())


map: [['test1', 'a', 'b'], ['test2', 'c', 'd']]
flatmap: ['test1', 'a', 'b', 'test2', 'c', 'd']
100
80


In [43]:
# Set operations 
# Distinct set, union, intersection, subtract, cartesian

rdd1 = sc.parallelize([1,1,2,5,6])
rdd2 = sc.parallelize([1,2,4,5])
print("rdd1", rdd1.collect())
print("rdd2", rdd2.collect())

print("union", rdd1.union(rdd2).collect())
print("intersection", rdd1.intersection(rdd2).collect())
print("rdd1 minus rdd2", rdd1.subtract(rdd2).collect())
print("rdd1 cartesian rdd2", rdd1.cartesian(rdd2).collect())


rdd1 [1, 1, 2, 5, 6]
rdd2 [1, 2, 4, 5]
union [1, 1, 2, 5, 6, 1, 2, 4, 5]
intersection [1, 2, 5]
rdd1 minus rdd2 [6]
rdd1 cartesian rdd2 [(1, 1), (1, 2), (1, 4), (1, 5), (1, 1), (1, 2), (1, 4), (1, 5), (2, 1), (2, 2), (2, 4), (2, 5), (5, 1), (6, 1), (5, 2), (6, 2), (5, 4), (6, 4), (5, 5), (6, 5)]


In [44]:
# Shuffle is also type of transformation RDD -> RDD
# Lot of communication across partitions - expensive
# Examples: distinct, sort, sortbykey, reducebykey, join

In [45]:
rdd1 = sc.parallelize([1,1,2,5,6])
print("rdd1 distinct set", rdd1.distinct().collect())

rdd1 distinct set [1, 5, 2, 6]


#### Actions:  RDD -> Python object on head node

In [46]:
# Tranforms RDD to python object on head node
# Some communication needed
# Examples: reduce, count, collect

In [47]:
rdd1 = sc.parallelize([1,2,3])
rdd1.map(lambda x: x*x*x)\
    .reduce(lambda x,y: x+y)

36

In [48]:
rdd1.collect()

[1, 2, 3]

In [49]:
rdd1.count()

3

#### (key, value) Transformations

In [50]:
# Create (key,value) RDD
data = [(3,9), (1,2), (2,4), (2,10)]
rdd1 = sc.parallelize(data)
rdd1.collect()

[(3, 9), (1, 2), (2, 4), (2, 10)]

In [51]:
# reducebykey
rdd1.reduceByKey(lambda v1,v2: v1 + v2).collect()

[(1, 2), (2, 14), (3, 9)]

In [52]:
# sortbykey
rdd1.sortByKey().collect()

[(1, 2), (2, 4), (2, 10), (3, 9)]

In [53]:
# mapvalues
rdd1.mapValues(lambda v: v+3).collect()

[(3, 12), (1, 5), (2, 7), (2, 13)]

In [54]:
# groupbykey
print(rdd1.groupByKey().collect())
rdd1.groupByKey().mapValues(lambda iterable: [x for x in iterable]).collect()

[(1, <pyspark.resultiterable.ResultIterable object at 0x7f61057669b0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7f61057668d0>), (3, <pyspark.resultiterable.ResultIterable object at 0x7f6105766e48>)]


[(1, [2]), (2, [4, 10]), (3, [9])]

In [55]:
# flatmapvalues
print(rdd1.mapValues(lambda v: [v,v+3]).collect())
rdd1.flatMapValues(lambda v: [v,v+3]).collect()

[(3, [9, 12]), (1, [2, 5]), (2, [4, 7]), (2, [10, 13])]


[(3, 9), (3, 12), (1, 2), (1, 5), (2, 4), (2, 7), (2, 10), (2, 13)]

In [56]:
# combinebykey

data = [("t1",1), ("t1",2), ("t1",3), ("t1", 4), ("t2",1), ("t2",2), ("t2",3), ("t3",1), ("t3",2), ("t3",3)]
rdd1 = sc.parallelize(data)
print(rdd1.collect())

# sum of values per key
rdd2 = rdd1.combineByKey(
             # createCombiner() -> creates combiner or accumulator: (acc[0], acc[1]) here   
            (lambda value: (value,1)), 
            # mergeValue() -> merge new value with existing accumulator acc
            (lambda acc, value: (acc[0]+value,acc[1]+1)), 
            # mergeCombiner() -> merges accumulators by key across partitions
            (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) 
        )
print(rdd2.collect())

# avg of values per key
rdd3 = rdd2.mapValues(lambda v: v[0]/v[1])
print(rdd3.collect())

[('t1', 1), ('t1', 2), ('t1', 3), ('t1', 4), ('t2', 1), ('t2', 2), ('t2', 3), ('t3', 1), ('t3', 2), ('t3', 3)]
[('t1', (10, 4)), ('t2', (6, 3)), ('t3', (6, 3))]
[('t1', 2.5), ('t2', 2.0), ('t3', 2.0)]


In [57]:
# subtractbykey
rdd1 = sc.parallelize([(1,2), (1,3), (2,4)])
rdd2 = sc.parallelize([(1,7), (3,6)])
rdd2.subtractByKey(rdd1).collect()

[(3, 6)]

In [58]:
# join
rdd1 = sc.parallelize([(1,2), (1,3), (2,4)])
rdd2 = sc.parallelize([(1,7), (3,6)])
print(rdd1.collect())
print(rdd2.collect())

# default is inner join
print(rdd1.join(rdd2).collect())

# leftOuterJoin
print(rdd1.leftOuterJoin(rdd2).collect())

# rightOuterJoin
print(rdd1.rightOuterJoin(rdd2).collect())

# fullOuterJoin
print(rdd1.fullOuterJoin(rdd2).collect())

[(1, 2), (1, 3), (2, 4)]
[(1, 7), (3, 6)]
[(1, (2, 7)), (1, (3, 7))]
[(1, (2, 7)), (1, (3, 7)), (2, (4, None))]
[(1, (2, 7)), (1, (3, 7)), (3, (None, 6))]
[(1, (2, 7)), (1, (3, 7)), (2, (4, None)), (3, (None, 6))]


#### (key, value) Actions

In [59]:
data = [(3,9), (1,2), (2,4), (2,10)]
rdd1 = sc.parallelize(data)

In [60]:
# count by key
x = rdd1.countByKey()
print(x)
print(x[3], x[2])

defaultdict(<class 'int'>, {3: 1, 1: 1, 2: 2})
1 2


In [61]:
# collect as dict
x = rdd1.collectAsMap()
print(x)
print(x[3], x[2])

{3: 9, 1: 2, 2: 10}
9 10


In [62]:
# search key
rdd1.lookup(2)

[4, 10]

### Dataframes

In [63]:
# Dataframe is an RDD of rows plus schema information
# It is a restricted subtype of RDD
# 2D structure - row is record, columns can be of different datatype

In [64]:
# Create SQLContext similar to SparkContext to use sql
sqc = SQLContext(sc)

In [65]:
# Option 1 to create dataframe: create schema and RDD of tuples

# RDD
data = [("A", 1), ("B", 2), ("C", 3)]
rdd = sc.parallelize(data)
print(rdd.collect())

# Schema
sch = [StructField("name", StringType(), False),
       StructField("number", IntegerType(), False)]
schema = StructType(sch)

# DF
df = sqc.createDataFrame(rdd, schema)
df.printSchema()
print(df.collect())

# Types:
type(rdd), type(df)

[('A', 1), ('B', 2), ('C', 3)]
root
 |-- name: string (nullable = false)
 |-- number: integer (nullable = false)

[Row(name='A', number=1), Row(name='B', number=2), Row(name='C', number=3)]


(pyspark.rdd.RDD, pyspark.sql.dataframe.DataFrame)

In [66]:
# Option 2 to create dataframe: create RDD of Rows

# RDD
data = [Row(name="A", number=1), 
        Row(name="B", number=2), 
        Row(name="C", number=3)]
rdd = sc.parallelize(data)
print(rdd.collect())

# DF
df = sqc.createDataFrame(rdd)
df.printSchema()
print(df.collect())

# Types:
type(rdd), type(df)

[Row(name='A', number=1), Row(name='B', number=2), Row(name='C', number=3)]
root
 |-- name: string (nullable = true)
 |-- number: long (nullable = true)

[Row(name='A', number=1), Row(name='B', number=2), Row(name='C', number=3)]


(pyspark.rdd.RDD, pyspark.sql.dataframe.DataFrame)

### Loading and saving files

#### JSON format

In [67]:
!ls data/people.json

data/people.json


In [68]:
fpath = 'data/people.json'
df = sqc.read.json(fpath)
print(type(df))
df.show()
df.printSchema()
df.select('age').show()

<class 'pyspark.sql.dataframe.DataFrame'>
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



#### Parquet format

In [69]:
!ls data/users.parquet

data/users.parquet


In [70]:
fpath = 'data/users.parquet'
df = sqc.read.load(fpath)
print(type(df))
df.show()
df.printSchema()
df.select('name','favorite_color').show(5)

<class 'pyspark.sql.dataframe.DataFrame'>
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+



In [71]:
df1 = df.select('name','favorite_color')
df1.write.save('data/temp.parquet', mode='overwrite')
!ls data/temp.parquet

part-00000-40f50b7a-e84a-4f5c-849c-46e8f55630b1-c000.snappy.parquet  _SUCCESS
