# GraphFrames User Guide (Python)
##### This notebook demonstrates examples from the <a href="https://graphframes.github.io/graphframes/docs/_site/user-guide.html">GraphFrames User Guide</a>.

##### The GraphFrames package is available from <a href="https://spark-packages.org/package/graphframes/graphframes">Spark Packages</a>.

In [2]:
from graphframes import *

In [3]:
vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50), 
                                  ('2', 'May', 'Derrick', 26),
                                 ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks', 'Mike', 93),
                                 ('98', 'Berg', 'Tim', 28),
                                 ('99', 'Page', 'Allan', 16)],
                                 ['id', 'name', 'firstname', 'age'])
edges = spark.createDataFrame([('1', '2', 'friend'), 
                               ('2', '1', 'friend'),
                              ('3', '1', 'friend'),
                              ('1', '3', 'friend'),
                               ('2', '3', 'follows'),
                               ('3', '4', 'friend'),
                               ('4', '3', 'friend'),
                               ('5', '3', 'friend'),
                               ('3', '5', 'friend'),
                               ('4', '5', 'follows'),
                              ('98', '99', 'friend'),
                              ('99', '98', 'friend')],
                              ['src', 'dst', 'type'])
g = GraphFrame(vertices, edges)
## Take a look at the DataFrames
g.vertices.show(20)
g.edges.show(20)
## Check the number of edges of each vertex
g.degrees.show(20)

## Directed vs undirected edges
Undirected graphs have edges that do not have a direction. The edges indicate a two-way relationship, in that each edge can be traversed in both directions. If your DataFrame only consist of two-way directed edges, you may be interested in analyzing undirected edges. You can convert your graph by mapping a function over the edges DataFrame that deletes the row if src ≥ dst (or the other way around). In GraphX you could use to_undirected() to create a deep, undirected copy of the Graph, unfortunately GraphFrames does not support this functionality.

In [5]:
copy = edges
from pyspark.sql.functions import udf
@udf("string")
def to_undir(src, dst):
    if src >= dst:
        return 'Delete'
    else : 
        return 'Keep'
copy.withColumn('undir', to_undir(copy.src, copy.dst))\
.filter('undir == "Keep"').drop('undir').show()

In [6]:
copy = edges
from pyspark.sql.functions import udf
@udf("string")
def to_undir(src, dst):
    if src <= dst:
        return 'Delete'
    else : 
        return 'Keep'
copy.withColumn('undir', to_undir(copy.src, copy.dst))\
.filter('undir == "Keep"').drop('undir').show()

In [7]:
g.inDegrees.show()

In [8]:
g.outDegrees.show()

## Filtering and connected components
A GraphFrame itself can’t be filtered, but DataFrames deducted from a Graph can. Consequently, the filter-function (or any other function) can be used just as you would use it with DataFrames. The only trap-hole might be the correct use of quotation marks: the whole condition should be quoted. The examples below should clarify this.

In [10]:
g.vertices.filter("age > 30").show()

In [11]:
g.inDegrees.filter("inDegree >= 2").sort("inDegree", ascending=False).show()

In [12]:
g.edges.filter('type == "friend"').show()

## Motif finding
Finding motifs helps to execute queries to discover structural patterns in graphs. Network motifs are patterns that occur repeatedly in the graph and represent the relationships between the vertices. GraphFrames motif finding uses a declarative Domain Specific Language (DSL) for expressing structural queries.

The query can be invoked by using the find-function, where the motif (in quotation marks) is expressed as the first parameter of the function.

The following example will search for pairs of vertices a,b connected by edge e and pairs of vertices b,c connected by edge e2. It will return a DataFrame of all such structures in the graph, with columns for each of the named elements (vertices or edges) in the motif.

In [14]:
g.find("(a)-[e]->(b); (b)-[e2]->(a)").show()

In [15]:
t = g.find("(a)-[]->(b); (b)-[]->(a)")
display(t)

a,b
"List(98, Berg, Tim, 28)","List(99, Page, Allan, 16)"
"List(2, May, Derrick, 26)","List(1, Carter, Derrick, 50)"
"List(99, Page, Allan, 16)","List(98, Berg, Tim, 28)"
"List(3, Mills, Jeff, 80)","List(5, Banks, Mike, 93)"
"List(1, Carter, Derrick, 50)","List(3, Mills, Jeff, 80)"
"List(3, Mills, Jeff, 80)","List(1, Carter, Derrick, 50)"
"List(5, Banks, Mike, 93)","List(3, Mills, Jeff, 80)"
"List(4, Hood, Robert, 65)","List(3, Mills, Jeff, 80)"
"List(1, Carter, Derrick, 50)","List(2, May, Derrick, 26)"
"List(3, Mills, Jeff, 80)","List(4, Hood, Robert, 65)"


In [16]:
t.printSchema()

In [17]:
t = t \
  .withColumn("src", t.a.id) \
  .withColumn("dst", t.b.id) \
  .select("src", "dst")

t = g.edges.join(t, on = ["src", "dst"] , how = "right")

#will get you all the bidirectional edges
t.show()

In [18]:
f = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(b); (b)-[]->(a)").dropDuplicates()
display(f)

