# Jupyter notebook for INFO303 Lab 11: Apache Spark

In this notebook you can enter one or more lines of Python 3 code inside each "cell". `Control-Enter` or `Shift-Enter` then runs that code, and the result of executing the last line (if there is any output) is shown underneath. If you use `Shift-Enter` the cursor is also moved to the next cell. A new cell is added when you run the last one. *Note:* If you see `[*]` to the left of a cell being run, the code is still running.

You can save your progress using File > Save and Checkpoint.

**Lab 11 and the terms requirement**
* As you run the code cells below, Jupyter will add the code's output to this notebook. Saving that modified version of the notebook to your lab11 repository folder (and committing and pushing it) will satisfy the terms requirement.

**When you have finished these exercises:**
* Save your changes to the notebook into your lab11 folder by selecting `Download as` from the Jupyter File menu above. Select `Notebook (.ipynb)` to get a copy of the notebook with your output, that can be pushed to GitBucket. This can also be run and edited in the future. However, it is an XML file, so is not easy to read. If you want an easily readable (but non-interactive) version, select `HTML (.html)`. You may want to download both versions.
* Don't forget to commit and push your changed notebook to the GitBucket repository if you wish this lab to count towards the terms requirement.
* Select File > Close and Halt above and then close the browser window. Don't worry if you get error messages displayed in the SSH terminal window in which you entered the `docker-compose up` command. Type `Control-C` in the SSH window, and then enter `poweroff & exit` to shut down the virtual machine and close the SSH window. Finally, close VirtualBox.

