In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
jspark = spark._jsparkSession

from py4j.java_gateway import java_import
# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")

# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")

# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")


In [3]:
# option 0: read graph data for one type of edge, scan from all storaged instances
df = spark.read.format(
  "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "edge").option(
    "spaceName", "basketballplayer").option(
    "label", "follow").option(
    "returnCols", "degree").option(
    "metaAddress", "metad0:9559").option(
    "partitionNumber", 1).load()

# option 1: read graph data with ngql, get data from graphd
df = spark.read.format(
  "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "edge").option(
    "spaceName", "basketballplayer").option(
    "label", "follow").option(
    "returnCols", "degree").option(
    "metaAddress", "metad0:9559").option(
    "graphAddress", "graphd:9669").option(
    "ngql", "MATCH ()-[e:follow]->() return e LIMIT 1000").option(
    "partitionNumber", 1).load()

In [None]:
# prConfig = PRConfig(3, 0.85)
prConfig = spark._jvm.PRConfig(3, 0.85)

In [None]:
# prResult = PageRankAlgo.apply(spark, df, prConfig, False)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, prConfig, False)

# We will fail in this step as our graph vertex ID is not numeric
# let's convert vertex ID into number first:

In [4]:
from pyspark.sql.functions import dense_rank, col
from pyspark.sql.window import Window


In [5]:
def convert_string_id_to_long_id(df):
    src_id_df = df.select("_srcId").withColumnRenamed("_srcId", "id")
    dst_id_df = df.select("_dstId").withColumnRenamed("_dstId", "id")
    id_df = src_id_df.union(dst_id_df).distinct()
    encode_id = id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
    encode_id.write.option("header", True).csv("file:///tmp/encodeId.csv")
    src_join_df = df.join(encode_id, df._srcId == encode_id.id)\
        .drop("_srcId")\
        .drop("id")\
        .withColumnRenamed("encodedId", "_srcId")
    dst_join_df = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id)\
        .drop("_dstId")\
        .drop("id")\
        .drop("_rank")\
        .drop("degree")\
        .withColumnRenamed("encodedId", "_dstId")
    
    return dst_join_df

In [6]:
df_int = convert_string_id_to_long_id(df)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)

# pageren result, but this is in vid type
prResult.show()


23/02/13 08:48:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 08:48:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 08:48:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 08:48:25 WARN BlockManager: Block rdd_38_0 already exists on this machine; not re-adding it


+---+------------------+
|_id|          pagerank|
+---+------------------+
| 20|1.0467769505659303|
| 19| 1.107604367969783|
| 13|0.9985417030429499|
| 41| 0.907496696364626|
| 39|0.9310679357433543|
|  2|1.1995083922141179|
| 21|1.1241377422857828|
|  4| 0.922979999502629|
| 15|1.0374131525148615|
| 16| 1.173844971046562|
| 34|0.9416714112085575|
| 25| 1.020692343026497|
| 22|0.8661097076682367|
| 28|0.8661097076682367|
| 29|0.8661097076682367|
| 11|0.8661097076682367|
| 14|1.2091869945122087|
| 32| 1.009041691134994|
| 35| 0.922979999502629|
| 36| 1.203961967699571|
+---+------------------+
only showing top 20 rows



In [7]:
# this is the mapping of the vid and the encodedId
mapping = spark.read.option("header", True).csv("file:///tmp/encodeId.csv")

mapping.show()

+---------+---------+
|       id|encodedId|
+---------+---------+
|player100|        1|
|player101|        2|
|player102|        3|
|player103|        4|
|player104|        5|
|player105|        6|
|player106|        7|
|player107|        8|
|player108|        9|
|player109|       10|
|player113|       11|
|player114|       12|
|player115|       13|
|player116|       14|
|player117|       15|
|player118|       16|
|player119|       17|
|player120|       18|
|player121|       19|
|player124|       20|
+---------+---------+
only showing top 20 rows

