In [0]:
from pyspark import SparkFiles
class AirportPageRank:
        
    def __init__(self, inputFileLink, maxIter, alpha = 0.15):
        self.inputFileLink = inputFileLink
        self.maxIter = maxIter
        self.alpha = alpha
        
        self.loadFile()
        self.getNodeCount()
        self.getDanglingNodes()
        
        self.createNodePair()
    
    def loadFile(self):
        spark.sparkContext.addFile(self.inputFileLink)
        self.flightsDf = spark.read.csv("file://"+SparkFiles.get("Jan_2021.csv"), header = True, inferSchema = True)
        
    def getNodeCount(self):
        self.origin = self.flightsDf.select(['ORIGIN']).distinct()
        self.dest = self.flightsDf.select(['DEST']).distinct()
        totalNodes = self.origin.union(self.dest).distinct()
        self.N = totalNodes.count()
    
    def getDanglingNodes(self):
        dang = self.dest.subtract(self.origin)
        self.danglingNodes = dang.rdd.map(lambda x : (x.DEST, "no_dest"))
    
    def createNodePair(self):
        flights = self.flightsDf.select(['ORIGIN', 'DEST'])
        flightsRdd = flights.rdd
        # creating source destination pair
        self.flightSrcDest = flightsRdd.map(lambda row : (row.ORIGIN, row.DEST)) # maps over rdd
        
    def calculatePageRank(self):
        
        flightPageRank = self.flightSrcDest.map(lambda x : (x[0], (x[1], 10)))
        
        for iter in range(self.maxIter):
            
            # this represents the number of outgoing links for each node
            flightEdges = flightPageRank.map(lambda x : (x[0], 1)).reduceByKey(lambda x, y : x+y)
            
            rankEdge = flightPageRank.join(flightEdges)
            
            # page rank
            pageRank = rankEdge.map(lambda x : (x[1][0][0], x[1][0][1]/x[1][1])).reduceByKey(lambda x, y: x+y)
            
            # calculating sum of pagerank values of dangling nodes
            mass = sc.accumulator(0)
            pageRank.join(self.danglingNodes).foreach(lambda x : mass.add(x[1][0]))
            dangPageRank = pageRank.join(self.danglingNodes)
            dangMass = mass.value
            
            # calculating final PageRank value of this iteration
            G = self.N
            alphaTemp = self.alpha
            # we cannot use class variables directly in lambda function
            pageRank = pageRank.map(lambda x : (x[0], x[1]+(dangMass/G)))
            pageRank = pageRank.map(lambda x : (x[0], x[1]*(1-alphaTemp)))
            pageRank = pageRank.map(lambda x : (x[0], x[1]+(alphaTemp/G)))
    
            # replacing old page rank with new calculated page rank value
            flightPageRank = self.flightSrcDest.join(pageRank)
        
        # final calculated page after running all iterations
        finalPageRank = pageRank.sortBy(lambda x : -x[1])
        return finalPageRank

In [0]:
datasetUrl = "/Data_repository/main/Jan_2021.csv"
result = AirportPageRank(datasetUrl, 10)
outputRdd = result.calculatePageRank()
schema = ["airport_node", "page_rank_value"]
outputDf = spark.createDataFrame(outputRdd, schema)
display(outputDf)

airport_node,page_rank_value
ORD,45.316867928711964
DEN,42.57875579395185
DFW,33.58704461057069
ATL,33.41913218319272
ANC,30.808857366921195
LAX,28.75991050563562
PHX,28.71297550551396
BET,27.97994002305999
CLT,27.605799826542512
LAS,24.986110619862316


In [0]:
Output File link - "https://Data_repository/main/Assig2_Q1_output.csv"