# Labo 3: Spark

Authors: Christopher MEIER and Guillaume HOCHET

Based on the work of: Gary MARIGLIANO and Miguel SANTAMARIA

For MAC course given by Nastaran FATEMI

Date: November 2019

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.4`
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.WARN)

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._

[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

In [2]:
// Create a spark session
// To have better integration with Jupyter, we use a wrapper class provided by almond-spark
import org.apache.spark.sql._
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}
import spark.implicits._

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/11/28 15:43:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[32mimport [39m[36morg.apache.spark.sql._
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@609b29dc
[32mimport [39m[36mspark.implicits._[39m

In [3]:
// Retrieve the Spark context
def sc = spark.sparkContext

defined [32mfunction[39m [36msc[39m

In [4]:
case class Movie(id: Int, title: String, genres: Seq[String],
                 description: String, director: String, actors: Seq[String],
                 year: Int, rating: Float, votes: Int)

defined [32mclass[39m [36mMovie[39m

In [5]:
  def parseRow(row: Row): Movie = {
    val id = row.getInt(0)
    val title = row.getString(1)
    val genres = row.getString(2).split(",").toList
    val description = row.getString(3)
    val director = row.getString(4)
    val actors = row.getString(5).split(",").toList
    val year = row.getInt(6)
    val rating = row.getDouble(8).toFloat
    val votes = row.getInt(9)

    Movie(id, title, genres, description, director, actors, year, rating, votes)
  }

defined [32mfunction[39m [36mparseRow[39m

In [6]:
val filename = "../data/IMDB-Movie-Data.csv"
val moviesDF = spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(filename)
val rddMovies = moviesDF.rdd.map(parseRow)

[36mfilename[39m: [32mString[39m = [32m"../data/IMDB-Movie-Data.csv"[39m
[36mmoviesDF[39m: [32mDataFrame[39m = [Rank: int, Title: string ... 10 more fields]
[36mrddMovies[39m: [32mRDD[39m[[32mMovie[39m] = MapPartitionsRDD[13] at map at cmd5.sc:7

In [7]:
// Print the title of the first 10 movies to see if they were correctly added.
rddMovies.take(10).map(m => m.title).foreach(println)

Guardians of the Galaxy
Prometheus
Split
Sing
Suicide Squad
The Great Wall
La La Land
Mindhorn
The Lost City of Z
Passengers


## Part 1 - Playing with the movies using RDD functions

The goal of this part is to play (i.e. query, filter and transform the data) with the movies.

### Ex1 - Print the movies whose title contains "City" 

Goal: 

* use `map()` and `filter()` methods to get the title of the movies that contains "City" in their title
 
Output example:

```plain
City of Tiny Lights
The Mortal Instruments: City of Bones
```

Steps:

* Use `filter()` to only keep the movies that contains "City" in their title
* Use `map()` to retrieve the titles of these filtered movies
* Use `foreach()` to pretty print the results


In [8]:
rddMovies.map(m => m.title).filter(t => t.contains("City")).foreach(println)



The Lost City of Z
Sin City: A Dame to Kill For
City of Tiny Lights
The Mortal Instruments: City of Bones
Sex and the City
Sex and the City 2


### Ex2- Print the title of the movies rated between `rateMin` and `rateMax`. Take the 10 worst ratings.

Goal:
    
* Take the titles of the movies that were rated between `rateMin` and `rateMax` (exclusing `rateMin` and including`rateMax`).
* This list is sorted by rating ASC
    
Output example:

```plain
...
3.5 - Wrecker
3.7 - The Last Face
...
```
    
Steps:

* Use `filter()` to only keep the movies released between `rateMin` and `rateMax`
* Sort the filtered movies by decreasing rating
* Use `map()` to keep only the relevant attributes (i.e. rating and title)
* Use `foreach()` to pretty print the results

In [9]:
val rateMin = 1
val rateMax = 10

rddMovies
    .filter(m => (m.rating <= rateMax) && (m.rating > rateMin))
    .sortBy(m => m.rating, true)
    .map(m => m.title + " " + m.rating)
    .take(10).foreach(println)

Disaster Movie 1.9
Don't Fuck in the Woods 2.7
Dragonball Evolution 2.7
Tall Men 3.2
The Intent 3.5
Wrecker 3.5
The Last Face 3.7
Satanic 3.7
The Disappointments Room 3.9
The Black Room 3.9


[36mrateMin[39m: [32mInt[39m = [32m1[39m
[36mrateMax[39m: [32mInt[39m = [32m10[39m

### Ex3 - Print the 10 top genres

Goals:

* Print the list of the genres that appears the most
* Use `flatMap()`

Output example:

```plain
Drama (513)
Action (303)
Comedy (279)
Adventure (259)
```

Theory:

When an operation is giving you a sequence of sequences like:

```scala
Array("hello", "world").map(word => word.split(""))
res91: Array[Array[String]] = Array(Array(h, e, l, l, o), Array(w, o, r, l, d))
```

You may want to flatten this to only have a single list like:
```scala
Array("hello", "world").map(_.split("")).flatten
res93: Array[String] = Array(h, e, l, l, o, w, o, r, l, d)
```

You can achieve the same result (i.e. `map` + `flatten`) using `flatMap`:
```scala
Array("hello", "world").flatMap(_.split(""))
res95: Array[String] = Array(h, e, l, l, o, w, o, r, l, d)
```

We are going to apply this same technique with the `genres` member.

Steps:

* Use `flatMap()` to get the list with all the genres
* Make sure to remove trailling whitespaces
* Count the genres
* Sort them by decreasing order
* Show the top N genres

In [10]:
val N = 10 // N top genres

rddMovies
    .flatMap(m => m.genres)
    .map(g => (g, 1)) // in order to count 
    .reduceByKey(_ + _)
    .sortBy(_._2, false)
    .take(N)
    .foreach(println)

(Drama,513)
(Action,303)
(Comedy,279)
(Adventure,259)
(Thriller,195)
(Crime,150)
(Romance,141)
(Sci-Fi,120)
(Horror,119)
(Mystery,106)


[36mN[39m: [32mInt[39m = [32m10[39m

### Ex4 - Print the average number of votes per year, order by descreasing number of votes

Goal:

* Print the average votes per year
* This output is sorted by descreasing votes

Output example:

```plain
...
year: 2008 average votes: 275505.3846153846
year: 2009 average votes: 255780.64705882352
year: 2010 average votes: 252782.31666666668
...
```

Theory:

We are going to use `reduceByKey()` which has the following signature `reduceByKey(func: (V, V) => V): RDD[(K, V)]`. 

`reduceByKey()` works on a RDD like `RDD[(K,V)]` (i.e. sort of "list of key/values pairs"). 

`reduceByKey()` takes a function that, from two elements, returns one i.e. the `func: (V, V) => V` in the signature.
The difference with `reduce()` is that `reduceByKey()` uses two elements sharing the same key.

For example (pseudo code):

```plain
 year, count
