# Analytical/OLAP Graph examples for Cosmos GremlinV1 in Synapse Spark using GraphFrames library
Code is in Scala for ease of configuration with user jars

## Pre-requirements:
- Create Cosmos Gremlin API account, load Sample Graph data : 
```
g.addV('person').property('id', 'a').property('name', 'Alice').property('age', 34)
g.addV('person').property('id', 'c').property('name', 'Charlie').property('age', 30)
g.addV('person').property('id', 'd').property('name', 'David').property('age', 29)
g.addV('person').property('id', 'e').property('name', 'Esther').property('age', 32)
g.addV('person').property('id', 'f').property('name', 'Fanny').property('age', 36)
g.addV('person').property('id', 'g').property('name', 'Gab').property('age', 60)


g.V().hasLabel('person').has('id', 'a').addE('friend').to(g.V().hasLabel('person').has('id', 'g'))
g.V().hasLabel('person').has('id', 'g').addE('follow').to(g.V().hasLabel('person').has('id', 'c'))
g.V().hasLabel('person').has('id', 'c').addE('follow').to(g.V().hasLabel('person').has('id', 'g'))
g.V().hasLabel('person').has('id', 'f').addE('follow').to(g.V().hasLabel('person').has('id', 'c'))
g.V().hasLabel('person').has('id', 'e').addE('follow').to(g.V().hasLabel('person').has('id', 'f'))
g.V().hasLabel('person').has('id', 'e').addE('friend').to(g.V().hasLabel('person').has('id', 'd'))
g.V().hasLabel('person').has('id', 'd').addE('friend').to(g.V().hasLabel('person').has('id', 'a'))
g.V().hasLabel('person').has('id', 'a').addE('friend').to(g.V().hasLabel('person').has('id', 'e'))
g.V().hasLabel('person').has('id', 'e').addE('friend').to(g.V().hasLabel('person').has('id', 'a'))
g.V().hasLabel('person').has('id', 'e').addE('friend').to(g.V().hasLabel('person').has('id', 'c'))
g.V().hasLabel('person').has('id', 'e').addE('friend').to(g.V().hasLabel('person').has('id', 'g'))
g.V().hasLabel('person').has('id', 'e').addE('friend').to(g.V().hasLabel('person').has('id', 'f'))
g.V().hasLabel('person').has('id', 'a').addE('follow').to(g.V().hasLabel('person').has('id', 'c'))
g.V().hasLabel('person').has('id', 'd').addE('follow').to(g.V().hasLabel('person').has('id', 'c'))
g.V().hasLabel('person').has('id', 'e').addE('follow').to(g.V().hasLabel('person').has('id', 'c'))


g.addV('person').property('id', 'b').property('name', 'Sam').property('age', 53)
g.addV('person').property('id', 'h').property('name', 'Samuel').property('age', 54)
g.addV('person').property('id', 'i').property('name', 'Sammmy').property('age', 55)

g.V().hasLabel('person').has('id', 'b').addE('follow').to(g.V().hasLabel('person').has('id', 'h'))
g.V().hasLabel('person').has('id', 'b').addE('friend').to(g.V().hasLabel('person').has('id', 'h'))
g.V().hasLabel('person').has('id', 'i').addE('friend').to(g.V().hasLabel('person').has('id', 'b'))
g.V().hasLabel('person').has('id', 'h').addE('follow').to(g.V().hasLabel('person').has('id', 'i'))
g.V().hasLabel('person').has('id', 'b').addE('follow').to(g.V().hasLabel('person').has('id', 'a'))

```
- Create Synapse Linksed service using Cosmos Gremlin API SDK URL/key to be able to access Cosmso Gremlin data via Synapse Linked Service using  ``spark.read.format("cosmos.oltp")"``.

### Configuration
Configure magic command can be used to access user jars.  Here we are accessing a GraphFrame jar that has been uploaded to blob storage.  

NOTE: user must be assigned **Storage Blob Data Contributor** role for the storage account to be able to access file (otherwise this command will fail).

Also make sure that Spark and Scala versions for the jar and your spark pool match.

Update ``<CosmosGremlinLinkedAccount>`` and ``<CosmoGremlinGraphName>``

