<a href="https://colab.research.google.com/github/momo54/large_scale_data_management/blob/main/GraphFramesPageRank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RDF and  GraphFrames

Interesting slides: 
* https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L12_2018.pdf
* https://www.slideshare.net/SparkSummit/graphframes-graph-queries-in-spark-sql 

GraphFrame is an additional package to perform graph processing in Spark. It is concurrent to GraphX, but available in Python.


launching in a terminal

```
pyspark --packages graphframes:graphframes:0.8.2-spark3.2-s_2.12

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

# Display the vertex and edge DataFrames
g.vertices.show()
```

Valid configuration are listed in:
```
https://spark-packages.org/package/graphframes/graphframes
```


In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=67b94c11a598a057cd933f975fde45253d5cf1267237c923452046806a543398
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
#visiblement faut pas faire ça !!
#!pip install graphframes

In [3]:
!wget -nc https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar
!cp graphframes-0.8.2-spark3.2-s_2.12.jar /usr/local/lib/python3.7/dist-packages/pyspark/jars/
!ls /usr/local/lib/python3.7/dist-packages/pyspark/jars/graph*

--2022-10-31 17:35:53--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 13.226.228.36, 13.226.228.104, 13.226.228.25, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|13.226.228.36|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 247880 (242K) [binary/octet-stream]
Saving to: ‘graphframes-0.8.2-spark3.2-s_2.12.jar’


2022-10-31 17:35:53 (8.23 MB/s) - ‘graphframes-0.8.2-spark3.2-s_2.12.jar’ saved [247880/247880]

/usr/local/lib/python3.7/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.2-s_2.12.jar


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12").getOrCreate()  


In [6]:
# checking that everything works...

# Create a Vertex DataFrame with unique ID column "id"
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
print(g.edges.filter("relationship = 'follow'").count())

# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()


  "DataFrame.sql_ctx is an internal property, and will be removed "


+---+--------+
| id|inDegree|
+---+--------+
|  b|       2|
|  c|       1|
+---+--------+

2
+---+------------------+
| id|          pagerank|
+---+------------------+
|  c|1.8994109890559092|
|  b|1.0905890109440908|
|  a|              0.01|
+---+------------------+



In [7]:
e.printSchema()

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- relationship: string (nullable = true)



In [8]:
# using a more realistic RDF graph (only triples)

#!wget -nc -q https://raw.githubusercontent.com/momo54/large_scale_data_management/main/small_page_links.nt
!wget -nc https://raw.githubusercontent.com/momo54/large_scale_data_management/main/watdiv-100k.nt

--2022-10-31 17:37:49--  https://raw.githubusercontent.com/momo54/large_scale_data_management/main/watdiv-100k.nt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15098125 (14M) [text/plain]
Saving to: ‘watdiv-100k.nt’


2022-10-31 17:37:49 (184 MB/s) - ‘watdiv-100k.nt’ saved [15098125/15098125]



In [9]:
from pyspark.sql.types import StructType,StringType
schema=StructType() \
  .add("src",StringType(),True) \
  .add("relationship",StringType(),True) \
  .add("dst",StringType(),True) 

import re
def parseTriple(line) :
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', line)
    return parts[0], parts[1], parts[2]

# reading Triples
# delimiter and the . !! (grrr)
# managing quads requires Reification :-/
#edges=spark.read.format("csv") \
#  .schema(schema) \
#  .load(["watdiv-100k.nt"])
#  .load(["multi.txt0.txt","catalog.txt0.txt"])

lines=spark.read.text(["watdiv-100k.nt"]).rdd.map(lambda r: r[0])


edges=lines.map(lambda x:parseTriple(x)).toDF(["src","relationship","dst"])


#generating Vertices from Edges...
vertices=edges.select('src') \
  .union(edges.select('dst')) \
  .distinct() \
  .withColumnRenamed('src', 'id')

print(vertices.take(1))

