In [1]:
import findspark
findspark.init()

In [2]:
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *


## Creating GraphFrames
Users can create GraphFrames from vertex and edge DataFrames.

Vertex DataFrame: A vertex DataFrame should contain a special column named "id" which specifies unique IDs for each vertex in the graph.
Edge DataFrame: An edge DataFrame should contain two special columns: "src" (source vertex ID of edge) and "dst" (destination vertex ID of edge).
Both DataFrames can have arbitrary other columns. Those columns can represent vertex and edge attributes.

Create the vertices first:

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Graphframes example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/05 03:34:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/05 03:34:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
vertices = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

In [5]:
vertices.show(3, False)

                                                                                

+---+-------+---+
|id |name   |age|
+---+-------+---+
|a  |Alice  |34 |
|b  |Bob    |36 |
|c  |Charlie|30 |
+---+-------+---+
only showing top 3 rows



And then some edges:

In [6]:
edges = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

In [7]:
edges.show(3, False)

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|a  |b  |friend      |
|b  |c  |follow      |
|c  |b  |follow      |
+---+---+------------+
only showing top 3 rows



Let's create a graph from these vertices and these edges:

In [8]:
g = GraphFrame(vertices, edges)
print(g)

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])




## Basic graph and DataFrame queries
GraphFrames provide several simple graph queries, such as node degree.

Also, since GraphFrames represent graphs as pairs of vertex and edge DataFrames, it is easy to make powerful queries directly on the vertex and edge DataFrames. Those DataFrames are made available as vertices and edges fields in the GraphFrame.



In [9]:
display(g.vertices)

DataFrame[id: string, name: string, age: bigint]

In [10]:
g.vertices.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
|  g|  Gabby| 60|
+---+-------+---+



In [11]:
g.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+



The incoming degree of the vertices:

In [12]:
g.inDegrees.show()



+---+--------+
| id|inDegree|
+---+--------+
|  b|       2|
|  c|       2|
|  f|       1|
|  d|       1|
|  a|       1|
|  e|       1|
+---+--------+



The outgoing degree of the vertices:

In [13]:
g.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|  a|        2|
|  b|        1|
|  c|        1|
|  f|        1|
|  e|        2|
|  d|        1|
+---+---------+



The degree of the vertices:

In [14]:
g.degrees.show()

+---+------+
| id|degree|
+---+------+
|  b|     3|
|  a|     3|
|  c|     3|
|  f|     2|
|  e|     3|
|  d|     2|
+---+------+



You can run queries directly on the vertices DataFrame. For example, we can find the age of the youngest person in the graph:

In [15]:
youngest = g.vertices.groupBy().min("age")
youngest.show()

+--------+
|min(age)|
+--------+
|      29|
+--------+



Likewise, you can run queries on the edges DataFrame. For example, let's count the number of 'follow' relationships in the graph:

In [16]:
numFollows = g.edges.filter("relationship = 'follow'").count()
print("The number of follow edges is", numFollows)

The number of follow edges is 4


## Motif finding
Using motifs you can build more complex relationships involving edges and vertices. The following cell finds the pairs of vertices with edges in both directions between them. The result is a DataFrame, in which the column names are given by the motif keys.

Check out the GraphFrame User Guide for more details on the API.

In [17]:
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|
+----------------+--------------+----------------+--------------+



Since the result is a DataFrame, more complex queries can be built on top of the motif. Let us find all the reciprocal relationships in which one person is older than 30:

In [18]:
filtered = motifs.filter("b.age > 30 or a.age > 30")
filtered.show()

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|
+----------------+--------------+----------------+--------------+



## Stateful queries
Most motif queries are stateless and simple to express, as in the examples above. The next example demonstrates a more complex query that carries state along a path in the motif. Such queries can be expressed by combining GraphFrame motif finding with filters on the result where the filters use sequence operations to operate over DataFrame columns.

For example, suppose you want to identify a chain of 4 vertices with some property defined by a sequence of functions. That is, among chains of 4 vertices a->b->c->d, identify the subset of chains matching this complex filter:

Initialize state on path.
Update state based on vertex a.
Update state based on vertex b.
Etc. for c and d.
If final state matches some condition, then the filter accepts the chain. The below code snippets demonstrate this process, where we identify chains of 4 vertices such that at least 2 of the 3 edges are “friend” relationships. In this example, the state is the current count of “friend” edges; in general, it could be any DataFrame Column.

In [19]:
# Find chains of 4 vertices.
chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
 
# Query on sequence, with state (cnt)
#  (a) Define method for updating state given the next element of the motif.
def cumFriends(cnt, edge):
  relationship = col(edge)["relationship"]
  return when(relationship == "friend", cnt + 1).otherwise(cnt)
 
#  (b) Use sequence operation to apply method to sequence of elements in motif.
#   In this case, the elements are the 3 edges.
edges = ["ab", "bc", "cd"]
numFriends = reduce(cumFriends, edges, lit(0))
    
chainWith2Friends2 = chain4.withColumn("num_friends", numFriends).where(numFriends >= 2)
chainWith2Friends2.show()

