In [1]:
#set up evironment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

In [3]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext("local","Filter app")
sqlc = SQLContext(sc)

In [4]:
import requests
import collections
main_url = "https://tdtu.edu.vn"
source = requests.get(url = main_url).text

In [5]:
from bs4 import BeautifulSoup
soup = BeautifulSoup(source,'html.parser')


In [6]:
def get_sub_urls(prefix):
  # d = collections.defaultdict(list) #hashmap with value = []
  for link in soup.find_all('a'):
    try:
      link = link.get('href')
      if prefix in link:
        yield link #make generator instead of list
    except:
      continue


def DFS(url, prefix, n):
  d = collections.defaultdict(list) #hashmap with value = []
  stack = [url]
  visited = set()
  try: #handling generator exit
    while stack and n>0:
      vertex = stack.pop() #current url
      d[vertex] = [sub_url for sub_url in get_sub_urls(prefix)]

      #prevent row > n
      if len(d[vertex]) > n:
        d[vertex] = d[vertex][:n]
      n-=len(d[vertex])

      if vertex in visited: #O(1) lookup
        continue
      # if d[vertex]: remove for deadend check
      yield vertex,d[vertex] #ex: (page,[list of it's sub_page])
  
      visited.add(vertex)
      for neighbor in d[vertex]:
        stack.append(neighbor)

  except GeneratorExit:
    print("clean up first")
    

In [8]:
prefix = 'tdtu.edu.vn'
n = 10000 #limit

tree = DFS(main_url,prefix,n) #completed graph
rdd = sc.parallelize(tree)\
        .flatMap(lambda n: [(n[0], x) for x in n[1]])\
        .distinct()

df = rdd.toDF(["Page","Successor"])
df.show()


+-------------------+--------------------+
|               Page|           Successor|
+-------------------+--------------------+
|https://tdtu.edu.vn|https://college.t...|
|https://tdtu.edu.vn|https://vfis.tdtu...|
|https://tdtu.edu.vn|http://it.tdtu.ed...|
|https://tdtu.edu.vn|http://pharmacy.t...|
|https://tdtu.edu.vn|http://feee.tdtu....|
|https://tdtu.edu.vn|http://aaf.tdtu.e...|
|https://tdtu.edu.vn|http://fss.tdtu.e...|
|https://tdtu.edu.vn|https://fas.tdtu....|
|https://tdtu.edu.vn|http://ssh.tdtu.e...|
|https://tdtu.edu.vn|http://civil.tdtu...|
|https://tdtu.edu.vn|http://laborrelat...|
|https://tdtu.edu.vn|http://law.tdtu.e...|
|https://tdtu.edu.vn|http://enlabsafe....|
|https://tdtu.edu.vn|http://ifa.tdtu.e...|
|https://tdtu.edu.vn|http://ffl.tdtu.e...|
|https://tdtu.edu.vn|http://fba.tdtu.e...|
|https://tdtu.edu.vn|http://finance.td...|
|https://tdtu.edu.vn|http://fms.tdtu.e...|
|https://tdtu.edu.vn|https://internati...|
|https://tdtu.edu.vn|https://incos.tdt...|
+----------

PageRank is an algorithm that is used by Google Search to rank websites in their search engine results. This algorithm iteratively updates a rank for each document by adding up contributions from documents that link to it. The algorithm can be summarized in the following steps -


*   Start each page at a rank of 1.
*   On each iteration, have page p contribute rank(p)/|neighbors(p)| to its neighbors.
*   Set each page's rank to 0.15 + 0.85 X contributions.

Src: https://github.com/ashishvshenoy/pagerank-spark





In [9]:
from operator import add
def computeContributes(urls, rank):
    for url in urls:
        yield (url, rank / len(urls))

lines = df.select(["Page","Successor"])\
            .rdd
#input format
#URL      neighbor
#URL      neighbor
#URL      neighbor

links = lines.groupByKey()
ranks = links.mapValues(lambda neigbor: 1.0)
for _ in range(50): #iteration = 50
    contribute = links.join(ranks).flatMap(
        lambda url_rank: computeContributes(url_rank[1][0], url_rank[1][1]))
    ranks = contribute.reduceByKey(add).mapValues(lambda rank: round(rank * 0.85 + 0.15,3))

ranks = ranks.toDF(["Page","PageRank"])
lines = lines.groupByKey()\
             .mapValues(list)\
             .map(lambda n: (n[0], n[1],len(n[1]), 0 if len(n[1])>0 else 1))\
             
lines = lines.toDF(["Page","Successor","Out-degree","Dead-ends"])
lines.join(ranks,on='Page').show()


+--------------------+--------------------+----------+---------+--------+
|                Page|           Successor|Out-degree|Dead-ends|PageRank|
+--------------------+--------------------+----------+---------+--------+
|https://raic.tdtu...|[https://college....|        45|        0|   0.234|
|https://science.t...|[https://college....|        45|        0|   0.234|
|https://admission...|[https://college....|        45|        0|   0.234|
|https://ecc.tdtu....|[https://college....|        45|        0|   0.234|
|https://science.t...|[https://college....|        45|        0|   0.234|
|https://vfis.tdtu...|[https://college....|        45|        0|   0.234|
|https://emas.tdtu...|[https://college....|        45|        0|   0.234|
|https://undergrad...|[https://college....|        45|        0|   0.234|
|https://science.t...|[https://college....|        45|        0|   0.234|
|https://nhatrang....|[https://college....|        45|        0|   0.234|
|https://clc.tdtu....|[https://college

In [10]:
sc.stop()