# ST446 Distributed Computing for Big Data
---

*We highly recommend using GCP, as the data sets used are about 20 GB in total.* Alternatively, you can use your own computer.

## Querying the YAGO semantic knowledge base

YAGO is a semantic knowledge base, derived from Wikipedia, WordNet and GeoNames. YAGO contains knowledge about more than 10 million entities (like persons, organizations and cities) and contains more than 120 million facts about these entities. You may find more about YAGO [here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/#c10444).

In this assignment, you are asked to use parts of the YAGO dataset to demonstrate your knowledge about Spark graphframes and motif queries. In particular, you are asked to **_use motif queries_** to find out answers to the following queries stated in English:

**A**. _Politicians who are also scientists_ (sorted alphabetically by name of person)

**B**. _Companies whose founders were born in London_ (sorted alphabetically by name of founder)

**C**. _Writers who have won a Nobel Prize (in any discipline)_ (sorted alphabetically by name of person)

**D**. _Nobel prize winners who were born in the same city as their spouses_ (sorted alphabetically by name of person)

**E**. _Politicians that are affiliated with a right-wing party_ (sorted alphabetically by name of person)

Please always show the first 20 entries of the resulting DataFrame and the total count of relevant entries.

## 0.1 Get YAGO data

You will need to download the following datasets that are part of YAGO (see [here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/downloads/) for more information):

* A set of relationships between instances (for example, specifying that Emomali Rahmon is the leader of the Military of Tajikistan). Link: http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoFacts.tsv.7z

* A set of subclass relationships (for example, specifying that *A1086* is *a road in England*, or that *Salmonella Dub* is *a Reggae music group* and also a *New Zealand dub musical group*). Link: http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoTransitiveType.tsv.7z

Please use `wget` to download the data to your compute engine (the files are big!).

Next, you will need to extract `tsv` files from the `7z` archives that you have downloaded. Use the following commands to install `p7zip` on your compute engine and extract the files.
```
sudo apt-get install p7zip-full
7z x yagoTransitiveType.tsv.7z 
7z x yagoFacts.tsv.7z 
```
Please note that this can take a while, in particular as `yagoTransitiveType.tsv` is **18GB** large.

Put the files (`yagoTransitiveType.tsv` and `yagoFacts.tsv`) into the Hadoop file system. 
Also, have a look at their first few lines to understand what kind of data they contain.

***
**The command I used to download data and put data into hdfs**
```
wget http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoFacts.tsv.7z
wget http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoTransitiveType.tsv.7z
sudo apt-get install p7zip-full
7z x yagoTransitiveType.tsv.7z 
7z x yagoFacts.tsv.7z
hadoop fs -put ./ /yago
```

## 0.2 Read the data into a Spark DataFrame

Please load the data from `yagoFacts.tsv` into a DataFrame called `df` and `yagoTransitiveType.tsv` into a DataFrame called `df_subclasses`.
Have a look at the beginning of the files to understand the schema.
Once imported, both DataFrames should have columns labelled as `id`, `subject`, `predicate`, `object` and `value`.
In the case of `yagoTransitiveType.tsv`, some of the predicates can be understood as *"is a sublcass of"* or *"is member of the class"*, and the objects can be understood as classes.

In [1]:
print(1)

1


In [2]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [3]:
# After having looked at the head of the file in the terminal, one can see that there is no header for the columns
# so we'll have to create the schema ourselves. Also, the first line is an explanation of what the database contains,
# which has to be removed before converting it into a df, so we'll upload the data to an RDD to do that removal
# with the following function:
def remove_first(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr

In [4]:
# Import Data to an RDD and remove first line
file = sc.textFile("/yago/yagoFacts.tsv")
data = file.mapPartitionsWithIndex(remove_first) # Maybe this is smarter: rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

In [5]:
# Split by columns
rdd_map = data.map(lambda x: x.split("\t"))   
#rdd_map.take(3)

In [6]:
# We've already seen the data and datatypes it contains, because there's no header describing the dataset
# we create a schema
schema = StructType([
                StructField("id",StringType()),
                StructField("subject", StringType()),
                StructField("predicate", StringType()),
                StructField("object", StringType()),
                StructField("value", StringType())#DoubleType
                    ])
# It didn't work with the value as a DoubleType, so we'll create it with that column as a string
# and change the datatype afterwards
df = sqlContext.createDataFrame(rdd_map, schema)
#df.take(3)

In [7]:
# Change value Datatype
df = df.withColumn("value", df["value"].cast(DoubleType()))
#df.take(3)

Now for the Transitive Type Dataset

In [8]:
file = sc.textFile("/yago/yagoTransitiveType.tsv")
data2 = file.mapPartitionsWithIndex(remove_first) # Maybe this is smarter:
#data = file.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
rdd_map2 = data2.map(lambda x: x.split("\t"))   
schema = StructType([
                StructField("id",StringType()),
                StructField("subject", StringType()),
                StructField("predicate", StringType()),
                StructField("object", StringType()),
                StructField("value", StringType())#DoubleType
                    ])
# It didn't work with the value as a DoubleType, so we'll create it with that column as a string
# and change the datatype afterwards
df_subclasses = sqlContext.createDataFrame(rdd_map2, schema)
df_subclasses = df_subclasses.withColumn("value", df_subclasses["value"].cast(DoubleType()))
#df_subclasses.show(3)

## 0.3 Understand the database schema

Let's look at the schema:

In [9]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)



In [10]:
df_subclasses.printSchema()

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)



