In [37]:
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.broadcast.Broadcast

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.broadcast.Broadcast


In [38]:
def load_data() = {
    val graph = Seq(Row("0","2"),Row("1","0"),Row("1","2"),Row("1","3"), Row("2", "3"))
    val graphDF = spark.createDataFrame(
        sc.parallelize(graph), 
        StructType(
            List(
                StructField("follower", StringType), 
                StructField("followee", StringType)
            )
        )
    ).as("social_graph")
    graphDF.cache()
    
    graphDF
}

val graphDF = load_data()

load_data: ()org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
graphDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [follower: string, followee: string]


In [39]:
def get_users(graphDF: DataFrame) = {
    graphDF.select(col("followee"))
        .union(graphDF.select(col("follower")))
        .withColumnRenamed("followee","user_id").distinct.as("users")
}


val users = get_users(graphDF)
val user_count = sc.broadcast(users.count())

get_users: (graphDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
users: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string]
user_count: org.apache.spark.broadcast.Broadcast[Long] = Broadcast(234)


In [40]:
def initialize_ranks(users: DataFrame) = {
    users.select(col("user_id"), lit(1.0/user_count.value).as("rank_value")).as("rank")
}

val rank = initialize_ranks(users)

initialize_ranks: (users: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
rank: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string, rank_value: double]


In [41]:
def get_followers_per_user(social_graph: DataFrame) = {
    social_graph.groupBy("followee").agg(collect_list("follower").as("followers")).as("followers")
}

val followers = get_followers_per_user(graphDF)

