In [71]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlc = SQLContext(spark)

# Loading in the data

In [91]:
rdd = sc.textFile("friendship-data.txt")
rdd.count()

49995

# Extracting edges

In [99]:
def makeEdges(row):
    rowSplit = row.split('\t')
    node = rowSplit[0]
    neighbors = rowSplit[1].split(',')
    return [[node,neighbor] for neighbor in neighbors]

edgesRDD = rdd.flatMap(makeEdges)
edgesDF = sqlc.createDataFrame(edgesRDD,['x','y'])
edgesDF.count()

662467

### Now the edgesDF contains each edge twice, each time with one of the two directions.
### e.g. => (0,1) and (1,0) both exist in the dataframe
### We first need to eliminate the redundancy

In [101]:
edgesDF = edgesDF.where('x<y')
edgesDF.count()

330798

# Part A: Finding all the triangles
### using conditional joins of the dataframe by itself twice, we can extract the list of triangles.
#### in each join we should be mindful not to return a straight line a repeated edge in a back & forth kind of movement

In [152]:
edgesDF2 = sqlc.createDataFrame(edgesRDD,['x','z'])
edgesDF2 = edgesDF2.where('x<z')
edgesDF2.count()

semiCondition = [edgesDF.x == edgesDF2.x ,edgesDF.y != edgesDF2.z]
semiOutputStyle = [ edgesDF.x,edgesDF.y,edgesDF2.z]
semiTriangles = edgesDF.join(edgesDF2, semiCondition).select(semiOutputStyle)

def sortNodes(row):
    nodes = list(row)
    return [row.x,min(row.y,row.z),max(row.y,row.z)]

semiTrianglesRDD = semiTriangles.rdd.map(sortNodes)
semiTriangles = semiTrianglesRDD.toDF(['x','y','z'])
semiTriangles.first()

Row(x='10096', y='10103', z='10114')

### The third side has to be in a set of reverse edges

In [153]:
condition = [semiTriangles.y == edgesDF.x , semiTriangles.z == edgesDF.y ]
outputStyle = [ semiTriangles.x,semiTriangles.y ,semiTriangles.z]
triangles = semiTriangles.join(edgesDF, condition).select(outputStyle)

## The number of triangles after droping the duplicates

In [154]:
triangles = triangles.drop_duplicates()
triangles.count()

757243

In [155]:
triangles.first()

Row(x='10002', y='10007', z='9995')

### Saving output to tirangles directory

In [156]:
to_save = triangles.rdd.map(lambda row: " {} \t {} \t {} ".format(row[0],row[1],row[2]))
to_save.coalesce(1).saveAsTextFile('triangles')

# Part B: Finding triangles with specific nodes

### I will use the resultant to_save RDD to search for triangles

In [158]:
to_find = [' 924 ',' 8941 ', ' 8942 ', ' 9019 ', ' 9020 ',' 9990 ', ' 9993 ']

def containNode(row,to_find):
    return any(x in row for x in to_find)

containing = to_save.filter(lambda row: containNode(row,to_find))
containing.count()

50

## Results for each node is saved under the triangles_q_{nodeID} directory

In [162]:
query_result = {}
for node in to_find:
    query_result[node.strip()] = containing.filter(lambda row : (node in str(row)))

for key,rdd in query_result.items():
    rdd.coalesce(1).saveAsTextFile('triangles_q_'+key.strip())