Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## GraphFrame Exercises
Your job is to implement multiple small methods that analyze people's social data using [GraphFrames](https://graphframes.github.io/graphframes/docs/_site/index.html).

We use a small sample data of "socialgraph.dat" from https://archive.org/download/201309_foursquare_dataset_umn/fsq.zip, inside the "umn_foursquare_datasets" folder.

The "socialgraph.dat" file contains the social graph edges (connections) that exist between users. Each social connection consits of two users represented by two unique ids (first_user_id and second_user_id). The connnections are directed. Supposed we have data shown as:

first_user_id | second_user_id 
---------------|----------------
             1 |             2
             2 |             1

This data set shows that there is a connection from user1 (whose id is 1) to user2 (whose id is 2), and another connection from user2 to user1.
             
These links may be helpful:  
https://graphframes.github.io/graphframes/docs/_site/user-guide.html  
https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#module-graphframes  


In [None]:
from graphframes import GraphFrame
from pyspark.sql import SparkSession, Row
import re
from pyspark.sql.functions import *

spark = SparkSession.builder\
    .master('local[*]')\
    .appName('main')\
    .getOrCreate()

sampleFile = "socialgraph_sample.dat"

# Path of smaller data set
testFile = "socialgraph_testsample.dat"

In [None]:
# Variable and methods that will be used in more than one test

sc = spark.sparkContext

# Test if two arrays that contain Rows are equal
def equalArray(array1, array2):
    for i in range(0, len(array2)):
        assert array1[i].asDict() == array2[i].asDict(), "the row was expected to be %s but it was %s" % (array2[i].asDict(), array1[i].asDict())

# Test if two dataframes contain same rows
def equalDF(df1, df2, *columns):
    
    # sort dfs before converting them to lists
    array1 = df1.orderBy(list(columns)).collect()
    array2 = df2.orderBy(list(columns)).collect()
    equalArray(array1, array2)

## Create Graph
`createGraph` creates a GraphFrame. The function already includes code that creates the relevant RDDs from the file. Your job is to create a GraphFrame from the RDDs. You can start by first turning the RDDs into dataframes.

Vertices should have fields `id` and `name`. Note that id and name are identical in this exercise.  
Edges should have fields `src`, `dst` and `relationship`.

Example: Supposed we have data shown below:

 first_user_id | second_user_id 
---------------|----------------
             1 |             2
             1 |             2
             2 |             1
             1 |             3
             2 |             3
            
The graph should be:
![](example.png)
             
param `path`: path of file whose data should be used to create the GraphFrame 

`return`: GraphFrame


Note: If you are experiencing performance issues in later exercises it might be because the graph has too many partitions. You can try to fix it by repartitioning the graph with `GraphFrame(vertices.repartition(2), edges.repartition(2))` when creating the graph in the createGraph function.

In [None]:
def createGraph(path):
    
    data = spark.sparkContext.textFile(path)
    regex = r'\s*\d+\s*\|\s*\d+\s*'
    
    filtered = data.map(lambda x: re.findall(regex, x)).filter(lambda y: len(y) > 0)

    verticesRDD = filtered.flatMap(lambda x: x[0].split('|')) \
        .map(lambda x: int(x.strip())) \
        .distinct()\
        .map(lambda v: (v,v))
    
    edgesRDD = filtered.map(lambda x: x[0].split('|'))\
            .map(lambda x: ((int(x[0].strip()), int(x[1].strip())), 1))\
            .reduceByKey(lambda a,b:a+b) \
            .map(lambda x: (x[0][0], x[0][1], x[1]))
    
    ver = verticesRDD.toDF().withColumnRenamed('_1', 'id').withColumnRenamed('_2', 'name').repartition(2)
    edge = edgesRDD.toDF().withColumnRenamed('_1', 'src').withColumnRenamed('_2', 'dst').withColumnRenamed('_3', 'relationship').repartition(2)
    g = GraphFrame(ver, edge)
    return g

In [None]:
# example print

graph = createGraph(sampleFile).persist()
graph.vertices.show()
graph.edges.show()

