# Spark - RDDs

## RDD

In [1]:
from pyspark.sql import SparkSession 
spark = SparkSession\
        .builder\
        .appName("Python")\
        .getOrCreate()

In [2]:
spark.range(10).rdd.collect()

[Row(id=0),
 Row(id=1),
 Row(id=2),
 Row(id=3),
 Row(id=4),
 Row(id=5),
 Row(id=6),
 Row(id=7),
 Row(id=8),
 Row(id=9)]

In [3]:
spark.range(10).toDF("id").rdd.map(lambda row: row[0]).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [4]:
spark.range(10).rdd.toDF().collect()

[Row(id=0),
 Row(id=1),
 Row(id=2),
 Row(id=3),
 Row(id=4),
 Row(id=5),
 Row(id=6),
 Row(id=7),
 Row(id=8),
 Row(id=9)]

In [5]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

In [6]:
words.collect()

['Spark',
 'The',
 'Definitive',
 'Guide',
 ':',
 'Big',
 'Data',
 'Processing',
 'Made',
 'Simple']

In [7]:
words.setName("myWords")
words.name() # myWords

'myWords'

In [8]:
def startsWithS(individual):
  return individual.startswith("S")

In [9]:
words.filter(lambda word: startsWithS(word)).collect()

['Spark', 'Simple']

In [10]:
words2 = words.map(lambda word: (word, word[0], word.startswith("S")))
words2.collect()

[('Spark', 'S', True),
 ('The', 'T', False),
 ('Definitive', 'D', False),
 ('Guide', 'G', False),
 (':', ':', False),
 ('Big', 'B', False),
 ('Data', 'D', False),
 ('Processing', 'P', False),
 ('Made', 'M', False),
 ('Simple', 'S', True)]

In [11]:
words2.filter(lambda record: record[2]).take(5)

[('Spark', 'S', True), ('Simple', 'S', True)]

In [12]:
words.flatMap(lambda word: list(word)).take(5)

['S', 'p', 'a', 'r', 'k']

In [13]:
words.sortBy(lambda word: len(word) * -1).take(2)

['Definitive', 'Processing']

In [14]:
fiftyFiftySplit = words.randomSplit([0.5, 0.5])
type(fiftyFiftySplit)
print(fiftyFiftySplit[0])

PythonRDD[36] at RDD at PythonRDD.scala:48


In [15]:
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210

210

In [16]:
def wordLengthReducer(leftWord, rightWord):
  if len(leftWord) > len(rightWord):
    return leftWord
  else:
    return rightWord

words.reduce(wordLengthReducer)

'Processing'

In [17]:
words.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [18]:
words.mapPartitions(lambda part: [1]).sum() # 2

2

In [19]:
def indexedFunc(partitionIndex, withinPartIterator):
  return ["partition: {} => {}".format(partitionIndex, x) for x in withinPartIterator]
words.mapPartitionsWithIndex(indexedFunc).collect()

['partition: 0 => Spark',
 'partition: 0 => The',
 'partition: 0 => Definitive',
 'partition: 0 => Guide',
 'partition: 0 => :',
 'partition: 1 => Big',
 'partition: 1 => Data',
 'partition: 1 => Processing',
 'partition: 1 => Made',
 'partition: 1 => Simple']

In [20]:
spark.sparkContext.parallelize(["Hello", "World"], 2).glom().collect()

[['Hello'], ['World']]

## Advanced RDDs

In [21]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
words.collect()

['Spark',
 'The',
 'Definitive',
 'Guide',
 ':',
 'Big',
 'Data',
 'Processing',
 'Made',
 'Simple']

In [22]:
words.map(lambda word: (word.lower(), 1))

PythonRDD[45] at RDD at PythonRDD.scala:48

In [24]:
keyword = words.keyBy(lambda word: word.lower())
keyword.collect()

[('spark', 'Spark'),
 ('the', 'The'),
 ('definitive', 'Definitive'),
 ('guide', 'Guide'),
 (':', ':'),
 ('big', 'Big'),
 ('data', 'Data'),
 ('processing', 'Processing'),
 ('made', 'Made'),
 ('simple', 'Simple')]

In [25]:
keyword.mapValues(lambda word: word.upper()).collect()

[('spark', 'SPARK'),
 ('the', 'THE'),
 ('definitive', 'DEFINITIVE'),
 ('guide', 'GUIDE'),
 (':', ':'),
 ('big', 'BIG'),
 ('data', 'DATA'),
 ('processing', 'PROCESSING'),
 ('made', 'MADE'),
 ('simple', 'SIMPLE')]

In [28]:
keyword.flatMapValues(lambda word: word.upper()).collect()[:6]

[('spark', 'S'),
 ('spark', 'P'),
 ('spark', 'A'),
 ('spark', 'R'),
 ('spark', 'K'),
 ('the', 'T')]