graph = GraphFrame(vertices, edges)


vertices.show(10)

edges.select("relationship").distinct().show(100,truncate=200)
edges.filter("relationship='<http://purl.org/goodrelations/includes>'").show(10,truncate=200)

[Row(id='<http://db.uwaterloo.ca/~galuc/wsdbm/User0>')]
+--------------------+
|                  id|
+--------------------+
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
|<http://db.uwater...|
+--------------------+
only showing top 10 rows

+---------------------------------------------------+
|                                       relationship|
+---------------------------------------------------+
|                        <http://schema.org/expires>|
| <http://db.uwaterloo.ca/~galuc/wsdbm/purchaseDate>|
|                <http://schema.org/aggregateRating>|
|  <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>|
|                   <http://schema.org/contactPoint>|
|   <http://db.uwaterloo.ca/~galuc/wsdbm/subscribes>|
|                       <http://schema.org/employee>|
|                       <http://schema.org/language>|
| 

In [10]:
#subgraphs...
subgraph=graph.filterEdges("relationship='<http://purl.org/goodrelations/includes>'").dropIsolatedVertices()
subgraph.triplets.show(truncate=200)

+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------+
|                                             src|                                                                                                                                        edge|                                               dst|
+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------+
| {<http://db.uwaterloo.ca/~galuc/wsdbm/Offer73>}| {<http://db.uwaterloo.ca/~galuc/wsdbm/Offer73>, <http://purl.org/goodrelations/includes>, <http://db.uwaterloo.ca/~galuc/wsdbm/Product105>}|{<http://db.uwaterloo.ca/~galuc/wsdbm/Product105>}|
|{<http://db.uwaterloo.ca/~g

In [14]:
offers = graph.find("(s)-[p]->(o)")\
  .filter("p.relationship='<http://purl.org/goodrelations/offers>'") 
offers.show(5,truncate=200)

+-------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+
|                                                s|                                                                                                                                        p|                                               o|
+-------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+
|{<http://db.uwaterloo.ca/~galuc/wsdbm/Retailer1>}|{<http://db.uwaterloo.ca/~galuc/wsdbm/Retailer1>, <http://purl.org/goodrelations/offers>, <http://db.uwaterloo.ca/~galuc/wsdbm/Offer367>}|{<http://db.uwaterloo.ca/~galuc/wsdbm/Offer367>}|
|{<http://db.uwaterloo.ca/~galuc/wsdbm/Retai

In [13]:
includes = graph.find("(s)-[p]->(o)")\
  .filter("p.relationship='<http://purl.org/goodrelations/includes>'") 
includes.show(5,truncate=200)

+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------+
|                                               s|                                                                                                                                           p|                                                 o|
+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------+
| {<http://db.uwaterloo.ca/~galuc/wsdbm/Offer73>}| {<http://db.uwaterloo.ca/~galuc/wsdbm/Offer73>, <http://purl.org/goodrelations/includes>, <http://db.uwaterloo.ca/~galuc/wsdbm/Product105>}|{<http://db.uwaterloo.ca/~galuc/wsdbm/Product105>}|
|{<http://db.uwaterloo.ca/~g

In [44]:
chain2 = graph.find("(a)-[p1]->(b);(b)-[p2]->(c)")\
  .filter("p1.relationship='<http://purl.org/goodrelations/offers>'") \
  .filter("p2.relationship='<http://purl.org/goodrelations/includes>'")
chain2.show(10)



+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                  p1|                   b|                  p2|                   c|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|
|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|
|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|
|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|
|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|
|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|{<http://db.uwate...|
|{<http://db.uwate...|{<http://db.uwa

In [16]:
# offers shared by different retailer
shared = graph.find("(a)-[p]->(b);(a1)-[p1]->(b)") \
  .filter("p.relationship='<http://purl.org/goodrelations/offers>'") \
  .filter("p1.relationship='<http://purl.org/goodrelations/offers>'")
shared.show(10,truncate=200)

+-------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
|                                                a|                                                                                                                                       p|                                              b|                                               a1|                                                                                                                                      p1|
+-------------------------------------------------+-----------------------------------------------------------------------------------------------------

In [39]:
#marche pas comme ça...
ret2 = graph.filterVertices("id=='<http://db.uwaterloo.ca/~galuc/wsdbm/Retailer2>'")
ret2.triplets.show(10)


  "DataFrame.sql_ctx is an internal property, and will be removed "


+---+----+---+
|src|edge|dst|
+---+----+---+
+---+----+---+



In [38]:
#queryinng from retailers to products..
#One way to do it...
paths = graph.bfs("id = '<http://db.uwaterloo.ca/~galuc/wsdbm/Retailer2>'", \
                  "instr(id,'Product') >0")
paths.show(truncate=200)

+-------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------+
|                                             from|                                                                                                                                       e0|                                              v1|                                                                                                                                          e1|                                                to|
+-------------------------------------------------+---------------------------------------------------------------------------------------

In [48]:
#
# Juste pour vérifier que cce genre de chose fonctionne vraiment...
#

from graphframes.examples import Graphs
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, lit, when

# ?? pas sur
from functools import reduce

sqlContext = SQLContext(spark.sparkContext)
g = Graphs(sqlContext).friends()  # Get example graph

chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

# Query on sequence, with state (cnt)
#  (a) Define method for updating state given the next element of the motif.
sumFriends =\
  lambda cnt,relationship: when(relationship == "friend", cnt+1).otherwise(cnt)
#  (b) Use sequence operation to apply method to sequence of elements in motif.
#      In this case, the elements are the 3 edges.
condition =\
  reduce(lambda cnt,e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
#  (c) Apply filter to DataFrame.
chainWith2Friends2 = chain4.where(condition >= 2)
chainWith2Friends2.show()

+---------------+--------------+--------------+--------------+--------------+--------------+----------------+
|              a|            ab|             b|            bc|             c|            cd|               d|
+---------------+--------------+--------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|{d, a, friend}|{a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}|{a, Alice, 34}|{a, b, friend}|  {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+--------------+--------------+----------------+



In [55]:
#
# possible to extract product ??

# get types
classes = graph.find("(s)-[p]->(o)")\
  .filter("p.relationship='<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>'") \
  .select("o") \
  .distinct() 
classes.show(100,truncate=200)

+---------------------------------------------------------+
|                                                        o|
+---------------------------------------------------------+
|          {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre13>}|
| {<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory2>}|
|           {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre9>}|
|          {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre11>}|
|          {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre10>}|
| {<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory8>}|
| {<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory1>}|
| {<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory5>}|
|{<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory13>}|
| {<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory9>}|
|           {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre0>}|
|           {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre8>}|
|           {<http://db.uwaterloo.ca/~galuc/wsdbm/Genre1>}|
|           {<http://db.uwaterloo.ca/~ga

In [60]:
#get entities products3

products3 = graph.find("(s)-[p]->(o);(s)-[p1]->(o1)")\
  .filter("p.relationship='<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>'") \
  .filter("o.id='<http://db.uwaterloo.ca/~galuc/wsdbm/ProductCategory3>'") \
  .select("p1") 
products3.show(100,truncate=200)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                           p1|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                    {<http://db.uwaterloo.ca/~galuc/wsdbm/Product106>, <http://schema.org/caption>, "wakener}|
|                                                        {<http://db.uwaterloo.ca/~galuc/wsdbm/Product106>, <http://schema.org/description>, "posthypophyseal}|
|                                                                   {<http://db.uwaterloo.ca/~galuc/wsdbm/Product106>, <http://ogp.me/ns#title>, "premunitory}|
|{<http://db.uwaterloo.ca/~galuc/wsdbm/P