In [None]:
'''createGraph tests'''

import random

correctVertices = sc.parallelize([Row(id=2, name=2),
                                  Row(id=10, name=10),
                                  Row(id=8, name=8),
                                  Row(id=3, name=3),
                                  Row(id=7, name=7),
                                  Row(id=4, name=4),
                                  Row(id=1, name=1),
                                  Row(id=9, name=9)]).toDF()

correctEdges = sc.parallelize([Row(src=2, dst=10, relationship=1),
                               Row(src=2, dst=8, relationship=1),
                               Row(src=3, dst=7, relationship=1),
                               Row(src=3, dst=10, relationship=1),
                               Row(src=2, dst=3, relationship=1),
                               Row(src=10, dst=4, relationship=1),
                               Row(src=4, dst=10, relationship=1),
                               Row(src=4, dst=2, relationship=1),
                               Row(src=1, dst=9, relationship=1),
                               Row(src=1, dst=10, relationship=2),
                               Row(src=7, dst=9, relationship=1),
                               Row(src=1, dst=3, relationship=1),
                               Row(src=10, dst=1, relationship=1)]).toDF()
testGraph = createGraph(testFile).persist()
testVertices = testGraph.vertices
testEdges = testGraph.edges

assert testVertices.count() == correctVertices.count(), "the vertices count was expected to be %s but it was %s" % (correctVertices.count(), testVertices.count())
assert testEdges.count() == correctEdges.count(), "the edges count was expected to be %s but it was %s" % (correctEdges.count(), testEdges.count())
equalDF(testGraph.vertices, correctVertices, "id")
equalDF(testGraph.edges, correctEdges, "src", "dst", "relationship")



## Both Directions
`bothDirections` finds pairs of users who are connected by an edge in both directions.

param `graph`: GraphFrame containing social data (created by `createGraph`).

`return`: DataFrame which has columns "start", "end" and "connections", corresponding to the  starting user id, ending user id and number of connections between two users. 

Example: Supposed we have a graph as below:
![](example_bothConnections.png)
The result should be

|start|end|connections|
|---|---|------------|
| 10|  4|           1|
|  4| 10|           1|
|  1| 10|           2|
| 10|  1|           1|

Hint: Check the find function from [GraphFrames](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding)

In [None]:
def bothDirections(graph):
    res_df = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
    res_df = res_df.select('a', 'b', 'e')
    rdd2=res_df.rdd.map(lambda x: (x[0][0],x[1][0],x[2][2]))
    res_df = rdd2.toDF(['start', 'end', 'connections'])
    return res_df

In [None]:
# example print

bothDirections(graph).show()

In [None]:
'''bothDirections tests'''

correctEdges = sc.parallelize([Row(start=10, end=4, connections=1),
                               Row(start=4, end=10, connections=1),
                               Row(start=1, end=10, connections=2),
                               Row(start=10, end=1, connections=1)]).toDF()
equalDF(bothDirections(testGraph), correctEdges, "start", "end", "connections")


## Most Active User
`mostActiveUser` finds which user has the most outward connections. 

param `graph`: GraphFrame containing social data.

return: id of user who has the most outward connections. Return the smallest id if more than one users have the same number of outward connections.

In [None]:
def mostActiveUser(graph):
    res_df = graph.find("(a)-[e]->()")
    res_df = res_df.groupBy('a').count()
    rdd2=res_df.rdd.map(lambda x: (x[0][0],x[1]))
    res_df = rdd2.toDF(['a', 'amount'])
    res_df = res_df.sort(res_df.amount.desc(), res_df.a.asc())
    res = res_df.collect()
    return res[0].asDict()['a']

In [None]:
# example print

mostActiveUser(graph)

In [None]:
'''mostActiveUser tests'''

assert mostActiveUser(graph) == 5, "the most active user was expected to be 5 but it was %s" % mostActiveUser(graph)
assert mostActiveUser(testGraph) == 1, "the most active user was expected to be 1 but it was %s" % mostActiveUser(testGraph)


