In [8]:
import org.apache.spark

import org.apache.spark


In [None]:
val sc = spark.SparkContext.getOrCreate()

In [9]:
val path_to_datasets = "../../../datasets/processed"

val path_to_tracks = path_to_datasets + "/tracks.csv"
val path_to_playlists = path_to_datasets + "/playlists.csv"
val path_to_track_in_playlists = path_to_datasets + "/tracks_in_playlist.csv"
val path_to_artists = path_to_datasets + "/artists.csv"

path_to_datasets: String = ../../../datasets/processed
path_to_tracks: String = ../../../datasets/processed/tracks.csv
path_to_playlists: String = ../../../datasets/processed/playlists.csv
path_to_track_in_playlists: String = ../../../datasets/processed/tracks_in_playlist.csv
path_to_artists: String = ../../../datasets/processed/artists.csv


In [10]:
object CsvParser {

  val noGenresListed = "(no genres listed)"
  val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
  val pipeRegex = "\\|(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
  val quotes = "\""


  // (PID, playlist_name, num_followers)
  def parsePlayListLine(line: String): Option[(String, String, Int)] = {
    try {
      val input = line.split(commaRegex)
      Some(input(0).trim, input(1).trim, input(2).trim.toInt)
    } catch {
      case _: Exception => None
    }
  }

  // (track_uri, track_name, duration_ms, artist_uri, album_uri, album_name)
  def parseTrackLine(line: String): Option[(String, String, Int, String, String, String)] = {
    try {
      val input = line.split(commaRegex)
      Some(input(0).trim, input(1).trim, input(2).trim.toInt, input(3).trim, input(4).trim, input(5).trim)
    } catch {
      case _: Exception => None
    }
  }

  // (artist_uri, artist_name)
  def parseArtistLine(line: String): Option[(String, String)] = {
    try {
      val input = line.split(commaRegex)
      Some(input(0).trim, input(1).trim)
    } catch {
      case _: Exception => None
    }
  }

  // (PID, track_uri, pos)
  def parseTrackInPlaylistLine(line: String): Option[(String, String, Int)] = {
    try {
      val input = line.split(commaRegex)
      Some(input(0).trim, input(1).trim, input(2).trim.toInt)
    } catch {
      case _: Exception => None
    }
  }
}

defined object CsvParser


In [11]:
val rddTracks = sc.textFile(path_to_tracks).
  flatMap(CsvParser.parseTrackLine)

val rddPlaylists = sc.textFile(path_to_playlists).
  flatMap(CsvParser.parsePlayListLine)

val rddTrackInPlaylists = sc.textFile(path_to_track_in_playlists).
  flatMap(CsvParser.parseTrackInPlaylistLine)

val rddArtists = sc.textFile(path_to_artists).
  flatMap(CsvParser.parseArtistLine)

rddTracks: org.apache.spark.rdd.RDD[(String, String, Int, String, String, String)] = MapPartitionsRDD[9] at flatMap at <console>:33
rddPlaylists: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[12] at flatMap at <console>:36
rddTrackInPlaylists: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[15] at flatMap at <console>:39
rddArtists: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[18] at flatMap at <console>:42


In [28]:
// ID of the song to analyze
val idSong = "spotify:track:0UaMYEvWZi0ZqiDOoHU3YI"

idSong: String = spotify:track:0UaMYEvWZi0ZqiDOoHU3YI


In [29]:
// RDD of (pid, trackUri)
val trackInPlaylistReduce = rddTrackInPlaylists.map(x => (x._1, x._2))

trackInPlaylistReduce: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[33] at map at <console>:29


In [30]:
// filter to obtain the id of playlists that contains the track
val playlistForTrack = trackInPlaylistReduce
          .filter { case (_, trackUri) => trackUri == idSong }
          .map(_._1)

playlistForTrack: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[35] at map at <console>:31


In [31]:
// Broadcast the list of playlists
val playlistsBroadcast = sc.broadcast(playlistForTrack.collect().toSet)

