In [None]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

KeyboardInterrupt: 

In [None]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

In [None]:
graphPath = "/data/graphDFSample"

In [None]:
from pyspark.sql.functions import udf
import pyspark.sql.types as t

def emit_all_pairs(friends_array):
    ret = []
    friends_len = len(friends_array)
    for i in range(friends_len):
        for j in range(i+1,friends_len):
            ret.append((friends_array[i], friends_array[j]))
    return ret

pair_schema = t.StructType([
    t.StructField("u1", t.IntegerType(), False),
    t.StructField("u2", t.IntegerType(), False)
])

emit_all_pairs_udf = udf(emit_all_pairs, t.ArrayType(pair_schema))

In [None]:
from pyspark.sql.functions import explode, collect_list, size, col, row_number, sort_array, count, desc, size
from pyspark.sql import Window

reversedGraph = sparkSession.read.parquet(graphPath) \
    .withColumn("friend", explode('friends')) \
    .groupBy("friend") \
    .agg(collect_list("user").alias("users")) \
    .withColumn("users", sort_array('users')) \
    .where(size("users") > 1)
                
    
pairs = reversedGraph.withColumn("pairs", emit_all_pairs_udf("users")) \
    .withColumn("pair", explode("pairs")) \
    .groupBy("pair") \
    .agg(count("pair").alias("pair_count"))

In [None]:
results = pairs.select(col("pair_count"), "pair.*") \
           .orderBy(desc("pair_count"), desc("u1"), desc("u2")) \
           .limit(49) \
          .collect()

In [None]:
for pair, u1, u2 in results:
    print("{} {} {}".format(pair, u1, u2))