Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## GraphFrame Exercises
Your job is to implement multiple small methods that analyze people's social data using [GraphFrames](https://graphframes.github.io/graphframes/docs/_site/index.html).

We use a small sample data of "socialgraph.dat" from https://archive.org/download/201309_foursquare_dataset_umn/fsq.zip, inside the "umn_foursquare_datasets" folder.

The "socialgraph.dat" file contains the social graph edges (connections) that exist between users. Each social connection consits of two users represented by two unique ids (first_user_id and second_user_id). The connnections are directed. Supposed we have data shown as:

first_user_id | second_user_id 
---------------|----------------
             1 |             2
             2 |             1

This data set shows that there is a connection from user1 (whose id is 1) to user2 (whose id is 2), and another connection from user2 to user1.
             
This link may be helpful: https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html#module-graphframes.


In [47]:
from graphframes import GraphFrame
from pyspark.sql import SparkSession, Row
import re
from pyspark.sql.functions import *

spark = SparkSession.builder\
    .master('local[*]')\
    .appName('main')\
    .getOrCreate()

sampleFile = "socialgraph_sample.dat"

# Path of smaller data set
testFile = "socialgraph_testsample.dat"

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:43987)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:43987)

In [None]:
# Variable and methods that will be used in more than one test

sc = spark.sparkContext

# Test if two arrays that contain Rows are equal
def equalArray(array1, array2):
    for i in range(0, len(array2)):
        assert array1[i].asDict() == array2[i].asDict(), "the row was expected to be %s but it was %s" % (array2[i].asDict(), array1[i].asDict())

# Test if two dataframes contain same rows
def equalDF(df1, df2, *columns):
    
    # sort dfs before converting them to lists
    array1 = df1.orderBy(list(columns)).collect()
    array2 = df2.orderBy(list(columns)).collect()
    equalArray(array1, array2)

## Create Graph
`createGraph` creates a GraphFrame. Use user_id as both vertex id and vertex attribute. Use number of unique connections between users as edge weight. 

Example: Supposed we have data shown below:

 first_user_id | second_user_id 
---------------|----------------
             1 |             2
             1 |             2
             2 |             1
             1 |             3
             2 |             3
            
The graph should be:
![](example.png)
             
param `path`: path of file whose data should be used to create the GraphFrame 

`return`: GraphFrame

