In [5]:
#!export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

# Creating RDDs

In [1]:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/Library/Frameworks/Python.framework/Versions/3.6/bin/python3'

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("rdd_demo").getOrCreate()

#sc.stop()
conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext.getOrCreate()

In [262]:
##### read using datasource API

flightData2015 = spark\
  .read\
  .option("inferSchema", "false")\
  .option("header", "true")\
  .csv("/Users/aakash/training/spark/data/flight-data/csv/2015-summary.csv")

# COMMAND ----------

flightData2015=flightData2015.toDF("dest","source","count").rdd
print(type(flightData2015))
print(flightData2015.take(1))

<class 'pyspark.rdd.RDD'>
[Row(dest='United States', source='Romania', count='15')]


In [263]:
##### read using sparkcontext
spth="/Users/aakash/training/spark/data/flight-data/csv/2015-summary.csv"
sc_flightData2015=spark.sparkContext.textFile(spth)
print(type(sc_flightData2015))
print(sc_flightData2015.take(2))

<class 'pyspark.rdd.RDD'>
['DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count', 'United States,Romania,15']


In [264]:
##### convert pandas file to RDD
import pandas as pd

spth="/Users/aakash/training/spark/data/flight-data/csv/2015-summary.csv"
pd_flightData2015=pd.read_csv(spth, header=0)
print(type(pd_flightData2015))
print(pd_flightData2015.head())
pd_flightData2015=spark.createDataFrame(pd_flightData2015).rdd
print(type(pd_flightData2015))
print(pd_flightData2015.take(1))



<class 'pandas.core.frame.DataFrame'>
  DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME  count
0     United States             Romania     15
1     United States             Croatia      1
2     United States             Ireland    344
3             Egypt       United States     15
4     United States               India     62
<class 'pyspark.rdd.RDD'>
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)]


In [265]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
words.take(5)

['Spark', 'The', 'Definitive', 'Guide', ':']

In [266]:
##### from a collection of text
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

words.setName("myWords")
words.name() # myWords
print(type(words))
print(words.count())

<class 'pyspark.rdd.RDD'>
10


In [309]:
### from a range of numbers
myRange = spark.range(1000000000000).toDF("number").rdd.map(lambda row: row[0])
#myRange = spark.range(1000).toDF("number")
#myRange.take(5)
myRange.getNumPartitions()

4

# a look into some low level issues

In [268]:
#### some low level access issues
pd_flightData2015.count()
pd_flightData2015.take(2)[-1]

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)

# filtering

In [269]:
def parseLine(line):
    fields = line.split(',')
    date = fields[0]
    p_open = fields[1]
    p_close = fields[5]
    return (date, p_open, p_close)

In [270]:
spth="/Users/aakash/Sarasvati/NSE/RELIANCE.csv"
sdt=spark.sparkContext.textFile(spth)
sdt=sdt.map(parseLine)
sdt.take(2)

[('Date', 'Open', 'Close'), ('1998-03-23', '178.5', '180.2')]

In [271]:
spth="/Users/aakash/Sarasvati/NSE/RELIANCE.csv"
o_sdt = spark.read.format("CSV").option("header","true").option("inferSchema", "true") \
    .load(spth)
o_sdt=o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd \
    .map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
#print(type(o_sdt))
#print(o_sdt.take(2))
o_sdt=o_sdt.filter(lambda row: row[2] > row[1])
#print(o_sdt.take(5))
#print(type(o_sdt))
print(o_sdt.count())

5366
2525


## filter function

In [272]:
def HighClose(row):
    ## Discuss
    if row[2] > row[1]:
        return(row)

Notice anything in output below?

In [273]:
o_sdt = spark.read.format("CSV").option("header","true").load(spth)
o_sdt=o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
o_sdt=o_sdt.filter(lambda row: HighClose(row))
print(o_sdt.take(1))
#print(type(o_sdt))
print(o_sdt.count())

5366
[('1998-03-23', '178.5', '180.2')]
2536


How about now?

In [274]:
o_sdt = spark.read.format("CSV").option("header","true").option("inferSchema", "true").load(spth)
o_sdt=o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
o_sdt=o_sdt.filter(lambda row: HighClose(row))
print(o_sdt.take(5))
print(type(o_sdt))
print(o_sdt.count())


