In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext("local", "Filter app")
sqlc = SQLContext(sc)

sqlc.sql("CREATE DATABASE IF NOT EXISTS BIGDATA")

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse

page = 'https://tdtu.edu.vn/'
domain = 'tdtu.edu.vn'
limit = 1000

pages = set() | {page}
count = 0

def run(page):
  print(page)
  global pages, count

  data = []

  pageParse = urlparse(page)

  # các Successor đã tồn tại thuộc 1 Page
  successors = set()

  # lấy source từ page
  resp = requests.get(page)

  # parser sourse thô sang html
  soup = BeautifulSoup(resp.content, "html.parser")

  # lấy các thẻ a có bắt đầu bằng "/" hoặc "https://en.wikipedia.org"
  links = soup.select('a[href^="/"], a[href^="https://"]')

  for link in links:
    # lấy giá trị thuộc tính của thẻ
    successor = link['href']

    # hoàn chỉnh successor
    if (successor.find('//') == 0):
      successor = pageParse.scheme + ':' + successor
    if (successor.find('/') == 0):
      successor = pageParse.scheme + '://' + pageParse.netloc + successor

    # kiểm tra prefix và domain
    successorParse = urlparse(successor)
    successor = successorParse.scheme + '://' + successorParse.netloc + successorParse.path

    if (domain not in successorParse.netloc):
      continue
    # if (page == successor):
    #   continue

    if (successor not in successors): # kiểm tra Successor đã tồn tại chưa

      # kiểm tra số lượng dữ liệu
      count += 1
      print(count)
      if (count > limit):
        if len(data) > 0:
          df = sqlc.createDataFrame(data,['Page', 'Successor'])
          df.write.mode("append").save("BIGDATA.parquet", format="parquet")
        return

      # lưu trữ Page, Successor
      data.append((page, successor))
      if (len(data) > 9):
        df = sqlc.createDataFrame(data,['Page', 'Successor'])
        df.write.mode("append").save("BIGDATA.parquet", format="parquet")
        data = []
      successors |= {successor}

    if (successor not in pages): # kiểm tra Page đã tồn tại chưa
      # lưu trữ pages
      pages |= {successor}

      # tiếp tục với Page mới là Successor
      run(successor)
  
  if len(data) > 0:
    df = sqlc.createDataFrame(data,['Page', 'Successor'])
    df.write.mode("append").save("BIGDATA.parquet", format="parquet")
    data = []

run(page)

In [None]:
df = sqlc.read.parquet("BIGDATA.parquet")

df.registerTempTable("PageRank");

In [None]:
data = sqlc.sql("\
  SELECT pr.*, od.OutDegree, de.DeadEnds \
  FROM PageRank pr \
  INNER JOIN ( \
    SELECT Page, COUNT(*) as OutDegree \
    FROM PageRank GROUP BY Page\
  ) od ON od.Page = pr.Page \
  INNER JOIN ( \
    SELECT DISTINCT s.Successor, (CASE WHEN p.Page IS NULL THEN 1 ELSE 0 END) AS DeadEnds \
    FROM PageRank s \
    LEFT JOIN PageRank as p ON p.Page = s.Successor \
  ) de ON de.Successor = pr.Successor \
")

data.show()

+--------------------+--------------------+---------+--------+
|                Page|           Successor|OutDegree|DeadEnds|
+--------------------+--------------------+---------+--------+
|https://tdtu.edu....|https://tdtu.edu....|       42|       0|
|https://tdtu.edu....|https://tdtu.edu....|       42|       0|
|https://tdtu.edu....|https://tdtu.edu....|       49|       0|
|https://tdtu.edu....|https://tdtu.edu....|       46|       0|
|https://tdtu.edu....|https://tdtu.edu....|       46|       0|
|https://tdtu.edu....|https://tdtu.edu....|       21|       0|
|https://tdtu.edu....|https://tdtu.edu....|       33|       0|
|https://tdtu.edu....|https://tdtu.edu....|       49|       0|
|https://tdtu.edu....|https://tdtu.edu....|       43|       0|
|https://tdtu.edu....|https://tdtu.edu....|       43|       0|
|https://tdtu.edu....|https://tdtu.edu....|       33|       0|
|https://tdtu.edu....|https://tdtu.edu....|       35|       0|
|https://tdtu.edu....|https://tdtu.edu....|       35|  

In [None]:
Page = sqlc.sql("\
  SELECT Page \
  FROM ( \
    SELECT DISTINCT Page \
    FROM PageRank pr \
    UNION \
    SELECT DISTINCT Successor AS Page \
    FROM PageRank pr \
  ) \
")

mapping = dict()
mapping_c = dict()

each_len_expect = 256
n_splits = limit // each_len_expect
each_len = limit // n_splits

i = 0
copy_df = Page
for i in range(n_splits):
  temp_df = copy_df.limit(each_len)

  copy_df = copy_df.subtract(temp_df)

  for row in temp_df.collect():
    mapping[row.Page] = str(i)
    mapping_c[str(i)] = row.Page
    i += 1