playlistsBroadcast: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Set[String]] = Broadcast(15)


In [32]:
// filter to obtain all the songs in playlist that contains the track
val trackInSamePlaylists = trackInPlaylistReduce
          .filter { case (pid, _) => playlistsBroadcast.value.contains(pid) }
          .filter { case (_, trackUri) => trackUri != idSong }

// RDD of (pid, trackUri)

trackInSamePlaylists: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[37] at filter at <console>:32


In [33]:
// create the pairs of ((mySong, otherSong), 1) for each playlist
val rddTrackPairs = trackInSamePlaylists
  .map { case (_, track) => ((idSong, track), 1) }


rddTrackPairs: org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[38] at map at <console>:30


In [34]:
// reduce by key to count the occurrences
val occurrencesCount = rddTrackPairs.reduceByKey(_ + _)

occurrencesCount: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[39] at reduceByKey at <console>:28


In [35]:
// take the pair with the highest count
val mostOccurrencesPair = occurrencesCount
          .max()(Ordering.by(_._2))

mostOccurrencesPair: ((String, String), Int) = ((spotify:track:0UaMYEvWZi0ZqiDOoHU3YI,spotify:track:0XUfyU2QviPAs6bxSpXYG4),253)


In [36]:
// Unisci i dettagli della traccia specifica e delle tracce correlate
val trackDetails = rddTracks.map(x => (x._1, x._2)) // (trackUri, trackName)

// mostOccurrencesPair to rdd to join 
val mostOccurrencesPairRDD = sc.parallelize(Seq(mostOccurrencesPair))

val enrichedResults = mostOccurrencesPairRDD
        .map {case ((track1, track2), count) => (track1, (track2, count)) }
        .join(trackDetails) // Unisci il nome della traccia principale
        .map { case (track1, ((track2, count), track1Name)) => (track2, (track1, track1Name, count)) }
        .join(trackDetails) // Unisci il nome delle co-tracce
        .map { case (track2, ((track1, track1Name, count), track2Name)) => (track1, track1Name, track2, track2Name, count) }

// Salva il risultato
enrichedResults.coalesce(1).saveAsTextFile("output/result")

trackDetails: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[40] at map at <console>:31
mostOccurrencesPairRDD: org.apache.spark.rdd.RDD[((String, String), Int)] = ParallelCollectionRDD[41] at parallelize at <console>:34
enrichedResults: org.apache.spark.rdd.RDD[(String, String, String, String, Int)] = MapPartitionsRDD[50] at map at <console>:41


In [6]:
/*
val playlistToTracks = rddTrackInPlaylists.map(x => (x._1, x._2))


playlistToTracks: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[12] at map at <console>:25


In [7]:
// filter the playlist that contains the track
val playlistForTrack = playlistToTracks.filter { case (_,trackUri) =>
  trackUri == idTrack
}.map(_._1)

val playlistsBroadcast = sc.broadcast(playlistForTrack.collect())
val tracksInSamePlaylists = playlistToTracks.filter { case (pid, _) => playlistsBroadcast.value.contains(pid) }


playlistForTrack: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:30
playlistsBroadcast: org.apache.spark.broadcast.Broadcast[Array[String]] = Broadcast(5)
tracksInSamePlaylists: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[15] at filter at <console>:33


In [9]:
val coTracksByPlaylist = tracksInSamePlaylists
        .groupByKey()
        .flatMap { case (_, tracks) =>
          val trackList = tracks.toList
          for {
            coTrack <- trackList if coTrack != idTrack
          } yield (idTrack, coTrack)
        }


coTracksByPlaylist: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[17] at flatMap at <console>:28


In [10]:
val occurrenceCount = coTracksByPlaylist
        .map { case (track, coTrack) => (track, Map(coTrack -> 1)) }
        .aggregateByKey(Map[String, Int]())(
          (acc, value) => {
            value.foldLeft(acc) { case (map, (coTrack, count)) =>
              map + (coTrack -> (map.getOrElse(coTrack, 0) + count))
            }
          },
          (map1, map2) => {
            map2.foldLeft(map1) { case (map, (coTrack, count)) =>
              map + (coTrack -> (map.getOrElse(coTrack, 0) + count))
            }
          }
        )

occurrenceCount: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,Int])] = ShuffledRDD[19] at aggregateByKey at <console>:27


In [11]:
val mostCooccurringTrackPerTrack = occurrenceCount
        .mapValues { occurrences =>
          occurrences.maxBy(_._2)
        }

mostCooccurringTrackPerTrack: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[20] at mapValues at <console>:26


In [13]:
val trackDetails = rddTracks.map(line => (line._1, line._2))

val enrichedResults = mostCooccurringTrackPerTrack
        .join(trackDetails)
        .map { case (trackUri, ((coTrackUri, count), trackName)) =>
          (coTrackUri, (trackUri, trackName, count))
        }
        .join(trackDetails)
        .map { case (coTrackUri, ((trackUri, trackName, count), coTrackName)) =>
          (trackUri, trackName, coTrackUri, coTrackName, count)
        }

enrichedResults.coalesce(1).saveAsTextFile("output/result") */

