# Spark RDD operation Example

## Import spark modules

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

**Setup spark configuration and create a spark context**

In [3]:
sc = spark.sparkContext

## Load data from collection

In [4]:
collData = sc.parallelize([4,3,2,5,8,5])

In [5]:
collData.collect()

[4, 3, 2, 5, 8, 5]

## Load data from csv

**Load data from csv**

In [6]:
autoData = sc.textFile("./data/auto-data.csv")

**Count the number of lines in the csv data**

In [7]:
autoData.count()

198

**Print the header line**

In [8]:
autoData.first()

'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

**Print the head of the data**

In [9]:
autoData.take(5)

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

**Print all the lines in the RDD**

In [10]:
for line in autoData.collect():
    print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,37,41,5389
honda,gas,std,two,hatchback,fwd,four,60,5500,38,42,5399
nissan,gas,std,two,sedan,fwd,four,69,5200,31,37,5499
dodge,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
plymouth,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
mazda,gas,std,two,hatchback,fwd,four,68,5000,31,38,6095
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,31,38,6189
dodge,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
plymouth,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
chevrolet,gas,std,two,hatchback,fwd,four,70,5400,38,43,6295
toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338
dodge,gas,std,two,hatchback,fwd,four,68,5500,31,38,6377

## Save the RDD to a local data

In [11]:
# Open file handler in write mode
with open("./saved_files/auto-data-saved.csv", 'w') as autoDataFile:
    # Collect the autodata RDD and write it to the local file
    autoDataFile.write("\n".join(autoData.collect()))

## Transformations

**Map and create a new RDD**

In [12]:
tsvData = autoData.map(lambda x: x.replace(",", "\t"))

In [13]:
tsvData.take(10)

['MAKE\tFUELTYPE\tASPIRE\tDOORS\tBODY\tDRIVE\tCYLINDERS\tHP\tRPM\tMPG-CITY\tMPG-HWY\tPRICE',
 'subaru\tgas\tstd\ttwo\thatchback\tfwd\tfour\t69\t4900\t31\t36\t5118',
 'chevrolet\tgas\tstd\ttwo\thatchback\tfwd\tthree\t48\t5100\t47\t53\t5151',
 'mazda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5000\t30\t31\t5195',
 'toyota\tgas\tstd\ttwo\thatchback\tfwd\tfour\t62\t4800\t35\t39\t5348',
 'mitsubishi\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5500\t37\t41\t5389',
 'honda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t60\t5500\t38\t42\t5399',
 'nissan\tgas\tstd\ttwo\tsedan\tfwd\tfour\t69\t5200\t31\t37\t5499',
 'dodge\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5500\t37\t41\t5572',
 'plymouth\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5500\t37\t41\t5572']

**Filter data that the word toyota in it**

In [14]:
toyotaData = autoData.filter(lambda x: "toyota" in x)

In [15]:
toyotaData.count()

32

In [16]:
toyotaData.take(5)

['toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338',
 'toyota,gas,std,four,hatchback,fwd,four,62,4800,31,38,6488',
 'toyota,gas,std,four,wagon,fwd,four,62,4800,31,37,6918',
 'toyota,gas,std,four,sedan,fwd,four,70,4800,30,37,6938']

**Split the toyota data lines using flatmap** 

In [17]:
words = toyotaData.flatMap(lambda line: line.split(","))

In [18]:
words.count()

384

In [19]:
words.take(20)

['toyota',
 'gas',
 'std',
 'two',
 'hatchback',
 'fwd',
 'four',
 '62',
 '4800',
 '35',
 '39',
 '5348',
 'toyota',
 'gas',
 'std',
 'two',
 'hatchback',
 'fwd',
 'four',
 '62']

**Display the distinct data of collData**

In [20]:
for data in collData.distinct().collect():
    print(data)

4
8
5
2
3


### Set operations

**Create 2 new RDDs**

In [21]:
words1 = sc.parallelize(["hello", "war", "peace", "world"])
words2 = sc.parallelize(["war", "peace", "universe"])

**Print unions**

In [22]:
for union in words1.union(words2).distinct().collect():
    print(union)

