In [23]:
from typing import Iterable, Tuple
from operator import add as add_op

import numpy as np
import pandas as pd

import pyspark
from pyspark.resultiterable import ResultIterable
from pyspark.sql.functions import sum, col, count, monotonically_increasing_id, mean, lit

from pyspark.sql import DataFrame, SparkSession

P_RETWEET = 0.00037

In [2]:
sc = SparkSession.builder.master("local[*]").appName("TunkRank").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/17 16:01:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
arrs = np.array(
    [[1, 2],
     [2, 3],
     [1, 3],
     [1, 4],
     [2, 4],
     [3, 1],
     [5, 3],
     [4, 1],
    ], dtype=np.uint32,
)

df = pd.DataFrame(arrs, columns=["a", "b"])
graph = sc.createDataFrame(df)

# Method 1

In [4]:
def compute_contibutes(following: ResultIterable[int], influence: float) -> Iterable[Tuple[int, float]]:
    """Calculates URL contributions to the rank of other URLs."""
    n_following = len(following)
    contrib = (1 + P_RETWEET * influence) / n_following
    for t in following:
        yield t, contrib

In [5]:
rdd = graph.rdd
following = rdd.map(lambda x: (x[1], x[0])).groupByKey().cache()
follower = rdd.groupByKey()
influence = follower.map(lambda neighbors: (neighbors[0], 0.0))

In [6]:
iteration = 0
while True:
    iteration += 1
    old_influence = influence
    print(f">>> Tunk rank (iteration={iteration})")
    # Calculates URL contributions to the rank of other URLs.
    contribs = following.join(influence).flatMap(lambda x: compute_contibutes(x[1][0], x[1][1]))
    # Re-calculates URL ranks based on neighbor contributions.
    influence = contribs.reduceByKey(add_op)
    if iteration > 1:
        delta = influence.join(old_influence).map(lambda x: (1, (x[1][0] - x[1][1]) ** 2)).reduceByKey(add_op).collect()[0][1]
        print(delta)
        if delta < 0.001:
            break

>>> Tunk rank (iteration=1)
>>> Tunk rank (iteration=2)


[Stage 0:>                  (0 + 8) / 8][Stage 1:>                  (0 + 0) / 8]

4.7154444444476226e-07


                                                                                

In [163]:
influence_sorted = influence.sortBy(lambda x: x[1], ascending=False)
influence_sorted.toDF(["a", "influence"]).toPandas()

Unnamed: 0,a,influence
0,1,1.833796
1,2,0.833488
2,3,0.500339
3,4,0.500339
4,5,0.333395


In [7]:
sc.stop()

# Alternative

In [37]:
influence = graph.select("a").withColumnRenamed("a", "uid").distinct().withColumn("influence", lit(0.))
following = graph.withColumnRenamed("b", "uid")
n_following = following.groupby("uid").agg(count("a").alias("count"))
ratio = following.join(n_following, on="uid", how="left").cache()

22/10/17 16:33:07 WARN CacheManager: Asked to cache already cached data.


In [40]:
old_influence = influence.withColumnRenamed("influence", "old_influence")
full_map = ratio.join(influence, on="uid", how="left").select(col("a").alias("uid"), "count", "influence")
contribs = full_map.select("uid", ((1 + P_RETWEET * col("influence")) / col("count")).alias("contrib"))
influence = contribs.groupby("uid").agg(sum("contrib").alias("influence"))

delta = influence.join(old_influence, on="uid").select(((col("influence") - col("old_influence")) ** 2).alias("squ")).agg(mean("squ").alias("mean_squ"))
print(delta.first()["mean_squ"])

1.0689659032927415e-14


In [10]:
influence.collect()

                                                                                

[Row(a=1, influence=0.0),
 Row(a=2, influence=0.0),
 Row(a=3, influence=0.0),
 Row(a=5, influence=0.0),
 Row(a=4, influence=0.0)]

In [18]:
sc.sql?