## Connection Ratio
`connectionRatio` shows which user has the highest ratio of inward connections but fewest outward connections. 

param `graph`: GraphFrame containing social data.

`return` DataFrame which has columns "id" and "connectionRatio", where "id" is the id of a user and "connectionRatio" = number of inward connections/number of outward connections. Users without inward or outward connections should be filtered out. **The DataFrame should be sorted by connectionRatio in descending order. If more than one users have the same connection ratio, these users should be sorted by their id in ascending order.**

example output:

| id|   connectionRatio|
|---|------------------|
| 10|               2.0|
|  3|               1.0|
|  7|               1.0|
|  4|               0.5|
|  1|0.3333333333333333|
|  2|0.3333333333333333|

In [None]:
def connectionRatio(graph):
    out_df = graph.find("(a)-[e]->()")
    in_df = graph.find("()-[e]->(b)")
    out_df = out_df.groupBy('a').count().withColumnRenamed('count', 'out')
    in_df = in_df.groupBy('b').count().withColumnRenamed('count', 'in')
    df = in_df.join(out_df, in_df.b == out_df.a, 'inner')
    rdd2=df.rdd.map(lambda x: (x[0][0],x[1]/x[3]))
    res_df = rdd2.toDF(['id', 'connectionRatio'])
    res_df = res_df.sort(res_df.connectionRatio.desc(), res_df.id.asc())
    return res_df

In [None]:
# example print

connectionRatio(graph).show()

In [None]:
'''connectionRatio tests'''
correct = [Row(id=10, connectionRatio=2.0),
           Row(id=3, connectionRatio=1.0),
           Row(id=7, connectionRatio=1.0),
           Row(id=4, connectionRatio=0.5),
           Row(id=1, connectionRatio=1/3),
           Row(id=2, connectionRatio=1/3)]

test = connectionRatio(testGraph)
equalArray(test.collect(), correct)


## Communities
`communities` uses [label propagation algorithm (LPA)](https://neo4j.com/blog/graph-algorithms-neo4j-label-propagation/) to detect communities for a graph. 

param `graph`: GraphFrame containing social data.

`return`: DataFrame containing columns "community" and "count". "community" is the label assigned by LPA and "count" is the number of users who belong to the community. **The Dataframe should be sorted by "count" in descending order. If more than one communities have same number of users, these communities should be sorted by label in ascending order.**

Note: set 5 as the number of iterations to be performed when running LPA.

Example output:

|community|count|
|---------|-----|
|        1|    4|
|        3|    2|
|       10|    2|


In [None]:
def communities(graph):
    result = graph.labelPropagation(maxIter=5)
    result = result.groupBy('label').count().withColumnRenamed('label', 'community')
    result = result.select('community', 'count')
    result = result.sort(result['count'].desc(), result['community'].asc())
    return result

In [None]:
# example print

communities(graph).show()

In [None]:
'''communities tests'''
correct = [Row(community=2, count=4),
           Row(community=8, count=3),
           Row(community=10, count=1)]

equalArray(communities(testGraph).collect(), correct)


## Highest Page Rank
`highestPageRank` finds which user has the highest [PageRank](https://en.wikipedia.org/wiki/PageRank).

param `graph`: GraphFrame containing social data.

`return`: id of user with the highest PageRank.

**Set tolerance "tol" as 0.0001 when using the pageRank algorithm.**

In [None]:
def highestPageRank(graph):
    results = graph.pageRank(resetProbability=0.15, tol=0.0001)
    ranks = results.vertices.select("id", "pagerank")
    ranks = ranks.sort(ranks.pagerank.desc())
    res = ranks.collect()
    return res[0].asDict()['id']

In [None]:
# example print

highestPageRank(graph)

In [None]:
'''highestPageRank tests'''

#graph = createGraph(testFile)
assert highestPageRank(testGraph) == 10, "the highest page rank was expected to be 10 but it was %s" % highestPageRank(testGraph)


In [None]:
spark.catalog.clearCache()
spark.stop()