<a id='links'></a>
## Useful links

 * [Spark RDD programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
 * [Python API documentation for the RDD class](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)
 * [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
 * [Python API documentation for the DataFrame class](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
 * [Python 3.7 standard library (standard datatypes and functions)](https://docs.python.org/3.7/library/)


## Exercise 1: Exploring the Shakespeare data set

In this exercise we will be using a data file containing the complete works of  William Shakespeare (plays and poems). If you did not upload the data file `shakespeare.txt` before opening this notebook, return (temporarily) to the original browswer tab showing http://127.0.0.1:8080/tree, click on Upload, and open that file from wherever you downloaded it to from Blackboard. Confirm the upload by clicking on the blue Upload button that appears.

To see what's in the data file, return (temporarily) to the original browswer tab showing http://127.0.0.1:8080/tree, and click on New<sub>&#9660;</sub> and select Terminal. In the resulting new tab (showing a Linux terminal prompt) enter:

```
more shakespeare.txt
```

Press the space bar to see each new page of text, and q when you have seen enough.

**Back in this notebook**

To query the Shakespeare data using Spark, run the code in each of the following cells (click on a cell and then type `Control-Enter` or `Shift-Enter`). Then look at the output, and try to understand what the code is doing.

First, note that the variable `sc` is predefined to hold a reference to a "Spark Context" object. This is configured to use as many Spark "worker" or "executor" processes as there are logical cores on the computer. Spark can also be configured to use workers running on external computers in a cluster, but we are not doing that for this lab.

In [1]:
sc

Now load the data.

In [2]:
lines = sc.textFile('shakespeare.txt')
lines

shakespeare.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

The variable `lines` now holds a reference to an object of class RDD: a *resilient distributed dataset*. The RDD will be associated with the file, but because Spark uses lazy execution, the file won't be read yet (you can see that by adding some extra characters to the file name - Spark won't yet realise that the file doesn't exist). When it is read, the dataset will be automatically partitioned, with each partition held in memory in one of the executor processes.

Now let's ask Spark to count the lines of data in the file. It will create the RDD (possibly giving an error message if the file doesn't exist), and then return the answer.

In [3]:
lines.count()

175436

The cell below calls the `take(n)` method. This retrieves the first *n* data items from the RDD and returns them in a Python list (so the list must be small enough to fit into memory).

In [4]:
lines.take(10)

['hamlet@0\t\tHAMLET',
 'hamlet@8\t',
 'hamlet@9\t',
 'hamlet@10\t\tDRAMATIS PERSONAE',
 'hamlet@29\t',
 'hamlet@30\t',
 'hamlet@31\tCLAUDIUS\tking of Denmark. (KING CLAUDIUS:)',
 'hamlet@74\t',
 'hamlet@75\tHAMLET\tson to the late, and nephew to the present king.',
 'hamlet@131\t']

Note: `u'Some text'` is Python's way of writing the string "Some text". The `u` tells us that the String is encoded using Unicode. `\t` represents a tab character.

Note that each line begins with a *key*, which is the name of the work (a play or poem) followed by "@" and a byte offset from the start of the work.

Now let's see how the data has been partitioned by Spark.

In [5]:
lines.getNumPartitions()

2

The result of `getNumPartitions` may show that the data has been broken up into separate partitions by Spark. The number of partitions would be larger if you were running this code on a Spark cluster rather than a single machine, and if the dataset were larger.

Next, to see the range of methods that are available to be called on an RDD object, position the cursor after the '`.`' in the cell below and press the tab key (you may need to delete and re-enter the "." for this to work, and it may be slow to respond). Use the Page Down key to see all the methods, and `Esc` to close the list. If you want to read what these methods do, see the [documentation for the DataFrame class](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

**Note: You don't need to *run* the cell below (using Control-Enter` or `Shift-Enter), but if you do, you will get an error message unless you complete the line to call one of the RDD class's methods with valid arguments.**

In [None]:
lines.

Now let's get rid of the key at the start of each line in the data set. Compare the result of the code below with that from `lines.take(10)` above. The `map` method takes a function as an argument and applies it to each item in the data set (in this case, a line of text). A new RDD is returned with each item replaced by its mapped value. Here we do the mapping using an unnamed "lambda function" that takes one parameter (`line`) and returns the value after the ":". That is the result of splitting the line into two substrings using the first tab character as the separator, and then returning the second substring (which has index 1 because in Python, like Java, lists begin at index 0).

In [6]:
lines2 = lines.map(lambda line: line.split('\t',1)[1])
lines2.take(10)

['\tHAMLET',
 '',
 '',
 '\tDRAMATIS PERSONAE',
 '',
 '',
 'CLAUDIUS\tking of Denmark. (KING CLAUDIUS:)',
 '',
 'HAMLET\tson to the late, and nephew to the present king.',
 '']

Python's `lambda` notation is very useful for passing short unnamed functions to other functions like map and reduce. The notation is:

  `lambda` *arg1*, ..., *argn*`:` *SomeExpressionUsingTheArguments*

The expression is evaluated and its result is returned when the function is called.

Below we create a new RDD named `scenes` that contains the descriptions of the scenes in all the plays. The RDD class's `filter` method takes a Boolean-valued function as an argument and includes in its result only the items for which the function returns `true`. The `collect` method retrieves all data from an RDD and returns it in a Python list (so it should only be used when the RDD is small enough to fit into memory on the local computer). You can read about these, and other, RDD functions at https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD (which is also listed under [Useful Links](#links) above).

In [7]:
scenes = lines2.filter(lambda line: 'SCENE ' in line)
scenes.collect()

['SCENE I\tElsinore. A platform before the castle.',
 'SCENE II\tA room of state in the castle.',
 "SCENE III\tA room in Polonius' house.",
 'SCENE IV\tThe platform.',
 'SCENE V\tAnother part of the platform.',
 "SCENE I\tA room in POLONIUS' house.",
 'SCENE II\tA room in the castle.',
 'SCENE I\tA room in the castle.',
 'SCENE II\tA hall in the castle.',
 'SCENE III\tA room in the castle.',
 "SCENE IV\tThe Queen's closet.",
 'SCENE I\tA room in the castle.',
 'SCENE II\tAnother room in the castle.',
 'SCENE III\tAnother room in the castle.',
 'SCENE IV\tA plain in Denmark.',
 'SCENE V\tElsinore. A room in the castle.',
 'SCENE VI\tAnother room in the castle.',
 'SCENE VII\tAnother room in the castle.',
 'SCENE I\tA churchyard.',
 'SCENE II\tA hall in the castle.',
 'SCENE I\tLondon. A street.',
 'SCENE II\tThe same. Another street.',
 'SCENE III\tThe palace.',
 'SCENE IV\tLondon. The Tower.',
 'SCENE I\tLondon. The palace.',
 'SCENE II\tThe palace.',
 'SCENE III\tLondon. A street.',
 

Next, we generate a sorted RDD of pairs (count, word) for each word in the data set. The backslashes at the end of the line tell Python that the following line continues the current one (otherwise it will complain about the syntax because line endings are significant to Python).

In [8]:
wordcounts = \
    lines2.map(lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').replace(':',' ')
                          .replace('?',' ').replace(';',' ').replace('!',' ').lower()) \
    .flatMap(lambda x: x.split()) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x,y: x+y) \
    .map(lambda x: (x[1],x[0])) \
    .sortByKey(True)
wordcounts.take(10)

[(1, '(gentlemen'),
 (1, '[francisco'),
 (1, 'illume'),
 (1, 'polacks'),
 (1, "shark'd"),
 (1, 'sheeted'),
 (1, 'precurse'),
 (1, 'demonstrated'),
 (1, 'crows]'),
 (1, "saviour's")]

To understand how this works, try out the code in the section titled "Revisiting the wordcount example" at the top of the web page at http://www.mccarroll.net/blog/pyspark2/. The code on that web page from the second shaded box onwards appears below. Read the description of each step and then try the code in this notebook.

#### Code from the 2nd shaded box:

In [9]:
lines = sc.parallelize(['Its fun to have fun,','but you have to know how.']) 
wordcounts = lines.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \
        .flatMap(lambda x: x.split()) \
        .map(lambda x: (x, 1)) \
        .reduceByKey(lambda x,y:x+y) \
        .map(lambda x:(x[1],x[0])) \
        .sortByKey(False) 
wordcounts.take(10)

[(2, 'have'),
 (2, 'to'),
 (2, 'fun'),
 (1, 'but'),
 (1, 'you'),
 (1, 'how'),
 (1, 'know'),
 (1, 'its')]

Rather than chaining together method calls, an alternative approach would be to assign the result of each method call to a variable, and progressively call the methods on each intermediate RDD. As noted above, Spark uses a "lazy" approach to generating RDDs. It usually does no computation on data (including the generation of intermediate RDDs) until some results are requested, e.g. using `take` or `collect`. 

#### Code for step 1: `map(<function>)`

In [10]:
r1 = lines.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())
r1.take(10)

['its fun to have fun ', 'but you have to know how ']

#### Code for step 2: `flatMap(<function>)`

In [11]:
r2 = r1.flatMap(lambda x: x.split())
r2.take(20)

['its', 'fun', 'to', 'have', 'fun', 'but', 'you', 'have', 'to', 'know', 'how']

#### Code for step 3: `map(<function>)` (again)

In [12]:
r3 = r2.map(lambda x: (x, 1))
r3.take(20)

[('its', 1),
 ('fun', 1),
 ('to', 1),
 ('have', 1),
 ('fun', 1),
 ('but', 1),
 ('you', 1),
 ('have', 1),
 ('to', 1),
 ('know', 1),
 ('how', 1)]

#### Code for step 4: `reduceByKey(<function>)`

In [13]:
r4 = r3.reduceByKey(lambda x,y:x+y)
r4.take(20)

[('have', 2),
 ('but', 1),
 ('to', 2),
 ('you', 1),
 ('how', 1),
 ('know', 1),
 ('its', 1),
 ('fun', 2)]

#### Code for step 5: `map(<function>)` (yet again!)

In [14]:
r5 = r4.map(lambda x:(x[1],x[0]))
r5.take(20)

[(2, 'have'),
 (1, 'but'),
 (2, 'to'),
 (1, 'you'),
 (1, 'how'),
 (1, 'know'),
 (1, 'its'),
 (2, 'fun')]

#### Code for step 6: `sortByKey( ascending=True|False )`

In [15]:
r6 = r5.sortByKey(ascending=False)
r6.take(20)

[(2, 'have'),
 (2, 'to'),
 (2, 'fun'),
 (1, 'but'),
 (1, 'you'),
 (1, 'how'),
 (1, 'know'),
 (1, 'its')]

Now, leaving that example, make sure that you understand the difference between the functions `map` and `flatMap`, by predicting what each of these cells will do, and then running them.

In [16]:
words = sc.parallelize(['aardvark', 'baby', 'carrot']) # Create an RDD from a Python list
word_chars = words.map(list)  # The function 'list' converts a string to a list of characters (which are just short strings)
word_chars.collect() # Collect gathers the RDD into a Python list.

[['a', 'a', 'r', 'd', 'v', 'a', 'r', 'k'],
 ['b', 'a', 'b', 'y'],
 ['c', 'a', 'r', 'r', 'o', 't']]

In [17]:
all_chars = words.flatMap(list)
all_chars.collect()

['a',
 'a',
 'r',
 'd',
 'v',
 'a',
 'r',
 'k',
 'b',
 'a',
 'b',
 'y',
 'c',
 'a',
 'r',
 'r',
 'o',
 't']

In [18]:
distinct_chars = all_chars.distinct()
distinct_chars.collect()

['r', 'b', 'y', 'c', 'a', 'd', 'v', 'k', 'o', 't']

## Exercise 2: Exploring the Enron dataset

The Enron dataset is a corpus of over 600,000 email messages from the Enron corporation. You can read about it [here](https://en.wikipedia.org/wiki/Enron_Corpus). If you did not upload the data file `messages.json.zip` before opening this notebook, return (temporarily) to the original browswer tab showing http://127.0.0.1:8080/tree, click on Upload, and open that file from wherever you downloaded it to from Blackboard. Confirm the upload by clicking on the blue Upload button that appears.

The code in the cell below removes the Exercise 1 datasets from memory and unzips the Enron email dataset file. This may take a while. You will know it is finished when the '`*`' in `[*]` (to the left of the cell) is replaced with a space. Also, the circle to the right of "Python 3' at the top left of this Jupyter notebook will turn solid black while the computation is running, then change to a hollow circle once it is finished. The same applies to the other cells below.

In [20]:
%reset -sf
import gc
gc.collect()

!unzip -o messages.json.zip

Archive:  messages.json.zip
  inflating: messages.json


Rather than reading the data into a Spark "resilient distributed dataset" (RDD), we will  create a Spark "data frame" from the JSON data file. A data frame includes schema information (inferred from the JSON file in this case). The third command below prints the schema, and the last command prints the first message in the data set. The variable `spark` is predefined to hold a reference to a "Spark Session" object.

In [21]:
messages = spark.read.json("messages.json")
print(messages, '\n')
messages.printSchema() # Print the schema
print(f'Number of messages: {messages.count()}')
messages.first() # Look at the first email message

DataFrame[_id: struct<$oid:string>, body: string, filename: string, headers: struct<Attendees:string,Bcc:string,Cc:string,Content-Transfer-Encoding:string,Content-Type:string,Date:string,From:string,Message-ID:string,Mime-Version:string,Re:string,Subject:string,Time:string,To:string,X-FileName:string,X-Folder:string,X-From:string,X-Origin:string,X-To:string,X-bcc:string,X-cc:string>, mailbox: string, subFolder: string] 

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- body: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- headers: struct (nullable = true)
 |    |-- Attendees: string (nullable = true)
 |    |-- Bcc: string (nullable = true)
 |    |-- Cc: string (nullable = true)
 |    |-- Content-Transfer-Encoding: string (nullable = true)
 |    |-- Content-Type: string (nullable = true)
 |    |-- Date: string (nullable = true)
 |    |-- From: string (nullable = true)
 |    |-- Message-ID: string (nullable = true)
 |    |-- Mime

Row(_id=Row($oid='4f16fc97d1e2d32371003e27'), body="the scrimmage is still up in the air...\n\n\nwebb said that they didnt want to scrimmage...\n\nthe aggies  are scrimmaging each other... (the aggie teams practiced on \nSunday)\n\nwhen I called the aggie captains to see if we could use their field.... they \nsaid that it was tooo smalll for us to use...\n\n\nsounds like bullshit to me... but what can we do....\n\n\nanyway... we will have to do another practice Wed. night....    and I dont' \nknow where we can practice.... any suggestions...\n\n\nalso,  we still need one  more person...", filename='450.', headers=Row(Attendees=None, Bcc=None, Cc=None, Content-Transfer-Encoding='7bit', Content-Type='text/plain; charset=us-ascii', Date='Tue, 14 Nov 2000 08:22:00 -0800 (PST)', From='michael.simmons@enron.com', Message-ID='<6884142.1075854677416.JavaMail.evans@thyme>', Mime-Version='1.0', Re=None, Subject='Re: Plays and other information', Time=None, To='eric.bass@enron.com', X-FileName='e

Spark data frames support the use of SQL (extended to allow nested column/property names, e.g. `headers.To`), as shown below. Before doing an SQL query, you must create a "view" of the data that can be treated like a table.

In [22]:
messages.createOrReplaceTempView("messages")
# Find the total number of message recipients in the data set
spark.sql("select count(distinct headers.To) tocount from messages").collect()

[Row(tocount=57828)]

There are also a range of useful methods we can use on our data frame, for example:

In [23]:
# Count emails to louis.soldano@...
messages.filter(messages.headers.To == 'louis.soldano@enron.com').count()

111

Now, let's create a data frame with a row for each sender/recipient pair, and a count of how many messages were sent from the sender to the recipient. Note that this may require splitting a single email message with multiple recipients into multiple records, each with a single recipient.
This could be done using map-reduce on the RDD underlying the data frame. Here, we do this using the "Hive SQL" *lateral view* construct (documented [here](https://docs.databricks.com/spark/latest/spark-sql/language-manual/select.html#lateral-view)) and the `explode` and `split` functions (documented [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)).

The following query creates a new data frame with columns named `sender` and `recipient`. There is a row for each occurrence of a sender-recipient pair in the data set (there may be multiple entries for any given pair). The change of column names from `from` and `to` to `sender` and `recipient` is to avoid confusing Hive SQL, which doesn't like having a column named "from".

In [24]:
sender_recipient_pairs = spark.sql("select messages.headers.From as sender, recipient, count(*) as count "
                                   "from messages lateral view "
                                      "explode(split(messages.headers.To, '[,\\\\s]+')) tolist as recipient "
                                   "group by sender, recipient order by count desc")
sender_recipient_pairs.printSchema()
sender_recipient_pairs.show(5, truncate=False)

root
 |-- sender: string (nullable = true)
 |-- recipient: string (nullable = true)
 |-- count: long (nullable = false)

+------------------------+-------------------------+-----+
|sender                  |recipient                |count|
+------------------------+-------------------------+-----+
|pete.davis@enron.com    |pete.davis@enron.com     |9141 |
|vince.kaminski@enron.com|vkaminski@aol.com        |4316 |
|jeff.dasovich@enron.com |richard.shapiro@enron.com|2898 |
|jeff.dasovich@enron.com |paul.kaufman@enron.com   |2768 |
|jeff.dasovich@enron.com |susan.mara@enron.com     |2735 |
+------------------------+-------------------------+-----+
only showing top 5 rows



Notice that this data frame includes emails sent or CCed to the sender. Let's get rid of those.

In [25]:
sender_recipient_pairs.createOrReplaceTempView("sender_recipient_pairs")
no_self_mails_sender_recipient_pairs = spark.sql("select sender, recipient, count from sender_recipient_pairs "
                                                 "where sender != recipient")
no_self_mails_sender_recipient_pairs.show(5, truncate=False)

+------------------------+-------------------------+-----+
|sender                  |recipient                |count|
+------------------------+-------------------------+-----+
|vince.kaminski@enron.com|vkaminski@aol.com        |4316 |
|jeff.dasovich@enron.com |richard.shapiro@enron.com|2898 |
|jeff.dasovich@enron.com |paul.kaufman@enron.com   |2768 |
|jeff.dasovich@enron.com |susan.mara@enron.com     |2735 |
|jeff.dasovich@enron.com |james.steffes@enron.com  |2711 |
+------------------------+-------------------------+-----+
only showing top 5 rows



Finally, we will use a third-party "graphframes" library that will let us view the data as a "graph" (or network) with a directed edge between each sender and recipient. First we must create data frames representing the vertices and edges of the graph, and these must have specific column names (`id` for the vertices, and `src` and `dst` for the edges). Then we can search for pairs (a,b) of users that satisfy the "motif" (or pattern) `(a)-[]->(b); !(b)-[]->(a)`. This means that there is an edge (i.e. emails were sent) from `a` to `b`, but there is **no** edge from `b` to `a`.

You can read about the read about the graphframes library [here](https://graphframes.github.io/graphframes/docs/_site/user-guide.html).

In [26]:
from graphframes import *
no_self_mails_sender_recipient_pairs.createOrReplaceTempView("no_self_mails_sender_recipient_pairs")
vertices = spark.sql("select sender as id from no_self_mails_sender_recipient_pairs "
                     "union "
                     "select recipient from no_self_mails_sender_recipient_pairs")
edges = no_self_mails_sender_recipient_pairs.toDF('src','dst','count')
gf = GraphFrame(vertices, edges)
gf.find('(a)-[]->(b); !(b)-[]->(a)').show(5, truncate=False)

+---------------------------------------------------------------+---------------------------+
|a                                                              |b                          |
+---------------------------------------------------------------+---------------------------+
|[40enron@enron.com]                                            |[shona.wilson@enron.com]   |
|[412170.167547968.1@news.forbesdigital.com]                    |[brapp@enron.com]          |
|[6.757.0a-znicpactg56oyau8uoxbt0ev3srr.1@mail3.travelocity.com]|[kholst@enron.com]         |
|[a..connor@enron.com]                                          |[peggy.hedstrom@enron.com] |
|[aalmgren@capstoneturbine.com]                                 |[senator.morrow@sen.ca.gov]|
+---------------------------------------------------------------+---------------------------+
only showing top 5 rows



Can you think of other interesting queries to try using `find` and graph motifs? 

Other useful graph frame attributes and methods (besides `find`) include `inDegrees` (for each user, how many other users have mailed the user), `outDegrees` (for each user, how many other users have been emailed by that user), `pageRank(...)` and `stronglyConnectedComponents(...)`. Note that `inDegrees` and `outDegrees` are graphframe *attributes*: no brackets are added after these.

`pageRank(tol=...)` is the original web page ranking algorithm used by Google. In the context of the Enron emails, the results could be interpreted as the probability for each user that a piece of gossip randomly arriving at one of the users will be passed on through a chain of emails to that specified user (See [Wikipedia](https://en.wikipedia.org/wiki/PageRank)). The `tol` parameter specifies how much error can be tolerated (e.g. 0.01). The result of `pageRank` is another graphframe. You can find the page rank for the first vertex using `pageRankResult.vertices.take(1)`.

See [Wikipedia](https://en.wikipedia.org/wiki/Strongly_connected_component) for a definition of strongly connected components. Running `stronglyConnectedComponents(maxIter)` on the Enron data (even with our 10% sample) may take a long time, depending on what value you provide for the `maxIter` argument (`maxIter` is the maximum number of iterations to perform). Also, rather than printing the components, it might be best just to count them, using `.count()`.

In [32]:
gf.outDegrees

DataFrame[id: string, outDegree: int]

Finally, note that we can use graphframes to solve the "friends in common" problem. i.e., give a list of *friend* relations between users, for each pair of users, compute who is a friend of both of them. Here we use a small test graphframe so the computation is quick.

In [33]:
import pyspark.sql.functions as F
# Vertex DataFrame
v = sqlContext.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36)
], ["id", "name", "age"])
# Edge DataFrame
e = sqlContext.createDataFrame([
    ("a", "b", "friend"),
    ("a", "f", "friend"),
    ("b", "c", "friend"),
    ("c", "b", "friend"),
    ("c", "f", "friend"),
    ("f", "c", "friend"),
    ("e", "f", "friend"),
    ("e", "d", "friend"),
    ("d", "a", "friend")
], ["src", "dst", "relationship"])
gf2 = GraphFrame(v,e)
gf2.find('(x)-[]->(z); (y)-[]->(z)').filter("x != y").groupBy("x", "y").agg(F.collect_set("z")).show(5, truncate=False)

+----------------+----------------+------------------------------+
|x               |y               |collect_set(z)                |
+----------------+----------------+------------------------------+
|[a, Alice, 34]  |[e, Esther, 32] |[[f, Fanny, 36]]              |
|[e, Esther, 32] |[c, Charlie, 30]|[[f, Fanny, 36]]              |
|[f, Fanny, 36]  |[b, Bob, 36]    |[[c, Charlie, 30]]            |
|[c, Charlie, 30]|[a, Alice, 34]  |[[f, Fanny, 36], [b, Bob, 36]]|
|[e, Esther, 32] |[a, Alice, 34]  |[[f, Fanny, 36]]              |
+----------------+----------------+------------------------------+
only showing top 5 rows



You may like to explore other graphframes queries using this data. When you have finished with this notebook, follow the instructions in the top cell for closing this notebook (and exporting the output to an HTML file, if you wish).

# Reflection

In this lab, you have seen how Hadoop's model of map-reduce programming (as discussed in a lecture) is generalised in Apache Spark. Spark's "resilient distributed dataset" (RDD) class allows operations such as filter, map, reduce and sort to be applied to a dataset split into multiple partitions due to its size, and these operations can be applied in a chain rather than allowing only a single map and reduce sequence (as in Hadoop). RDDs are read-only, so each operation that performs a computation using data actually creates a new RDD.

In Exercise 2, you saw how you can use Spark data frames rather than RDDs, and perform queries on them using an extended versio of SQL. Finally, you also saw some example of an alternative way of viewing data: as a graph (which in this context means a network of nodes and edges that connect them). The graphframes library allowed you to query the Enron dataset using "motifs": patterns expressed in terms of nodes and edges. You may have also experimeted with calling graph algorithms such as pageRank on the Enron dataset.
