In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

In [3]:
from pyspark.sql.functions import explode, collect_list, size, col, row_number, sort_array, udf, count
from pyspark.sql import Window
from pyspark.sql.types import *

In [4]:
graphPath = "/data/graphDFSample"

reversedGraph = sparkSession.read.parquet(graphPath) \
    .withColumn("friend", explode('friends')) \
    .groupBy("friend") \
    .agg(collect_list("user").alias("users")) \
    .withColumn("users_size", size("users")) \
    
reversedGraph.printSchema()
reversedGraph.show(5)    

root
 |-- friend: integer (nullable = true)
 |-- users: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- users_size: integer (nullable = false)

+------+--------------------+----------+
|friend|               users|users_size|
+------+--------------------+----------+
|   148|[65051219, 146311...|         4|
|  5518|          [58573511]|         1|
|  9900|          [36844066]|         1|
| 10362|          [65278216]|         1|
| 11458|          [39169321]|         1|
+------+--------------------+----------+
only showing top 5 rows



In [5]:
reversedGraph = reversedGraph.select(reversedGraph.friend, \
                                     sort_array(reversedGraph.users).alias("users_sorted"), \
                                     "users_size") \
                            .where(reversedGraph.users_size > '1')
reversedGraph.show(3)

+------+--------------------+----------+
|friend|        users_sorted|users_size|
+------+--------------------+----------+
| 36538|[5506394, 6170161...|        32|
| 76756|[2387712, 3274722...|        33|
| 93319|[12426490, 295206...|         8|
+------+--------------------+----------+
only showing top 3 rows



In [6]:
def create_pairs(in_list):
    pairs = []
    count_f1 = 0
    while count_f1 < len(in_list):
        for count_f2 in range (count_f1 + 1, len(in_list)):
            pairs.append((in_list[count_f1], in_list[count_f2]))
        count_f1 += 1
    return(pairs)

pairs_udf = udf(lambda y: create_pairs(y), 
                     ArrayType(StructType(
                         (StructField("f1", IntegerType(), True),
                          StructField("f2", IntegerType(), True)))))



In [7]:
reversedGraph = reversedGraph.select(pairs_udf(reversedGraph.users_sorted).alias("users"), \
                                    "users_size")
reversedGraph.printSchema()
reversedGraph.show(5)

root
 |-- users: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- f1: integer (nullable = true)
 |    |    |-- f2: integer (nullable = true)
 |-- users_size: integer (nullable = false)

+--------------------+----------+
|               users|users_size|
+--------------------+----------+
|[[5506394,6170161...|        32|
|[[2387712,3274722...|        33|
|[[12426490,295206...|         8|
|[[2661492,4271861...|         6|
|[[2974302,1086021...|         7|
+--------------------+----------+
only showing top 5 rows



In [11]:
rg1 = reversedGraph.select(explode(reversedGraph.users))
rg1.show(3)

+------------------+
|               col|
+------------------+
| [5506394,6170161]|
| [5506394,6846874]|
|[5506394,15305594]|
+------------------+
only showing top 3 rows



In [12]:
rg2 = rg1.select('col.f1', 'col.f2')
rg2.show(10)

+--------+--------+-----+
|      f1|      f2|count|
+--------+--------+-----+
|19016678|27967558|   25|
|35731241|63987222| 1272|
|38837335|60011356|    2|
|40003405|41101961|   26|
| 3274722|37391049|  100|
|13805420|30729314|   12|
|32763426|42099418|    8|
| 6170161|44110626|   79|
| 6956945|44284974|  181|
|21121725|33793439|  218|
+--------+--------+-----+
only showing top 10 rows



In [13]:
rg3 = rg2.groupBy("f1", "f2").agg(count("*").alias("count"))
rg3.show(10)

+--------+--------+-----+
|      f1|      f2|count|
+--------+--------+-----+
|19016678|27967558|    1|
|35731241|63987222|    1|
|38837335|60011356|    1|
|40003405|41101961|    1|
| 3274722|37391049|    1|
|13805420|30729314|    1|
|32763426|42099418|    1|
| 6170161|44110626|    1|
| 6956945|44284974|    1|
|21121725|33793439|    1|
+--------+--------+-----+
only showing top 10 rows



In [16]:
window = Window.orderBy(col("count").desc())
    
top50 = rg3.withColumn("row_number", row_number().over(window)) \
            .filter(col("row_number") < 50) \
            .select(col("count"), col("f1"), col("f2")) \
            .orderBy(col("count").desc(), col("f1").desc()) \
            .collect()

In [17]:
for val in top50:
    print '%s %s %s' % val

1 62936623 64819882
1 42899683 57516788
1 41609464 52942230
1 40697455 56271262
1 40003405 41101961
1 39314779 50516758
1 38837335 60011356
1 38436695 43700485
1 37250279 63589057
1 35731241 63987222
1 35335957 47237327
1 33141665 59403447
1 32763426 42099418
1 31455939 51755669
1 30829221 31455939
1 30829221 62439121
1 25556203 33990604
1 24659937 45915446
1 23651273 56351013
1 23374539 60440950
1 23263517 51633283
1 21121725 33793439
1 20240688 50516758
1 19016678 27967558
1 16793490 21364043
1 16684463 51126236
1 16501143 49958374
1 15479903 41574601
1 13805420 30729314
1 12426490 39865897
1 12378179 23612791
1 11794870 51268556
1 10812639 42899683
1 10481998 40675360
1 10318279 39892770
1 9554168 55608231
1 8961437 59403447
1 8609655 8819098
1 6956945 44284974
1 6170161 44110626
1 5143474 30127398
1 4858076 51633283
1 4437374 5854674
1 3274722 37391049
1 2932270 14241359
1 1836499 49951442
1 1021695 62936623
1 525850 34092512
1 262805 25701054