Hints:
- You can load the file into RDD and use RDD transformations to extract and parse the data. 
- [Regex](https://docs.python.org/3.7/library/re.html#module-re) is extremely useful when parsing the data in this case.

In [None]:
def createGraph(path):
    r = sc.textFile(path)
    r = r.flatMap(lambda file: file.split("/n"))
    digit = re.compile('\d')
    r = r.filter(lambda row: row and digit.match(row[len(row) - 1]))
    ids = r.flatMap(lambda row: row.split(" ")).filter(lambda row: row and digit.match(row[0])).map(lambda x: (x, 1))
    v = ids.toDF().dropDuplicates().withColumn('id', col('_1').cast('int'))
    v = v.withColumn('name', col('id').cast('int'))
    v = v.select('id', 'name')
    edges = r.map(lambda row: row.split(" "))
    edges = edges.map(lambda li: [x for x in li if x and digit.match(x[0])]).map(lambda pair: (tuple(pair), 1))
    e = edges.reduceByKey(lambda a, b: a + b).map(lambda li: (li[0][0], li[0][1], li[1])).toDF(schema=['src', 'dst', 'relationship'])
    e = e.withColumn('src', col('src').cast('int')).withColumn('dst', col('dst').cast('int'))
    return GraphFrame(v, e)

In [None]:
# example print

graph = createGraph(sampleFile).persist()
graph.vertices.show()
graph.edges.show()

In [None]:
'''createGraph tests'''

import random

correctVertices = sc.parallelize([Row(id=2, name=2),
                                  Row(id=10, name=10),
                                  Row(id=8, name=8),
                                  Row(id=3, name=3),
                                  Row(id=7, name=7),
                                  Row(id=4, name=4),
                                  Row(id=1, name=1),
                                  Row(id=9, name=9)]).toDF()

correctEdges = sc.parallelize([Row(src=2, dst=10, relationship=1),
                               Row(src=2, dst=8, relationship=1),
                               Row(src=3, dst=7, relationship=1),
                               Row(src=3, dst=10, relationship=1),
                               Row(src=2, dst=3, relationship=1),
                               Row(src=10, dst=4, relationship=1),
                               Row(src=4, dst=10, relationship=1),
                               Row(src=4, dst=2, relationship=1),
                               Row(src=1, dst=9, relationship=1),
                               Row(src=1, dst=10, relationship=2),
                               Row(src=7, dst=9, relationship=1),
                               Row(src=1, dst=3, relationship=1),
                               Row(src=10, dst=1, relationship=1)]).toDF()
testGraph = createGraph(testFile).persist()
testVertices = testGraph.vertices
testEdges = testGraph.edges

assert testVertices.count() == correctVertices.count(), "the vertices count was expected to be %s but it was %s" % (correctVertices.count(), testVertices.count())
assert testEdges.count() == correctEdges.count(), "the edges count was expected to be %s but it was %s" % (correctEdges.count(), testEdges.count())
equalDF(testGraph.vertices, correctVertices, "id")
equalDF(testGraph.edges, correctEdges, "src", "dst", "relationship")



## Both Directions
`bothDirections` finds pairs of users who are connected by an edge in both directions.

param `graph`: GraphFrame containing social data (created by `createGraph`).

`return`: DataFrame which has columns "start", "end" and "connections", corresponding to the  starting user id, ending user id and number of connections between two users. 

Example: Supposed we have a graph as below:
![](example_bothConnections.png)
The result should be

|start|end|connections|
|---|---|------------|
| 10|  4|           1|
|  4| 10|           1|
|  1| 10|           2|
| 10|  1|           1|

Hint: Check the find function from [GraphFrames](https://graphframes.github.io/graphframes/docs/_site/index.html)

In [None]:
def bothDirections(graph):
    df = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
    df = df.withColumn('start', col('a').getItem('id')).withColumn('end', col('b').getItem('id')).withColumn('connections', col('e').getItem('relationship'))
    return df.select('start', 'end', 'connections')

In [None]:
# example print

bothDirections(graph).show()

In [46]:
'''bothDirections tests'''

correctEdges = sc.parallelize([Row(start=10, end=4, connections=1),
                               Row(start=4, end=10, connections=1),
                               Row(start=1, end=10, connections=2),
                               Row(start=10, end=1, connections=1)]).toDF()
equalDF(bothDirections(testGraph), correctEdges, "start", "end", "connections")


AttributeError: 'NoneType' object has no attribute 'sc'

## Most Active User
`mostActiveUser` finds which user has the most outward connections. 

param `graph`: GraphFrame containing social data.

return: id of user who has the most outward connections. Return the smallest id if more than one users have the same number of outward connections.

In [None]:
def mostActiveUser(graph):
    max_connections: DataFrame = graph.edges
    max_connections = max_connections.groupBy('src').sum('relationship')
    max_connection = max_connections.groupBy().max('sum(relationship)').first()[0]
    max_connections = max_connections.filter(col('sum(relationship)') == max_connection).sort('src')
    return max_connections.first()[0]

In [None]:
# example print

mostActiveUser(graph)

In [None]:
'''mostActiveUser tests'''

assert mostActiveUser(graph) == 5, "the most active user was expected to be 5 but it was %s" % mostActiveUser(graph)
assert mostActiveUser(testGraph) == 1, "the most active user was expected to be 1 but it was %s" % mostActiveUser(testGraph)


## Connection Ratio
`connectionRatio` shows which user has the highest ratio of inward connections but fewest outward connections. 

param `graph`: GraphFrame containing social data.

`return` DataFrame which has columns "id" and "connectionRatio", where "id" is the id of a user and "connectionRatio" = number of inward connections/number of outward connections. Users without inward or outward connections should be filtered out. **The DaraFrame should be sorted by connectionRatio in descending order. If more than one users have the same connection ratio, these users should be sorted by their id in ascending order.**

example output:

| id|   connectionRatio|
|---|------------------|
| 10|               2.0|
|  3|               1.0|
|  7|               1.0|
|  4|               0.5|
|  1|0.3333333333333333|
|  2|0.3333333333333333|

In [None]:
def connectionRatio(graph):
    connections: DataFrame = graph.edges
    outward = connections.groupBy('src').count().withColumnRenamed('count','outward')
    inward = connections.groupBy('dst').count().withColumnRenamed('count','inward')
    df = outward.join(inward, outward.src == inward.dst).withColumn('connectionRatio', col('inward')/col('outward'))
    return df.select('src', 'connectionRatio').withColumnRenamed('src', 'id').sort(col('connectionRatio').desc(), col('id').asc())    

In [None]:
# example print

connectionRatio(graph).show()

In [None]:
'''connectionRatio tests'''
correct = [Row(id=10, connectionRatio=2.0),
           Row(id=3, connectionRatio=1.0),
           Row(id=7, connectionRatio=1.0),
           Row(id=4, connectionRatio=0.5),
           Row(id=1, connectionRatio=1/3),
           Row(id=2, connectionRatio=1/3)]

test = connectionRatio(testGraph)
equalArray(test.collect(), correct)


## Communities
`communities` uses [label propagation algorithm (LPA)](https://neo4j.com/blog/graph-algorithms-neo4j-label-propagation/) to detect communities for a graph. 

param `graph`: GraphFrame containing social data.

`return`: DataFrame containing columns "community" and "count". "community" is the label assigned by LPA and "count" is the number of users who belong to the community. **The Dataframe should be sorted by "count" in descending order. If more than one communities have same number of users, these communities should be sorted by label in ascending order.**

Note: set 5 as the number of iterations to be performed when running LPA.

Example output:

|community|count|
|---------|-----|
|        1|    4|
|        3|    2|
|       10|    2|


In [None]:
def communities(graph):
    result = graph.labelPropagation(5).withColumnRenamed('label', 'community').groupBy('community').count()
    return result.select('community', 'count').sort(col('count').desc(), col('community').asc())

In [None]:
# example print

communities(graph).show()

In [None]:
'''communities tests'''
correct = [Row(community=2, count=4),
           Row(community=8, count=3),
           Row(community=10, count=1)]

equalArray(communities(testGraph).collect(), correct)


## Highest Page Rank
`highestPageRank` finds which user has the highest [PageRank](https://en.wikipedia.org/wiki/PageRank).

param `graph`: GraphFrame containing social data.

`return`: id of user with the highest PageRank.

**Set tolerance "tol" as 0.0001 when using the pageRank algorithm.**

In [None]:
def highestPageRank(graph):
    df: DataFrame = graph.pageRank(tol=0.0001).vertices
    maxPageRank = df.groupBy().max('pagerank').first()[0]
    df = df.filter(col('pagerank') == maxPageRank)
    return df.first()[0]

In [None]:
# example print

highestPageRank(graph)

In [None]:
'''highestPageRank tests'''

#graph = createGraph(testFile)
assert highestPageRank(testGraph) == 10, "the highest page rank was expected to be 10 but it was %s" % highestPageRank(testGraph)


In [None]:
spark.catalog.clearCache()
spark.stop()