n = len(mapping)
mapping

In [None]:
tmp = sqlc.sql("\
  SELECT Page, Successor \
  FROM PageRank \
")

mappingDF = tmp.na.replace(mapping)
mappingDF.show(truncate=False)

+----+---------+
|Page|Successor|
+----+---------+
|35  |22       |
|41  |35       |
|79  |86       |
|79  |86       |
|35  |35       |
|35  |22       |
|22  |35       |
|22  |22       |
|22  |51       |
|91  |45       |
|91  |53       |
|91  |85       |
|91  |92       |
|91  |80       |
|22  |35       |
|22  |51       |
|13  |48       |
|13  |87       |
|13  |82       |
|13  |5        |
+----+---------+
only showing top 20 rows



In [None]:
import numpy as np

matrix = np.zeros([n, n]) 

each_len_expect = 256
n_splits = limit // each_len_expect
each_len = limit // n_splits

copy_df = mappingDF
for i in range(n_splits):
  temp_df = copy_df.limit(each_len)

  copy_df = copy_df.subtract(temp_df)

  for coor in temp_df.collect():
    matrix[(int(coor['Page']), int(coor['Successor'])) ] = 1
    
matrix = np.transpose(matrix)
matrix

array([[1., 0., 1., ..., 0., 1., 1.],
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 1., 0.],
       [0., 0., 1., ..., 0., 1., 1.]])

In [None]:
def findM(G, N):
  tmp = [] # tmp === r_tmp_new_j
  for col_i in range(N):
    
      col = G[:, col_i]

      if(np.sum(col) > 0): #no deadend
        #divide r_i to number of out degree (number of values != 0)
        out_dgs = (col > 0).sum()
        tmp.append(col/out_dgs)
      else: #deadend
        tmp.append(np.array([0]*N))

  return np.array(tmp).T #stochastic matrix with prob.

def gg_pagerank(G, b, N):
  r_j_old = np.array([ 1/N ]*N).T
  r_j_new = np.array([ 0 ]*N).T

  thresh_hold = 10**-8

  ### stochastic matrix with prob in gg algo
  M = findM(G,N)*b

  ### leaked
  leaked = (1-b)/N

  ### begin iteration
  while np.sum((np.absolute(r_j_new - r_j_old))) >= thresh_hold:
    
    ###update to exit while
    r_j_old = r_j_new
    r_j_new = M.dot(r_j_old) + leaked
    
    ###normalized
    if(np.sum(r_j_new.T) < 1):
      tmp = [r_j_new.T[i]/np.sum(r_j_new.T) for i in range(N)]
      r_j_new = np.array(tmp).T

  return r_j_new

In [None]:
res = gg_pagerank(matrix,.8,n)
print(res)
np.sum(res)

[0.00368081 0.0071864  0.00230333 0.00247548 0.01088685 0.01473154
 0.00410872 0.00226929 0.00287592 0.0027744  0.002828   0.00341438
 0.00225424 0.00215663 0.00231877 0.00313648 0.0091336  0.00290134
 0.00266387 0.01107566 0.00408479 0.00256076 0.00365177 0.00225424
 0.00290267 0.00220751 0.00219874 0.00239263 0.00230333 0.00322529
 0.00215663 0.06155997 0.00237675 0.0020291  0.00257272 0.00409823
 0.00219876 0.00215663 0.00273058 0.00265579 0.00220711 0.00377502
 0.00222691 0.00558055 0.00218046 0.00376977 0.00664908 0.00256076
 0.07822604 0.00830231 0.00215663 0.00319515 0.00248289 0.00706425
 0.00220798 0.00237675 0.00218137 0.00218181 0.00999907 0.00223908
 0.00241914 0.00309609 0.11045984 0.0020291  0.00334177 0.01319261
 0.00219876 0.00403526 0.00221697 0.01070447 0.00215663 0.0024547
 0.00869227 0.03302918 0.05715121 0.00223908 0.05122504 0.03171036
 0.00215045 0.00273468 0.00215045 0.0020291  0.07761819 0.06156488
 0.00264699 0.00215045 0.00235394 0.1009051  0.00220711 0.00522

0.9999999999999998

In [None]:
ranksDF = sqlc.createDataFrame([(mapping_c[str(i)], str(res[i])) for i in range(n)], ['Page', 'PageRank'])
ranksDF.show(10, truncate=False)

+---------------------------------------------------------------------------------------------+---------------------+
|Page                                                                                         |PageRank             |
+---------------------------------------------------------------------------------------------+---------------------+
|https://tdtu.edu.vn/en/about/mission-vision-quality-policy                                   |0.003680807587862185 |
|https://tdtu.edu.vn/nghien-cuu                                                               |0.007186403744435274 |
|https://tdtu.edu.vn/en/about/department/department-for-facility-management                   |0.0023033343046450143|
|https://college.tdtu.edu.vn/user/password                                                    |0.002475479907157273 |
|https://college.tdtu.edu.vn/tuyensinh                                                        |0.010886852029395913 |
|https://college.tdtu.edu.vn/tin-tuc                    