# Big Data Exam Report @ UniBo a.y. 2023/2024

- Manuel Andruccioli
- Kelvin Olaiya

In [None]:
// DO NOT RUN THIS CELL -- ONLY FOR TYPE CHECKER
import org.apache.spark.SparkContext
val sc = new SparkContext("local[*]", "BigDataExam")
val spark = org.apache.spark.sql.SparkSession.builder.appName("BigDataExam").getOrCreate()

In [1]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [2]:
sc.applicationId

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1714141781578_0001,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res1: String = application_1714141781578_0001
res3: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1714141781578_0001/


In [3]:
def unpersistRDD(): Unit = {
  sc.getPersistentRDDs.foreach(_._2.unpersist())
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

unpersistRDD: ()Unit


## Data structures and definitions

### Utility function for parsing

In [4]:
def getCharIndexes(line: String, char: Char): Seq[Int] = line.zipWithIndex.filter(_._1 == char).map(_._2) 
def splitAt(s: String, indices: Seq[Int]): Seq[String] = indices match {
  case h +: t => s.splitAt(h) match {
    case (a, b) => a +: splitAt(b, t.map(_ - h))
  }
  case Nil => Seq(s)
}
def parseCSVLine(l: String): Seq[String] = {
  val apices = getCharIndexes(l, '"').grouped(2).map { case Seq(a, b) => (a, b) }.toSeq
  val commas = getCharIndexes(l, ',').filter(i => !apices.exists { case (a, b) => a < i && i < b })
  return splitAt(l, commas).map(_.dropWhile(s => s == ',' || s == ' ')).map(_.replaceAll("^\"|\"$", ""))
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

getCharIndexes: (line: String, char: Char)Seq[Int]
splitAt: (s: String, indices: Seq[Int])Seq[String]
parseCSVLine: (l: String)Seq[String]


### Data structures

In [5]:
case class Track(
  uri: String,
  name: String,
  duration: Int,
  explicit: Boolean,
  artists: String,            // List of artists uri, separated by |
  available_markets: String,  // List of markets, separated by |
  album_uri: String,
  popularity: Int,
)

object Tracks {
  def fromCSVLine(line: String): Option[Track] = 
    parseCSVLine(line) match {
      case Seq(uri, name, duration, explicit, artists, available_markets, album_uri, popularity) =>
        try {
          Some(Track(uri, name, duration.toInt, explicit.toBoolean, artists, available_markets, album_uri, popularity.toInt))
        } catch {
          case _: Throwable => None
        }
    }
}

case class Playlist(
  pid: Int,
  name: String,
  num_follower: Int,
)

object Playlists {
  def fromCSVLine(line: String): Option[Playlist] = 
    parseCSVLine(line) match {
      case Seq(pid, name, num_follower) =>
        try {
            Some(Playlist(pid.toInt, name, num_follower.toInt))
        } catch {
          case _: Throwable => None
        }
    }
}

case class TrackInPlaylist(
  pid: Int,
  track_uri: String,
  pos: Int,
)

object TrackInPlaylists {
  def fromCSVLine(line: String): Option[TrackInPlaylist] = 
    parseCSVLine(line) match {
      case Seq(pid, track_uri, pos) =>
        try {
          Some(TrackInPlaylist(pid.toInt, track_uri, pos.toInt))
        } catch {
          case _: Throwable => None
        }
    }
}

case class Artist(
  uri: String,
  name: String,
  followers: Int,
  genres: String,             // List of genres, separated by |
  popularity: Int,
)

object Artists {
  def fromCSVLine(line: String): Option[Artist] =
    parseCSVLine(line) match {
      case Seq(uri, name, followers, genres, popularity) =>
        try {
          Some(Artist(uri, name, followers.toInt, genres, popularity.toInt))
        } catch {
          case _: Throwable => None
        }
    }
}

case class Album(
  uri: String,
  name: String,
  album_type: String,         // album, compilation, single.
  artists: String,            // List of artists uri, separated by |
  available_markets: String,  // List of markets, separated by |
  release_year: String,
  total_tracks: Int,
)

object Albums {
  def fromCSVLine(line: String): Option[Album] = 
    parseCSVLine(line) match {
      case Seq(uri, name, album_type, artists, available_markets, release_year, total_tracks) =>
        try {
          Some(Album(uri, name, album_type, artists, available_markets, release_year, total_tracks.toInt))
        } catch {
          case _: Throwable => None
        }
    }
}

case class Feature(
  uri: String,
  key: Int,
  loudness: Double,
  tempo: Double,
  mode: Boolean,
  danceability: Double,
  valence: Double,
  instrumentalness: Double,
  liveness: Double,
  acousticness: Double,
  energy: Double,
  speechiness: Double,
)

object Features {
  def fromCSVLine(line: String): Option[Feature] = 
    parseCSVLine(line) match {
      case Seq(uri, key, loudness, tempo, mode, danceability, valence, instrumentalness, liveness, acousticness, energy, speechiness) =>
        try {
          Some(Feature(uri, key.toInt, loudness.toDouble, tempo.toDouble, mode.toInt == 1, danceability.toDouble, valence.toDouble, instrumentalness.toDouble, liveness.toDouble, acousticness.toDouble, energy.toDouble, speechiness.toDouble))
        } catch {
          case e: Throwable => None
        }
    }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined class Track
defined object Tracks
defined class Playlist
defined object Playlists
defined class TrackInPlaylist
defined object TrackInPlaylists
defined class Artist
defined object Artists
defined class Album
defined object Albums
defined class Feature
defined object Features


## Dataset exploration

In [6]:
val datasetPath = "s3://unibo-bd2324-manuandru/exam/"
val outputPath = s"${datasetPath}output/"

val albumRdd = sc.textFile(s"${datasetPath}albums.csv").flatMap(Albums.fromCSVLine)
val artistRdd = sc.textFile(s"${datasetPath}artists.csv").flatMap(Artists.fromCSVLine)
val featureRdd = sc.textFile(s"${datasetPath}features.csv").flatMap(Features.fromCSVLine)
val playlistRdd = sc.textFile(s"${datasetPath}playlists.csv").flatMap(Playlists.fromCSVLine)
val trackInPlaylistRdd = sc.textFile(s"${datasetPath}tracks_in_playlists.csv").flatMap(TrackInPlaylists.fromCSVLine)
val trackRdd = sc.textFile(s"${datasetPath}tracks.csv").flatMap(Tracks.fromCSVLine)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

datasetPath: String = s3://unibo-bd2324-manuandru/exam/
outputPath: String = s3://unibo-bd2324-manuandru/exam/output/
albumRdd: org.apache.spark.rdd.RDD[Album] = MapPartitionsRDD[2] at flatMap at <console>:26
artistRdd: org.apache.spark.rdd.RDD[Artist] = MapPartitionsRDD[5] at flatMap at <console>:25
featureRdd: org.apache.spark.rdd.RDD[Feature] = MapPartitionsRDD[8] at flatMap at <console>:25
playlistRdd: org.apache.spark.rdd.RDD[Playlist] = MapPartitionsRDD[11] at flatMap at <console>:25
trackInPlaylistRdd: org.apache.spark.rdd.RDD[TrackInPlaylist] = MapPartitionsRDD[14] at flatMap at <console>:25
trackRdd: org.apache.spark.rdd.RDD[Track] = MapPartitionsRDD[17] at flatMap at <console>:25


In [7]:
val albumRddCached = albumRdd.cache()
val artistRddCached = artistRdd.cache()
val featureRddCached = featureRdd.cache()
val playlistRddCached = playlistRdd.cache()
val trackInPlaylistRddCached = trackInPlaylistRdd.cache()
val trackRddCached = trackRdd.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

albumRddCached: albumRdd.type = MapPartitionsRDD[2] at flatMap at <console>:26
artistRddCached: artistRdd.type = MapPartitionsRDD[5] at flatMap at <console>:25
featureRddCached: featureRdd.type = MapPartitionsRDD[8] at flatMap at <console>:25
playlistRddCached: playlistRdd.type = MapPartitionsRDD[11] at flatMap at <console>:25
trackInPlaylistRddCached: trackInPlaylistRdd.type = MapPartitionsRDD[14] at flatMap at <console>:25
trackRddCached: trackRdd.type = MapPartitionsRDD[17] at flatMap at <console>:25


In [8]:
println(s"Number of Albums: ${albumRddCached.count()}")
println(s"Number of Artists: ${artistRddCached.count()}")
println(s"Number of Track's Feature: ${featureRddCached.count()}")
println(s"Number of Playlist: ${playlistRddCached.count()}")
println(s"Number of Tracks add in Playlists: ${trackInPlaylistRddCached.count()}")
println(s"Number of Tracks: ${trackRddCached.count()}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of Albums: 734381
Number of Artists: 414153
Number of Track's Feature: 2261031
Number of Playlist: 2000000
Number of Tracks add in Playlists: 193778287
Number of Tracks: 2261729


### Average number of tracks

In [9]:
println(s"In Playlist: ${(trackInPlaylistRddCached.count().toDouble / playlistRddCached.count().toDouble).round}")
println(s"In Album: ${(albumRddCached.map(_.total_tracks).sum() / albumRddCached.count().toDouble).round}")
println(s"Per Artist (double counting on artist): ${
  trackRddCached.flatMap(t => t.artists.split('|').map(a => (a, 1))).
    reduceByKey(_ + _).
    map { case (_, count) => count }.
    mean().round}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In Playlist: 97
In Album: 11
Per Artist (double counting on artist): 7


### Explicit vs Non-Explicit Tracks

In [10]:
import org.apache.spark.sql.SaveMode

trackRddCached.map(t => (t.explicit, 1)).
  reduceByKey(_ + _).
  coalesce(1).
  toDF().write.format("csv").mode(SaveMode.Overwrite).save(s"${outputPath}explicit_vs_non_explicit.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode


![Explicit vs Non-Explicit Tracks](./img/explicit_vs_non_explicit.png)

### Tracks per year

In [11]:
albumRddCached.map(a => a.release_year).
  distinct().collect().sorted

albumRddCached.map(a => (a.uri, a.release_year)).
  join(trackRddCached.map(t => (t.album_uri, (t.uri, t.popularity)))).
  map { case (albumUri, (releaseYear, (trackUri, popularity))) => (releaseYear, 1) }.
  reduceByKey(_ + _).
  coalesce(1).
  sortByKey().
  toDF().write.format("csv").mode(SaveMode.Overwrite).save(s"${outputPath}tracks_per_year.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res27: Array[String] = Array(0000, 0001, 0013, 1197, 1885, 1889, 1899, 1900, 1901, 1905, 1906, 1908, 1909, 1910, 1911, 1917, 1918, 1919, 1920, 1922, 1923, 1924, 1925, 1926, 1927, 1928, 1929, 1930, 1931, 1932, 1933, 1934, 1935, 1936, 1937, 1938, 1939, 1940, 1941, 1942, 1943, 1944, 1945, 1946, 1947, 1948, 1949, 1950, 1951, 1952, 1953, 1954, 1955, 1956, 1957, 1958, 1959, 1960, 1961, 1962, 1963, 1964, 1965, 1966, 1967, 1968, 1969, 1970, 1971, 1972, 1973, 1974, 1975, 1976, 1977, 1978, 1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024)


![Track per Year](./img/tracks_per_year.png)

### Top

#### Most followed Playlist

In [12]:
playlistRddCached.map(p => (p.pid, p.name, p.num_follower)).
  sortBy(_._3, ascending = false).
  coalesce(1).
  toDF().write.format("csv").mode(SaveMode.Overwrite).save(s"${outputPath}playlist_with_followers.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

![Top 10 playlist per followers](./img/top10_playlist_per_followers.png)

#### Most popular Playlist

The playlist popularity is calculated as the mean of the popularity of the tracks in the playlist.

In [13]:
trackInPlaylistRddCached.
  map(t => (t.track_uri, t.pid)).
  join(trackRddCached.map(t => (t.uri, t.popularity))).
  map { case (_, (pid, popularity)) => (pid, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }.
  join(playlistRddCached.map(p => (p.pid, p.name))).
  map { case (pid, (popularity, name)) => (pid, name, popularity) }.
  sortBy(_._3, ascending = false).
  coalesce(1).
  toDF().write.format("csv").mode(SaveMode.Overwrite).save(s"${outputPath}playlist_with_popularity.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

![Top playlist per popularity](./img/top_playlist_per_popularity.png)

In [14]:
unpersistRDD()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Job 1

Given the following metrics:

  - track's popularity
  - average popularity of tracks in year
  - artist popularity (average if more artists are present in the track)

Understand which metrics influence mostly the playlists, averaging the values of the tracks appearing in each.
Also, aggregate the playlists on the previously calculated influence by averaging for the number of playlist followers.

> **most influent**: the metric that has the highest average value for a playlist. 

The query let us answering the following question:
a playlist influenced most by the popularity of the tracks has, on average, 500 followers. (Same for the other two metrics).

### Non optimized

First of all, we need to calculate the `popularity of the year`.

$$ \texttt{Popularity of the year} = 
\frac{
  \sum{\texttt{popularity of tracks in that year}}
}{
  \texttt{Number of tracks in that year}
}
$$

Given the fact that the `release year` of a track is on the album, we need to join the track with the album to get it.
After this first step, we can calculate the average popularity of the year.

In [15]:
// year -> popularity of the year
val popularityPerYear = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[86] at mapValues at <console>:32


After that, we need to get the `artist's popularity of a track`.
Given the fact that a track can have multiple artists, the `artist's popularity of a track` will be a mean of the `artist popularity` of the artists in the track.

$$ \texttt{Artist's popularity of a track} =
\frac{
  \sum{\texttt{popularity of artists in the track}}
}{
  \texttt{Number of artists in the track}
}
$$

> **Note**: The `artists_uri` is a *denormalized* field in the track, so we need to split it.

In [16]:
// track -> artist's popularity
val trackWithArtistPopularity = trackRdd.flatMap(t => t.artists.split('|').map(artistUri => (artistUri, t.uri))).
  join(artistRdd.map(a => (a.uri, a.popularity))).
  map { case (artistUri, (trackUri, artistPopularity)) => (trackUri, artistPopularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

trackWithArtistPopularity: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[94] at mapValues at <console>:32


At this point we can join all the data to get the `popularity`, `average popularity of the year` and `artist's popularity` of a track.

$$
\texttt{track_uri} \rightarrow (\texttt{popularity}, \texttt{average popularity of the year}, \texttt{artist's popularity})
$$

After that, we can join the `track` with the `playlist` and perform the aggregation, achieving the average of the metrics for each playlist:

$$
\texttt{pid} \rightarrow (\texttt{popularity}, \texttt{average popularity of the year}, \texttt{artist's popularity})
$$

Now, we can calculate the `most influent` metric for each playlist taking the maximum value of the metrics.
  
Finally, we can aggregate the playlists by the `most influent` metric and calculate the average number of followers for each metric.

In [17]:
val job = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }.
  join(trackWithArtistPopularity).
  map { case (trackUri, ((popularity, avgPopularityInYear), avgArtistPopularity)) => (trackUri, (popularity, avgPopularityInYear, avgArtistPopularity)) }.
  join(trackInPlaylistRdd.map(t => (t.track_uri, t.pid))).
  map { case (trackUri, ((popularity, avgPopularityInYear, avgArtistPopularity), pid)) => (pid, (popularity, avgPopularityInYear, avgArtistPopularity)) }.
  join(playlistRdd.map(p => (p.pid, p.num_follower))).
  map { case (pid, ((popularity, avgPopularityInYear, avgArtistPopularity), numFollower)) => (pid, (popularity, avgPopularityInYear, avgArtistPopularity, numFollower)) }.
  aggregateByKey((0.0, 0.0, 0.0, 0, 0))(
    { case ((accPop, accAvgPopInYear, accAvgArtistPop, _, count), (popularity, avgPopularityInYear, avgArtistPopularity, followers)) =>
      (accPop + popularity, accAvgPopInYear + avgPopularityInYear, accAvgArtistPop + avgArtistPopularity, followers, count + 1)
    },
    { case ((accPop1, accAvgPopInYear1, accAvgArtistPop1, followers, count1), (accPop2, accAvgPopInYear2, accAvgArtistPop2, _, count2)) =>
      (accPop1 + accPop2, accAvgPopInYear1 + accAvgPopInYear2, accAvgArtistPop1 + accAvgArtistPop2, followers, count1 + count2)
    }
  ).
  mapValues { case (accPop, accAvgPopInYear, accAvgArtistPop, followers, count) => (accPop / count, accAvgPopInYear / count, accAvgArtistPop / count, followers) }.
  map {
    case (pid, (avgPop, avgPopInYear, avgArtistPop, followers)) =>
      val maxAvg = Math.max(avgPop, Math.max(avgPopInYear, avgArtistPop))
      val indexOfBestAvg = Seq(avgPop, avgPopInYear, avgArtistPop).indexWhere(_ >= maxAvg)
      (indexOfBestAvg, followers)
  }.
  map { case (indexOfBestAvg, followers) => (indexOfBestAvg, (followers, 1)) }.
  reduceByKey { case ((followers1, c1), (followers2, c2)) => (followers1 + followers2, c1 + c2) }.
  mapValues { case (accF, c) => accF / c }

spark.time {
  job.collect.foreach {
    case (0, avgFollowers) => println(s"Playlist influenced most by Track's popularity has $avgFollowers followers on average")
    case (1, avgFollowers) => println(s"Playlist influenced most by popularity of the year has $avgFollowers followers on average")
    case (2, avgFollowers) => println(s"Playlist influenced most by Artist's popularity has $avgFollowers followers on average")
  }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

job: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[124] at mapValues at <console>:57
Playlist influenced most by Track's popularity has 13 followers on average
Playlist influenced most by popularity of the year has 145 followers on average
Playlist influenced most by Artist's popularity has 354 followers on average
Time taken: 592682 ms


### Optimization

The first part of the optimization process will be performed evaluating multiple different execution plan on a restricted part of the entire job.
The best one, according to the execution time, will be chosen and added to the final optimization.

#### Self-join

The first part of the job involves a self-join, and it could be resumed as the following steps:

1. Join tracks on album (only to get the year of the track)
2. Aggregate popularity by year to get the `popularity per year`
3. Join track with their popularity per year

Only for conceptual evaluation, it's been taken the following part:

In [18]:
// year -> popularity of the year
val popularityPerYear = trackRdd.map(t => (t.album_uri, t.popularity)).
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, (popularity, releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

val selfJoin1 = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }

spark.time {
  selfJoin1.count()
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[132] at mapValues at <console>:32
selfJoin1: org.apache.spark.rdd.RDD[(String, (Int, Double))] = MapPartitionsRDD[142] at map at <console>:31
Time taken: 47698 ms
res40: Long = 2261687


##### Broadcast variable

Considering the small size of `Album` table (`289.6 MiB`, `20.1 MiB` after the map), the could be done using a *broadcast variable* as follows:

In [19]:
val albumsBroadcast = sc.broadcast(albumRdd.map(a => (a.uri, a.release_year)).collectAsMap())

// year -> popularity of the year
val popularityPerYear = trackRdd.flatMap(t => albumsBroadcast.value.get(t.album_uri).map((_, t.popularity))).
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

val selfJoin2 = trackRdd.flatMap(t => albumsBroadcast.value.get(t.album_uri).map((_, (t.uri, t.popularity)))).
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }

spark.time {
  selfJoin2.count()
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

albumsBroadcast: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(62)
popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[146] at mapValues at <console>:31
selfJoin2: org.apache.spark.rdd.RDD[(String, (Int, Double))] = MapPartitionsRDD[151] at map at <console>:29
Time taken: 37996 ms
res45: Long = 2261687


##### RDD caching

According to the previous consideration, the sub job's been evaluated also using the RDD caching. 

In [20]:
val albumRddCached = albumRdd.map(a => (a.uri, a.release_year)).cache()

// year -> popularity of the year
val popularityPerYear = trackRdd.map(t => (t.album_uri, t.popularity)).
  join(albumRddCached).
  map { case (albumUri, (popularity, releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

val selfJoin3 = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).
  join(albumRddCached).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }

spark.time {
  selfJoin3.count()
}
unpersistRDD()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

albumRddCached: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[152] at map at <console>:24
popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[159] at mapValues at <console>:33
selfJoin3: org.apache.spark.rdd.RDD[(String, (Int, Double))] = MapPartitionsRDD[168] at map at <console>:31
Time taken: 47603 ms
res50: Long = 2261687


The optimization could be extended also to the `Track` table, considering it's size (`982.8 MiB`, `120.7 MiB` after the map).

In [21]:
val trackRddCached = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).cache()

// year -> popularity of the year
val popularityPerYear = trackRddCached.
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((_, popularity), releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

val selfJoin4 = trackRddCached.
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }

spark.time {
  selfJoin4.count()
}
unpersistRDD()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

trackRddCached: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[169] at map at <console>:24
popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[176] at mapValues at <console>:33
selfJoin4: org.apache.spark.rdd.RDD[(String, (Int, Double))] = MapPartitionsRDD[185] at map at <console>:31
Time taken: 35418 ms
res56: Long = 2261687


Also we can consider to cache both `Track` and `Album` RDDs, given them sizes.

In [22]:
val trackRddCached = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).cache()
val albumRddCached = albumRdd.map(a => (a.uri, a.release_year)).cache()

// year -> popularity of the year
val popularityPerYear = trackRddCached.
  join(albumRddCached).
  map { case (albumUri, ((_, popularity), releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

val selfJoin5 = trackRddCached.
  join(albumRddCached).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }

spark.time {
  selfJoin5.count()
}
unpersistRDD()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

trackRddCached: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[186] at map at <console>:24
albumRddCached: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[187] at map at <console>:24
popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[193] at mapValues at <console>:33
selfJoin5: org.apache.spark.rdd.RDD[(String, (Int, Double))] = MapPartitionsRDD[201] at map at <console>:31
Time taken: 32289 ms
res62: Long = 2261687


##### Results

| Optimization            | Execution Time |
|-------------------------|----------------|
| Original                | 50 sec         |
| Broadcast               | 36 sec         |
| Album Caching           | 46 sec         |
| Track Caching           | 35 sec         |
| Album and Track Caching | 32 sec         |

#### Aggregate before join & Hash partitioning of biggest RDD

##### Aggregate before join

Considering the initial job, the join on `Playlist` table (to get the number of followers) could be done after the first aggregation over pid on `Track_in_Playlist` table.

Usually, aggregate before join is a **good practice**, in order to avoid to shuffle more data than needed.
But, in this particular case, produce worse performance as we can see in the image below.

In particular:
- **Original Job**: `~9.7 min` image 1
- **Aggregate before join**: `~13 min` image 2

[Original Job Spark UI](./img/original_job.png)

[Aggregate before join Spark UI](./img/aggregate_before_join.png)

##### Hash partitioning of biggest RDD

It's been tried also a partitioning of the biggest RDD (`Track_in_Playlist`, `8.6 GiB` input, `~193M` of records) in order to avoid a *reshuffle* during the join part.
It's been taken `6` as number of partitions, considering the number of executors (2) and the number of cores (3) per executor.

The result is similar to the previous one, producing a worse performance than the original job (`~11 min`).

##### Merging the two optimizations

The two optimizations described above have been merged and the job got a performance improvement: now the aggregation is done before the join and the biggest RDD is shuffled, avoiding the *reshuffle* during the join.

The job reach an execution time of `~7.9 min`.

[Aggregate before join & Hash partitioning Spark UI](./img/aggregate_before_join_and_partitioner.png)

The execution has been tested with other number of partitions, but the best result has been reached with `6` partitions.

- **12 partition**: `~8.3 min`
- **18 partition**: `~8.9 min`


In [23]:
// year -> popularity of the year
val popularityPerYear = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

// track -> artist's popularity
val trackWithArtistPopularity = trackRdd.flatMap(t => t.artists.split('|').map(artistUri => (artistUri, t.uri))).
  join(artistRdd.map(a => (a.uri, a.popularity))).
  map { case (artistUri, (trackUri, artistPopularity)) => (trackUri, artistPopularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count }

import org.apache.spark.HashPartitioner
val partitioner = new HashPartitioner(6)

val jobAggregationBeforeJoinAndPartitioner = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity))).
  join(albumRdd.map(a => (a.uri, a.release_year))).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(popularityPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }.
  join(trackWithArtistPopularity).
  map { case (trackUri, ((popularity, avgPopularityInYear), avgArtistPopularity)) => (trackUri, (popularity, avgPopularityInYear, avgArtistPopularity)) }.
  join(trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).partitionBy(partitioner)). // here the partitioner
  map { case (trackUri, ((popularity, avgPopularityInYear, avgArtistPopularity), pid)) => (pid, (popularity, avgPopularityInYear, avgArtistPopularity)) }.
  // the join from here is pushed down to (1)
  aggregateByKey((0.0, 0.0, 0.0, 0))(
    { case ((accPop, accAvgPopInYear, accAvgArtistPop, count), (popularity, avgPopularityInYear, avgArtistPopularity)) =>
      (accPop + popularity, accAvgPopInYear + avgPopularityInYear, accAvgArtistPop + avgArtistPopularity, count + 1)
    },
    { case ((accPop1, accAvgPopInYear1, accAvgArtistPop1, count1), (accPop2, accAvgPopInYear2, accAvgArtistPop2, count2)) =>
      (accPop1 + accPop2, accAvgPopInYear1 + accAvgPopInYear2, accAvgArtistPop1 + accAvgArtistPop2, count1 + count2)
    }
  ).
  mapValues { case (accPop, accAvgPopInYear, accAvgArtistPop, count) => (accPop / count, accAvgPopInYear / count, accAvgArtistPop / count) }.
  map {
    case (pid, (avgPop, avgPopInYear, avgArtistPop)) =>
      val maxAvg = Math.max(avgPop, Math.max(avgPopInYear, avgArtistPop))
      val indexOfBestAvg = Seq(avgPop, avgPopInYear, avgArtistPop).indexWhere(_ >= maxAvg)
      (pid, indexOfBestAvg)
  }.
  // (1) here
  join(playlistRdd.map(p => (p.pid, p.num_follower))).
  map { case (pid, (indexOfBestAvg, numFollower)) => (indexOfBestAvg, (numFollower, 1)) }.
  reduceByKey { case ((accFollowers1, c1), (accFollowers2, c2)) => (accFollowers1 + accFollowers2, c1 + c2) }.
  mapValues { case (accF, c) => accF / c }

spark.time {
  jobAggregationBeforeJoinAndPartitioner.collect.foreach {
    case (0, avgFollowers) => println(s"Playlist influenced most by Track's popularity has $avgFollowers followers on average")
    case (1, avgFollowers) => println(s"Playlist influenced most by popularity of the year has $avgFollowers followers on average")
    case (2, avgFollowers) => println(s"Playlist influenced most by Artist's popularity has $avgFollowers followers on average")
  }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

popularityPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[209] at mapValues at <console>:32
trackWithArtistPopularity: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[217] at mapValues at <console>:33
import org.apache.spark.HashPartitioner
partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@6
jobAggregationBeforeJoinAndPartitioner: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[247] at mapValues at <console>:61
Playlist influenced most by Track's popularity has 13 followers on average
Playlist influenced most by popularity of the year has 145 followers on average
Playlist influenced most by Artist's popularity has 354 followers on average
Time taken: 473760 ms


### Merge all the optimizations

The final job has been executed merging the optimizations described above.

- **RDD caching**: `Album` and `Track`
- **Aggregate before join**
- **Hash partitioning of biggest RDD**

The job has been executed in `~7.3 min`.

In [24]:
unpersistRDD()

val finalTrackRddCached = trackRdd.map(t => (t.album_uri, (t.uri, t.popularity, t.artists))).cache()
val finalAlbumRddCached = albumRdd.map(a => (a.uri, a.release_year)).cache()

val avgPopPerYear = finalTrackRddCached.
  join(finalAlbumRddCached).
  map { case (albumUri, ((trackUri, popularity, _), releaseYear)) => (releaseYear, popularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count } // year -> avg popularity of tracks in that year

val trackWithArtistPopularity = finalTrackRddCached.flatMap(t => t._2._3.split('|').map(artistUri => (artistUri, t._2._1))).
  join(artistRdd.map(a => (a.uri, a.popularity))).
  map { case (artistUri, (trackUri, artistPopularity)) => (trackUri, artistPopularity) }.
  aggregateByKey((0, 0))(
    { case ((acc, count), popularity) => (acc + popularity, count + 1) },
    { case ((acc1, count1), (acc2, count2)) => (acc1 + acc2, count1 + count2) }
  ).mapValues { case (acc, count) => acc.toDouble / count } // track -> avg popularity of artists of that track

import org.apache.spark.HashPartitioner
 
val finalJob = finalTrackRddCached.map(t => (t._1, (t._2._1, t._2._2))).
  join(finalAlbumRddCached).
  map { case (albumUri, ((trackUri, popularity), releaseYear)) => (releaseYear, (trackUri, popularity)) }.
  join(avgPopPerYear).
  map { case (releaseYear, ((trackUri, popularity), avgPopularityInYear)) => (trackUri, (popularity, avgPopularityInYear)) }.
  join(trackWithArtistPopularity).
  map { case (trackUri, ((popularity, avgPopularityInYear), avgArtistPopularity)) => (trackUri, (popularity, avgPopularityInYear, avgArtistPopularity)) }.
  join(trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).partitionBy(new HashPartitioner(6))).
  map { case (trackUri, ((popularity, avgPopularityInYear, avgArtistPopularity), pid)) => (pid, (popularity, avgPopularityInYear, avgArtistPopularity))}.
  aggregateByKey((0.0, 0.0, 0.0, 0))(
    { case ((accPop, accAvgPopInYear, accAvgArtistPop, count), (popularity, avgPopularityInYear, avgArtistPopularity)) =>
      (accPop + popularity, accAvgPopInYear + avgPopularityInYear, accAvgArtistPop + avgArtistPopularity, count + 1)
    },
    { case ((accPop1, accAvgPopInYear1, accAvgArtistPop1, count1), (accPop2, accAvgPopInYear2, accAvgArtistPop2, count2)) =>
      (accPop1 + accPop2, accAvgPopInYear1 + accAvgPopInYear2, accAvgArtistPop1 + accAvgArtistPop2, count1 + count2)
    }
  ).
  mapValues { case (accPop, accAvgPopInYear, accAvgArtistPop, count) => (accPop / count, accAvgPopInYear / count, accAvgArtistPop / count) }.
  map {
    case (pid, (avgPop, avgPopInYear, avgArtistPop)) =>
      val maxAvg = Math.max(avgPop, Math.max(avgPopInYear, avgArtistPop))
      val indexOfBestAvg = Seq(avgPop, avgPopInYear, avgArtistPop).indexWhere(_ >= maxAvg)
      (pid, indexOfBestAvg)
  }.
  join(playlistRdd.map(p => (p.pid, p.num_follower))).
  map { case (pid, (indexOfBestAvg, numFollower)) => (indexOfBestAvg, (numFollower, 1)) }.
  reduceByKey { case ((accFollowers1, c1), (accFollowers2, c2)) => (accFollowers1 + accFollowers2, c1 + c2) }.
  mapValues { case (accF, c) => accF / c }

spark.time {
  finalJob.collect.foreach {
    case (0, avgFollowers) => println(s"Playlist influenced most by Track's popularity has $avgFollowers followers on average")
    case (1, avgFollowers) => println(s"Playlist influenced most by popularity of the year has $avgFollowers followers on average")
    case (2, avgFollowers) => println(s"Playlist influenced most by Artist's popularity has $avgFollowers followers on average")
  }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

finalTrackRddCached: org.apache.spark.rdd.RDD[(String, (String, Int, String))] = MapPartitionsRDD[248] at map at <console>:26
finalAlbumRddCached: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[249] at map at <console>:25
avgPopPerYear: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[255] at mapValues at <console>:33
trackWithArtistPopularity: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[263] at mapValues at <console>:33
import org.apache.spark.HashPartitioner
finalJob: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[292] at mapValues at <console>:60
Playlist influenced most by Track's popularity has 13 followers on average
Playlist influenced most by popularity of the year has 145 followers on average
Playlist influenced most by Artist's popularity has 354 followers on average
Time taken: 450923 ms


![Job 1 result](./img/job1_results.png)

Other tests have been done, such as:

- repartitioning and coalescing of `Tracks`
- repartitioning and coalescing after aggregation during the job part
- unpersisting the RDDs when they are not needed anymore

But the best performance has been reached with the optimizations described above.

Here the resume of the execution time:

| Job                                           | Execution Time |
|-----------------------------------------------|----------------|
| Original Job                                  | 9.7 min        |
| Aggregate before join                         | 13 min         |
| Hash partitioning (6)                         | 11 min         |
| Aggregate before join + Hash partitioning (6) | 7.9 min        |
| Final Job                                     | 7.3 min        |
| Final Job with repartition (300) on Tracks    | 7.7 min        |

The achived speedup is $ S = \frac{9.7 min}{7.3 min} = 1.32 $

## Job 2

Given the following classes: slowly danceable (tempo <= 130BPM, danceability > 0.5), swiftly danceable (tempo >130BPM, danceability > 0.5), slowly undanceable (tempo <= 130BPM, danceability <= 0.5), swiftly undanceable (tempo >130BPM, danceability <= 0.5); and the various keys (C, C#/Db, ...).
  for each class and (key ---OR--- range of followers) get:
  - The number of playlist.
  - Average playlist's explicitness percentage.
  - Average number of tracks in playlist.
  - Average number of playlist followers.
  <!-- - Average playlist danceability.
  - Average playlist tempo. -->
  (The key of a playlist is the most present key among its tracks)

In [25]:
def toClass(tempo: Double, danceablility: Double): String = (tempo, danceablility) match {
  case (t, d) if t <= 130 && d > 0.5 => "slowly danceable"
  case (t, d) if t > 130 && d > 0.5 => "swiftly danceable"
  case (t, d) if t <= 130 && d <= 0.5 => "slowly undanceable"
  case (t, d) if t > 130 && d <= 0.5 => "swiftly undanceable"
}

def toKey(key: Int): String = Seq("C", "C#/Db", "D", "D#/Eb", "E", "F", "F#/Gb", "G", "G#/Ab", "A", "A#/Bb", "B")(key)

def incrementKey(map: Map[Int, Int], key: Int) = {
    val currentValue = map.getOrElse(key, 0)
    map.updated(key, currentValue + 1)
}

def joinMap(map1: Map[Int, Int], map2: Map[Int, Int]): Map[Int, Int] = map1.map { case(k, v) => (k, map2.getOrElse(k, 0) + v) }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

toClass: (tempo: Double, danceablility: Double)String
toKey: (key: Int)String
incrementKey: (map: Map[Int,Int], key: Int)scala.collection.immutable.Map[Int,Int]
joinMap: (map1: Map[Int,Int], map2: Map[Int,Int])Map[Int,Int]


In [26]:
val features = featureRdd.map(t => (t.uri, (t.tempo, t.danceability, t.key))).
  join(trackRdd.map(t => (t.uri, t.explicit))).
  map { case (uri, ((t, d, k), e)) => (uri, (t, d, k, e)) }

val tracksInPlaylist = trackInPlaylistRdd.map(t => (t.track_uri, t.pid))

val playlistWithFollowers = playlistRdd.map(p => (p.pid, p.num_follower))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

features: org.apache.spark.rdd.RDD[(String, (Double, Double, Int, Boolean))] = MapPartitionsRDD[298] at map at <console>:30
tracksInPlaylist: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[299] at map at <console>:28
playlistWithFollowers: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[300] at map at <console>:28


### First implementation

Following the self-join pattern we computed the `playlistClasses` rdd by joining the `tracksInPlaylist`'s rdd with the `features`'s one and then aggregating on the playlist ID (pid) in order to compute the average tempo and danceability of each playlist. 
After that, we joined `tracksInPlaylistWithFeatures` with `playlistWithFollowers` so that by aggregating on the **pid** we could compute the ratio of explict songs and the number of followers of each playlist. Finally, we joined this last result with `playlistClasses` so that we could calculate per each of those classes the number of playlist belonging to the class, the average of explicit song ratio and number of followers.  

In [27]:
// Compute the class of each playlist
val tracksInPlaylistWithFeatures = tracksInPlaylist.join(features)

val playlistClasses = tracksInPlaylistWithFeatures.
    map { case (t_uri, (pid, (t, d, k, _))) => (pid, (t, d, k)) }.
    aggregateByKey((0.0, 0.0, (0 to 11).map((_, 0)).toMap, 0))(
        { case ((accT, accD, accK, c), (t, d, k)) => (accT + t, accD + d, incrementKey(accK, k), c + 1) },
        { case ( (accT1, accD1, accK1, c1), (accT2, accD2, accK2, c2)) => (accT1 + accT2, accD1+ accD2, joinMap(accK1, accK2), c1 + c2) }
    ).
    mapValues { case (accT, accD, accK, c) => (toKey(accK.maxBy(_._2)._1), toClass(accT/c, accD/c)) } //(pid, (k, cls))

val job1 = tracksInPlaylistWithFeatures.
    map { case (t_uri, (pid, (_,_,_, e))) => (pid, if (e) 1 else 0) }.
    join(playlistWithFollowers). // (pid, (e, num_followers)) --> consider --J&A-- or A&J
    aggregateByKey((0, 0, 0))(
        { case ((accE, nF, c), (e, f)) => (accE + e, f, c + 1)  },
        { case ((accE1, nF, c1), (accE2, _, c2)) => (accE1 + accE2, nF, c1 + c2) }
    ).
    mapValues { case (accE, nF, c) => (accE/c, nF) }. // (pid, avgE, NF)
    join(playlistClasses). // (pid ((avgE, NF), (k, cls)))
    map { case (pid, ((avgE, nF), (k, cls))) => ((k, cls), (avgE, nF))}.
    aggregateByKey((0.0, 0, 0))(
        { case ((accE, accF, c), (e, f)) => (accE + e, accF + f, c+1) },
        { case ((accE1, accF1, c1), (accE2, accF2, c2)) => (accE1 + accE2, accF1 + accF2, c1 + c2) }
    ).
    mapValues { case (accE, accF, c) => (accE/c, accF/c, c) }
    
val result = job1.collect    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

tracksInPlaylistWithFeatures: org.apache.spark.rdd.RDD[(String, (Int, (Double, Double, Int, Boolean)))] = MapPartitionsRDD[303] at join at <console>:29
playlistClasses: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[306] at mapValues at <console>:38
job1: org.apache.spark.rdd.RDD[((String, String), (Double, Int, Int))] = MapPartitionsRDD[318] at mapValues at <console>:44
result: Array[((String, String), (Double, Int, Int))] = Array(((D#/Eb,slowly danceable),(0.010009099181073703,1511,1099)), ((D,swiftly undanceable),(0.0,230,5340)), ((A,slowly undanceable),(0.0,852,13619)), ((G#/Ab,swiftly danceable),(0.05002174858634189,397,2299)), ((G#/Ab,slowly undanceable),(5.763688760806917E-4,679,1735)), ((A,swiftly danceable),(0.004842931937172775,809,7640)), ((B,swiftly undanceable),(0.0,282,1264)), ((D,swiftly danceable),(0.010874111250522794,768,7173)), ((A#/Bb,slowly undanceable),(0.0,1249,3216)), ((F,slowly danceable),(0.006729221031591805,2211,90055)), ((F#/Gb,slowly 

This job completed in around `22 minutes`. This is surely due the size of the input data, in particular those regarding the tracks in playlist which is `8.6GB` large.

## Optimizations

As a first step toward optimizing execution time we noticed that we accessed the result of the join between `tracksInPlaylist` and `features` multiple times. So we dediced to cache it in order to avoid loading the dataset more than once and thus avoiding waste of time.

In [28]:
// Compute the class of each playlist
val tracksInPlaylistWithFeatures = tracksInPlaylist.join(features).cache

val playlistClasses = tracksInPlaylistWithFeatures.
    map { case (t_uri, (pid, (t, d, k, _))) => (pid, (t, d, k)) }.
    aggregateByKey((0.0, 0.0, (0 to 11).map((_, 0)).toMap, 0))(
        { case ((accT, accD, accK, c), (t, d, k)) => (accT + t, accD + d, incrementKey(accK, k), c + 1) },
        { case ( (accT1, accD1, accK1, c1), (accT2, accD2, accK2, c2)) => (accT1 + accT2, accD1 + accD2, joinMap(accK1, accK2), c1 + c2) }
    ).
    mapValues { case (accT, accD, accK, c) => (toKey(accK.maxBy(_._2)._1), toClass(accT/c, accD/c)) } //(pid, (k, cls))

val job2 = tracksInPlaylistWithFeatures.
    map { case (t_uri, (pid, (_,_,_, e))) => (pid, if (e) 1 else 0) }.
    join(playlistWithFollowers). // (pid, (e, num_followers))
    aggregateByKey((0, 0, 0))(
        { case ((accE, nF, c), (e, f)) => (accE + e, f, c + 1)  },
        { case ((accE1, nF, c1), (accE2, _, c2)) => (accE1 + accE2, nF, c1 + c2) }
    ).
    mapValues { case (accE, nF, c) => (accE/c, nF) }. // (pid, avgE, NF)
    join(playlistClasses). // (pid ((avgE, NF), (k, cls)))
    map { case (pid, ((avgE, nF), (k, cls))) => ((k, cls), (avgE, nF))}.
    aggregateByKey((0.0, 0, 0))(
        { case ((accE, accF, c), (e, f)) => (accE + e, accF + f, c+1) },
        { case ((accE1, accF1, c1), (accE2, accF2, c2)) => (accE1 + accE2, accF1 + accF2, c1 + c2) }
    ).
    mapValues { case (accE, accF, c) => (accE/c, accF/c, c) }
    
val result = job2.collect    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Interrupted by user


No reduction in time was registered as the job took still `22 minutes` to perform. We therefore tried to change the execution plan. In the previous job, when joining `tracksInPlaylistWithFeatures` with `playlistFollowers`, we performed the so called `Join & Aggregate`. So we tried to perform the `Aggregate and Join` to see if there was any benefit.

In [None]:
// Compute the class of each playlist
val tracksInPlaylistWithFeatures = tracksInPlaylist.join(features).cache

val playlistClasses = tracksInPlaylistWithFeatures.
    map { case (t_uri, (pid, (t, d, k, _))) => (pid, (t, d, k)) }.
    aggregateByKey((0.0, 0.0, (0 to 11).map((_, 0)).toMap, 0))(
        { case ((accT, accD, accK, c), (t, d, k)) => (accT + t, accD + d, incrementKey(accK, k), c + 1) },
        { case ( (accT1, accD1, accK1, c1), (accT2, accD2, accK2, c2)) => (accT1 + accT2, accD1 + accD2, joinMap(accK1, accK2), c1 + c2) }
    ).
    mapValues { case (accT, accD, accK, c) => (toKey(accK.maxBy(_._2)._1), toClass(accT/c, accD/c)) } //(pid, (k, cls))

val playlistsWithClasses = playlistWithFollowers.join(playlistClasses).mapValues { case (f, (k, cls)) => (k, cls, f) }

val job3 = tracksInPlaylistWithFeatures.
    map { case (t_uri, (pid, (_,_,_, e))) => (pid, if (e) 1 else 0) }.
    aggregateByKey((0.0, 0))(
        { case ((accE, c), e) => (accE + e, c + 1)},
        { case ((accE1, c1), (accE2, c2)) => (accE1 + accE2, c1 + c2) }
    ).
    mapValues { case (accE, c) => accE / c }.
    join(playlistsWithClasses). // (pid, (avgE, (k, cls, f)))
    map { case (pid, (avgE, (k, cls, nF))) => ((k, cls), (avgE, nF))}.
    aggregateByKey((0.0, 0, 0))(
        { case ((accE, accF, c), (e, f)) => (accE + e, accF + f, c+1) },
        { case ((accE1, accF1, c1), (accE2, accF2, c2)) => (accE1 + accE2, accF1 + accF2, c1 + c2) }
    ).
    mapValues { case (accE, accF, c) => (accE/c, accF/c, c) }
    
val result = job3.collect    

By performing an *Aggregate and Join* the execution time increase of `2 minutes`. So ultimately we tried to compute the playlist classes and the requested averages in a single aggregation step before joining with `playlistWithFollowers` and aggregating to obtain the final result.

In [None]:
val tracksInPlaylistWithFeatures = tracksInPlaylist.join(features).
    map { case (trackUri, (pid, (t, d, k, e))) => (pid, (t, d, k, e)) }


val tracksInPlaylistWithClasses = tracksInPlaylistWithFeatures.
        aggregateByKey((0.0, 0.0, (0 to 11).map((_, 0)).toMap, 0.0, 0))(
          { case ((accT, accD, ks, ec, c), (t, d, k, e)) => (accT+t, accD+d, incrementKey(ks, k), ec+(if (e) 1 else 0), c+1) },
          { case ((accT1, accD1, k1, ec1, c1), (accT2, accD2, k2, ec2, c2)) => (accT1+accT2, accD1+accD2, joinMap(k1, k2), ec1+ec2, c1+c2) }).
        mapValues({ case (accT, accD, k, ec, c) => (toKey(k.maxBy(_._2)._1), toClass(accT/c, accD/c), ec/c, c) }) // (pid, (k, class, avgE, c))

val job4 = playlistWithFollowers.join(tracksInPlaylistWithClasses). // (pid, (num_follower, (k, class, avgE, c)))
        map { case (pid, (num_follower, (k, cls, avgE, tc))) => ((k, cls), (num_follower, avgE, tc)) }.
        aggregateByKey((0.0, 0.0, 0.0, 0))(
          { case ((accF, accE, accTC, c), (f, e, tc)) => (accF+f, accE+e, accTC+tc, c+1) },
          { case ((accF1, accE1, accTC1, c1), (accF2, accE2, accTC2, c2)) => (accF1+accF2, accE1+accE2, accTC1+accTC2, c1+c2) }
        ).
        mapValues { case (accF, accE, accTC,c) => (accF/c, accE/c, accTC/c, c) } // ((k, class), (avgF, avgE, avgTC, c))
        
val result = job4.collect

This last job took in total `20 minutes`. Overall we obtain a `4 minutes` improvement.

### Further optimizations

Since the last job was in term of time consuption more efficient, it has been choosen for further optimizations.
First of all, by looking at the job details on SparkUI it becomes evident that the stage that is more costsly is the join between the `tracksInPlaylist` with the `features`. This may be due to the fact that the `tracksInPlaylist` form a total of 277 block thus 277 tasks. This very likely results in a big scheduling overhead and to the generation of a lot of intermediate files due to the shuffling strategy. So we tried reducing the number of partitions. 

### Coalescing the number of partitions

In [None]:
unpersistRDD()
// val tracksInPlaylist = trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).coalesce(150)
// val tracksInPlaylist = trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).coalesce(50)
// val tracksInPlaylist = trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).coalesce(10)
val tracksInPlaylist = trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).coalesce(6)


val tracksInPlaylistWithFeatures = tracksInPlaylist.join(features).
    map { case (trackUri, (pid, (t, d, k, e))) => (pid, (t, d, k, e)) }


val tracksInPlaylistWithClasses = tracksInPlaylistWithFeatures.
        aggregateByKey((0.0, 0.0, (0 to 11).map((_, 0)).toMap, 0.0, 0))(
          { case ((accT, accD, ks, ec, c), (t, d, k, e)) => (accT+t, accD+d, incrementKey(ks, k), ec+(if (e) 1 else 0), c+1) },
          { case ((accT1, accD1, k1, ec1, c1), (accT2, accD2, k2, ec2, c2)) => (accT1+accT2, accD1+accD2, joinMap(k1, k2), ec1+ec2, c1+c2) }).
        mapValues({ case (accT, accD, k, ec, c) => (toKey(k.maxBy(_._2)._1), toClass(accT/c, accD/c), ec/c, c) }) // (pid, (k, class, avgE, c))

val job5 = playlistWithFollowers.join(tracksInPlaylistWithClasses). // (pid, (num_follower, (k, class, avgE, c)))
        map { case (pid, (num_follower, (k, cls, avgE, tc))) => ((k, cls), (num_follower, avgE, tc)) }.
        aggregateByKey((0.0, 0.0, 0.0, 0))(
          { case ((accF, accE, accTC, c), (f, e, tc)) => (accF+f, accE+e, accTC+tc, c+1) },
          { case ((accF1, accE1, accTC1, c1), (accF2, accE2, accTC2, c2)) => (accF1+accF2, accE1+accE2, accTC1+accTC2, c1+c2) }
        ).
        mapValues { case (accF, accE, accTC,c) => (accF/c, accE/c, accTC/c, c) } // ((k, class), (avgF, avgE, avgTC, c))
job5.collect        

Here's the the result of the various executions:

|N. of tasks |Join step shuffle data size|Exection time|
|------------|---------------------------|-------------|
|150|12.2 GB|17 min|
|50|7.0 GB|13 min|
|10|1704.8 MB|14 min|
|6|---|8.6 min|
|5|1013.7MB|9.3 min|

So the best number of partitions seems to be `6`.

## Enforcing a partition criteria

Next we tried to enforce the same partion criteria on the rdd involved in the join. We tried with different number of partiotions:

In [None]:
import org.apache.spark.HashPartitioner
//val p = new HashPartitioner(50)
//val p = new HashPartitioner(10)
//val p = new HashPartitioner(6)
val p = new HashPartitioner(5)

unpersistRDD()

val features = featureRdd.map(t => (t.uri, (t.tempo, t.danceability, t.key))).
  join(trackRdd.map(t => (t.uri, t.explicit))).
  map { case (uri, ((t, d, k), e)) => (uri, (t, d, k, e)) }.partitionBy(p)
val tracksInPlaylist = trackInPlaylistRdd.map(t => (t.track_uri, t.pid)).partitionBy(p)


val tracksInPlaylistWithFeatures = tracksInPlaylist.join(features).
    map { case (trackUri, (pid, (t, d, k, e))) => (pid, (t, d, k, e)) }


val tracksInPlaylistWithClasses = tracksInPlaylistWithFeatures.
        aggregateByKey((0.0, 0.0, (0 to 11).map((_, 0)).toMap, 0.0, 0))(
          { case ((accT, accD, ks, ec, c), (t, d, k, e)) => (accT+t, accD+d, incrementKey(ks, k), ec+(if (e) 1 else 0), c+1) },
          { case ((accT1, accD1, k1, ec1, c1), (accT2, accD2, k2, ec2, c2)) => (accT1+accT2, accD1+accD2, joinMap(k1, k2), ec1+ec2, c1+c2) }).
        mapValues({ case (accT, accD, k, ec, c) => (toKey(k.maxBy(_._2)._1), toClass(accT/c, accD/c), ec/c, c) }).partitionBy(p) // (pid, (k, class, avgE, c))

val job4 = playlistWithFollowers.partitionBy(p).join(tracksInPlaylistWithClasses). // (pid, (num_follower, (k, class, avgE, c)))
        map { case (pid, (num_follower, (k, cls, avgE, tc))) => ((k, cls), (num_follower, avgE, tc)) }.
        aggregateByKey((0.0, 0.0, 0.0, 0))(
          { case ((accF, accE, accTC, c), (f, e, tc)) => (accF+f, accE+e, accTC+tc, c+1) },
          { case ((accF1, accE1, accTC1, c1), (accF2, accE2, accTC2, c2)) => (accF1+accF2, accE1+accE2, accTC1+accTC2, c1+c2) }
        ).
        mapValues { case (accF, accE, accTC,c) => (accF/c, accE/c, accTC/c, c) } // ((k, class), (avgF, avgE, avgTC, c))
val result = job4.collect        

Here's the the result of the various executions:

|N. of tasks |Join step shuffle data size|Exection time|
|------------|---------------------------|-------------|
|50|7.0 GB|13 min|
|10|1704.8 MB|12 min|
|6|---|7.5 min|
|5|1013.7 MB|7.5 min|

So the best number of partitions seems to be `6 or 5`.

So overall we obtain a speed-up of $ S=\frac{20 min}{7.5 min} = 2.66 $