In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 35.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=52dbb686352c20eb040c720a7075c2d280cc4fc3d346ad5de935c6f520cb30cb
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [3]:
#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [4]:
#Create a Spark Session
SpSession = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
#Get the Spark Context from Spark Session    
SpContext = SpSession.sparkContext

In [6]:
#Test Spark
testData = SpContext.parallelize([3,6,4,2])
testData.count()

4

In [11]:
#Load from a collection
collData = SpContext.parallelize([4,3,8,5,8])
collData.collect()# bring the entire RDD to the driver node, could be expensive
#collData.count()

[4, 3, 8, 5, 8]

In [13]:
#Load the file. Lazy initialization
autoData = SpContext.textFile('/content/drive/My Drive/auto-data.csv')
autoData.cache()
#Loads only now.
autoData.count()


198

In [14]:
autoData.first()


'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

In [15]:
autoData.take(5)

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

In [16]:
for line in autoData.collect():
  print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,37,41,5389
honda,gas,std,two,hatchback,fwd,four,60,5500,38,42,5399
nissan,gas,std,two,sedan,fwd,four,69,5200,31,37,5499
dodge,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
plymouth,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
mazda,gas,std,two,hatchback,fwd,four,68,5000,31,38,6095
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,31,38,6189
dodge,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
plymouth,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
chevrolet,gas,std,two,hatchback,fwd,four,70,5400,38,43,6295
toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338
dodge,gas,std,two,hatchback,fwd,four,68,5500,31,38,6377

In [17]:
#Save to a local file. First collect the RDD to the master
#and then save as local file.
autoDataFile = open("auto-data-saved.csv","w")
autoDataFile.write("\n".join(autoData.collect()))
autoDataFile.close()

In [19]:
##   Transformations

#Map and create a new RDD
tsvData=autoData.map(lambda x : x.replace(","," "))
tsvData.take(5)

['MAKE FUELTYPE ASPIRE DOORS BODY DRIVE CYLINDERS HP RPM MPG-CITY MPG-HWY PRICE',
 'subaru gas std two hatchback fwd four 69 4900 31 36 5118',
 'chevrolet gas std two hatchback fwd three 48 5100 47 53 5151',
 'mazda gas std two hatchback fwd four 68 5000 30 31 5195',
 'toyota gas std two hatchback fwd four 62 4800 35 39 5348']

In [20]:
toyotaData = autoData.filter(lambda x : "toyota" in x)
toyotaData.count()

32

In [21]:
#FlatMap
words=toyotaData.flatMap(lambda line: line.split(","))
words.count()


384

In [22]:
words.take(20)

['toyota',
 'gas',
 'std',
 'two',
 'hatchback',
 'fwd',
 'four',
 '62',
 '4800',
 '35',
 '39',
 '5348',
 'toyota',
 'gas',
 'std',
 'two',
 'hatchback',
 'fwd',
 'four',
 '62']

In [23]:
#Distinct
for numbData in collData.distinct().collect():
    print(numbData)

4
8
3
5


In [24]:
#Set operations
words1 = SpContext.parallelize(["hello","war","peace","world"])
words2 = SpContext.parallelize(["war","peace","universe"])

for unions in words1.union(words2).distinct().collect():
    print(unions)

hello
peace
world
universe
war


In [26]:
for intersects in words1.intersection(words2).collect():
    print(intersects)

peace
war


#Using functions for transformation
#cleanse and transform an RDD

In [27]:
def cleanseRDD(autoStr) :
    if isinstance(autoStr, int) :
        return autoStr
    attList=autoStr.split(",")
    #convert doors to a number str
    if attList[3] == "two" :
         attList[3]="2"
    else :
         attList[3]="4"
    #Convert Drive to uppercase
    attList[5] = attList[5].upper()
    return ",".join(attList)
    
cleanedData=autoData.map(cleanseRDD)
cleanedData.collect()