trackDetails: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[31] at map at <console>:26
enrichedResults: org.apache.spark.rdd.RDD[(String, String, String, String, Int)] = MapPartitionsRDD[39] at map at <console>:34


In [6]:
/*
//PER TUTTE LE CANZONI, DA FARE CON POCHI FILE
 
val playlistToTracks = rddTrackInPlaylists
  .map(x => (x._1, x._2))

playlistToTracks: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[12] at map at <console>:28


In [6]:
val coTracksByPlaylist = playlistToTracks
  .groupByKey() // Raggruppa le tracce per playlist
  .flatMap { case (_, tracks) =>
    val trackList = tracks.toList
    for {
      track <- trackList
      coTrack <- trackList if track != coTrack
    } yield (track, coTrack) // Genera coppie (track, coTrack)
  }

coTracksByPlaylist: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[14] at flatMap at <console>:27


In [7]:
val cooccurrenceCounts = coTracksByPlaylist
  .map { case (track, coTrack) => (track, Map(coTrack -> 1)) } // Mappa tracce a un conteggio iniziale
  .aggregateByKey(Map[String, Int]())(
    (acc, value) => { // Combinatore locale
      value.foldLeft(acc) { case (map, (coTrack, count)) =>
        map + (coTrack -> (map.getOrElse(coTrack, 0) + count))
      }
    },
    (map1, map2) => { // Combinatore globale
      map2.foldLeft(map1) { case (map, (coTrack, count)) =>
        map + (coTrack -> (map.getOrElse(coTrack, 0) + count))
      }
    }
  )

cooccurrenceCounts: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,Int])] = ShuffledRDD[16] at aggregateByKey at <console>:27


In [8]:
val mostCooccurringTrackPerTrack = cooccurrenceCounts
  .mapValues { cooccurrences =>
    cooccurrences.maxBy(_._2) // Trova la traccia con il conteggio massimo
  }

mostCooccurringTrackPerTrack: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[17] at mapValues at <console>:26


In [10]:
// Mappa i dettagli delle tracce
val trackDetail = rddTracks.map(line => (line._1, line._2))

// Aggiungi i dettagli
val enrichedResults = mostCooccurringTrackPerTrack
  .join(trackDetail) // Aggiungi il nome della traccia principale
  .map { case (trackUri, ((coTrackUri, count), trackName)) =>
    (coTrackUri, (trackUri, trackName, count))
  }
  .join(trackDetail) // Aggiungi il nome della traccia co-occurrente
  .map { case (coTrackUri, ((trackUri, trackName, count), coTrackName)) =>
    (trackUri, trackName, coTrackUri, coTrackName, count)
  }

enrichedResults.saveAsTextFile("output/result") */


trackDetail: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[28] at map at <console>:27
enrichedResults: org.apache.spark.rdd.RDD[(String, String, String, String, Int)] = MapPartitionsRDD[36] at map at <console>:36