5366
[(datetime.datetime(1998, 3, 23, 0, 0), 178.5, 180.2), (datetime.datetime(1998, 3, 25, 0, 0), 181.5, 183.85), (datetime.datetime(1998, 3, 27, 0, 0), 179.1, 180.4), (datetime.datetime(1998, 4, 1, 0, 0), 177.5, 182.85), (datetime.datetime(1998, 4, 3, 0, 0), 181.5, 184.95)]
<class 'pyspark.rdd.PipelinedRDD'>
2525


In [275]:
o_sdt = spark.read.format("CSV").option("header","true").option("inferSchema", "true").load(spth)
o_sdt=o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
o_sdt=o_sdt.filter(lambda row: HighClose(row))
print(o_sdt.take(5))
print(type(o_sdt))
print(o_sdt.count())


5366
[(datetime.datetime(1998, 3, 23, 0, 0), 178.5, 180.2), (datetime.datetime(1998, 3, 25, 0, 0), 181.5, 183.85), (datetime.datetime(1998, 3, 27, 0, 0), 179.1, 180.4), (datetime.datetime(1998, 4, 1, 0, 0), 177.5, 182.85), (datetime.datetime(1998, 4, 3, 0, 0), 181.5, 184.95)]
<class 'pyspark.rdd.PipelinedRDD'>
2525


## Map

In [276]:
def to_to_mill(row):
    return (row[0], row[1], row[2], round(row[3],0))

In [277]:
spth="/Users/aakash/Sarasvati/NSE/TATASTEEL.csv"
o_sdt = spark.read.format("CSV").option("header","true").option("inferSchema", "true").load(spth)
o_sdt=o_sdt.toDF("Date","Open","High","Low","Last","Close","Volume","Turnover").rdd.map(lambda row: (row[0], row[1], row[5], row[7]))
print(o_sdt.take(2))
o_sdt=o_sdt.map(to_to_mill)
print(o_sdt.take(2))
print(type(o_sdt))
print(o_sdt.count())



[(datetime.datetime(2005, 10, 17, 0, 0), 380.0, 384.35, 17639.41), (datetime.datetime(2005, 10, 18, 0, 0), 386.85, 374.85, 13024.16)]
[(datetime.datetime(2005, 10, 17, 0, 0), 380.0, 384.35, 17639.0), (datetime.datetime(2005, 10, 18, 0, 0), 386.85, 374.85, 13024.0)]
<class 'pyspark.rdd.PipelinedRDD'>
3461


# flatMap

In [278]:
def Func(lines):
    lines = lines.lower()
    lines = lines.split(" ")
    return lines

#sc.stop()
conf = SparkConf().setMaster("local").setAppName("wordcount")
sc = SparkContext.getOrCreate()

spth="/Users/aakash/training/spark/data/sherlock_holmes.txt"
input_file = sc.textFile("data/sherlock_holmes.txt")
#print(input_file.take(5))
rdd1 = input_file.flatMap(Func)
rdd2=rdd1.map(lambda x: (x,1)).groupByKey().mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
rdd2.take(5)

[(1134897, 'the'),
 (624263, ''),
 (573518, 'and'),
 (548842, 'of'),
 (541280, 'to')]

## Reduce

In [127]:
spark.sparkContext.parallelize(range(1,200)).reduce(lambda x, y: x+y)

19900

## count

In [135]:
rdd1.count()

110739

In [141]:
rdd1.countApprox(1, 0.95)

110739

In [240]:
rdd1.countByValue()

AttributeError: 'collections.defaultdict' object has no attribute 'countByValue'

In [279]:
spth="/Users/aakash/training/spark/data/s1.txt"
spth_1="/Users/aakash/training/spark/data/sherlock_holmes.txt"
spth_2="/Users/aakash/training/spark/data/little_sherlock_holmes.txt"
input_file = sc.textFile(spth_1,6)


input_file.pipe("wc -l").collect()
### why 2 outputs???

['  408379', '  408212', '  408267', '  408337', '  408257', '  408238']

In [245]:
input_file.count()

1514130

In [365]:
print(type(input_file))
print(input_file.count())
print(input_file.getNumPartitions())
print(sc.defaultParallelism)

<class 'pyspark.rdd.RDD'>
2449690
5
4


## saving file

In [None]:
input_file.saveAsTextFile('/Users/aakash/training/spark/wc.txt')
## check ouatputs

## glom

In [300]:
spark.sparkContext.parallelize(["Hello"," World"],2).glom().collect()

[['Hello'], [' World']]

