In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:05:08)
SparkSession available as 'spark'.


In [2]:
spark

<pyspark.sql.session.SparkSession at 0x110ef3890>

In [3]:
graphPath = "/Users/pritykovskaya/Desktop/EY/graph/part-00000"

In [4]:
data = spark.read.format("csv").option("delimiter", "\t")\
    .load(graphPath).withColumnRenamed("_c0", "user").withColumnRenamed("_c1", "friendsString")

In [5]:
data.show()

+-----+--------------------+
| user|       friendsString|
+-----+--------------------+
|  423|{(76034,0),(91316...|
| 1431|{(25244,0),(37528...|
| 2039|{(1378599,0),(502...|
| 4999|{(188333,0),(1999...|
| 5159|{(168092,0),(1866...|
| 5975|{(9800,0),(14904,...|
| 6519|{(222812,0),(2726...|
| 7159|{(40034,0),(33552...|
| 7735|{(567383,0),(1339...|
|10119|{(2657110,0),(345...|
|10311|{(225212,1024),(3...|
|13719|{(113697,0),(3491...|
|14951|      {(27292725,0)}|
|15815|{(939274,0),(1910...|
|17111|{(205443,1024),(2...|
|18487|{(3371513,0),(709...|
|20199|{(160621,0),(2896...|
|20359|{(351400,0),(4019...|
|20567|{(74837,0),(86428...|
|23735|{(164356,0),(3772...|
+-----+--------------------+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import abs, col, explode, collect_list, sort_array, size, split
from pyspark.sql.types import *

In [7]:
from pyspark.sql.functions import udf

def cutStartEndBrackets(s):
    return s[2:-2]

cutStartEndBracketsUDF = udf(cutStartEndBrackets, StringType())

userFriend = \
    data.select(col("user"), split(cutStartEndBracketsUDF(col("friendsString")), "\),\(").alias("friendsMasks"))\
    .withColumn("friendMask", explode('friendsMasks'))\
    .withColumn("friend", split(col("friendMask"), ",")[0])\
    .select(col("user").cast("integer"), col("friend").cast("integer"))

usersWithCommonFriend = userFriend\
    .groupBy("friend")\
    .agg(collect_list("user").alias("users"))\
    .withColumn("usersWithCommonFriend", sort_array("users")) \
    .where(size(col("usersWithCommonFriend")) >= 2) \
    .select(col("usersWithCommonFriend"))

In [8]:
usersWithCommonFriend.printSchema()

root
 |-- usersWithCommonFriend: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [8]:
from pyspark.sql.functions import udf

def orderedPairsWithCommonFriend(usersWithCommonFriend):
    pairs = []
    for user1Index in range(0, len(usersWithCommonFriend)):
         for user2Index in range(user1Index + 1, len(usersWithCommonFriend)):
             pairs.append((usersWithCommonFriend[user1Index], usersWithCommonFriend[user2Index]))
    return pairs

schema = ArrayType(StructType([
    StructField("user1", IntegerType(), False),
    StructField("user2", IntegerType(), False)
]))
         
orderedPairsWithCommonFriendUdf = udf(orderedPairsWithCommonFriend, schema)
        
commonFriendsCounts = usersWithCommonFriend\
        .select(orderedPairsWithCommonFriendUdf("usersWithCommonFriend").alias("orderedPairsWithCommonFriend"))\
        .withColumn("orderedPairWithCommonFriend", explode("orderedPairsWithCommonFriend")) \
        .groupBy("orderedPairWithCommonFriend")\
        .count()\
        .select(col("count"),\
                col("orderedPairWithCommonFriend.user1").alias("user1"),\
                col("orderedPairWithCommonFriend.user2").alias("user2"))\
        .write.parquet("commonFriendsCounts")


In [11]:
from pyspark.sql.functions import udf, row_number, concat_ws
from pyspark.sql import Window

window = Window.partitionBy(col("user1")).orderBy(col("count").desc())

spark.read.parquet("commonFriendsCounts")\
    .withColumn("row_number", row_number().over(window))\
    .filter(col("row_number") < 100)\
    .groupBy(col("user1"))\
    .agg(collect_list(col("user2").cast("string")).alias("candidates"))\
    .withColumn("candidatesStr", concat_ws(" ", col("candidates")))\
    .select(col("user1").cast("string"), col("candidatesStr"))\
    .repartition(1)\
    .write.csv('candidates.csv', sep=" ")