The useful information is in columns "subject", "predicate" and "object". "predicate" defines the relation between entities "subject" and "object". For example, for "Albert Einstein was born in Ulm", "Albert Einstein" is the subject, "was born in" is the predicate and "Ulm" is the object.

## 0.4 Simple query example

To get information about where Albert Einstein was born, we load data into Spark using the following query:

In [21]:
born_city_df = df.where("predicate == '<wasBornIn>'")
born_city_df.show(1)

+--------------------+--------------------+-----------+---------------+-----+
|                  id|             subject|  predicate|         object|value|
+--------------------+--------------------+-----------+---------------+-----+
|<id_thPX9b1zg!_7f...|<William_Jones_(W...|<wasBornIn>|<Penrhiwceiber>| null|
+--------------------+--------------------+-----------+---------------+-----+
only showing top 1 row



In [10]:
born_city_df.where("subject = '<Albert_Einstein>'").show()

+--------------------+-----------------+-----------+------+-----+-----+
|                  id|          subject|  predicate|object|value|label|
+--------------------+-----------------+-----------+------+-----+-----+
|<id_sbCVliqDT2_7f...|<Albert_Einstein>|<wasBornIn>| <Ulm>|     | null|
+--------------------+-----------------+-----------+------+-----+-----+



You may wonder how one would know whether to use the predicate '&lt;wasBornIn&gt;' or '&lt;was_born_in&gt;' and subject '&lt;Albert_Einstein&gt;' or '&lt;AlbertEinstein&gt;'. For YAGO subjects (and objects), the naming is aligned with Wikipedia. For example, Albert Einstein's wiki is: https://en.wikipedia.org/wiki/Albert_Einstein and you can see it is 'Albert_Einstein'. 

For predicates, you can look at the "property" list from the [Yago Web interface](https://gate.d5.mpi-inf.mpg.de/webyagospotlx/WebInterface?L01=%3Fx&L0R=%3CwasBornIn%3E&L02=%3Fc&L0T=&L03=&L0L=&L04=&L05=&L11=&L1R=&L12=&L1T=&L13=&L1L=&L14=&L15=&L21=&L2R=&L22=&L2T=&L23=&L2L=&L24=&L25=&L31=&L3R=&L32=&L3T=&L33=&L3L=&L34=&L35=&L41=&L4R=&L42=&L4T=&L43=&L4L=&L44=&L45=). 
Try different queries with this web interface query to understand more how to query YAGO.

## 0.5 Simple motif example

In this part of the assignment, you are required to use **motif** to find out answer to the 4 questions. Please complete the following example to find out: "Which city was Albert Einstein born in?" using motif queries instead of  SQL queries on the first dataframe (`df`):

In [11]:
# Import necessary libraries
import graphframes
from graphframes import *

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql.functions import col, lit, when
from pyspark.sql import Row

from datetime import datetime
import re
import numpy as np

In [13]:
def vertices_edges_split(df, condition):
    sub = df.filter(condition).select("subject").withColumnRenamed("subject","id")
    obj = df.filter(condition).select("object").withColumnRenamed("object","id")
    v = sub.union(obj).distinct()
    e = df.filter(condition).select("subject","object","predicate")\
    .withColumnRenamed("subject","src").withColumnRenamed("object","dst")
    return v, e

v, e = vertices_edges_split(df, "subject='<Albert_Einstein>'")
g = GraphFrame(v, e)
# your code

res = g.find("(a)-[ab]->(b)").filter("b.id = '<Ulm>'")
res.show() # took 2min 30sec

+-------------------+--------------------+-------+
|                  a|                  ab|      b|
+-------------------+--------------------+-------+
|[<Albert_Einstein>]|[<Albert_Einstein...|[<Ulm>]|
+-------------------+--------------------+-------+



Some other operations with graphs for the record

In [None]:
print(g)
g.vertices.count()
g.edges.count()
g.find("(a)-[e]->(b)").count()
g.vertices.filter("id = '<Ulm>'").show(1)
g.edges.groupBy("predicate").count().sort("count", ascending = False).show(5) # Up until here 6min 15sec

# Another Motiv query
g.find("(a)-[ab]->(b)").filter("ab.predicate = '<isMarriedTo>'").show() # 8min 20sec... 

GraphFrame(v:[id: string], e:[src: string, dst: string ... 1 more field])
+-----+
|   id|
+-----+
|<Ulm>|
+-----+

+-------------+-----+
|    predicate|count|
+-------------+-----+
| <influences>|   25|
|    <worksAt>|   12|
|<isCitizenOf>|   12|
|<hasWonPrize>|    8|
|<isMarriedTo>|    2|
+-------------+-----+
only showing top 5 rows

+-------------------+--------------------+-----------------+
|                  a|                  ab|                b|
+-------------------+--------------------+-----------------+
|[<Albert_Einstein>]|[<Albert_Einstein...|[<Elsa_Einstein>]|
|[<Albert_Einstein>]|[<Albert_Einstein...| [<Mileva_Marić>]|
+-------------------+--------------------+-----------------+



## 0.6 Some useful tips

### Get a subset of YAGO database
YAGO database is large, so we don't try to load the entire database into a dataframe and then query it. If you do this, you will find that you won't even be able to execute `df.take(1)`, as it would take up too much of space (at least on a laptop). Instead, you use Spark SQL commands or `df.where` to get a suitable fraction of the data.

### Try the queries in the YAGO Web interface first
It is sometimes tricky to get the right "subject", "predicate" and "object". It is easier if you start from [Yago Web interface](https://gate.d5.mpi-inf.mpg.de/webyagospotlx/WebInterface?L01=%3Fx&L0R=%3CwasBornIn%3E&L02=%3Fc&L0T=&L03=&L0L=&L04=&L05=&L11=&L1R=&L12=&L1T=&L13=&L1L=&L14=&L15=&L21=&L2R=&L22=&L2T=&L23=&L2L=&L24=&L25=&L31=&L3R=&L32=&L3T=&L33=&L3L=&L34=&L35=&L41=&L4R=&L42=&L4T=&L43=&L4L=&L44=&L45=) rather than directly querying in Pyspark. Once your query works, you can convert your query to Pyspark code. Note that sometimes the web version of object/subject code may be different from what you need to type here. For example, company code is &lt;wordnet_company_108058098&gt; when you do the query here but when you do it via the web interface it is &lt;wordnet company 108058098&gt;. 

### Be patient and don't do this exercise in the last minute
Some trial and error is needed to get the query right and it may take some time get the result for a query. For these reasons, we advise you not to wait to work out this exercise just before the submission deadline. 

### Make sure to get the initialization actions right
For this exercise, you will be using GraphFrames.

## 1. Politicians who are also scientists (Question A)
Find all politicians who are also scientists. Output top 20 of them. How many people are in the dataset who are both scientists and politicians?
Please follow these steps:
* Operate on the subsets of `df_subclasses` where the objects are `'<wordnet_scientist_110560637>` (scientists) and `'<wordnet_politician_110450303>'` (politicians), and where the predicates are `rdf:type`.
* Use graphframes and the right parts of `df_subclasses` to construct a graph whose (directed) edges point from subjects to objects. Hence, its source vertices are subjects and it destination vertices are objects. It may be convenient to use intermediate DataFrames and join all the required dataframes of edges and vertices.
* The subjects will be people and the objects will be classes (e.g., scientists, politicians).
* Use a motif query to find all instances that fulfil the criteria specified in the question.
* It is a good idea to define a function that takes a DataFrame and outputs a set of data frames for vertices and edges.

Please sort the output alphabetically by the person column.

In [None]:
df_sci_pol = df_subclasses.where("object = '<wordnet_scientist_110560637>' or object = '<wordnet_politician_110450303>'")

In [11]:
scientists_df = df_subclasses.filter("object = '<wordnet_scientist_110560637>'")\
.filter("predicate = 'rdf:type'").select("subject","object","predicate").withColumnRenamed("predicate", "pred_sci")\
.withColumnRenamed("object", "obj_sci")


politicians_df = df_subclasses.filter("object = '<wordnet_politician_110450303>'")\
.filter("predicate = 'rdf:type'").select("subject","object","predicate").withColumnRenamed("predicate", "pred_pol")\
.withColumnRenamed("object", "obj_pol")

In [13]:
polisci_df = scientists_df.join(politicians_df, on=['subject'], how='inner')

In [None]:
src = polisci_df.select("subject").distinct().withColumnRenamed("subject","id")
dst_sci = polisci_df.select("obj_sci").distinct().withColumnRenamed("obj_sci","id")
dst_pol = polisci_df.select("obj_pol").distinct().withColumnRenamed("obj_pol","id")

v = src.unionAll(dst_sci).unionAll(dst_pol)

sci_edg = polisci_df.select("subject", "obj_sci", "pred_sci").withColumnRenamed("subject","src")\
.withColumnRenamed("pred_sci","pred").withColumnRenamed("obj_sci","dst")
pol_edg = polisci_df.select("subject", "obj_pol", "pred_pol").withColumnRenamed("subject","src")\
.withColumnRenamed("pred_pol","pred").withColumnRenamed("obj_pol","dst")

e = sci_edg.unionAll(pol_edg)

In [None]:
polsciGraph = GraphFrame(v, e)

In [None]:
polsciGraph.find("(a)-[ab]->(b); (a)-[ac]->(c)").filter("b.id = '<wordnet_scientist_110560637>'")\
.filter("b.id != c.id").sort("a").show(20)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                  ab|                   b|                  ac|                   c|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|      [<A._C._Cuza>]|[<A._C._Cuza>, <w...|[<wordnet_scienti...|[<A._C._Cuza>, <w...|[<wordnet_politic...|
|[<A._P._J._Abdul_...|[<A._P._J._Abdul_...|[<wordnet_scienti...|[<A._P._J._Abdul_...|[<wordnet_politic...|
|       [<Aad_Kosto>]|[<Aad_Kosto>, <wo...|[<wordnet_scienti...|[<Aad_Kosto>, <wo...|[<wordnet_politic...|
|        [<Aad_Nuis>]|[<Aad_Nuis>, <wor...|[<wordnet_scienti...|[<Aad_Nuis>, <wor...|[<wordnet_politic...|
| [<Aaron_Aaronsohn>]|[<Aaron_Aaronsohn...|[<wordnet_scienti...|[<Aaron_Aaronsohn...|[<wordnet_politic...|
|  [<Aaron_Farrugia>]|[<Aaron_Farrugia>...|[<wordnet_scienti...|[<Aaron_Farrugia>...|[<wordnet_politic...|
|        [<Ab_Klink>]|[<Ab_Klink>, <w

## 2. Companies whose founders were born in London (Question B)
For companies, use `'<wordnet_company_108058098>'`. 
For *"being founder"*, use `<created>`.

By now, you will understand which DataFrame to use for what. 

Set up a graph and use a motif query to find companies whose founders were born in London.
Please take some time to figure out how a suitable configuration of nodes and edges should look like.  How many such companies are there in our dataset?

Please sort the output alphabetically by the founder column.

In [43]:
df_subclasses.createOrReplaceTempView("df_subclasses")
df.createOrReplaceTempView("df")

In [49]:
comps_found = spark.sql("SELECT s.subject as company, s.object as comp_obj, s.predicate as comp_pred, \
d.subject as founder, d.predicate as found_pred \
FROM df_subclasses s JOIN df d ON s.subject = d.object \
WHERE s.predicate = 'rdf:type' AND s.object = '<wordnet_company_108058098>' AND d.predicate = '<created>'")

In [50]:
comps_found.createOrReplaceTempView("comps_found")

In [51]:
comps_found_Ldn = spark.sql("SELECT cf.company, cf.comp_pred, cf.comp_obj, cf.founder, cf.found_pred, \
l.object as london, l.predicate as ldn_pred \
FROM comps_found cf JOIN df l ON cf.founder = l.subject \
WHERE l.predicate = '<wasBornIn>' AND l.object = '<London>'")

In [52]:
comps_found_Ldn.createOrReplaceTempView("comps_found_Ldn")

In [55]:
v = comps_found_Ldn.select("company",).withColumnRenamed("company", "id")\
.union(comps_found_Ldn.select("comp_obj",).withColumnRenamed("comp_obj", "id"))\
.union(comps_found_Ldn.select("founder",).withColumnRenamed("founder", "id"))\
.union(comps_found_Ldn.select("london",).withColumnRenamed("london", "id")).distinct()


e = comps_found_Ldn.select("company", "comp_obj", "comp_pred").withColumnRenamed("company", "src")\
.withColumnRenamed("comp_obj", "dst").withColumnRenamed("comp_pred", "pred")\
.union(comps_found_Ldn.select("founder", "company", "found_pred").withColumnRenamed("founder", "src")\
.withColumnRenamed("company", "dst").withColumnRenamed("found_pred", "pred"))\
.union(comps_found_Ldn.select("founder", "london", "ldn_pred").withColumnRenamed("founder", "src")\
.withColumnRenamed("london", "dst").withColumnRenamed("ldn_pred", "pred")).distinct()


In [56]:
g_ldn = GraphFrame(v, e)

In [57]:
beginning = datetime.now()

In [None]:
g_ldn.find("(c)-[e1]->(t); (f)-[e2]->(c); (f)-[e3]->(l)").filter("t.id = '<wordnet_company_108058098>'")\
.filter("e2.pred = '<created>'").filter("l.id = '<London>'").sort("f").show(20) 


+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                   c|                  e1|                   t|                   f|                  e2|                  e3|         l|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     [<Dare_Comics>]|[<Dare_Comics>, <...|[<wordnet_company...|      [<Adam_Hamdy>]|[<Adam_Hamdy>, <D...|[<Adam_Hamdy>, <L...|[<London>]|
|[<Jawbone_(compan...|[<Jawbone_(compan...|[<wordnet_company...|[<Alexander_Assei...|[<Alexander_Assei...|[<Alexander_Assei...|[<London>]|
|      [<Video_Arts>]|[<Video_Arts>, <w...|[<wordnet_company...|      [<Antony_Jay>]|[<Antony_Jay>, <V...|[<Antony_Jay>, <L...|[<London>]|
|[<SENS_Research_F...|[<SENS_Research_F...|[<wordnet_company...|  [<Aubrey_de_Grey>]|[<Aubrey_de_Grey>...|[<Aubrey_de_Grey>...|[<London>]|
|[<Andreessen_Horo...|[<And

In [None]:
ending = datetime.now()
beginning_time = beginning.strftime("%H:%M:%S")
ending_time = ending.strftime("%H:%M:%S")

print("Started at {} \n Ended at {}".format(beginning_time, ending_time))
elapsed = (beginning-ending)
print("Took", str(elapsed))

Started at 10:52:23 
 Ended at 13:14:31
('Took', '-1 day, 21:37:52.715393')