In [1]:
%%configure -f
{
    "jars": [
        "abfss://sourcedata@cosmossynapseadls.dfs.core.windows.net/graphframes-0.8.0-spark2.4-s_2.11.jar"
    ]
}

StatementMeta(sparkdemo, 81, 3, Finished, Available)



### Reading data from Azure Cosmos DB analytical store to Spark DataFrame


In [25]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
import org.graphframes.GraphFrame


val df_olap = spark.read.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<CosmosGremlinLinkedAccount>").
    option("spark.cosmos.container", "<CosmoGremlinGraphName>").
    load()

//display first 10 entries
//display(df_olap) 

var vertices = df_olap.filter($"_sink".isNull).select($"id", $"name",$"age".getItem(0).getItem("_value").as("age"))

//display(vertices)

val df_edges = (df_olap.filter($"_sink".isNotNull)
    .drop("_isEdge"))

var edges = df_edges.select("_vertexId", "_sink", "label")
edges = edges.withColumnRenamed("_vertexId", "src")
edges = edges.withColumnRenamed("_sink", "dst")
edges = edges.withColumnRenamed("label", "relationship")

//display(edges)


//val graph = GraphFrame(vertices, edges)


StatementMeta(sparkdemo, 81, 27, Finished, Available)

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
import org.graphframes.GraphFrame
df_olap: org.apache.spark.sql.DataFrame = [_attachments: string, _etag: string ... 13 more fields]
vertices: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
df_edges: org.apache.spark.sql.DataFrame = [_attachments: string, _etag: string ... 12 more fields]
edges: org.apache.spark.sql.DataFrame = [_vertexId: string, _sink: string ... 1 more field]
edges: org.apache.spark.sql.DataFrame = [src: string, _sink: string ... 1 more field]
edges: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]
edges: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]


In [26]:
display(vertices)

StatementMeta(sparkdemo, 81, 28, Finished, Available)

SynapseWidget(Synapse.DataFrame, eeb76ac4-107e-4425-b6e5-ecb73d243e15)




In [27]:
display(edges)

StatementMeta(sparkdemo, 81, 29, Finished, Available)

SynapseWidget(Synapse.DataFrame, fa5c5f97-51bb-4dbb-85a9-d767d5cd17d9)




### Converting from dataframe to graphframe


In [28]:
import org.graphframes._
import org.graphframes.GraphFrame

val graph = GraphFrame(vertices, edges)


StatementMeta(sparkdemo, 81, 30, Finished, Available)

import org.graphframes._
import org.graphframes.GraphFrame
graph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


### Counting vertices and edges in dataframe and graphframe


In [29]:
var vert_count = graph.vertices.count()
var edge_count = graph.edges.count()
println()

// Sum of vertices and edges should be number of rows in initial dataframe
println(df_olap.count())
println(vert_count + edge_count)


StatementMeta(sparkdemo, 81, 31, Finished, Available)

vert_count: Long = 9
edge_count: Long = 20

29
29


## Performing analysis/queries with graphframes


In [30]:
import org.apache.spark.sql.DataFrame

// Find the youngest user's age in the graph.
// This queries the vertex DataFrame.
graph.vertices.groupBy().min("age").show()

// Count the number of "follows" in the graph.
// This queries the edge DataFrame.
val numFollows = graph.edges.filter("relationship = 'follow'").count()

println("")

// Get a DataFrame with columns "id" and "inDeg" (in-degree)
graph.inDegrees.show()


StatementMeta(sparkdemo, 81, 32, Finished, Available)

import org.apache.spark.sql.DataFrame
+--------+
|min(age)|
+--------+
|      29|
+--------+

numFollows: Long = 10

+---+--------+
| id|inDegree|
+---+--------+
|  g|       3|
|  f|       2|
|  e|       1|
|  h|       2|
|  d|       1|
|  c|       6|
|  i|       1|
|  b|       1|
|  a|       3|
+---+--------+



## Example of Comunity Detection Algo using labelpropagation

In [31]:
import org.apache.spark.sql.DataFrame
// Label Propagation Algo to detect comunities of Vertices based on thier connections
// See also https://graphframes.github.io/graphframes/docs/_site/user-guide.html