In [29]:
keyword.keys().collect()

['spark',
 'the',
 'definitive',
 'guide',
 ':',
 'big',
 'data',
 'processing',
 'made',
 'simple']

In [30]:
keyword.values().collect()

['Spark',
 'The',
 'Definitive',
 'Guide',
 ':',
 'Big',
 'Data',
 'Processing',
 'Made',
 'Simple']

In [31]:
import random
distinctChars = words.flatMap(lambda word: list(word.lower())).distinct()\
  .collect()

In [32]:
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))
words.map(lambda word: (word.lower()[0], word))\
  .sampleByKey(True, sampleMap, 6).collect()

[(':', ':')]

In [33]:
chars = words.flatMap(lambda word: word.lower())

In [34]:
KVcharacters = chars.map(lambda letter: (letter, 1))

In [35]:
KVcharacters.collect()

[('s', 1),
 ('p', 1),
 ('a', 1),
 ('r', 1),
 ('k', 1),
 ('t', 1),
 ('h', 1),
 ('e', 1),
 ('d', 1),
 ('e', 1),
 ('f', 1),
 ('i', 1),
 ('n', 1),
 ('i', 1),
 ('t', 1),
 ('i', 1),
 ('v', 1),
 ('e', 1),
 ('g', 1),
 ('u', 1),
 ('i', 1),
 ('d', 1),
 ('e', 1),
 (':', 1),
 ('b', 1),
 ('i', 1),
 ('g', 1),
 ('d', 1),
 ('a', 1),
 ('t', 1),
 ('a', 1),
 ('p', 1),
 ('r', 1),
 ('o', 1),
 ('c', 1),
 ('e', 1),
 ('s', 1),
 ('s', 1),
 ('i', 1),
 ('n', 1),
 ('g', 1),
 ('m', 1),
 ('a', 1),
 ('d', 1),
 ('e', 1),
 ('s', 1),
 ('i', 1),
 ('m', 1),
 ('p', 1),
 ('l', 1),
 ('e', 1)]

In [36]:
def maxFunc(left, right):
  return max(left, right)
def addFunc(left, right):
  return left + right

In [37]:
nums = sc.parallelize(range(1,31), 5)
nums.take(5)

[1, 2, 3, 4, 5]

In [38]:
KVcharacters.countByKey()

defaultdict(int,
            {':': 1,
             'a': 4,
             'b': 1,
             'c': 1,
             'd': 4,
             'e': 7,
             'f': 1,
             'g': 3,
             'h': 1,
             'i': 7,
             'k': 1,
             'l': 1,
             'm': 2,
             'n': 2,
             'o': 1,
             'p': 3,
             'r': 2,
             's': 4,
             't': 3,
             'u': 1,
             'v': 1})

In [39]:
from functools import reduce
KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1])))\
  .collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [40]:
nums.aggregate(0, maxFunc, addFunc)

90

In [41]:
depth = 3
nums.treeAggregate(0, maxFunc, addFunc, depth)

90

In [42]:
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()

[('s', 3),
 ('p', 2),
 ('r', 1),
 ('h', 1),
 ('d', 2),
 ('i', 4),
 ('g', 2),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 3),
 ('k', 1),
 ('t', 2),
 ('e', 4),
 ('f', 1),
 ('n', 1),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [43]:
def valToCombiner(value):
  return [value]
def mergeValuesFunc(vals, valToAppend):
  vals.append(valToAppend)
  return vals
def mergeCombinerFunc(vals1, vals2):
  return vals1 + vals2

outputPartitions = 6

KVcharacters\
  .combineByKey(
    valToCombiner,
    mergeValuesFunc,
    mergeCombinerFunc,
    outputPartitions)\
  .collect()

[('s', [1, 1, 1, 1]),
 ('d', [1, 1, 1, 1]),
 ('l', [1]),
 ('v', [1]),
 (':', [1]),
 ('p', [1, 1, 1]),
 ('r', [1, 1]),
 ('c', [1]),
 ('k', [1]),
 ('t', [1, 1, 1]),
 ('n', [1, 1]),
 ('u', [1]),
 ('o', [1]),
 ('h', [1]),
 ('i', [1, 1, 1, 1, 1, 1, 1]),
 ('g', [1, 1, 1]),
 ('b', [1]),
 ('a', [1, 1, 1, 1]),
 ('e', [1, 1, 1, 1, 1, 1, 1]),
 ('f', [1]),
 ('m', [1, 1])]

In [44]:
KVcharacters.foldByKey(0, addFunc).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [45]:
import random
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
charRDD = distinctChars.map(lambda c: (c, random.random()))
charRDD2 = distinctChars.map(lambda c: (c, random.random()))
charRDD.cogroup(charRDD2).take(5)