['MAKE,FUELTYPE,ASPIRE,4,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118',
 'chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151',
 'mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195',
 'toyota,gas,std,2,hatchback,FWD,four,62,4800,35,39,5348',
 'mitsubishi,gas,std,2,hatchback,FWD,four,68,5500,37,41,5389',
 'honda,gas,std,2,hatchback,FWD,four,60,5500,38,42,5399',
 'nissan,gas,std,2,sedan,FWD,four,69,5200,31,37,5499',
 'dodge,gas,std,2,hatchback,FWD,four,68,5500,37,41,5572',
 'plymouth,gas,std,2,hatchback,FWD,four,68,5500,37,41,5572',
 'mazda,gas,std,2,hatchback,FWD,four,68,5000,31,38,6095',
 'mitsubishi,gas,std,2,hatchback,FWD,four,68,5500,31,38,6189',
 'dodge,gas,std,4,hatchback,FWD,four,68,5500,31,38,6229',
 'plymouth,gas,std,4,hatchback,FWD,four,68,5500,31,38,6229',
 'chevrolet,gas,std,2,hatchback,FWD,four,70,5400,38,43,6295',
 'toyota,gas,std,2,hatchback,FWD,four,62,4800,31,38,6338',
 'dodge,gas,std,2,hatchback

##   Actions

In [28]:
#reduce - compute the sum
collData.collect()
collData.reduce(lambda x,y: x+y)

28

In [29]:
#find the shortest line
autoData.reduce(lambda x,y: x if len(x) < len(y) else y)

'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

In [30]:
#Use a function to perform reduce 
def getMPG( autoStr) :
    if isinstance(autoStr, int) :
        return autoStr
    attList=autoStr.split(",")
    if attList[9].isdigit() :
        return int(attList[9])
    else:
        return 0

#find average MPG-City for all cars    
autoData.reduce(lambda x,y : getMPG(x) + getMPG(y)) \
    / (autoData.count()-1.0)  # account for header line

25.15228426395939

##   Working with Key/Value RDDs

In [32]:
#create a KV RDD of auto Brand and Horsepower
cylData = autoData.map( lambda x: ( x.split(",")[0], \
    x.split(",")[7]))
cylData.take(5)


[('MAKE', 'HP'),
 ('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62')]

In [33]:
cylData.keys().collect()

['MAKE',
 'subaru',
 'chevrolet',
 'mazda',
 'toyota',
 'mitsubishi',
 'honda',
 'nissan',
 'dodge',
 'plymouth',
 'mazda',
 'mitsubishi',
 'dodge',
 'plymouth',
 'chevrolet',
 'toyota',
 'dodge',
 'honda',
 'toyota',
 'honda',
 'chevrolet',
 'nissan',
 'mitsubishi',
 'dodge',
 'plymouth',
 'mazda',
 'isuzu',
 'mazda',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'mitsubishi',
 'subaru',
 'nissan',
 'subaru',
 'honda',
 'toyota',
 'honda',
 'honda',
 'nissan',
 'nissan',
 'mazda',
 'subaru',
 'nissan',
 'subaru',
 'dodge',
 'plymouth',
 'mitsubishi',
 'toyota',
 'subaru',
 'volkswagen',
 'toyota',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'dodge',
 'plymouth',
 'volkswagen',
 'volkswagen',
 'nissan',
 'subaru',
 'toyota',
 'mitsubishi',
 'volkswagen',
 'toyota',
 'nissan',
 'toyota',
 'toyota',
 'mazda',
 'volkswagen',
 'mitsubishi',
 'toyota',
 'honda',
 'mazda',
 'dodge',
 'plymouth',
 'toyota',
 'nissan',
 'honda',
 'subaru',
 'toyota',
 'mitsubishi',
 'mitsubishi',
 'toyota',
 'vo

In [36]:
#Remove header row
header = cylData.first()
cylHPData= cylData.filter(lambda line: line != header)
cylHPData.collect()

[('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62'),
 ('mitsubishi', '68'),
 ('honda', '60'),
 ('nissan', '69'),
 ('dodge', '68'),
 ('plymouth', '68'),
 ('mazda', '68'),
 ('mitsubishi', '68'),
 ('dodge', '68'),
 ('plymouth', '68'),
 ('chevrolet', '70'),
 ('toyota', '62'),
 ('dodge', '68'),
 ('honda', '58'),
 ('toyota', '62'),
 ('honda', '76'),
 ('chevrolet', '70'),
 ('nissan', '69'),
 ('mitsubishi', '68'),
 ('dodge', '68'),
 ('plymouth', '68'),
 ('mazda', '68'),
 ('isuzu', '78'),
 ('mazda', '68'),
 ('nissan', '69'),
 ('honda', '76'),
 ('toyota', '62'),
 ('toyota', '70'),
 ('mitsubishi', '88'),
 ('subaru', '73'),
 ('nissan', '55'),
 ('subaru', '82'),
 ('honda', '76'),
 ('toyota', '70'),
 ('honda', '76'),
 ('honda', '76'),
 ('nissan', '69'),
 ('nissan', '69'),
 ('mazda', '68'),
 ('subaru', '82'),
 ('nissan', '69'),
 ('subaru', '73'),
 ('dodge', '68'),
 ('plymouth', '68'),
 ('mitsubishi', '102'),
 ('toyota', '70'),
 ('subaru', '82'),
 ('volkswagen', '52'),
 ('toyo

In [35]:
#Find average HP by Brand
#Add a count 1 to each record and then reduce to find totals of HP and counts
addOne = cylHPData.mapValues(lambda x: (x, 1))
addOne.collect()

[('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1)),
 ('honda', ('60', 1)),
 ('nissan', ('69', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('chevrolet', ('70', 1)),
 ('toyota', ('62', 1)),
 ('dodge', ('68', 1)),
 ('honda', ('58', 1)),
 ('toyota', ('62', 1)),
 ('honda', ('76', 1)),
 ('chevrolet', ('70', 1)),
 ('nissan', ('69', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('isuzu', ('78', 1)),
 ('mazda', ('68', 1)),
 ('nissan', ('69', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('62', 1)),
 ('toyota', ('70', 1)),
 ('mitsubishi', ('88', 1)),
 ('subaru', ('73', 1)),
 ('nissan', ('55', 1)),
 ('subaru', ('82', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('70', 1)),
 ('honda', ('76', 1)),
 ('honda', ('76', 1)),
 ('nissan', ('69', 1)),
 ('nissan', ('69', 1)),
 

In [37]:
brandValues= addOne \
    .reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), \
    x[1] + y[1])) 
brandValues.collect()

[('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('mitsubishi', (1353, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('saab', (760, 6)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('mercedes-benz', (1170, 8)),
 ('jaguar', (614, 3)),
 ('subaru', (1035, 12)),
 ('toyota', (2969, 32)),
 ('honda', (1043, 13)),
 ('isuzu', (168, 2)),
 ('volkswagen', (973, 12)),
 ('peugot', (1098, 11)),
 ('audi', (687, 6)),
 ('bmw', (1111, 8)),
 ('mercury', ('175', 1)),
 ('porsche', (764, 4))]

In [38]:
#find average by dividing HP total by count total
brandValues.mapValues(lambda x: int(x[0])/int(x[1])). \
    collect()

[('chevrolet', 62.666666666666664),
 ('mazda', 86.875),
 ('mitsubishi', 104.07692307692308),
 ('nissan', 102.55555555555556),
 ('dodge', 84.375),
 ('plymouth', 86.71428571428571),
 ('saab', 126.66666666666667),
 ('volvo', 128.0),
 ('alfa-romero', 125.33333333333333),
 ('mercedes-benz', 146.25),
 ('jaguar', 204.66666666666666),
 ('subaru', 86.25),
 ('toyota', 92.78125),
 ('honda', 80.23076923076923),
 ('isuzu', 84.0),
 ('volkswagen', 81.08333333333333),
 ('peugot', 99.81818181818181),
 ('audi', 114.5),
 ('bmw', 138.875),
 ('mercury', 175.0),
 ('porsche', 191.0)]

##   Advanced Spark : Accumulators & Broadcast Variables

In [39]:
#function that splits the line as well as counts sedans and hatchbacks
#Speed optimization

    
#Initialize accumulator
sedanCount = SpContext.accumulator(0)
hatchbackCount =SpContext.accumulator(0)

#Set Broadcast variable
sedanText=SpContext.broadcast("sedan")
hatchbackText=SpContext.broadcast("hatchback")

def splitLines(line) :

    global sedanCount
    global hatchbackCount

    #Use broadcast variable to do comparison and set accumulator
    if sedanText.value in line:
        sedanCount +=1
    if hatchbackText.value in line:
        hatchbackCount +=1
        
    return line.split(",")


#do the map
splitData=autoData.map(splitLines)

#Make it execute the map (lazy execution)
splitData.count()
print(sedanCount, hatchbackCount)

92 67


##   Advanced Spark : Partitions

In [40]:
collData.getNumPartitions()

#Specify no. of partitions.
collData=SpContext.parallelize([3,5,4,7,4],4)
collData.cache()
collData.count()

collData.getNumPartitions()

4