In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array, lit, udf, collect_list, size, explode
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.types import ArrayType, FloatType, BooleanType, StructType, StructField, IntegerType
import random
import numpy

In [None]:
## LIT
# https://stanford.edu/~rezab/dao/notes/Partitioning_PageRank.pdf

In [None]:
def create_random_graph(size):
    r_graph = [(random.randint(1, size), random.randint(1, size))
               for i in range(size)]
    # a tenth of edges are edges to None
    r_graph += [(random.randint(size+1, size*2), None)
                for i in range(int(size/10))]
    return r_graph


def rank_dist(link_list, rank):
    # link_list contains the node IDs this node has edges to
    # rank is the current rank of the node
    len_link_list = len(link_list)

    if len_link_list > 0:
        rank = rank / len_link_list
        r_list = [(x, rank) for x in link_list]
    else:
        # can't emit None so if there are no known links -1 is emited instead
        r_list = [(-1, rank)]

    return r_list


inner_schema = StructType([
    StructField("uri_id", IntegerType(), False),
    StructField("rank", FloatType(), False)
])

ranks_dist_udf = udf(rank_dist, ArrayType(inner_schema))

In [None]:
def run(edges_df, alpha=0.15, convergence=0.01):
    """
    The function takes an spark dataframe as `edges_df` containing all edges (source to destination)
    of the graph and calculates a pagerank value for all sources (nodes). It runs until `convergence` is achieved
    It can deal with destinations not found as sources and with dangling edges. 
    The dataframe needs to be formated as follows with `src` and `dst` as the column names

    +---+----+
    |src| dst|
    +---+----+
    |  1|   1|
    |  1|   3|
    |  3|   4|
    |  2|   5|
    |  6|null|
    +---+----+

    alpha defaults to 0.15 and convergence to 0.01
    """

    ranks_df = edges_df\
        .groupby('src')\
        .agg(collect_list('dst'))\
        .withColumnRenamed('src', 'uri_id')\
        .withColumnRenamed('collect_list(dst)', 'dst_list')

    ranks_df = ranks_df.withColumn('rank', lit(1.0))

    n_nodes = ranks_df.count()
    i = 0

    while True:

        print(f'##### Itteration:\t{i} #####')
        # chaching the dataframe in the beginning makes the pagerank faster
        ranks_df.cache()

        # first we distribute the current rank to all the linked nodes
        ranks_one_df = ranks_df.withColumn(
            'link_map_pr', ranks_dist_udf('dst_list', 'rank'))
        ranks_one_df = ranks_one_df.select(
            explode('link_map_pr').alias('exploded'))
        ranks_one_df = ranks_one_df\
            .withColumn('dst_id', ranks_one_df['exploded'].getItem('uri_id'))\
            .withColumn('rank_i', ranks_one_df['exploded'].getItem('rank'))\
            .drop(ranks_one_df['exploded'])

        ranks_one_df = ranks_one_df\
            .groupby('dst_id')\
            .sum('rank_i')\
            .withColumnRenamed('sum(rank_i)', 'rank_i')

        # next we have to deal with dangling nodes and nodes that edges to unkown nodes not present in the graph
        # in the end all of this gets also collected into alpha and distrbuted to all nodes in the graph
        # full outer join because you dont want to lose nodes that either have no know input or no know output
        ranks_df = ranks_df\
            .join(ranks_one_df, ranks_df['uri_id'] == ranks_one_df['dst_id'], 'outer')\
            .drop('dst_id')

        dangling_rank = ranks_df\
            .filter(ranks_df.uri_id.isNull())\
            .select(spark_sum('rank_i'))\
            .first()[0]

        ranks_df = ranks_df.filter(ranks_df.uri_id.isNotNull())

        # because dangling is handeled like a link to every node dangling also needs to devalued by alpha!
        if dangling_rank:
            dist_alpha = ((dangling_rank/n_nodes)*(1-alpha)) + alpha
        else:
            dist_alpha = alpha
        print(f'dangling sum:\t{dangling_rank}')
        print(f'alpha dist :\t{dist_alpha}')

        sum_alpha_and_pr_udf = udf(lambda x: (
            x * (1-alpha)) + dist_alpha, FloatType())
        ranks_df = ranks_df.na.fill(0, ['rank_i'])
        ranks_df = ranks_df.withColumn(
            'rank_i', sum_alpha_and_pr_udf('rank_i'))

        # test for convergence
        convergence_udf = udf(lambda rank_i, rank: abs(
            rank_i - rank) <= convergence, BooleanType())
        ranks_df = ranks_df.withColumn(
            'convergence', convergence_udf('rank', 'rank_i'))
        count_not_converged = ranks_df.filter(
            ranks_df.convergence == False).count()
        ranks_df = ranks_df.drop('convergence').drop(
            'rank').withColumnRenamed('rank_i', 'rank')

        # dataframe needs to be checkpointed here to truncate the logic path
        # otherwise it would grow larger than the avalaible memory
        ranks_df = ranks_df.checkpoint()

        if count_not_converged == 0:
            print("converged")
            break
        else:
            print(f'nodes not yet converged {count_not_converged}')

        i += 1

    return ranks_df.drop('dst_list')

In [None]:
if __name__ == '__main__':

    input_base_path = '../data/results/'
    webgraph_path = input_base_path + 'webgraph_v2_without_null/'
    pagerank_out_path = input_base_path + 'pagerank_df_parquet/'

    random_graph_size = 10
    graph_path = None
    output_path = None
    chekpoint_dir = '../data/checkpoints'

    spark.sparkContext.setCheckpointDir(chekpoint_dir)

    if graph_path:
        graph_df = spark.read.parquet(webgraph_path).cache()
    else:
        graph_df = spark\
            .createDataFrame(create_random_graph(random_graph_size))\
            .withColumnRenamed('_1', 'src')\
            .withColumnRenamed('_2', 'dst')

        print(graph_df.show())

    ranked_graph_df = run(graph_df)
    print(ranked_graph_df.show())

    if output_path:
        ranked_graph_df.write.parquet(output_path)