In [1]:
# !export JAVA_HOME=~/jdk1.8.0_211
# !export PATH="$JAVA_HOME/bin:$PATH"
#pyspark works only with java 1.8

In [2]:
!java -version

java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)


In [2]:
import pandas as pd
import numpy as np
import time
import csv
import json
import pyspark as py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f
import operator
import pagerankPySpark as PRpy
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName("SpamPageRank").config("spark.memory.fraction", 0.8).config("spark.executor.memory", "14g").config("spark.driver.memory", "128g").config("spark.sql.shuffle.partitions" , "800").config("spark.driver.maxResultSize", "2g").getOrCreate()

In [4]:
df_structure = StructType(fields= [StructField('score', IntegerType(), True), 
                                   StructField('clueWebID', StringType(), True)])
filename = "clueweb09spam_Fusion.csv"
spamScore = spark.read.csv(filename, sep = ' ', schema=df_structure) 

In the Waterloo Spam Score dataset we had 503,903,810 observation, and 150,955,774 of them had a spam score higher than 70. The reason is that they represent the 30% of the less spammier pages. 

In [5]:
df_structure = StructType(fields= [StructField('networkID', StringType(), True), 
                                   StructField('clueWebID', StringType(), True)])
filename = "ClueB-ID-DOCNO.txt"
mapping = spark.read.csv(filename, sep = '\t', schema=df_structure) #148148553 osservazioni

Now we introduced a new dataset, the clueweb09 id and their network id. This will be used to set the most spammier page as isolated nodes, and to relate the cluewebID (used in the Spam dataset) to the network id (used in the Network dataset).

In [6]:
mapping.count()#148148553

148148553

The mapping dataset has just 148,148,553 record, and particularly,

In [6]:
spamScore = spamScore.join(mapping, on = 'clueWebID')
notSpamDF = spamScore.filter(spamScore.score>=70) # 61,324,401

In [8]:
df_structure = StructType(fields= [StructField('GoingFrom', StringType(), True), 
                                   StructField('PointingTo', StringType(), True)])
filename = "ClueWeb09_50m_Network.csv"
networkDF = spark.read.csv(filename, sep = ' ', schema=df_structure) # 

Importing the dataset from lemur project, what we are going to do is to take just the nodes which are in the notSpamDf, so that have a spam score higher than 70 from both side, ingoing and outgoing, in order to leave spam node isolated.

In [9]:
networkDF1 = networkDF.join(notSpamDF, networkDF.GoingFrom == notSpamDF.networkID).select(col('clueWebID').alias('GoingFrom'), 'PointingTo') 
networkDFwithoutSPAM =  networkDF1.join(notSpamDF, networkDF1.PointingTo == notSpamDF.networkID).select('GoingFrom',col('clueWebID').alias('PointingTo')) 


As results we have 202,015,555 number of nodes in GoingFrom which are not categorized as SPAM, and 101,776,085 number of links in GoingFrom that are not in SPAM, but we are going to co


In [3]:
# networkWithoutSPAM = networkDFwithoutSPAM.toPandas()
# networkWithoutSPAM.to_csv('networkWithoutSPAM.csv', index=False, header = None, sep = " ")

In [None]:
#nodes = mapping.select('clueWebID').toPandas()
#nodes.to_csv('nodes.csv', index=False, header = None)

In [4]:
alpha = 0.85
max_iter = 100
tol = 1e-6
filename = 'networkWithoutSPAM.csv'
nodesFilename = 'nodes.csv'
numNodes = 148148553

In [11]:
outgoingLink, incomingLink, score, iteration = PRpy.networkDataComputation(spark, filename, nodesFilename, alpha, max_iter, tol, number_of_nodes = numNodes)#37 iteration

Starting Network Analysis: after this computation we will have for each nodes the Number of Outgoing and Incoming Link and the pagerank score
Computing the number of Incoming Links
Computing the number of Outgming Links
Starting with the pagerank Computations. Expected 42 iteration


In [6]:
rankPySpark = score.toDF()
pandasDF = rankPySpark.toPandas()
pandasDF.columns = ["clueWebID","PR_Score"]


Just check if we have the right number of nodes

In [11]:
incomingLink.count()

148147604

In [12]:
outgoingLink.count()

148147604

In [12]:
incoming_outcoming = incomingLink.join(outgoingLink)

In [8]:
# incomingLinkDF = incomingLink.toDF()
incomingLink = incomingLinkDF.toPandas()
incomingLink.columns = ["ClueWebID","IncomingLink"]
incomingLink.to_csv("SpamIncomingLink.csv", index = False)

KeyboardInterrupt: 

In [9]:
incomingLink.count()

148147604

In [None]:
outgoingLinkDF = outgoingLink.toDF()
outgoingLink = outgoing.toPandas()
outgoingLink.columns = ["ClueWebID","OutgoingLink"]
outgoingLink.to_csv("SpamOutgoingLink.csv", index = False)

In [None]:
pandasDF.to_csv("SpamPageRankScore.csv", index = False)