+---------------+--------------+---------------+--------------+---------------+--------------+----------------+-----------+
|              a|            ab|              b|            bc|              c|            cd|               d|num_friends|
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+-----------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|          3|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|          3|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|          2|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}|  {d, David, 29}|          3|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}| {e, Esther, 32}|          3|
| {d, Da

## Subgraphs
GraphFrames provides APIs for building subgraphs by filtering on edges and vertices. These filters can be composed together, for example the following subgraph only includes people who are more than 30 years old and have friends who are more than 30 years old.

In [20]:
g2 = g.filterEdges("relationship = 'friend'").filterVertices("age > 30").dropIsolatedVertices()

In [21]:
g2.vertices.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  a| Alice| 34|
|  b|   Bob| 36|
|  e|Esther| 32|
+---+------+---+



In [22]:
g2.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



## Standard graph algorithms
GraphFrames comes with a number of standard graph algorithms built in:

Breadth-first search (BFS)
Connected components
Strongly connected components
Label Propagation Algorithm (LPA)
PageRank (regular and personalized)
Shortest paths
Triangle count

Breadth-first search (BFS)
Search from "Esther" for users of age < 32.

In [23]:
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|
+---------------+--------------+--------------+



The search may also be limited by edge filters and maximum path lengths.

In [24]:
filteredPaths = g.bfs(
  fromExpr = "name = 'Esther'",
  toExpr = "age < 32",
  edgeFilter = "relationship != 'friend'",
  maxPathLength = 3)
filteredPaths.show()

+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+



## Connected components
Compute the connected component membership of each vertex and return a DataFrame with each vertex assigned a component ID. The GraphFrames connected components implementation can take advantage of checkpointing to improve performance.

In [25]:
spark.sparkContext.setCheckpointDir("/tmp/graphframes-example-connected-components")
result = g.connectedComponents()
result.show()

24/03/05 03:35:20 WARN BlockManager: Block rdd_299_0 already exists on this machine; not re-adding it
                                                                                

+---+-------+---+---------+
| id|   name|age|component|
+---+-------+---+---------+
|  a|  Alice| 34|        0|
|  b|    Bob| 36|        0|
|  c|Charlie| 30|        0|
|  d|  David| 29|        0|
|  e| Esther| 32|        0|
|  f|  Fanny| 36|        0|
|  g|  Gabby| 60|        6|
+---+-------+---+---------+



## Strongly connected components
Compute the strongly connected component (SCC) of each vertex and return a DataFrame with each vertex assigned to the SCC containing that vertex.

In [26]:
result = g.stronglyConnectedComponents(maxIter=10)
result.select("id", "component").show()

24/03/05 03:35:33 WARN BlockManager: Block rdd_552_0 already exists on this machine; not re-adding it


+---+---------+
| id|component|
+---+---------+
|  a|        0|
|  b|        1|
|  c|        1|
|  d|        0|
|  e|        0|
|  f|        5|
|  g|        6|
+---+---------+



## Label Propagation
Run static Label Propagation Algorithm for detecting communities in networks.

Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the most frequent community affiliation of incoming messages.

LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community).

In [27]:
result = g.labelPropagation(maxIter=5)
result.show()

+---+-------+---+-----+
| id|   name|age|label|
+---+-------+---+-----+
|  a|  Alice| 34|    2|
|  b|    Bob| 36|    2|
|  c|Charlie| 30|    1|
|  d|  David| 29|    2|
|  e| Esther| 32|    5|
|  f|  Fanny| 36|    2|
|  g|  Gabby| 60|    6|
+---+-------+---+-----+



## PageRank
Identify important vertices in a graph based on connections.

In [28]:
results = g.pageRank(resetProbability=0.15, tol=0.01)
results.vertices.show()


+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  a|  Alice| 34|0.44910633706538744|
|  b|    Bob| 36|  2.655507832863289|
|  c|Charlie| 30| 2.6878300011606218|
|  d|  David| 29| 0.3283606792049851|
|  e| Esther| 32|0.37085233187676075|
|  f|  Fanny| 36| 0.3283606792049851|
|  g|  Gabby| 60| 0.1799821386239711|
+---+-------+---+-------------------+



In [29]:
results.edges.show()

+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
|  a|  b|      friend|   0.5|
|  a|  e|      friend|   0.5|
|  b|  c|      follow|   1.0|
|  c|  b|      follow|   1.0|
|  d|  a|      friend|   1.0|
|  e|  d|      friend|   0.5|
|  e|  f|      follow|   0.5|
|  f|  c|      follow|   1.0|
+---+---+------------+------+



In [30]:
# Run PageRank for a fixed number of iterations.
g.pageRank(resetProbability=0.15, maxIter=10)

GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])

In [31]:
# Run PageRank personalized for vertex "a"
g.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")

GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])

## Shortest paths
Computes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID.

In [32]:
results = g.shortestPaths(landmarks=["a", "d"])
results.show()

+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  a|  Alice| 34|{d -> 2, a -> 0}|
|  b|    Bob| 36|              {}|
|  c|Charlie| 30|              {}|
|  d|  David| 29|{d -> 0, a -> 1}|
|  e| Esther| 32|{d -> 1, a -> 2}|
|  f|  Fanny| 36|              {}|
|  g|  Gabby| 60|              {}|
+---+-------+---+----------------+



## Triangle count
Computes the number of triangles passing through each vertex.

In [33]:
results = g.triangleCount()
results.show()

+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  f|  Fanny| 36|
|    0|  g|  Gabby| 60|
+-----+---+-------+---+

