# ST446 Distributed Computing for Big Data
## Assignment 2 - PART 1
---

**We highly recommend using GCP, as the data sets used are about 20 GB in total.** Alternatively, you can use your own computer.

## P1: Querying the YAGO semantic knowledge base

YAGO is a semantic knowledge base, derived from Wikipedia, WordNet and GeoNames (in its Version 1). YAGO contains knowledge about more than 10 million entities (like persons, organizations and cities) and contains more than 120 million facts about these entities. You may find more about YAGO [here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/#c10444).

In this assignment, you are asked to use parts of the YAGO dataset to demonstrate your knowledge about Spark GraphFrames and motif queries. In particular, you are asked to **_use motif queries_** to find out answers to the following queries stated in English:

**A (max points 10)**. _Politicians who are also scientists_ (sorted alphabetically by name of person)

**B (max points 10)**. _Companies whose founders were born in London_ (sorted alphabetically by name of founder)

**C (max points 10)**. _Writers who have won a Nobel Prize (in any discipline)_ (sorted alphabetically by name of person)

**D (max points 10)**. _Nobel prize winners who were born in the same city as their spouses_ (sorted alphabetically by name of person)

**E (max points 10)**. _Politicians that are affiliated with a right-wing party_ (sorted alphabetically by name of person)

Please always show the first 20 entries of the resulting DataFrame and the total count of relevant entries.

---

## 0.1 Get YAGO data

* You will need to download the following datasets that are part of YAGO (see [here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/downloads/) for more information):

    * A set of relationships between instances (for example, specifying that Emomali Rahmon is the leader of the Military of Tajikistan). Link: http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoFacts.tsv.7z

    * A set of subclass relationships (for example, specifying that *A1086* is *a road in England*, or that *Salmonella Dub* is *a Reggae music group* and also a *New Zealand dub musical group*). Link: http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoTransitiveType.tsv.7z

* Use `wget` to download the data into the master node of your Dataproc cluster (the files are big!).

* Next, you will need to extract `tsv` files from the `7z` archives that you have downloaded. Use the following commands to install `p7zip` on your Dataproc cluster and extract the files. Please note that this can take a while, in particular as `yagoTransitiveType.tsv` is **18GB** large.

* Put the files (`yagoTransitiveType.tsv` and `yagoFacts.tsv`) into the Hadoop file system. 

```
wget http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoFacts.tsv.7z
wget http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoTransitiveType.tsv.7z
sudo apt-get install p7zip-full
7z x yagoTransitiveType.tsv.7z 
7z x yagoFacts.tsv.7z
hadoop fs -put ./ /yago
```

Also, have a look at their first few lines to understand what kind of data they contain (you need this to infer the schemas).

```
head yagoTransitiveType.tsv
head yagoFacts.tsv
```

## 0.2 Read the data into a Spark DataFrame

Please load the data from `yagoFacts.tsv` into a DataFrame called `df` and `yagoTransitiveType.tsv` into a DataFrame called `df_subclasses`.

Have a look at the beginning of the files to understand the schema. Once imported, both DataFrames should have columns labelled as `id`, `subject`, `predicate`, `object` and `value`.
In the case of `yagoTransitiveType.tsv`, some of the predicates can be understood as *is a subclass of* or *is a member of the class*, and the objects can be understood as classes.

In [1]:
from pyspark.sql.types import *

#---df--yagoFacts-----
df_or = spark.read.csv('hdfs:/yago/yagoFacts.tsv', sep = '\t', inferSchema = 'true')

df_or = df_or.withColumn('id', df_or['_c0'].cast(StringType()))
df_or = df_or.withColumn('subject', df_or['_c1'].cast(StringType()))
df_or = df_or.withColumn('predicate', df_or['_c2'].cast(StringType()))
df_or = df_or.withColumn('object', df_or['_c3'].cast(StringType()))
df_or = df_or.withColumn('value', df_or['_c4'].cast(DoubleType()))

df = df_or.drop('_c0', '_c1', '_c2', '_C3', '_c4')
df = df.where('id IS NOT null')
df.createOrReplaceTempView('df')


#---df_subclasses--yagoTransitiveType-----
df_subclasses_or = spark.read.csv('hdfs:/yago/yagoTransitiveType.tsv', sep = '\t', inferSchema = 'true')

df_subclasses_or = df_subclasses_or.withColumn('id', df_subclasses_or['_c0'].cast(StringType()))
df_subclasses_or = df_subclasses_or.withColumn('subject', df_subclasses_or['_c1'].cast(StringType()))
df_subclasses_or = df_subclasses_or.withColumn('predicate', df_subclasses_or['_c2'].cast(StringType()))
df_subclasses_or = df_subclasses_or.withColumn('object', df_subclasses_or['_c3'].cast(StringType()))
df_subclasses_or = df_subclasses_or.withColumn('value', df_subclasses_or['_c4'].cast(DoubleType()))

df_subclasses = df_subclasses_or.drop('_c0', '_c1', '_c2', '_C3', '_c4')
df_subclasses.createOrReplaceTempView('df')

## 0.3 Understand the database schema

Let's look at the schema:

In [2]:
# your code
df.printSchema()

df_subclasses.printSchema()

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)



