In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lit, sum as spark_sum

In [2]:
import os
import json
from dotenv import dotenv_values

import pandas as pd
from tqdm import tqdm

In [3]:
config = dotenv_values("../.env")

In [4]:
config.items()

odict_items([('SPLIT_DATASET_DIR', '/dataset/splitupDataset'), ('LOG_DIR', '/Crawler/logs'), ('OUTPUT_DIR', '../Crawler/output'), ('SCRAPED_DATA', '../Crawler/output')])

In [5]:
spark = SparkSession.builder.appName("PageRank").getOrCreate()

24/05/19 12:57:48 WARN Utils: Your hostname, sunaam-pc resolves to a loopback address: 127.0.1.1; using 10.7.94.48 instead (on interface wlx001ea643c8cb)
24/05/19 12:57:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/19 12:57:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
class PageNode:
    def __init__(self, url, forwardLinks):
        self.url = url
        self.forwardLinks = forwardLinks
    def getUrl(self):
        return self.url
    def getOutGoingLinks(self):
        return self.forwardLinks
    
    def __str__(self):
        return f"{self.url}\t=>\t{self.forwardLinks}\n"
    def __repr__(self):
        return f"{self.url}\t=>\t{self.forwardLinks}\n"

In [7]:
pageNodes = {}

dataFiles = os.listdir(config["SCRAPED_DATA"])
for file in (dataFiles):
    with open(os.path.join(config["SCRAPED_DATA"], file)) as f:
        edgeData = json.load(f)
        pageNodes[edgeData['url']]=edgeData['forwardLinks']
        
print(pageNodes)


