![image](https://user-images.githubusercontent.com/1651790/221876073-61ef4edb-adcd-4f10-b3fc-8ddc24918ea1.png)

In [None]:
# install ng_ai in the first run
%pip install ng_ai

## AI Suite Spark Engine Examples
### read data with spark engine, scan mode

In this example, we are leveraging the Spark Engine of NebulaGraph AI Suite, with the Storage Scan mode.

#### Step 1, get dataframe by scanning the Graph

We will scan all edge in type `follow` first as dataframe: `df`

In [1]:
from ng_ai import NebulaReader

# read data with spark engine, scan mode
reader = NebulaReader(engine="spark")
reader.scan(edge="follow", props="degree")
df = reader.read()
df.show(2)


[Stage 0:>                                                          (0 + 1) / 1]

+---------+---------+-----+------+
|   _srcId|   _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100|    0|    70|
|player105|player104|    0|    83|
+---------+---------+-----+------+
only showing top 2 rows




                                                                                

#### Step 2, run Pagerank Algorithm

In [2]:
pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10)

+---------+
|       id|
+---------+
|player108|
|player129|
|player120|
|player103|
|player128|
|player148|
|player117|
|player139|
|player140|
|player134|
|player149|
|player150|
|player125|
|player137|
|player143|
|player101|
|player141|
|player144|
|player102|
|player121|
+---------+
only showing top 20 rows

+-----+------+-----------+----------+
|_rank|degree|     _srcId|    _dstId|
+-----+------+-----------+----------+
|    0|    90|34359738371|         3|
|    0|    90|25769803786|         6|
|    0|    90|34359738369|         6|
|    0|    80| 8589934596|         2|
|    0|    99|25769803784|         2|
|    0|    90|25769803777|         2|
|    0|    90|          1|         4|
|    0|    90|17179869189|         4|
|    0|    90|          4|         1|
|    0|    10| 8589934598|         1|
|    0|    90|17179869189|         1|
|    0|    80| 8589934598|         5|
|    0|    85|25769803786|         5|
|    0|    70|34359738373|         5|
|    0|    95|17179869185|8589934597|
| 

23/03/01 13:17:34 WARN BlockManager: Block rdd_75_2 already exists on this machine; not re-adding it
23/03/01 13:17:34 WARN BlockManager: Block rdd_75_3 already exists on this machine; not re-adding it
23/03/01 13:17:34 WARN BlockManager: Block rdd_75_1 already exists on this machine; not re-adding it


#### Step 3, check results of the algorithm


In [3]:
pr_result.show(5)

+---------+-------------------+
|      _id|           pagerank|
+---------+-------------------+
|player133|0.18601069183310504|
|player126|0.18601069183310504|
|player130|  1.240071278887367|
|player108|0.18601069183310504|
|player102| 1.6602373739502536|
+---------+-------------------+
only showing top 5 rows



------------------
### read data with spark engine, query mode

In this example, we are leveraging the Spark Engine of NebulaGraph DI Suite, with the Graph Query mode.

#### Step 1, get dataframe by querying the Graph with a Cypher

We will query 100000 edges in type `follow` as a dataframe: `df`

In [4]:
from ng_ai import NebulaReader

# read data with spark engine, query mode
reader = NebulaReader(engine="spark")
query = """
    MATCH ()-[e:follow]->()
    RETURN e LIMIT 100000
"""
reader.query(query=query, edge="follow", props="degree")
df = reader.read()  # this will take some time
df.show(2)

+---------+---------+-----+------+
|   _srcId|   _dstId|_rank|degree|
+---------+---------+-----+------+
|player102|player100|    0|    75|
|player102|player101|    0|    75|
+---------+---------+-----+------+
only showing top 2 rows



#### Step 2, run Conncted Components Algorithm

In [5]:
cc_result = df.algo.connected_components(max_iter=10)

+---------+
|       id|
+---------+
|player129|
|player120|
|player148|
|player103|
|player128|
|player108|
|player117|
|player150|
|player125|
|player137|
|player139|
|player140|
|player134|
|player149|
|player102|
|player135|
|player147|
|player121|
|player143|
|player101|
+---------+
only showing top 20 rows

+-----+------+-----------+----------+
|_rank|degree|     _srcId|    _dstId|
+-----+------+-----------+----------+
|    0|    90|34359738368|         3|
|    0|    90|25769803781|         6|
|    0|    90|34359738371|         6|
|    0|    80| 8589934592|         1|
|    0|    99|25769803779|         1|
|    0|    90|25769803784|         1|
|    0|    90|          0|         4|
|    0|    90|17179869187|         4|
|    0|    90|          4|         0|
|    0|    10| 8589934594|         0|
|    0|    90|17179869187|         0|
|    0|    80| 8589934594|         2|
|    0|    85|25769803781|         2|
|    0|    70|34359738370|         2|
|    0|    95|17179869189|8589934593|
| 

#### Step 3, check results of the algorithm


In [6]:
cc_result.show(5)

+---------+---------+
|      _id|       cc|
+---------+---------+
|player115|player129|
|player113|player129|
|player100|player129|
|player129|player129|
|player137|player129|
+---------+---------+
only showing top 5 rows



### Write back algo result to NebulaGraph as TAG

Assume that we have a Spark DataFrame `df_result` computed with `df.algo.label_propagation()` with the following schema:

```python
df_result.printSchema()
# result:
root
 |-- _id: string (nullable = false)
 |-- lpa: string (nullable = false)
```

There are two columns, `_id` is the vid, and `lpa` is the label propagation detected cluster id, let's write them back to tag: label_propagation(cluster_id). So we create a TAG `label_propagation` in NebulaGraph on same space with the following schema:

```ngql
CREATE TAG IF NOT EXISTS label_propagation (
    cluster_id string NOT NULL
);
```

Then, we could write the label propagation result to NebulaGraph, map the column `lpa` to `cluster_id`:
```python
properties = {
    "lpa": "cluster_id"
}
```
And pass it to NebulaWriter in `spark` engine and `nebulagraph_vertex` sink

In [7]:
# Run label Propagation Algorithm
df_result = df.algo.label_propagation()

+---------+
|       id|
+---------+
|player129|
|player120|
|player148|
|player103|
|player128|
|player108|
|player117|
|player150|
|player125|
|player137|
|player139|
|player140|
|player134|
|player149|
|player102|
|player135|
|player147|
|player121|
|player143|
|player101|
+---------+
only showing top 20 rows

+-----+------+----------+-----------+
|_rank|degree|    _srcId|     _dstId|
+-----+------+----------+-----------+
|    0|    70|         3|17179869184|
|    0|    80|         5|25769803785|
|    0|    80|         5|17179869189|
|    0|    80|         1|25769803784|
|    0|    90|         4|25769803778|
|    0|    90|         4|17179869187|
|    0|    90|         4|          0|
|    0|    90|         0|25769803778|
|    0|    90|         0|17179869187|
|    0|    90|         0|          4|
|    0|    80|         2|34359738370|
|    0|    90|         2|25769803781|
|    0|    85|         2| 8589934594|
|    0|    90|8589934593|25769803785|
|    0|    -1|8589934597|17179869187|
| 

23/03/01 13:18:01 WARN CacheManager: Asked to cache already cached data.
23/03/01 13:18:01 WARN CacheManager: Asked to cache already cached data.


In [8]:
# check the result schema
df_result.printSchema()

root
 |-- _id: string (nullable = false)
 |-- lpa: string (nullable = false)



In [9]:
from ng_ai import NebulaWriter
from ng_ai.config import NebulaGraphConfig

config = NebulaGraphConfig()
writer = NebulaWriter(
    data=df_result, sink="nebulagraph_vertex", config=config, engine="spark"
)

# map column louvain into property cluster_id
properties = {"lpa": "cluster_id"}

writer.set_options(
    space="basketballplayer",
    tag="label_propagation",
    vid_field="_id",
    properties=properties,
    batch_size=256,
    write_mode="insert",
)
# write back to NebulaGraph
writer.write()

Then we could query the result in NebulaGraph:

```cypher
MATCH (v:label_propagation)
RETURN id(v), v.label_propagation.cluster_id LIMIT 10;
```
Result:

```cypher
(root@nebula) [basketballplayer]> """
                               -> MATCH (v:label_propagation)
                               -> RETURN id(v), v.label_propagation.cluster_id LIMIT 10;
                               -> """
+-------------+--------------------------------+
| id(v)       | v.label_propagation.cluster_id |
+-------------+--------------------------------+
| "player103" | "player101"                    |
| "player113" | "player129"                    |
| "player121" | "player129"                    |
| "player128" | "player129"                    |
| "player130" | "player130"                    |
| "player136" | "player136"                    |
| "player127" | "player137"                    |
| "player135" | "player101"                    |
| "player147" | "player148"                    |
| "player148" | "player148"                    |
+-------------+--------------------------------+
```

### Result being written as edge

Similar to TAG, we first need to ensure to create schema first

```
CREATE EDGE jaccard_similarity(similarity double);
```

Then we run a algorithm writting results to edge:

In [None]:
# Run Jaccard Algorithm
df_result = df.algo.jaccard()

Then let's write the result to NebulaGraph, map the column `similarity` to `similarity`:

In [None]:
writer = NebulaWriter(
    data=df_result, sink="nebulagraph_vertex", config=config, engine="spark"
)

# map column louvain into property cluster_id
properties = {"similarity": "similarity"}

writer.set_options(
    space="basketballplayer",
    type="edge",
    edge_type="jaccard_similarity",
    src_id="srcId",
    dst_id="dstId",
    src_id_policy="",
    dst_id_policy="",
    properties=properties,
    batch_size=256,
    write_mode="insert",
)

# write back to NebulaGraph
writer.write()

Check result:

```
(root@nebula) [basketballplayer]> MATCH ()-[e:jaccard_similarity]->() RETURN e LIMIT 3
+-------------------------------------------------------------------------------------+
| e                                                                                   |
+-------------------------------------------------------------------------------------+
| [:jaccard_similarity "player102"->"player100" @0 {similarity: 0.07692307692307687}] |
| [:jaccard_similarity "player102"->"player101" @0 {similarity: 0.11111111111111116}] |
| [:jaccard_similarity "player102"->"player104" @0 {similarity: 0.33333333333333326}] |
+-------------------------------------------------------------------------------------+
Got 3 rows (time spent 39.984ms/44.574542ms)

Wed, 06 Sep 2023 13:04:38 CST

(root@nebula) [basketballplayer]>
```

## How to run other algorithm examples

In [None]:
# lpa_result  = df.algo.label_propagation()
# louvain_result = df.algo.louvain()
# k_core_result = df.algo.k_core()
# degree_statics_result = df.algo.degree_statics()
# betweenness_centrality_result = df.algo.betweenness_centrality()
# clustering_coefficient_result = df.algo.clustering_coefficient()
# bfs_result = df.algo.bfs()
# hanp_result = df.algo.hanp()
# jaccard_result = df.algo.jaccard()
# strong_connected_components_result = df.algo.strong_connected_components()
# triangle_count_result = df.algo.triangle_count()