In [1]:
# Import relevant packages
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from Utils import Utils

In [2]:
conf = SparkConf().setAppName('pairRDDExamples').setMaster("local[*]")   # It will run all the available cores on local cpu
sc = SparkContext(conf=conf)

In [None]:
listOfTuples = [('Lily', 23),('Jack', 29),('Mary', 29),('James', 8)] 
pairRDD = sc.parallelize(listOfTuples) 
outFolder = "pairRDDfromTupleList"
pairRDD.coalesce(1).saveAsTextFile(outFolder)    # Coalesce returns a new RDD reduced into numPartitions (passed as argument) partitions

In [None]:
# turn a regular RDD to pair-RDD
inputStrings =  ['Lily 23', 'Jack 29', 'Mary 29', 'James 8']
regularRDD = sc.parallelize(inputStrings)
pairRDDs = regularRDD.map(lambda s: (s.split(" ")[0], s.split(" ")[1]))  #splits and converts the elements of string-list into tuples
outFolder = "pairRDDfromRDD"
pairRDDs.coalesce(1).saveAsTextFile(outFolder)

In [None]:
# Tranformations on Pair RDDs

# Filter transformation
filePath = 'e:\\Eskills-Academy-projects\\python-spark-tutorial-master\\in\\'
fileName = "airports.text"
textFile = filePath + fileName
airportsRDD = sc.textFile(textFile)
airportsPairRDD = airportsRDD.map(lambda line: (Utils.COMMA_DELIMITER.split(line)[1],
                                                Utils.COMMA_DELIMITER.split(line)[3]))

airportsNotInUSA = airportsPairRDD.filter(lambda keyValue: keyValue[1] != "\"United States\"")
outFolder = "airportsNotInUSA"

airportsNotInUSA.saveAsTextFile(outFolder)

In [None]:
#
upperCase = airportsPairRDD.mapValues(lambda countryName: countryName.upper())
upperCase.saveAsTextFile("airportsUpperCase.text")


In [None]:
# transformation: reduceByKey()
filePath = 'file:///E:/Eskills-Academy-projects/python-spark-tutorial-master/in/'
fileName = "word_count.text"
textFile = filePath + fileName
lines = sc.textFile(textFile)

wordRDD = lines.flatMap(lambda line: line.split(" "))
wordPairRDD = wordRDD.map(lambda word: (word, 1))
wordCounts = wordPairRDD.reduceByKey(lambda x, y: x + y)
sortedWordCount =wordCounts.sortBy(lambda wordCounts: wordCounts[1], ascending=False)

for word, count in wordCounts.collect():
    print("{}:{}".format(word, count))

In [None]:
filePath = 'file:///E:/Eskills-Academy-projects/python-spark-tutorial-master/in/'
fileName = "RealEstate.csv"
textFile = filePath + fileName
lines = sc.textFile(textFile)
linesWithoutHeader = lines.filter(lambda line: "Bedrooms" not in line)
BedRoomCountIx, PriceIx = 3, 2   # Given in the csv file

class AvgCount():
    def __init__(self, count: int, total: float) -> None:
        self.count = count
        self.total = total

housePricePairRdd = linesWithoutHeader.map(lambda line: ((int(float(line.split(",")[BedRoomCountIx]))),
                                                                  AvgCount(1, float(line.split(",")[PriceIx]))))

housePriceTotal = housePricePairRdd.reduceByKey(lambda x, y: AvgCount(x.count+y.count, x.total+y.total))
housePriceAvg = housePriceTotal.mapValues(lambda avgCount: avgCount.total / avgCount.count)
sortedHousingPriceAvg = housePriceAvg.sortByKey(ascending=False)
sortedHousingPriceAvg.take(10)

In [None]:
# Group by key
filePath = 'file:///E:/Eskills-Academy-projects/python-spark-tutorial-master/in/'
fileName = "airports.text"
textFile = filePath + fileName
lines = sc.textFile(textFile)

countryAndAirportNamePair = lines.map(lambda airport:
                                     (Utils.COMMA_DELIMITER.split(airport)[3],
                                      Utils.COMMA_DELIMITER.split(airport)[1]))

airportsByCountry = countryAndAirportNamePair.groupByKey()
# airportsByCountry.take(6)
for country, airportName in airportsByCountry.collectAsMap().items():
    print("{}:{}".format(country, list(airportName)))

In [3]:
# Join operations
ages = sc.parallelize([("Tom", 29), ("John", 22)])
addresses = sc.parallelize([("James", "USA"),("John", "UK")])
join = ages.join(addresses)
leftOuterJoin = ages.leftOuterJoin(addresses)
rightOuterJoin = ages.rightOuterJoin(addresses)
fullOuterJoin = ages.fullOuterJoin(addresses)


In [4]:
join.collect()[:]


[('John', (22, 'UK'))]

In [5]:
leftOuterJoin.collect()[:]


[('John', (22, 'UK')), ('Tom', (29, None))]

In [6]:
rightOuterJoin.collect()[:]


[('John', (22, 'UK')), ('James', (None, 'USA'))]

In [7]:
fullOuterJoin.collect()[:]

[('John', (22, 'UK')), ('James', (None, 'USA')), ('Tom', (29, None))]