hello
universe
peace
world
war


**Print intersects**

In [23]:
for intersect in words1.intersection(words2).collect():
    print(intersect)

peace
war


### Map data using function

**Map function**

In [24]:
def cleanseRDD(autoStr):
    '''map function'''
    if isinstance(autoStr, int):
        return autoStr
    attlist = autoStr.split(",")
    # convert the doors to number str
    if attlist[3] == "two":
        attlist[3] = "2"
    else:
        attlist[3] = "4"
    # convert drive to upper
    attlist[5] = attlist[5].upper()
    return ",".join(attlist)

**Clean operation**

In [25]:
cleaned_data = autoData.map(cleanseRDD)

**Display the head**

In [26]:
cleaned_data.take(10)

['MAKE,FUELTYPE,ASPIRE,4,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118',
 'chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151',
 'mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195',
 'toyota,gas,std,2,hatchback,FWD,four,62,4800,35,39,5348',
 'mitsubishi,gas,std,2,hatchback,FWD,four,68,5500,37,41,5389',
 'honda,gas,std,2,hatchback,FWD,four,60,5500,38,42,5399',
 'nissan,gas,std,2,sedan,FWD,four,69,5200,31,37,5499',
 'dodge,gas,std,2,hatchback,FWD,four,68,5500,37,41,5572',
 'plymouth,gas,std,2,hatchback,FWD,four,68,5500,37,41,5572']

# Actions

**Reduce action to compute sum**

In [27]:
collData.reduce(lambda x,y: x+y)

27

**Reduce action to compute the shortest line**

In [28]:
autoData.reduce(lambda x,y: x if len(x)<len(y)else y)

'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

### Reduce action to compute avg miles per gallon

**Reduce function**

In [29]:
def getMPG(autoStr):
    '''reduce function'''
    if isinstance(autoStr, int):
        return autoStr
    attList = autoStr.split(",")
    if attList[9].isdigit():
        return int(attList[9])
    else:
        return 0

**Apply reduce function**

In [30]:
autoData.reduce(lambda x,y: getMPG(x) + getMPG(y)) \
    / (autoData.count()-1) # -1 is bacuase of the header

25.15228426395939

# working with key/value RDDs

**Create a KV RDD of auto Brand and Horsepower**

In [31]:
cylData = autoData.map(lambda x: (x.split(",")[0], x.split(",")[7]))

**Head of KV RDD**

In [32]:
cylData.take(5)

[('MAKE', 'HP'),
 ('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62')]

**List of keys**

In [33]:
cylData.keys().collect()