The output should be similar to that:

```
YAGO Facts schema:

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)

YAGO TransitiveType schema:

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)
 ```

The useful information is in columns `subject`, `predicate` and `object`. **predicate** defines the relation between entities **subject** and **object**. For example, for *Albert Einstein was born in Ulm*, `Albert Einstein` is the subject, `was born in` is the predicate and `Ulm` is the object.

## 0.4 Simple query example

To get information about where Albert Einstein was born, we can load data into Spark using the following queries:

In [3]:
# checking for predicate `was born in`
born_city_df = df.where("predicate == '<wasBornIn>'")
born_city_df.show(1)

+--------------------+--------------------+-----------+---------------+-----+
|                  id|             subject|  predicate|         object|value|
+--------------------+--------------------+-----------+---------------+-----+
|<id_thPX9b1zg!_7f...|<William_Jones_(W...|<wasBornIn>|<Penrhiwceiber>| null|
+--------------------+--------------------+-----------+---------------+-----+
only showing top 1 row



In [4]:
# checking for subject
born_city_df.where("subject = '<Albert_Einstein>'").show()

+--------------------+-----------------+-----------+------+-----+
|                  id|          subject|  predicate|object|value|
+--------------------+-----------------+-----------+------+-----+
|<id_sbCVliqDT2_7f...|<Albert_Einstein>|<wasBornIn>| <Ulm>| null|
+--------------------+-----------------+-----------+------+-----+



You may wonder how one would know whether to use the predicate '&lt;wasBornIn&gt;' or '&lt;was_born_in&gt;' and subject '&lt;Albert_Einstein&gt;' or '&lt;AlbertEinstein&gt;'. 

For YAGO subjects (and objects), the naming is aligned with Wikipedia. For example, Albert Einstein's wiki is: https://en.wikipedia.org/wiki/Albert_Einstein and you can see it is 'Albert_Einstein'. 

For predicates, you can look at the "property" list from the [Yago Web interface](https://yago-knowledge.org/) or the documentation on the TAXONOMY theme ([here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/downloads)).

Try different queries with this Web interface query to understand more how to query YAGO.

## 0.5 Simple motif example

To find out "Which city was Albert Einstein born in?", we can use the following motif query on the first dataframe (`df`):

In [None]:
from graphframes import *

# helper function to filter nodes (vertices) by subject and object, and establish edges
def vertices_edges_split(df, condition):
    sub = df.filter(condition).select("subject").withColumnRenamed("subject","id")
    obj = df.filter(condition).select("object").withColumnRenamed("object","id")
    v = sub.union(obj).distinct()
    e = df.filter(condition).select("subject","object","predicate")\
    .withColumnRenamed("subject","src").withColumnRenamed("object","dst")
    return v, e

# build a graph filtering by Albert Einstein
v, e = vertices_edges_split(df, "subject='<Albert_Einstein>'")
g = GraphFrame(v, e)
# find all relationships where Albert Einstein is the subject...
motifs = g.find("(a)-[e]->(b)")
# ... and the predicate is `was born in`
res = motifs.filter("e.predicate='<wasBornIn>'")
res.show()

+-------------------+--------------------+-------+
|                  a|                   e|      b|
+-------------------+--------------------+-------+
|[<Albert_Einstein>]|[<Albert_Einstein...|[<Ulm>]|
+-------------------+--------------------+-------+



## 0.6 Some useful tips

### Get a subset of YAGO database
YAGO database is large, so we don't try to load the entire database into a dataframe and then query it. If you do this, you will find that you won't even be able to execute `df.take(1)`, as it would take up too much of space (at least on a laptop). Instead, you use Spark SQL commands or `df.where` to get a suitable fraction of the data.

### Try the queries in the YAGO Web interface first
It is sometimes tricky to get the right "subject", "predicate" and "object". It is easier if you start from [Yago Web interface](https://yago-knowledge.org/) rather than directly querying in PySpark. Once your query works, you can convert your query to PySpark code. 

Note that sometimes the Web version of object/subject code may be different from what you need to type here. For example, company code is &lt;wordnet_company_108058098&gt; when you do the query here but when you do it via the web interface it is &lt;wordnet company 108058098&gt;. 

### Be patient and don't do this exercise in the last minute
Some trial and error is needed to get the query right and it may take some time get the result for a query. For these reasons, we advise you not to wait to work out this exercise just before the submission deadline. 

---

## 1. Politicians who are also scientists (Question A)
Find all politicians who are also scientists. Output top 20 of them. How many people are in the dataset who are both scientists and politicians?

Please follow these steps:
* Operate on the subsets of `df_subclasses` where the objects are `'<wordnet_scientist_110560637>` (scientists) and `'<wordnet_politician_110450303>'` (politicians), and where the predicates are `rdf:type`.
* Use graphframes and the right parts of `df_subclasses` to construct a graph whose (directed) edges point from subjects to objects. Hence, its source vertices are subjects and it destination vertices are objects. It may be convenient to use intermediate DataFrames and join all the required dataframes of edges and vertices.
* The subjects will be people and the objects will be classes (e.g., scientists, politicians).
* Use a motif query to find all instances that fulfil the criteria specified in the question.
* It is a good idea to define a function that takes a DataFrame and outputs a set of data frames for vertices and edges.

Please sort the output alphabetically by the person column.

In [None]:
# Keep scientists and politicians only
sci_pol = df_subclasses.filter("(object = '<wordnet_scientist_110560637>' \
                     OR object = '<wordnet_politician_110450303>') \
                     AND predicate = 'rdf:type'")

#establish edges and nodes
sub_sci = sci_pol.select("subject").withColumnRenamed("subject","id")
obj_sci = sci_pol.select("object").withColumnRenamed("object","id")
v = sub_sci.union(obj_sci).distinct()
e = sci_pol.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

#create a graph
g = GraphFrame(v, e)

#find all relationships where "someone" is the subject with different objects 
motif = g.find('(Someone)-[is]->(Politician); (Someone)-[is0]->(Scientist)')

#to guarantee the correctness, I set a filter like follow
res1 = motif.filter('Politician.id != Scientist.id').select('Someone').distinct()

res1.count()

7182

The total number of politicians that are also scientists is: 7182

In [None]:
res1.select('Someone').withColumnRenamed('Someone',"PoliticianScientist").distinct().sort('PoliticianScientist').show(truncate = False)

+---------------------------+
|PoliticianScientist        |
+---------------------------+
|[<A._C._Cuza>]             |
|[<A._P._J._Abdul_Kalam>]   |
|[<Aad_Kosto>]              |
|[<Aad_Nuis>]               |
|[<Aaron_Aaronsohn>]        |
|[<Aaron_Farrugia>]         |
|[<Ab_Klink>]               |
|[<Abba_P._Lerner>]         |
|[<Abbas_Ahmad_Akhoundi>]   |
|[<Abbie_Hoffman>]          |
|[<Abbott_Lawrence_Lowell>] |
|[<Abdallah_Salem_el-Badri>]|
|[<Abdelbaki_Hermassi>]     |
|[<Abdellatif_Abid>]        |
|[<Abdelouahed_Souhail>]    |
|[<Abdelwahed_Radi>]        |
|[<Abdesslam_Yassine>]      |
|[<Abdi_Farah_Shirdon>]     |
|[<Abdirahman_Duale_Beyle>] |
|[<Abdiweli_Mohamed_Ali>]   |
+---------------------------+
only showing top 20 rows



## 2. Companies whose founders were born in London (Question B)
For companies, use `'<wordnet_company_108058098>'`. 
For *"being founder"*, use `predicate=<created>`.

By now, you will understand which DataFrame to use for what. 

Set up a graph and use a motif query to find companies whose founders were born in London.
Please take some time to figure out how a suitable configuration of nodes and edges should look like.  How many such companies are there in our dataset?

Please sort the output alphabetically by the founder column.

In [None]:
#keep company founder
founder = df.where("predicate == '<created>'").select('subject', 'object', 'predicate')
sub_f = founder.select('subject').withColumnRenamed("subject","id")
obj_f = founder.select('object').withColumnRenamed("object","id")
v_f = sub_f.union(obj_f).distinct()
e_f = founder.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

#keep people born in London
born_in_ld = df.where("predicate == '<wasBornIn>' AND object == '<London>'").select('subject', 'object', 'predicate')
sub_b = born_in_ld.select('subject').withColumnRenamed("subject","id")
obj_b = born_in_ld.select('object').withColumnRenamed("object","id")
v_b = sub_b.union(obj_b).distinct()
e_b = born_in_ld.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

#keep companies
company = df_subclasses.where("object == '<wordnet_company_108058098>'").select('subject', 'object', 'predicate')
sub_c = company.select('subject').withColumnRenamed("subject","id")
obj_c = company.select('object').withColumnRenamed("object","id")
v_c = sub_c.union(obj_c).distinct()
e_c = company.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

In [None]:
#create vertices and edges
v = v_f.union(v_b).union(v_c).distinct()
e = e_f.union(e_b).union(e_c).distinct()

#create graphframe
g = GraphFrame(v, e)

motifs = g.find("(Someone)-[WasBornIn]->(Birthplace); (Someone)-[Created]->(Organization); (Organization)-[Is]->(Company)")
res2 = motifs.filter("WasBornIn.predicate == '<wasBornIn>' AND Birthplace.id == '<London>' AND Created.predicate == '<created>' AND Company.id == '<wordnet_company_108058098>'")

res2.select('Someone', 'Organization').withColumnRenamed('Someone',"FounderBornInLondon").distinct().sort('FounderBornInLondon').show(truncate = False)

+---------------------------------+---------------------------------------+
|FounderBornInLondon              |Organization                           |
+---------------------------------+---------------------------------------+
|[<Adam_Hamdy>]                   |[<Dare_Comics>]                        |
|[<Alexander_Asseily>]            |[<Jawbone_(company)>]                  |
|[<Antony_Jay>]                   |[<Video_Arts>]                         |
|[<Aubrey_de_Grey>]               |[<SENS_Research_Foundation>]           |
|[<Ben_Horowitz>]                 |[<Andreessen_Horowitz>]                |
|[<Bernard_MacMahon_(filmmaker)>] |[<LO-MAX_Records>]                     |
|[<Brian_Maxwell>]                |[<PowerBar>]                           |
|[<Bruno_Heller>]                 |[<Primrose_Hill_Productions>]          |
|[<Charlie_Chaplin>]              |[<United_Artists>]                     |
|[<Dan_Joyce>]                    |[<Kurrupt_Recordings_HARD>]            |
|[<Daniel_Ja

In [None]:
# If I execute "res2.count()", the result turns to be 61
res2.select('Organization').distinct().count()

There are only 59 different companies, but 61 entries

## 3. Writers who have won a Nobel Prize in any discipline, including economics (Question C)
Tags for nobel prizes look like these: `'<Nobel_Prize_in_Chemistry>`, `<Nobel_Prize_in_Physics>'`, `<Nobel_Prize>` or `<Nobel_Prize>` etc.
We are also counting this one: `'<Nobel_Memorial_Prize_in_Economic_Sciences>'`.

The tag for writers is `'<wordnet_writer_110794014>'`.

You will need to use `'<hasWonPrize>'` as a predicate.

Please sort the output alphabetically by the person column.

**Note: My result includes "Nobel Peace Prize", which may be a bit different from the result which only includes the tags list above.**

In [None]:
nobel_prize = df.where("predicate == '<hasWonPrize>' AND (object LIKE '<Nobel%')").select('subject', 'object', 'predicate')
sub_nob = nobel_prize.select('subject').withColumnRenamed("subject","id")
obj_nob = nobel_prize.select('object').withColumnRenamed("object","id")
v_nob = sub_nob.union(obj_nob).distinct()
e_nob = nobel_prize.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")


writer = df_subclasses.where("object == '<wordnet_writer_110794014>'").select('subject', 'object', 'predicate')
sub_wri = writer.select('subject').withColumnRenamed("subject","id")
obj_wri = writer.select('object').withColumnRenamed("object","id")
v_wri = sub_wri.union(obj_wri).distinct()
e_wri = writer.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

In [None]:
v = v_nob.union(v_wri).distinct()
e = e_nob.union(e_wri).distinct()

g = GraphFrame(v, e)

motifs = g.find("(Someone)-[hasWon]->(Nobel_Prize); (Someone)-[is]->(Writer)")
# ... and the predicate is `was born in`
res3 = motifs.filter("hasWon.predicate == '<hasWonPrize>' AND (Nobel_Prize.id LIKE '<Nobel%') AND Writer.id == '<wordnet_writer_110794014>'")
res3.count()

260

In [None]:
res3.select('Someone').withColumnRenamed('Someone',"Writer").sort('Writer').show(truncate = False)

+--------------------------------+
|Writer                          |
+--------------------------------+
|[<14th_Dalai_Lama>]             |
|[<Adrienne_Clarkson>]           |
|[<Al_Gore>]                     |
|[<Albert_Camus>]                |
|[<Albert_Einstein>]             |
|[<Albert_Lutuli>]               |
|[<Albert_Schweitzer>]           |
|[<Aleksandr_Solzhenitsyn>]      |
|[<Alexander_Prokhorov>]         |
|[<Alexei_Alexeyevich_Abrikosov>]|
|[<Alexis_Carrel>]               |
|[<Alfonso_García_Robles>]       |
|[<Alfred_Hermann_Fried>]        |
|[<Alfred_Kastler>]              |
|[<Alice_Munro>]                 |
|[<Alva_Myrdal>]                 |
|[<Alvin_E._Roth>]               |
|[<Alvin_Toffler>]               |
|[<Amartya_Sen>]                 |
|[<Anatole_France>]              |
+--------------------------------+
only showing top 20 rows



## 4. Nobel prize winners who were born in the same city as their spouses (Question D)
You may find the predicate `'<isMarriedTo>'` useful to create a Dataframe of all mariages.
Please also show the cities in which the Nobel laureates and their spouses were born.

Please sort the output alphabetically by the person (prize winner) column.

In [16]:
nobel_prize = df.where("predicate == '<hasWonPrize>' AND (object LIKE '<Nobel%')").select('subject', 'object', 'predicate')
sub_nob = nobel_prize.select('subject').withColumnRenamed("subject","id")
obj_nob = nobel_prize.select('object').withColumnRenamed("object","id")
v_nob = sub_nob.union(obj_nob).distinct()
e_nob = nobel_prize.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

marriage = df.where("predicate == '<isMarriedTo>'").select('subject', 'object', 'predicate')
sub_marri = marriage.select('subject').withColumnRenamed("subject","id")
obj_marri = marriage.select('object').withColumnRenamed("object","id")
v_marri = sub_marri.union(obj_marri).distinct()
e_marri = marriage.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

born = df.where("predicate == '<wasBornIn>'").select('subject', 'object', 'predicate')
sub_b = born.select('subject').withColumnRenamed("subject","id")
obj_b = born.select('object').withColumnRenamed("object","id")
v_b = sub_b.union(obj_b).distinct()
e_b = born.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")


In [26]:
v = v_nob.union(v_marri).union(v_b).distinct()
e = e_nob.union(e_marri).union(e_b).distinct()

g = GraphFrame(v, e)

motifs = g.find("(someone)-[hasWon]->(Nobel_Prize); (someone)-[isMarriedTo]->(someone0); (someone)-[]->(somewhere);(someone0)-[]->(somewhere0)")

res4 = motifs.filter("hasWon.predicate == '<hasWonPrize>' AND (Nobel_Prize.id LIKE '<Nobel%') AND isMarriedTo.predicate == '<isMarriedTo>' AND somewhere.id == somewhere0.id")
res4.select('someone').withColumnRenamed('someone',"Winner").distinct().sort('Winner').show(truncate = False)

+-------------------------+
|Winner                   |
+-------------------------+
|[<Al_Gore>]              |
|[<Carl_Ferdinand_Cori>]  |
|[<Dud_Dudley>]           |
|[<Edvard_Moser>]         |
|[<Fridtjof_Nansen>]      |
|[<Frédéric_Joliot-Curie>]|
|[<Gerty_Cori>]           |
|[<Irène_Joliot-Curie>]   |
|[<Jimmy_Carter>]         |
|[<Marie_Curie>]          |
|[<May-Britt_Moser>]      |
|[<Pierre_Curie>]         |
|[<Robert_Hofstadter>]    |
+-------------------------+



## 5. Politicians that are affiliated with a right-wing party (Question E)

We are looking for all connections of the form `polician -> party`, where party is a right-wing party and politicians are defined above. If one politician is associated with several right wing parties, you may count them several times.

Use `'<isAffiliatedTo>'` to find membership in organisations and `'<wikicat_Right-wing_parties>'` for right-wing parties organisations.

There are multiple ways to do this.

Please sort the output alphabetically by the person (politician) column.

In [27]:
poli = df_subclasses.where("object == '<wordnet_politician_110450303>'").select('subject', 'object', 'predicate')
sub_poli = poli.select('subject').withColumnRenamed("subject","id")
obj_poli = poli.select('object').withColumnRenamed("object","id")
v_poli = sub_poli.union(obj_poli).distinct()
e_poli = poli.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")


pty = df.where("predicate == '<isAffiliatedTo>'").select('subject', 'object', 'predicate')
sub_pty = pty.select('subject').withColumnRenamed("subject","id")
obj_pty = pty.select('object').withColumnRenamed("object","id")
v_pty = sub_pty.union(obj_pty).distinct()
e_pty = pty.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")


rw = df_subclasses.where("object == '<wikicat_Right-wing_parties>'").select('subject', 'object', 'predicate')
sub_rw = rw.select('subject').withColumnRenamed("subject","id")
obj_rw = rw.select('object').withColumnRenamed("object","id")
v_rw = sub_rw.union(obj_rw).distinct()
e_rw = rw.select("subject","object","predicate").withColumnRenamed("subject","src").withColumnRenamed("object","dst")

In [None]:
v = v_poli.union(v_pty).union(v_rw).distinct()
e = e_poli.union(e_pty).union(e_rw).distinct()

g = GraphFrame(v, e)

motifs = g.find("(Someone)-[]->(Politician); (Someone)-[IsAffiliatedTo]->(Party);(Party)-[]->(RightWing)")

res5 = motifs.filter("Politician.id == '<wordnet_politician_110450303>' AND IsAffiliatedTo.predicate == '<isAffiliatedTo>' AND RightWing.id == '<wikicat_Right-wing_parties>'")
res5.select('Someone').withColumnRenamed('Someone',"RightWingPolitician").distinct().sort('RightWingPolitician').show(truncate = False)


+---------------------------------+
|RightWingPolitician              |
+---------------------------------+
|[<A.N.M._Ehsanul_Hoque_Milan>]   |
|[<A._A._Wijethunga>]             |
|[<A._B._Colton>]                 |
|[<A._C._Clemons>]                |
|[<A._C._Gibbs>]                  |
|[<A._C._Hamlin>]                 |
|[<A._Clifford_Jones>]            |
|[<A._Dean_Jeffs>]                |
|[<A._F._M._Ahsanuddin_Chowdhury>]|
|[<A._G._Crowe>]                  |
|[<A._Homer_Byington>]            |
|[<A._J._M._Muzammil>]            |
|[<A._J._McNamara>]               |
|[<A._J._Ranasinghe>]             |
|[<A._K._A._Firoze_Noon>]         |
|[<A._K._Patel>]                  |
|[<A._Linwood_Holton_Jr.>]        |
|[<A._M._Starr>]                  |
|[<A._Piatt_Andrew>]              |
|[<A._Q._M._Badruddoza_Chowdhury>]|
+---------------------------------+
only showing top 20 rows