a,b,c
"List(4, Hood, Robert, 65)","List(3, Mills, Jeff, 80)","List(4, Hood, Robert, 65)"
"List(3, Mills, Jeff, 80)","List(1, Carter, Derrick, 50)","List(2, May, Derrick, 26)"
"List(5, Banks, Mike, 93)","List(3, Mills, Jeff, 80)","List(1, Carter, Derrick, 50)"
"List(4, Hood, Robert, 65)","List(3, Mills, Jeff, 80)","List(1, Carter, Derrick, 50)"
"List(3, Mills, Jeff, 80)","List(1, Carter, Derrick, 50)","List(3, Mills, Jeff, 80)"
"List(5, Banks, Mike, 93)","List(3, Mills, Jeff, 80)","List(5, Banks, Mike, 93)"
"List(5, Banks, Mike, 93)","List(3, Mills, Jeff, 80)","List(4, Hood, Robert, 65)"
"List(1, Carter, Derrick, 50)","List(2, May, Derrick, 26)","List(1, Carter, Derrick, 50)"
"List(1, Carter, Derrick, 50)","List(3, Mills, Jeff, 80)","List(4, Hood, Robert, 65)"
"List(2, May, Derrick, 26)","List(1, Carter, Derrick, 50)","List(3, Mills, Jeff, 80)"


In [19]:
f.filter("b.age > 30 or a.age > 30").show()

## TriangleCount and PageRank
To finish up, we’ll discover two additional built-in algorithms. TriangleCount counts the number of triangles passing through each vertex in this graph. A triangle can be defined as a group of three vertices that is interrelated, i.e. a has an edge to b, b has an edge to c, and c has an edge to a. The example below shows a graph with two triangles

In the GraphFrames package you can count the number of triangles passing through each vertex by invoking the triangleCount-function. Note that our simple example has only two triangles in total. Triangles are used for various tasks for real‐life networks, including community discovery, link prediction, and spam filtering.

In [22]:
g.triangleCount().show()

PageRank works by counting the number and quality of links to a page to determine a rough estimate of how important the website is. The underlying assumption is that more important websites are likely to receive more links from other websites.

The PageRank algorithm holds that an imaginary surfer who is randomly clicking on links will eventually stop clicking. The probability, at any step, that the person will continue is a damping factor. The damping factor can be be set by changing the resetProbability parameter. Other important parameters are the tolerance (tol) and the maximum number of iterations (maxIter).

In [24]:
pr = g.pageRank(resetProbability=0.15, tol=0.01)

look at the pagerank score for every vertex

In [26]:
pr.vertices.show()

look at the weight of every edge

In [28]:
pr.edges.show()

## Subgraphs
GraphFrames provides APIs for building subgraphs by filtering on edges and vertices. These filters can be composed together, for example the following subgraph only includes people who are more than 30 years old and have friends who are more than 30 years old.

In [30]:

g2 = g.filterEdges("type = 'friend'").filterVertices("age > 30").dropIsolatedVertices()

In [31]:
display(g2.vertices)

id,name,firstname,age
3,Mills,Jeff,80
5,Banks,Mike,93
1,Carter,Derrick,50
4,Hood,Robert,65


In [32]:
display(g2.edges)

src,dst,type
5,3,friend
1,3,friend
4,3,friend
3,5,friend
3,1,friend
3,4,friend


## Standard graph algorithms
GraphFrames comes with a number of standard graph algorithms built in:

* Breadth-first search (BFS)
* Connected components
* Strongly connected components
* Label Propagation Algorithm (LPA)
* PageRank (regular and personalized)
* Shortest paths
* Triangle count

#### Breadth-first search (BFS)
Search from "Esther" for users of age < 32.

In [35]:
paths = g.bfs("firstname = 'Derrick'", "age < 32")
display(paths)

from,to
"List(2, May, Derrick, 26)","List(2, May, Derrick, 26)"


#### Connected components
Compute the connected component membership of each vertex and return a DataFrame with each vertex assigned a component ID. The GraphFrames connected components implementation can take advantage of checkpointing to improve performance.

In [37]:
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
result = g.connectedComponents()
display(result)

id,name,firstname,age,component
1,Carter,Derrick,50,154618822656
2,May,Derrick,26,154618822656
3,Mills,Jeff,80,154618822656
4,Hood,Robert,65,154618822656
5,Banks,Mike,93,154618822656
98,Berg,Tim,28,317827579904
99,Page,Allan,16,317827579904


#### Shortest paths
Computes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID.

In [39]:
results = g.shortestPaths(landmarks=["1", "5"])
display(results)

id,name,firstname,age,distances
1,Carter,Derrick,50,"Map(1 -> 0, 5 -> 2)"
3,Mills,Jeff,80,"Map(5 -> 1, 1 -> 1)"
2,May,Derrick,26,"Map(1 -> 1, 5 -> 2)"
4,Hood,Robert,65,"Map(5 -> 1, 1 -> 2)"
98,Berg,Tim,28,Map()
5,Banks,Mike,93,"Map(5 -> 0, 1 -> 2)"
99,Page,Allan,16,Map()