['MAKE',
 'subaru',
 'chevrolet',
 'mazda',
 'toyota',
 'mitsubishi',
 'honda',
 'nissan',
 'dodge',
 'plymouth',
 'mazda',
 'mitsubishi',
 'dodge',
 'plymouth',
 'chevrolet',
 'toyota',
 'dodge',
 'honda',
 'toyota',
 'honda',
 'chevrolet',
 'nissan',
 'mitsubishi',
 'dodge',
 'plymouth',
 'mazda',
 'isuzu',
 'mazda',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'mitsubishi',
 'subaru',
 'nissan',
 'subaru',
 'honda',
 'toyota',
 'honda',
 'honda',
 'nissan',
 'nissan',
 'mazda',
 'subaru',
 'nissan',
 'subaru',
 'dodge',
 'plymouth',
 'mitsubishi',
 'toyota',
 'subaru',
 'volkswagen',
 'toyota',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'dodge',
 'plymouth',
 'volkswagen',
 'volkswagen',
 'nissan',
 'subaru',
 'toyota',
 'mitsubishi',
 'volkswagen',
 'toyota',
 'nissan',
 'toyota',
 'toyota',
 'mazda',
 'volkswagen',
 'mitsubishi',
 'toyota',
 'honda',
 'mazda',
 'dodge',
 'plymouth',
 'toyota',
 'nissan',
 'honda',
 'subaru',
 'toyota',
 'mitsubishi',
 'mitsubishi',
 'toyota',
 'vo

### Find average HP by brand

**Remove the header**

In [34]:
header = cylData.first()
cylHPData = cylData.filter(lambda line: line != header)

**Add 1 to the data**

In [35]:
addOne = cylHPData.mapValues(lambda x: (x,1))
addOne.collect()

[('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1)),
 ('honda', ('60', 1)),
 ('nissan', ('69', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('chevrolet', ('70', 1)),
 ('toyota', ('62', 1)),
 ('dodge', ('68', 1)),
 ('honda', ('58', 1)),
 ('toyota', ('62', 1)),
 ('honda', ('76', 1)),
 ('chevrolet', ('70', 1)),
 ('nissan', ('69', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('isuzu', ('78', 1)),
 ('mazda', ('68', 1)),
 ('nissan', ('69', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('62', 1)),
 ('toyota', ('70', 1)),
 ('mitsubishi', ('88', 1)),
 ('subaru', ('73', 1)),
 ('nissan', ('55', 1)),
 ('subaru', ('82', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('70', 1)),
 ('honda', ('76', 1)),
 ('honda', ('76', 1)),
 ('nissan', ('69', 1)),
 ('nissan', ('69', 1)),
 

In [36]:
brandValues = addOne \
                .reduceByKey(lambda x,y: (int(x[0]) + int(y[0]), \
                x[1] + y[1]))
brandValues.collect()

[('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('mitsubishi', (1353, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('saab', (760, 6)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('mercedes-benz', (1170, 8)),
 ('jaguar', (614, 3)),
 ('subaru', (1035, 12)),
 ('toyota', (2969, 32)),
 ('honda', (1043, 13)),
 ('isuzu', (168, 2)),
 ('volkswagen', (973, 12)),
 ('peugot', (1098, 11)),
 ('audi', (687, 6)),
 ('bmw', (1111, 8)),
 ('mercury', ('175', 1)),
 ('porsche', (764, 4))]

**Find average by dividing HP total by count total**

In [37]:
brandValues.mapValues(lambda x: int(x[0])/int(x[1])).collect()

[('chevrolet', 62.666666666666664),
 ('mazda', 86.875),
 ('mitsubishi', 104.07692307692308),
 ('nissan', 102.55555555555556),
 ('dodge', 84.375),
 ('plymouth', 86.71428571428571),
 ('saab', 126.66666666666667),
 ('volvo', 128.0),
 ('alfa-romero', 125.33333333333333),
 ('mercedes-benz', 146.25),
 ('jaguar', 204.66666666666666),
 ('subaru', 86.25),
 ('toyota', 92.78125),
 ('honda', 80.23076923076923),
 ('isuzu', 84.0),
 ('volkswagen', 81.08333333333333),
 ('peugot', 99.81818181818181),
 ('audi', 114.5),
 ('bmw', 138.875),
 ('mercury', 175.0),
 ('porsche', 191.0)]

## Accumulator and Broadcast Variables

**Initialize accumulators**

In [38]:
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)

**Set broadcast variables**

In [39]:
sedanText = sc.broadcast("sedan")
hatchbackText = sc.broadcast("hatchback")

**Map function which counts the instances with sedan and hatchback**

In [40]:
def splitLines(line):
    
    # global variables
    global sedanCount
    global hatchbackCount
    
    if sedanText.value in line:
        sedanCount += 1
    if hatchbackText.value in line:
        hatchbackCount += 1
        
    return line.split(",")

**Execute mapping**

In [41]:
splitData = autoData.map(splitLines)

In [42]:
splitData.count()

198

**Print hte sedan and hatchback counts**

In [43]:
print("Sedan Count:", sedanCount)
print("Hatchback Count:", hatchbackCount)

Sedan Count: 92
Hatchback Count: 67


## Partitions

In [44]:
collData.getNumPartitions()

4

**Specify the number of partitions in RDD**

In [45]:
collData = sc.parallelize([3,5,4,7,4],3)

In [46]:
collData.cache()

ParallelCollectionRDD[48] at parallelize at PythonRDD.scala:175

In [47]:
collData.count()

5

**Display the number of cores**

In [48]:
collData.getNumPartitions()

3