val result = graph.labelPropagation.maxIter(5).run()
result.select("id", "name", "label").show()


StatementMeta(sparkdemo, 81, 33, Finished, Available)

import org.apache.spark.sql.DataFrame
result: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]
+---+-------+-------------+
| id|   name|        label|
+---+-------+-------------+
|  g|    Gab| 670014898176|
|  c|Charlie| 670014898176|
|  i| Sammmy| 704374636544|
|  f|  Fanny| 670014898176|
|  b|    Sam| 704374636544|
|  a|  Alice| 670014898176|
|  e| Esther| 670014898176|
|  h| Samuel|1382979469312|
|  d|  David| 670014898176|
+---+-------+-------------+



## Example of PageRank Algo using pagerank


In [53]:
// PageRank - Centrality Algo used to find Importance of connected Vertices ( higher Ranking Score is higher importance) 
// Personalized PageRank - bias Ranking of connected vertices toward particular Vertice(s)
// edges weighted based on the importance of the Vertices (or can have a custom weights assigned to boost certain relationships/connections)

// Run PageRank on graph and display results
val results = graph.pageRank.resetProbability(0.15).tol(0.01).run()

//Personalized PageRank (biased toward sourceID)
val resultsP1 = graph.pageRank.resetProbability(0.15).maxIter(5).sourceId("a").run()

//Personalized PageRank (biased toward array of sourceIDs)
val resultsPm = graph.parallelPersonalizedPageRank.resetProbability(0.15).maxIter(5).sourceIds(Array("a", "b", "c")).run()

StatementMeta(sparkdemo, 81, 55, Finished, Available)

results: org.graphframes.GraphFrame = GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])
resultsP1: org.graphframes.GraphFrame = GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])
resultsPm: org.graphframes.GraphFrame = GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])


In [54]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

display(results.vertices.select("id", "name" ,"pagerank").orderBy($"pagerank".desc))


StatementMeta(sparkdemo, 81, 56, Finished, Available)

import org.apache.spark.sql._
import org.apache.spark.sql.functions._


SynapseWidget(Synapse.DataFrame, fc030c0e-25cd-4a77-989f-231362328946)




In [55]:
display(resultsP1.vertices.select("id", "name" ,"pagerank").orderBy($"pagerank".desc))

StatementMeta(sparkdemo, 81, 57, Finished, Available)

SynapseWidget(Synapse.DataFrame, e1719b4f-9637-4e12-abbe-ed83cb34a4f2)




In [70]:
display(resultsPm.vertices.select($"id", $"name" ,$"pageranks"))


StatementMeta(sparkdemo, 81, 72, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9188d5a8-8ee9-4259-92b8-718cefe0b594)




In [34]:
//
display(results.edges.select( "src","dst","relationship","weight").orderBy($"weight".desc))

StatementMeta(sparkdemo, 81, 36, Finished, Available)

SynapseWidget(Synapse.DataFrame, 55371f51-c8ff-46ca-b2fd-629a7a8ffcb8)




## Example of FInding ShortestPath Algo using shortestPaths

In [40]:
//Shortest Path: Computes shortest paths from each vertex to the given set of landmark vertices, where landmarks are specified by vertex ID.
import org.apache.spark.sql.functions._
val paths = graph.shortestPaths.landmarks(Seq("a", "c")).run()

display(paths)


StatementMeta(sparkdemo, 81, 42, Finished, Available)

import org.apache.spark.sql.functions._
paths: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]


SynapseWidget(Synapse.DataFrame, 8f276f7d-9a32-4a61-ad3c-df8f9c01fb4e)




In [61]:
import org.apache.spark.sql.functions._

val paths_m = paths.select($"id",explode($"distances"))
display(paths_m)

StatementMeta(sparkdemo, 81, 63, Finished, Available)

import org.apache.spark.sql.functions._
paths_m: org.apache.spark.sql.DataFrame = [id: string, key: string ... 1 more field]


SynapseWidget(Synapse.DataFrame, 62083f48-c2db-475d-96e8-750153132f56)




## The end