[('s',
  (<pyspark.resultiterable.ResultIterable at 0x112c17048>,
   <pyspark.resultiterable.ResultIterable at 0x112c17f60>)),
 ('p',
  (<pyspark.resultiterable.ResultIterable at 0x112c17748>,
   <pyspark.resultiterable.ResultIterable at 0x112c17c88>)),
 ('r',
  (<pyspark.resultiterable.ResultIterable at 0x112c17ba8>,
   <pyspark.resultiterable.ResultIterable at 0x112c17c50>)),
 ('i',
  (<pyspark.resultiterable.ResultIterable at 0x112c17518>,
   <pyspark.resultiterable.ResultIterable at 0x112c17208>)),
 ('g',
  (<pyspark.resultiterable.ResultIterable at 0x112c17080>,
   <pyspark.resultiterable.ResultIterable at 0x112c17780>))]

In [46]:
keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10
KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()

51

In [47]:
numRange = sc.parallelize(range(10), 2)
words.zip(numRange).collect()

[('Spark', 0),
 ('The', 1),
 ('Definitive', 2),
 ('Guide', 3),
 (':', 4),
 ('Big', 5),
 ('Data', 6),
 ('Processing', 7),
 ('Made', 8),
 ('Simple', 9)]

In [48]:
words.coalesce(1).getNumPartitions() # 1

1

In [49]:
df = spark.read.option("header", "true").option("inferSchema", "true")\
  .csv("data/retail-data/all/")
rdd = df.coalesce(10).rdd

In [50]:
rdd.take(5)

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity='6', InvoiceDate='12/1/2010 8:26', UnitPrice='2.55', CustomerID='17850', Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity='6', InvoiceDate='12/1/2010 8:26', UnitPrice='3.39', CustomerID='17850', Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity='8', InvoiceDate='12/1/2010 8:26', UnitPrice='2.75', CustomerID='17850', Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity='6', InvoiceDate='12/1/2010 8:26', UnitPrice='3.39', CustomerID='17850', Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity='6', InvoiceDate='12/1/2010 8:26', UnitPrice='3.39', CustomerID='17850', Country='United Kingdom')]

In [51]:
def partitionFunc(key):
  import random
  if key == 17850 or key == 12583:
    return 0
  else:
    return random.randint(1,2)

keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD\
  .partitionBy(3, partitionFunc)\
  .map(lambda x: x[0])\
  .glom()\
  .map(lambda x: len(set(x)))\
  .take(5)

[0, 4363, 4379]

## Distributed Variables

In [52]:
my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

In [53]:
supplementalData = {"Spark":1000, "Definitive":200,
                    "Big":-300, "Simple":100}

In [54]:
suppBroadcast = spark.sparkContext.broadcast(supplementalData)

In [55]:
suppBroadcast.value

{'Big': -300, 'Definitive': 200, 'Simple': 100, 'Spark': 1000}

In [56]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
  .sortBy(lambda wordPair: wordPair[1])\
  .collect()

[('Big', -300),
 ('The', 0),
 ('Guide', 0),
 (':', 0),
 ('Data', 0),
 ('Processing', 0),
 ('Made', 0),
 ('Simple', 100),
 ('Definitive', 200),
 ('Spark', 1000)]

In [57]:
flights = spark.read\
  .parquet("data/flight-data/parquet/2010-summary.parquet")

In [58]:
accChina = spark.sparkContext.accumulator(0)

In [59]:
def accChinaFunc(flight_row):
  destination = flight_row["DEST_COUNTRY_NAME"]
  origin = flight_row["ORIGIN_COUNTRY_NAME"]
  if destination == "China":
    accChina.add(flight_row["count"])
  if origin == "China":
    accChina.add(flight_row["count"])

In [60]:
flights.foreach(lambda flight_row: accChinaFunc(flight_row))

In [61]:
accChina.value # 953

953

## Spark on a Cluster

In [62]:
# Creating a SparkSession in Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

In [63]:
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)

In [64]:
step1 = df1.repartition(5)
step12 = df2.repartition(6)

In [65]:
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")

step4.collect() # 2500000000000

[Row(sum(id)=2500000000000)]

## Spark Applications

In [66]:
if __name__ == '__main__':
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

print(spark.range(5000).where("id > 500").selectExpr("sum(id)").collect())

[Row(sum(id)=12372250)]


In [67]:
from pyspark import SparkConf
conf = SparkConf().setMaster("local[2]").setAppName("DefinitiveGuide")\
  .set("some.conf", "to.some.value")

In [69]:
# Original loading code that does *not* cache DataFrame
DF1 = spark.read.format("csv")\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .load("data/flight-data/csv/2015-summary.csv")
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()

In [70]:
DF1.cache()
DF1.count()

256

In [71]:
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()