(2010, 2)
(2011, 3)
(2011, 4)
(2010, 8)
// use reduceByKey((count1, count2) => count1+count2)
> (2010, 10)
> (2011, 7)
```

Note: here `count` is just an Int but it can be anything e.g. `Movie`

Steps:

* To compute the average we need the **total sum** of votes per year and the **count** of all the movies per year
* Use `map()` to create an RDD made of `(year, (votes, 1))`. Like a word count we use the `1` to be able to count the number of movies per year
* Use `reduceByKey()` to sum the votes and to count the number of movies per year. The output should look like: `(year, (totalVotes, moviePerYearCount))`
* Find a way to compute the average using the result from the last operation
* Sort by number of votes decreasing

In [11]:
rddMovies
    .map(m => (m.year, (m.votes, 1)))
    .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)) // (year, (totalVotes, moviePerYearCount))
    .map(result => (result._1, result._2._1.toDouble / result._2._2)) // computes average votes per year
    .map(result => "year : " + result._1 + " average votes : " + result._2) // format output
    .foreach(println)

year : 2010 average votes : 252782.31666666668
year : 2014 average votes : 203930.22448979592
year : 2008 average votes : 275505.3846153846
year : 2006 average votes : 269289.95454545453
year : 2012 average votes : 285226.09375
year : 2015 average votes : 115726.22047244094
year : 2013 average votes : 219049.64835164836
year : 2016 average votes : 48591.75420875421
year : 2009 average votes : 255780.64705882352
year : 2011 average votes : 240790.3015873016
year : 2007 average votes : 244331.03773584907


## Part 2 - Create a basic Inverted Index

The goal of this part is to show you how to create an inverted index that indexes words from all the movies' description.

Goal:

Using `rddMovies` create an inverted that use the movies' description:

```plain
Movie(1,Guardians of the Galaxy,List(Action, Adventure, Sci-Fi),A group of intergalactic [...] of the universe.,James Gunn,List(Chris Pratt, Vin Diesel, Bradley Cooper, Zoe Saldana),2014,8.1,757074.0)
Movie(2,Prometheus,List(Adventure, Mystery, Sci-Fi),Following clues to the origin[...] not alone.,Ridley Scott,List(Noomi Rapace, Logan Marshall-Green, Michael Fassbender, Charlize Theron),2012,7.0,485820.0)
Movie(3,Split,List(Horror, Thriller),Three girls are kidnapped [...] a frightful new 24th.,M. Night Shyamalan,List(James McAvoy, Anya Taylor-Joy, Haley Lu Richardson, Jessica Sula),2016,7.3,157606.0)
...
```

and extract them to produce an inverted index like:

```plain
"reunion" -> (640, 697)
"runner" -> (338)
"vietnam" -> (797, 947, 983)
...
```

Steps

* Tokenize description i.e. produce an RDD like (movId, words)
* Normalize words e.g. toLowercase, trimming,..
* Remove stopwords (ignored here)
* Apply stemming (ignored here)
* Group by document id

In [12]:
    /**
    * Goal: create an inverted index that allows searching a word
    * in the movies description.
    * Features:
    * - case insensitive
    *
    */
    // TODO student
    // In this first function we are going to tokenize and order the descriptions of the movies, then return these data. We are not going to apply any search right now.
    def createInvertedIndex(movies: RDD[Movie]): RDD[(String, Iterable[Int])] = {
        // Define helper functions directly inside this function. In scala you can declare inner functions
        // and use them only inside the function they were declared. Useful to encapsulate/restrict 
        // their use outside this function.
        
        // Split the given string into an array of words (without any formatting), then return it.
        def tokenizeDescription(description: String): Seq[String] = {
            description.split(" ")
        }
        
        // Remove the blank spaces (trim) in the given word, transform it in lowercase, then return it.
        def normalizeWord(word: String): String = {
            word.toLowerCase.replaceAll("-+^.:,", "").stripSuffix(",") // remove all useless chars, such as comma
        }
        
        // For the sake of simplicity let's ignore the implementation (in a real case we would return true if w is a stopword, otherwise false).
        // TODO student nothing here but still call this function in your invertedIndex creation process.
        def isStopWord(w: String): Boolean = {
          false
        }
        
        // For the sake of simplicity let's ignore the implementation (in a real case we would apply stemming to w and return the result, e.g. w=automation -> w=automat).
        // TODO student nothing here but still call this function in your invertedIndex creation process.
        def applyStemming(w: String): String = {
          w
        }
      
       // TODO student
       // Here we are going to work on the movies RDD, by tokenizing and normalizing the description of every movie, then by building a key-value object that contains the tokens as keys, and the IDs of the movies as values
       // (see the example on 4).
       // The goal here is to do everything by chaining the possible transformations and actions of Spark.
       // Possible steps:
       //   1) What we first want to do here is applying the 4 previous methods on any movie's description. Be aware of the fact that we also want to keep the IDs of the movies.
       //   2) For each tokenized word, create a tuple as (word, id), where id is the current movie id
       //        [
       //          ("toto", 120), ("mange", 120), ("des", 120), ("pommes", 120),
       //          ("toto", 121), ("lance", 121), ("des", 121), ("photocopies", 121)
       //        ]
       //      Hint: you can use a `map` function inside another `map` function.
       //   3) We finally need to find a way to remove duplicated keys and thus only having one entry per key, with all the linked IDs as values. For example:
       //        [
       //          ("toto", [120, 121]),
       //          ("mange", [120]),
       //          ...
       //        ]
        
        
       // helper : def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B
       val invertedIndex = rddMovies
            .map(m => (m.description, m.id))
            .flatMap(d => tokenizeDescription(d._1)
                .map(word => (applyStemming(normalizeWord(word)), d._2))) // make the tuples (w, id) with w normalized
            .filter(tuple => !isStopWord(tuple._1))
            .filter(tuple => tuple._1.nonEmpty)
            .distinct() // remove duplicated ids 
            .groupByKey()
            

       // Return the new-built inverted index.
       invertedIndex
  }

defined [32mfunction[39m [36mcreateInvertedIndex[39m

Now we would like to use our inverted index to display the top N most used words in the descriptions of movies.

In [13]:
// TODO student
// Here we are going to operate the analytic and display its result on a given inverted index (that will be obtained from the previous function).
def topN(invertedIndex: RDD[(String, Iterable[Int])], N: Int): Unit = {
  // TODO student
  // We are going to work on the given invertedIndex array to do our analytic:
  //   1) Find a way to get the number of movie in which a word appears.
  //   2) Keep only the top N words and their occurence.
  val topMovies = invertedIndex.sortBy(_._2.size, false).take(N)
  
  // Print the words and the number of descriptions in which they appear.
  println("Top '" + N + "' most used words")
  topMovies.foreach(println)
}

defined [32mfunction[39m [36mtopN[39m

In [14]:
// Code used to test your implementation.
// Create the inverted index of the movies.
val invertedIndex = createInvertedIndex(rddMovies)

// Show how the inverted index looks like.
invertedIndex.take(3).foreach(x => println(x._1 + ": " + x._2.mkString(", ")))

// Show the top 10 most used words.
topN(invertedIndex, 10)

divorcee: 63
reunion: 697
confidants: 785


Top '10' most used words
(a,CompactBuffer(112, 320, 78, 762, 980, 145, 683, 300, 463, 876, 940, 422, 161, 547, 694, 440, 664, 563, 612, 119, 365, 222, 31, 302, 926, 156, 952, 627, 577, 121, 228, 59, 81, 21, 4, 449, 498, 985, 735, 715, 894, 444, 562, 837, 450, 594, 669, 816, 3, 496, 69, 806, 369, 372, 107, 403, 519, 864, 22, 332, 2, 288, 730, 823, 157, 208, 170, 445, 50, 443, 461, 483, 904, 855, 329, 564, 357, 16, 110, 352, 240, 924, 339, 987, 951, 148, 822, 298, 491, 230, 410, 336, 268, 195, 933, 388, 599, 387, 70, 32, 626, 42, 393, 138, 542, 912, 703, 513, 698, 773, 454, 825, 283, 617, 953, 244, 801, 757, 932, 573, 348, 17, 363, 961, 711, 251, 767, 537, 297, 984, 113, 370, 314, 95, 807, 878, 201, 242, 676, 517, 821, 813, 258, 838, 962, 692, 778, 60, 857, 171, 149, 988, 631, 430, 255, 51, 947, 942, 135, 648, 653, 724, 180, 185, 795, 539, 512, 111, 13, 130, 41, 557, 262, 420, 139, 847, 637, 986, 104, 120, 673, 331, 311, 660, 720, 353, 131, 351, 799, 533, 490, 903, 731, 83, 827, 862, 213

[36minvertedIndex[39m: [32mRDD[39m[([32mString[39m, [32mIterable[39m[[32mInt[39m])] = ShuffledRDD[38] at groupByKey at cmd11.sc:55

## Part 3 - Dataframe and SparkSQL

For all of the following exercices, write your queries in two different ways: 

* using the sql literal 
* using DataFrame API (select, where, etc.)

### Exercice 1 - Use the moviesDF DataFrame

* Use the dataframe `moviesDF` already created when loading the data 
* Print the schema of moviesDF
* Show the first 10 lines of the moviesDF as a table

In [15]:
moviesDF.printSchema()
moviesDF.show(10)
moviesDF.createOrReplaceTempView("movies")
spark.sql("SELECT * FROM movies LIMIT 10").show

root
 |-- Rank: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Runtime (Minutes): integer (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Votes: integer (nullable = true)
 |-- Revenue (Millions): double (nullable = true)
 |-- Metascore: integer (nullable = true)



+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|   1|Guardians of the ...|Action,Adventure,...|A group of interg...|          James Gunn|Chris Pratt, Vin ...|2014|              121|   8.1|757074|            333.13|       76|
|   2|          Prometheus|Adventure,Mystery...|Following clues t...|        Ridley Scott|Noomi Rapace, Log...|2012|              124|   7.0|485820|            126.46|       65|
|   3|               Split|     Horror,Thriller|Three girls are k...|  M. Night Shyamalan|James McAvoy, Any...

+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|   1|Guardians of the ...|Action,Adventure,...|A group of interg...|          James Gunn|Chris Pratt, Vin ...|2014|              121|   8.1|757074|            333.13|       76|
|   2|          Prometheus|Adventure,Mystery...|Following clues t...|        Ridley Scott|Noomi Rapace, Log...|2012|              124|   7.0|485820|            126.46|       65|
|   3|               Split|     Horror,Thriller|Three girls are k...|  M. Night Shyamalan|James McAvoy, Any...

#### Exercice 2 - Get the movies (id, title, votes, director) whose title contains "City" 

Apply two different ways: 

* use the sql literal 
* use DataFrame API (select, where, etc.)


In [64]:
spark.sql("SELECT title, votes, director FROM movies WHERE title LIKE '%City%'").show

moviesDF.select("title", "votes", "director")
    .where("title LIKE '%City%'")
    .show()

+--------------------+------+--------------------+
|               title| votes|            director|
+--------------------+------+--------------------+
|  The Lost City of Z|  7188|          James Gray|
|Sin City: A Dame ...|122185|        Frank Miller|
| City of Tiny Lights|   291|         Pete Travis|
|The Mortal Instru...|112313|        Harald Zwart|
|    Sex and the City|102547|Michael Patrick King|
|  Sex and the City 2| 62403|Michael Patrick King|
+--------------------+------+--------------------+



+--------------------+------+--------------------+
|               title| votes|            director|
+--------------------+------+--------------------+
|  The Lost City of Z|  7188|          James Gray|
|Sin City: A Dame ...|122185|        Frank Miller|
| City of Tiny Lights|   291|         Pete Travis|
|The Mortal Instru...|112313|        Harald Zwart|
|    Sex and the City|102547|Michael Patrick King|
|  Sex and the City 2| 62403|Michael Patrick King|
+--------------------+------+--------------------+



### Exercice 3 - Get the number of movies which have a number of votes between 500 and 2000 (inclusive range)

In [70]:
spark.sql("SELECT COUNT(*) FROM movies WHERE votes BETWEEN '500' AND '2000'").show

moviesDF.filter($"votes" >= 500 && $"votes" <= 2000).count()

+--------+
|count(1)|
+--------+
|      34|
+--------+



[36mres69_1[39m: [32mLong[39m = [32m34L[39m

# Exercice 4 - Get the minimum, maximum and average rating of films per director. Sort the results by minimum rating.  

In [72]:
spark.sql("SELECT director, MIN(rating) as min, MAX(rating) as max, AVG(rating) as avg FROM movies GROUP BY director ORDER BY min ").show

moviesDF
    .groupBy("director")
    .agg(min("rating"), max("rating"), avg("rating"))
    .orderBy($"min(rating)".asc)
    .show()

+--------------------+---+---+-----------------+
|            director|min|max|              avg|
+--------------------+---+---+-----------------+
|     Jason Friedberg|1.9|1.9|              1.9|
|       Shawn Burkett|2.7|2.7|              2.7|
|          James Wong|2.7|2.7|              2.7|
|   Jonathan Holbrook|3.2|3.2|              3.2|
|       Femi Oyeniran|3.5|3.5|              3.5|
|      Micheal Bafaro|3.5|3.5|              3.5|
|           Sean Penn|3.7|8.1|              5.9|
|     Jeffrey G. Hunt|3.7|3.7|              3.7|
|      Rolfe Kanefsky|3.9|3.9|              3.9|
|        George Nolfi|3.9|7.1|              5.5|
|         D.J. Caruso|3.9|6.9|            5.875|
|         Joey Curtis|4.0|4.0|              4.0|
|  Sam Taylor-Johnson|4.1|4.1|              4.1|
|  M. Night Shyamalan|4.2|7.3|5.533333333333332|
|    Gee Malik Linton|4.2|4.2|              4.2|
|     Elizabeth Banks|4.3|6.5|              5.4|
|         Kevin Smith|4.3|6.6|5.433333333333334|
|          Josh Tran

+--------------------+-----------+-----------+-----------------+
|            director|min(rating)|max(rating)|      avg(rating)|
+--------------------+-----------+-----------+-----------------+
|     Jason Friedberg|        1.9|        1.9|              1.9|
|       Shawn Burkett|        2.7|        2.7|              2.7|
|          James Wong|        2.7|        2.7|              2.7|
|   Jonathan Holbrook|        3.2|        3.2|              3.2|
|       Femi Oyeniran|        3.5|        3.5|              3.5|
|      Micheal Bafaro|        3.5|        3.5|              3.5|
|           Sean Penn|        3.7|        8.1|              5.9|
|     Jeffrey G. Hunt|        3.7|        3.7|              3.7|
|      Rolfe Kanefsky|        3.9|        3.9|              3.9|
|        George Nolfi|        3.9|        7.1|              5.5|
|         D.J. Caruso|        3.9|        6.9|            5.875|
|         Joey Curtis|        4.0|        4.0|              4.0|
|  Sam Taylor-Johnson|   

### Exercice 5 - Find the title of the movie(s) having the minimum rating for each year. Show the title, year and the rating information in the result, order by increasing rating.

**Example output**

```plain
+--------------------+----+------+
|               title|year|rating|
+--------------------+----+------+
|      Disaster Movie|2008|   1.9|
|Don't Fuck in the...|2016|   2.7|
|Dragonball Evolution|2009|   2.7|
...
```

In [83]:
spark.sql("""
SELECT movies.title, movies.year, minRatingsPerYear.min
FROM movies 
    INNER JOIN(
        SELECT year, MIN(rating) as min
        FROM movies
        GROUP BY year
    )minRatingsPerYear
ON minRatingsPerYear.year = movies.year
WHERE movies.rating = minRatingsPerYear.min
ORDER BY min""").show()

val tmpDF = moviesDF.select("title", "year", "rating")

moviesDF
    .groupBy("year")
    .agg(min("rating"))
    .join(tmpDF, "year")
    .filter($"rating" === $"min(rating)")
    .orderBy($"min(Rating)".asc)
    .show()

+--------------------+----+---+
|               title|year|min|
+--------------------+----+---+
|      Disaster Movie|2008|1.9|
|Don't Fuck in the...|2016|2.7|
|Dragonball Evolution|2009|2.7|
|             Wrecker|2015|3.5|
|  The Last Airbender|2010|4.2|
|            Movie 43|2013|4.3|
|Aliens vs Predato...|2007|4.7|
|    Megan Is Missing|2011|4.9|
|The Twilight Saga...|2011|4.9|
|            Sex Tape|2014|5.1|
|     Spring Breakers|2012|5.3|
|   Lady in the Water|2006|5.6|
|   Snakes on a Plane|2006|5.6|
+--------------------+----+---+



+----+-----------+--------------------+------+
|year|min(rating)|               title|rating|
+----+-----------+--------------------+------+
|2008|        1.9|      Disaster Movie|   1.9|
|2016|        2.7|Don't Fuck in the...|   2.7|
|2009|        2.7|Dragonball Evolution|   2.7|
|2015|        3.5|             Wrecker|   3.5|
|2010|        4.2|  The Last Airbender|   4.2|
|2013|        4.3|            Movie 43|   4.3|
|2007|        4.7|Aliens vs Predato...|   4.7|
|2011|        4.9|    Megan Is Missing|   4.9|
|2011|        4.9|The Twilight Saga...|   4.9|
|2014|        5.1|            Sex Tape|   5.1|
|2012|        5.3|     Spring Breakers|   5.3|
|2006|        5.6|   Lady in the Water|   5.6|
|2006|        5.6|   Snakes on a Plane|   5.6|
+----+-----------+--------------------+------+



[36mtmpDF[39m: [32mDataFrame[39m = [title: string, year: int ... 1 more field]

### Exercice 6 - Find the title of movies having the same director. 

**Example output**

```plain
+------------------+--------------------+--------------------+
|         director1|              title1|              title2|
+------------------+--------------------+--------------------+
|        James Gunn|Guardians of the ...|               Super|
|        James Gunn|Guardians of the ...|             Slither|
|      Ridley Scott|          Prometheus|        Body of Lies|
|      Ridley Scott|          Prometheus|         A Good Year|
...
```

Note that when using the dataframe API:
* You can use `===` to test equality of columns and `!==` to test non-equality of them
* You can use the `as` keyword to make alias of column names and table names (check the reference documentation)

In [92]:
spark.sql("""
SELECT movies.director, movies.title as title1, movies2.title as title2
FROM movies
INNER JOIN(
    SELECT director, FIRST_VALUE(title) as title
    FROM movies
    GROUP BY director
)movies2
ON movies.director = movies2.director
WHERE movies.title != movies2.title 
ORDER BY director
""")
.show

+--------------------+--------------------+--------------------+
|            director|              title1|              title2|
+--------------------+--------------------+--------------------+
|          Adam McKay|Talladega Nights:...|       The Big Short|
|          Adam McKay|      The Other Guys|       The Big Short|
|          Adam McKay|       Step Brothers|       The Big Short|
|       Adam Shankman|        Rock of Ages|           Hairspray|
|        Adam Wingard|           The Guest|         Blair Witch|
|         Alan Taylor|  Terminator Genisys|Thor: The Dark World|
|Alejandro Gonzále...|Birdman or (The U...|        The Revenant|
|Alejandro Gonzále...|               Babel|        The Revenant|
|       Alexandre Aja|The 9th Life of L...| The Hills Have Eyes|
|       Alexandre Aja|               Horns| The Hills Have Eyes|
|       Alexandre Aja|          Piranha 3D| The Hills Have Eyes|
|      Alfonso Cuarón|             Gravity|     Children of Men|
|       Andrew Niccol|   

In [110]:
val movies2DF = moviesDF.groupBy("director").agg(first("title").as("title2"))

moviesDF
    .select("director", "title")
    .join(movies2DF, "director")
    .where($"title" !== $"title2")
    .orderBy(moviesDF("director"))
    .withColumnRenamed("title", "title1")
    .show

+--------------------+--------------------+--------------------+
|            director|              title1|              title2|
+--------------------+--------------------+--------------------+
|          Adam McKay|Talladega Nights:...|       The Big Short|
|          Adam McKay|      The Other Guys|       The Big Short|
|          Adam McKay|       Step Brothers|       The Big Short|
|       Adam Shankman|        Rock of Ages|           Hairspray|
|        Adam Wingard|           The Guest|         Blair Witch|
|         Alan Taylor|  Terminator Genisys|Thor: The Dark World|
|Alejandro Gonzále...|Birdman or (The U...|        The Revenant|
|Alejandro Gonzále...|               Babel|        The Revenant|
|       Alexandre Aja|The 9th Life of L...| The Hills Have Eyes|
|       Alexandre Aja|               Horns| The Hills Have Eyes|
|       Alexandre Aja|          Piranha 3D| The Hills Have Eyes|
|      Alfonso Cuarón|             Gravity|     Children of Men|
|       Andrew Niccol|   

[36mmovies2DF[39m: [32mDataFrame[39m = [director: string, title2: string]