## controlling partitioning

In [360]:
print(input_file.getNumPartitions())
print(input_file.coalesce(2).getNumPartitions())
#### no shuffle is performed through coalesce

4
2


In [361]:
print(input_file.getNumPartitions())
print(input_file.repartition(5).getNumPartitions())
input_file=input_file.repartition(5)
print(input_file.getNumPartitions())


4
5
5


In [362]:
df=spark.read.option("header","true").option("inferSchema", "true").csv("/Users/aakash/training/spark/data/retail-data/all")
rdd=df.coalesce(10).rdd
df.count()
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [293]:
import random
def partitionFunc(key):  
    if key == 17850 or key == 12583:
        return 0
    else:
        return random.randint(1,2)

keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD\
  .partitionBy(3, partitionFunc)\
  .map(lambda x: x[0])\
  .glom()\
  .map(lambda x: len(set(x)))\
  .take(5)

### This custom partitioning logic is only available at RDD level

[2, 4295, 4302]

## Key value pairs

In [304]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

### using map
words.map(lambda word: (word.lower(),1)).take(5)

[('spark', 1), ('the', 1), ('definitive', 1), ('guide', 1), (':', 1)]

In [315]:
### using keyBy()
keyword=words.keyBy(lambda word: word.lower())
keyword.take(4)

[('spark', 'Spark'),
 ('the', 'The'),
 ('definitive', 'Definitive'),
 ('guide', 'Guide')]

In [316]:
## using mapValues
keyword.mapValues(lambda x: x.upper()).collect()

[('spark', 'SPARK'),
 ('the', 'THE'),
 ('definitive', 'DEFINITIVE'),
 ('guide', 'GUIDE'),
 (':', ':'),
 ('big', 'BIG'),
 ('data', 'DATA'),
 ('processing', 'PROCESSING'),
 ('made', 'MADE'),
 ('simple', 'SIMPLE')]

In [319]:
## extracting keys and/or values
print(keyword.keys().collect())
print(keyword.values().collect())

['spark', 'the', 'definitive', 'guide', ':', 'big', 'data', 'processing', 'made', 'simple']
['Spark', 'The', 'Definitive', 'Guide', ':', 'Big', 'Data', 'Processing', 'Made', 'Simple']


## Aggregations

In [337]:
def addFunc(left, right):
    return(left + right)

def maxFunc(left, right):
    max(left, right)

In [328]:
spth_2="/Users/aakash/training/spark/data/little_sherlock_holmes.txt"
input_file = sc.textFile(spth_2,6)
chars=input_file.flatMap(lambda word: word.split(" "))
KVcharacters=chars.map(lambda letter: (letter,1))
KVcharacters.countByKey()



