In [1]:
# installing spark in colab and creating spark session

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

In [6]:
from pyspark import SparkFiles
class AirportPageRank:
        
    def __init__(self, inputFileLink, maxIter, alpha = 0.15):
        self.inputFileLink = inputFileLink
        self.maxIter = maxIter
        self.alpha = alpha
        
        self.loadFile()
        self.getNodeCount()
        self.getDanglingNodes()
        
        self.createNodePair()
    
    def loadFile(self):
        spark.sparkContext.addFile(self.inputFileLink)
        self.flightsDf = spark.read.csv("file://"+SparkFiles.get("Airports_Jan_2022.csv"), header = True, inferSchema = True)
        
    def getNodeCount(self):
        self.origin = self.flightsDf.select(['ORIGIN']).distinct()
        self.dest = self.flightsDf.select(['DEST']).distinct()
        totalNodes = self.origin.union(self.dest).distinct()
        self.N = totalNodes.count()
    
    def getDanglingNodes(self):
        dang = self.dest.subtract(self.origin)
        self.danglingNodes = dang.rdd.map(lambda x : (x.DEST, "no_dest"))
    
    def createNodePair(self):
        flights = self.flightsDf.select(['ORIGIN', 'DEST'])
        flightsRdd = flights.rdd
        # creating source destination pair
        self.flightSrcDest = flightsRdd.map(lambda row : (row.ORIGIN, row.DEST)) # maps over rdd
        
    def calculatePageRank(self):
        
        flightPageRank = self.flightSrcDest.map(lambda x : (x[0], (x[1], 10)))
        
        for iter in range(self.maxIter):
            
            # this represents the number of outgoing links for each node
            flightEdges = flightPageRank.map(lambda x : (x[0], 1)).reduceByKey(lambda x, y : x+y)
            
            rankEdge = flightPageRank.join(flightEdges)
            
            # page rank
            pageRank = rankEdge.map(lambda x : (x[1][0][0], x[1][0][1]/x[1][1])).reduceByKey(lambda x, y: x+y)
            
            # calculating sum of pagerank values of dangling nodes
            mass = sc.accumulator(0)
            pageRank.join(self.danglingNodes).foreach(lambda x : mass.add(x[1][0]))
            dangPageRank = pageRank.join(self.danglingNodes)
            dangMass = mass.value
            
            # calculating final PageRank value of this iteration
            G = self.N
            alphaTemp = self.alpha
            # we cannot use class variables directly in lambda function
            pageRank = pageRank.map(lambda x : (x[0], x[1]+(dangMass/G)))
            pageRank = pageRank.map(lambda x : (x[0], x[1]*(1-alphaTemp)))
            pageRank = pageRank.map(lambda x : (x[0], x[1]+(alphaTemp/G)))
    
            # replacing old page rank with new calculated page rank value
            flightPageRank = self.flightSrcDest.join(pageRank)
        
        # final calculated page after running all iterations
        finalPageRank = pageRank.sortBy(lambda x : -x[1])
        return finalPageRank

In [7]:
datasetUrl = "/content/Airports_Jan_2022.csv"
result = AirportPageRank(datasetUrl, 10)
outputRdd = result.calculatePageRank()
schema = ["airport_node", "page_rank_value"]
outputDf = spark.createDataFrame(outputRdd, schema)

In [8]:
outputDf.take(10)

[Row(airport_node='ORD', page_rank_value=47.250122427267016),
 Row(airport_node='DEN', page_rank_value=43.218340813925046),
 Row(airport_node='ATL', page_rank_value=37.34263691741187),
 Row(airport_node='ANC', page_rank_value=31.682317826385457),
 Row(airport_node='LAS', page_rank_value=29.669637219743297),
 Row(airport_node='LAX', page_rank_value=29.210093861252542),
 Row(airport_node='DFW', page_rank_value=28.71073282289035),
 Row(airport_node='IAH', page_rank_value=28.068463479449694),
 Row(airport_node='PHX', page_rank_value=26.550432320607655),
 Row(airport_node='BET', page_rank_value=25.511383054450203)]