#####pyspark使用GraphFrames实现pagerank

In [1]:
#load graphframes package to spark
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages graphframes:graphframes:0.2.0-spark2.0-s_2.11 pyspark-shell"
)

In [3]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

# 使用本地spark
sc = SparkContext('local', 'pyspark')

# 建立spark会话
spark = SparkSession.builder \
    .master('local') \
    .appName('spark_mllib') \
    .config('spark.sql.warehouse.dir', 'file:///F:/workspace/work/project/spark/spark-warehouse') \
    .getOrCreate()

In [4]:
# Create a Vertex DataFrame with unique ID column "id"
v = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
], ["id", "name", "age"])
v.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
+---+-------+---+



In [5]:
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
], ["src", "dst", "relationship"])
e.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
+---+---+------------+



In [6]:
# Create a GraphFrame
from graphframes import *

g = GraphFrame(v, e)

In [7]:
# Query: Get in-degree of each vertex.
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+



In [8]:
# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

2

In [9]:
# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.05, maxIter=10)
results.vertices.select("id", "pagerank").show()

+---+------------------+
| id|          pagerank|
+---+------------------+
|  a|              0.05|
|  b| 0.626687039889458|
|  c|0.6169126832811621|
+---+------------------+