{'https://wordpress.com': ['https://wpvip.com', 'https://apps.wordpress.com', 'https://www.bluehost.com'], 'https://123moviesfree.app': [], 'https://10wontips.blogspot.com': ['https://www.blogger.com'], 'https://18jp.fun': [], 'https://hinkhoj.com': [], 'https://dribbble.com': ['http://twitter.com', 'http://facebook.com', 'http://instagram.com', 'http://www.pinterest.com'], 'https://1917.co.nz': ['https://www.facebook.com', 'https://www.instagram.com'], 'https://123moviesfree.icu': ['https://123moviesplay.cc'], 'https://1push.io': [], 'https://1miba.com': ['https://guanliqi.zhiouwang.com', 'https://vr.zhiouwang.com', 'https://zhichan.zhiouwang.com', 'https://wpa1.qq.com'], 'https://012mobile.co.il': [], 'https://123movies.ski': [], 'https://issuu.com': [], 'https://1campus.net': [], 'https://0xprocessing.com': [], 'https://1337.no': [], 'https://123clickz.com': [], 'https://144tehranpharmacy.com': [], 'https://113366.com': [], 'https://1024.no': [], 'https://theguardian.com': ['https:/

In [8]:
vertexGraph = spark.createDataFrame(
    [(value,) for value in pageNodes.keys()]
,["vertices"])
vertexGraph.show()

# vertexGraph = spark.createDataFrame(
#     pd.DataFrame({
#         "vertices":['a','b','c','d']
#     })
# )
# vertexGraph.show()

                                                                                

+--------------------+
|            vertices|
+--------------------+
|https://wordpress...|
|https://123movies...|
|https://10wontips...|
|    https://18jp.fun|
| https://hinkhoj.com|
|https://dribbble.com|
|  https://1917.co.nz|
|https://123movies...|
|    https://1push.io|
|   https://1miba.com|
|https://012mobile...|
|https://123movies...|
|   https://issuu.com|
| https://1campus.net|
|https://0xprocess...|
|     https://1337.no|
|https://123clickz...|
|https://144tehran...|
|  https://113366.com|
|     https://1024.no|
+--------------------+
only showing top 20 rows



In [9]:
edgeData = []
for key, item in pageNodes.items():
    for it in item:
        edgeData.append(Row(source=key, dest=it))

edgesSchema = StructType([StructField("source", StringType(), True),
                     StructField("dest", StringType(), True),]
                     )

edgeGraph = spark.createDataFrame(edgeData, schema=edgesSchema)
edgeGraph.show()

# edgeGraph = spark.createDataFrame(
#     pd.DataFrame({
#     "source":['a','b','b','c','d'],
#     "dest":['b','c','d','a','b']
#     })
# )
# edgeGraph.show()

+--------------------+--------------------+
|              source|                dest|
+--------------------+--------------------+
|https://wordpress...|   https://wpvip.com|
|https://wordpress...|https://apps.word...|
|https://wordpress...|https://www.blueh...|
|https://10wontips...|https://www.blogg...|
|https://dribbble.com|  http://twitter.com|
|https://dribbble.com| http://facebook.com|
|https://dribbble.com|http://instagram.com|
|https://dribbble.com|http://www.pinter...|
|  https://1917.co.nz|https://www.faceb...|
|  https://1917.co.nz|https://www.insta...|
|https://123movies...|https://123movies...|
|   https://1miba.com|https://guanliqi....|
|   https://1miba.com|https://vr.zhiouw...|
|   https://1miba.com|https://zhichan.z...|
|   https://1miba.com| https://wpa1.qq.com|
|https://theguardi...|https://www.wordi...|
|https://linkedin.com|https://about.lin...|
|https://linkedin.com|https://press.lin...|
|https://linkedin.com|https://blog.link...|
|https://linkedin.com|https://de

In [10]:
def pagerank(vertexInfo, edgeInfo, iterations=3, tolerance=1e-06, damping=0.8):
    # Total vertices
    num_vertices = vertexInfo.count()
    initial_rank = 1.0 / num_vertices
    # Initialize with 1/N ranking
    ranks = vertexGraph.withColumn("rank", lit(initial_rank)).withColumnRenamed("vertices", "page")
    # rankGraph.show()

    print(f"Vertices: {num_vertices}, Initial Rank: {initial_rank}")
        
    for _ in tqdm(range(iterations)):
        # Get the Rankings
        contribs = edgeInfo.join(ranks, edgeInfo.source == ranks.page).drop("page")
        contribs.show()
        # 
        # Get the (total outgoing links)    
        counts = spark.createDataFrame(contribs.groupBy("source").count().collect())
        contribs = contribs.join(counts, ['source'])
        # contribs.show()
        
        # Get link value (1 outgoing link)/(total outgoing links)
        contribs = contribs.withColumn("Contributions", col("rank")/col("count"))
        # contribs.show()
        
        # Get sum of link values of incoming links for a dest node
        aggregated_contribs = contribs.groupBy("dest").agg(spark_sum("Contributions").alias("sum_contrib"))
        # aggregated_contribs.show()
        
        ranks = aggregated_contribs.withColumn("rank", lit((1 - damping) / num_vertices) +
                                                damping * col("sum_contrib")).select(col("dest").alias("page"), "rank")
        ranks.show()
        
    ranks = vertexGraph.join(ranks, vertexGraph.vertices == ranks.page, how="left").select(vertexGraph.vertices, ranks.rank).na.fill((1 - damping) / num_vertices, ["rank"])

    return ranks

pagerank(vertexInfo=vertexGraph, edgeInfo=edgeGraph).show()

Vertices: 1085, Initial Rank: 0.0009216589861751152


  0%|          | 0/3 [00:00<?, ?it/s]

+--------------------+--------------------+--------------------+
|              source|                dest|                rank|
+--------------------+--------------------+--------------------+
| https://state.mn.us|https://www.sos.s...|9.216589861751152E-4|
| https://state.mn.us|https://www.reven...|9.216589861751152E-4|
| https://state.mn.us|https://drive.mn.gov|9.216589861751152E-4|
| https://state.mn.us|      https://mn.gov|9.216589861751152E-4|
|    https://1728.org|https://www.nist.gov|9.216589861751152E-4|
|    https://1728.org| http://www.nist.gov|9.216589861751152E-4|
|    https://1728.org|http://www.timean...|9.216589861751152E-4|
|    https://1728.org|https://www.faceb...|9.216589861751152E-4|
|    https://1728.org|https://www.twitt...|9.216589861751152E-4|
|    https://1728.org|https://www.freef...|9.216589861751152E-4|
|    https://1728.org|http://www.standu...|9.216589861751152E-4|
| https://10short.com|https://api.whats...|9.216589861751152E-4|
|https://10wontips...|htt

 33%|███▎      | 1/3 [00:02<00:05,  2.96s/it]

+--------------------+--------------------+
|                page|                rank|
+--------------------+--------------------+
|https://fandom.ze...|2.060178910273786...|
|https://www.lineb...|2.060178910273786...|
|https://www.pinte...|2.410492733073377...|
|https://xmljatsre...|4.301075268817204E-4|
|https://www.netvi...|5.529953917050691E-4|
| https://mochajs.org|2.304147465437787...|
|https://scienceso...|2.138248847926267E-4|
| http://facebook.com|4.608294930875575...|
|https://bigzip.11...|2.764976958525345E-4|
|https://api.stack...|2.073732718894008...|
|     https://qiwa.sa|2.053982883475970...|
|https://unabridge...|3.072196620583717E-4|
|https://stu.17zwd...|2.662570404505888E-4|
|http://www.kavame...|9.216589861751152E-4|
|https://www.nxt.s...|2.580645161290322E-4|
|http://www.histor...|1.950176985240098...|
|https://saudiares...|3.317972350230414...|
|https://help.gymg...|2.896642527978933E-4|
|https://desktop.l...|2.060178910273786...|
|http://www.uclh.n...|1.90003544

                                                                                

+--------------------+--------------------+--------------------+
|              source|                dest|                rank|
+--------------------+--------------------+--------------------+
|  https://www.nhs.uk|https://volunteer...|1.900035448422544...|
|  https://www.nhs.uk|https://www.organ...|1.900035448422544...|
|  https://www.nhs.uk|http://www.blood....|1.900035448422544...|
|https://wordpress...|https://www.blueh...|0.002887864823348...|
|https://wordpress...|https://apps.word...|0.002887864823348...|
|https://wordpress...|   https://wpvip.com|0.002887864823348...|
|https://steamcomm...|https://help.stea...|4.301075268817204E-4|
|https://steamcomm...|https://store.ste...|4.301075268817204E-4|
|     https://line.me|https://www.faceb...|0.001167434715821...|
|     https://line.me|https://www.youtu...|0.001167434715821...|
|     https://line.me|https://apps.micr...|0.001167434715821...|
|     https://line.me|      https://lin.ee|0.001167434715821...|
|     https://line.me|htt

24/05/19 12:58:03 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
 67%|██████▋   | 2/3 [00:06<00:03,  3.46s/it]                                   

+--------------------+--------------------+
|                page|                rank|
+--------------------+--------------------+
|https://www.lineb...|2.118008493720068E-4|
|https://desktop.l...|2.118008493720068E-4|
| https://github.blog|2.241162297263278...|
|https://resources...|2.241162297263278...|
|https://store.ste...|3.563748079877111...|
|https://linefinan...|2.118008493720068E-4|
|https://chirashi....|2.118008493720068E-4|
|https://www.linef...|2.118008493720068E-4|
|https://developer...|2.211981566820276E-4|
|https://line.work...|2.118008493720068E-4|
|https://blog.link...|2.211981566820276E-4|
|https://docs.gith...|2.241162297263278...|
|https://apps.micr...|2.118008493720068E-4|
|https://pop2.game...|2.118008493720068E-4|
|  https://ec.line.me|2.118008493720068E-4|
|https://about.lin...|2.211981566820276E-4|
|https://manga.lin...|2.118008493720068E-4|
|https://reward.li...|2.118008493720068E-4|
| https://pay.line.me|2.118008493720068E-4|
|https://www.faceb...|2.51585281

                                                                                

+------+----+----+
|source|dest|rank|
+------+----+----+
+------+----+----+



 67%|██████▋   | 2/3 [00:09<00:04,  4.70s/it]                                   


PySparkValueError: [CANNOT_INFER_EMPTY_SCHEMA] Can not infer schema from empty dataset.