get_followers_per_user: (social_graph: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
followers: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [followee: string, followers: array<string>]


In [42]:
def get_following_per_user(social_graph: DataFrame) = {
    social_graph.groupBy("follower").agg(collect_list("followee").as("following")).as("following")
}
val following = get_following_per_user(graphDF)

get_following_per_user: (social_graph: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
following: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [follower: string, following: array<string>]


In [43]:
var enhanced_rank = rank
    .join(followers, col("followers.followee") === col("rank.user_id"), "left")
    .select(col("user_id").as("user_id"), col("rank_value"), col("followers")).as("rank_followers")
    .join(following, col("following.follower") === col("rank_followers.user_id"), "left")
    .select(col("user_id"), col("rank_value"), col("followers"), col("following")).as("rank_followers_following")
    .as("rank")
enhanced_rank.show()

+-------+----------+---------+---------+
|user_id|rank_value|followers|following|
+-------+----------+---------+---------+
|      2|      0.25|   [0, 1]|      [3]|
|      0|      0.25|      [1]|      [2]|
|      3|      0.25|   [1, 2]|     null|
|      1|      0.25|     null|[0, 2, 3]|
+-------+----------+---------+---------+



enhanced_rank: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string, rank_value: double ... 2 more fields]


In [44]:
// val non_dangling_users = following_followers_with_rank.filter(col("following").isNotNull)
// val dangling_users = following_followers_with_rank.filter(col("following").isNull)

In [45]:
def get_contributions(rank: DataFrame) = {
    rank.withColumn("contributions", col("rank_value") / when(col("following").isNotNull, size(col("following"))).otherwise(1))
}

val contributions = get_contributions(enhanced_rank)
contributions.show()

+-------+----------+---------+---------+-------------------+
|user_id|rank_value|followers|following|      contributions|
+-------+----------+---------+---------+-------------------+
|      2|      0.25|   [0, 1]|      [3]|               0.25|
|      0|      0.25|      [1]|      [2]|               0.25|
|      3|      0.25|   [1, 2]|     null|               0.25|
|      1|      0.25|     null|[0, 2, 3]|0.08333333333333333|
+-------+----------+---------+---------+-------------------+



get_contributions: (rank: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
contributions: org.apache.spark.sql.DataFrame = [user_id: string, rank_value: double ... 3 more fields]


In [46]:
def explode_and_sum_contributions(contributions: DataFrame) = {
    val exploded_contribution = contributions.select(col("user_id"),col("rank_value"), col("followers"), explode_outer(col("following")).as("contribute_to"), col("contributions")).as("exploded_contributions")
    exploded_contribution.groupBy("contribute_to").agg(sum("contributions").alias("contributions")).as("summed_contributions")
}

val summed_contributions = explode_and_sum_contributions(contributions)
// val total_dangling_bonus = summed_contributions.select("contributions").where(col("contribute_to").isNull).first.getDouble(0)
summed_contributions.show()

+-------------+-------------------+
|contribute_to|      contributions|
+-------------+-------------------+
|            3| 0.3333333333333333|
|            0|0.08333333333333333|
|         null|               0.25|
|            2| 0.3333333333333333|
+-------------+-------------------+



explode_and_sum_contributions: (contributions: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
summed_contributions: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [contribute_to: string, contributions: double]


In [48]:
def calculate_rank(contributions: DataFrame, summed_contributions: DataFrame, user_count: Broadcast[Long]) = {

    val new_ranks_without_dangling = contributions
        .drop("contributions").join(summed_contributions, col("contribute_to") === col("user_id"), "left")
        .withColumn("contributions", when(col("contributions").isNotNull, col("contributions")).otherwise(lit(0)))
        .select(col("user_id"), col("followers"), col("rank_value"), col("following"), col("contributions").as("rank_contributions"))
        .crossJoin(summed_contributions.filter(col("contribute_to").isNull).select(col("contributions").as("remainder")))
        .withColumn("rank_value", (col("remainder")/ user_count.value) + col("rank_contributions"))
        .drop("rank_contributions", "remainder")
    new_ranks_without_dangling.withColumn("rank_value", (col("rank_value") * 0.85) + (0.15 / user_count.value))
}

// def calculate_rank(contributions: DataFrame, summed_contributions: DataFrame) = {
//     val remainder = summed_contributions.agg(sum("contributions").as("remainder")).withColumn("remainder", (lit(1) - col("remainder"))/user_count.value)
//     val new_ranks = contributions
//         .crossJoin(remainder)
//         .drop("contributions")
//         .join(summed_contributions, col("contribute_to") === col("user_id"), "left").drop("contribute_to")
//         .select(col("user_id"), col("followers"), col("rank_value"), col("following"), col("contributions").as("rank_contributions"), col("remainder"))
//         .withColumn("final_rank", (when(col("rank_contributions").isNotNull, col("rank_contributions")).otherwise(lit(0)) + col("remainder")))
//         .withColumn("rank_value", (col("final_rank") * 0.85) + (0.15 / user_count.value))
//         .select(col("user_id"), col("followers"), col("following"), col("rank_value"))
//     new_ranks
// }
val new_ranks = calculate_rank(contributions, summed_contributions, user_count)
new_ranks.show()

+-------+---------+-------------------+---------+
|user_id|followers|         rank_value|following|
+-------+---------+-------------------+---------+
|      2|   [0, 1]| 0.3739583333333333|      [3]|
|      0|      [1]|0.16145833333333331|      [2]|
|      3|   [1, 2]| 0.3739583333333333|     null|
|      1|     null|           0.090625|[0, 2, 3]|
+-------+---------+-------------------+---------+



calculate_rank: (contributions: org.apache.spark.sql.DataFrame, summed_contributions: org.apache.spark.sql.DataFrame, user_count: org.apache.spark.broadcast.Broadcast[Long])org.apache.spark.sql.DataFrame
new_ranks: org.apache.spark.sql.DataFrame = [user_id: string, followers: array<string> ... 2 more fields]


In [None]:
var number_of_iterations = 10

while(number_of_iterations > 0) {
    val contributions = get_contributions(enhanced_rank)
    val summed_contributions = explode_and_sum_contributions(contributions)
    val new_ranks = calculate_rank(contributions, summed_contributions, user_count)
    enhanced_rank = new_ranks
    print(enhanced_rank.show())
    number_of_iterations -= 1
}

+-------+---------+-------------------+---------+
|user_id|followers|         rank_value|following|
+-------+---------+-------------------+---------+
|      2|   [0, 1]| 0.3739583333333333|      [3]|
|      0|      [1]|0.16145833333333331|      [2]|
|      3|   [1, 2]| 0.3739583333333333|     null|
|      1|     null|           0.090625|[0, 2, 3]|
+-------+---------+-------------------+---------+

()+-------+---------+-------------------+---------+
|user_id|followers|         rank_value|following|
+-------+---------+-------------------+---------+
|      2|   [0, 1]|0.27988281249999997|      [3]|
|      0|      [1]|0.14264322916666666|      [2]|
|      3|   [1, 2]| 0.4605078124999999|     null|
|      1|     null|0.11696614583333331|[0, 2, 3]|
+-------+---------+-------------------+---------+

()+-------+---------+-------------------+---------+
|user_id|followers|         rank_value|following|
+-------+---------+-------------------+---------+
|      2|   [0, 1]| 0.2897450629340277|     