defaultdict(int,
            {'': 216600,
             'Project': 5592,
             "Gutenberg's": 70,
             'The': 18784,
             'Adventures': 279,
             'of': 187968,
             'Sherlock': 6697,
             'Holmes,': 8765,
             'by': 23472,
             'Arthur': 969,
             'Conan': 279,
             'Doyle': 279,
             'This': 3728,
             'eBook': 347,
             'is': 74130,
             'for': 48105,
             'the': 373183,
             'use': 2485,
             'anyone': 1864,
             'anywhere': 208,
             'at': 50867,
             'no': 19738,
             'cost': 346,
             'and': 193128,
             'with': 58260,
             'almost': 1382,
             'restrictions': 139,
             'whatsoever.': 139,
             'You': 9388,
             'may': 13875,
             'copy': 691,
             'it,': 6421,
             'give': 3798,
             'it': 67425,
             'away': 5798,
      

In [331]:
### groupByKey
spth="/Users/aakash/training/spark/data/sherlock_holmes.txt"
input_file = sc.textFile("data/sherlock_holmes.txt")
#print(input_file.take(5))
rdd1 = input_file.flatMap(Func)
rdd2=rdd1.map(lambda x: (x,1)).groupByKey() \
    .mapValues(sum).map(lambda x: (x[1],x[0])) \
    .sortByKey(False)

rdd2.take(5)

[(1134897, 'the'),
 (624263, ''),
 (573518, 'and'),
 (548842, 'of'),
 (541280, 'to')]

In [335]:
### reduceByKey
KVcharacters.reduceByKey(lambda x,y: x+y).collect()

[('', 216600),
 ('Project', 5592),
 ('of', 187968),
 ('anywhere', 208),
 ('no', 19738),
 ('whatsoever.', 139),
 ('away', 5798),
 ('#1661]', 70),
 ('20,', 70),
 ('Language:', 70),
 ('Character', 70),
 ('UTF-8', 70),
 ('OF', 1313),
 ('GUTENBERG', 208),
 ('anonymous', 139),
 ('II.', 208),
 ('Valley', 277),
 ('Orange', 70),
 ('Bachelor', 70),
 ('XII.', 139),
 ('Copper', 553),
 ('BOHEMIA', 70),
 ('always', 3798),
 ('have', 61706),
 ('heard', 7110),
 ('It', 20020),
 ('love', 1105),
 ('machine', 829),
 ('as', 53557),
 ('placed', 1243),
 ('spoke', 1313),
 ('save', 2416),
 ('admirable', 208),
 ('drawing', 346),
 ('But', 6902),
 ('intrusions', 70),
 ('into', 18773),
 ('distracting', 70),
 ('high-power', 70),
 ('there', 18980),
 ('seen', 4281),
 ('finds', 277),
 ('master', 484),
 ('form', 553),
 ('Baker', 2279),
 ('books,', 208),
 ('cocaine', 208),
 ('nature.', 277),
 ('observation', 277),
 ('out', 18772),
 ('clues,', 70),
 ('official', 967),
 ('account', 1245),
 ('murder,', 139),
 ('Atkinson', 7

In [359]:
### aggregateByKey
KVcharacters.aggregateByKey(0, addFunc, addFunc).collect()

[('', 216600),
 ('Project', 5592),
 ('of', 187968),
 ('anywhere', 208),
 ('no', 19738),
 ('whatsoever.', 139),
 ('away', 5798),
 ('#1661]', 70),
 ('20,', 70),
 ('Language:', 70),
 ('Character', 70),
 ('UTF-8', 70),
 ('OF', 1313),
 ('GUTENBERG', 208),
 ('anonymous', 139),
 ('II.', 208),
 ('Valley', 277),
 ('Orange', 70),
 ('Bachelor', 70),
 ('XII.', 139),
 ('Copper', 553),
 ('BOHEMIA', 70),
 ('always', 3798),
 ('have', 61706),
 ('heard', 7110),
 ('It', 20020),
 ('love', 1105),
 ('machine', 829),
 ('as', 53557),
 ('placed', 1243),
 ('spoke', 1313),
 ('save', 2416),
 ('admirable', 208),
 ('drawing', 346),
 ('But', 6902),
 ('intrusions', 70),
 ('into', 18773),
 ('distracting', 70),
 ('high-power', 70),
 ('there', 18980),
 ('seen', 4281),
 ('finds', 277),
 ('master', 484),
 ('form', 553),
 ('Baker', 2279),
 ('books,', 208),
 ('cocaine', 208),
 ('nature.', 277),
 ('observation', 277),
 ('out', 18772),
 ('clues,', 70),
 ('official', 967),
 ('account', 1245),
 ('murder,', 139),
 ('Atkinson', 7

In [357]:
### inner joins
x = sc.parallelize([("a", 1), ("b", 4), ("c",5)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())


[('a', (1, 2)), ('a', (1, 3))]

In [356]:
## zips
y = sc.parallelize(zip(range(0,5), range(10,25)))
y.take(5)

[(0, 10), (1, 11), (2, 12), (3, 13), (4, 14)]

In [350]:
words.getNumPartitions()

2

## Broadcast variables

In [364]:
my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

supplementalData = {"Spark":1000, \
                    "Definitive":200,\
                    "Big":-300, \
                    "Simple":100}

suppBroadcast = spark.sparkContext.broadcast(supplementalData)

print(suppBroadcast.value)

words.map(lambda word: (word, suppBroadcast.value.get(word, 0))) \
    .sortBy(lambda wordPair: wordPair[1]).collect()

{'Spark': 1000, 'Definitive': 200, 'Big': -300, 'Simple': 100}


[('Big', -300),
 ('The', 0),
 ('Guide', 0),
 (':', 0),
 ('Data', 0),
 ('Processing', 0),
 ('Made', 0),
 ('Simple', 100),
 ('Definitive', 200),
 ('Spark', 1000)]

## Accumulators

In [None]:
#from pyspark import SparkContext 
#sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
    global num 
    num+=x 

rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value
print ("Accumulated value is -> %i" % (final))