## TP3

In [15]:


case class Employee(
  employeeId: Int,
  lastName: String,
  firstName: String,
  title: String,
  reportsTo: Int
)

case class MediaType(
  mediaTypeId: Int,
  name: String
)

case class Genre(
  genreId: Int,
  name: String
)

case class Track(
  trackId: Int,
  name: String,
  mediaType: Int,
  genre: Int
)

case class InvoiceItem(
  invoiceLineId: Int,
  invoiceId: Int,
  trackId: Int,
  unitPrice: BigDecimal,
  quantity: Int
  )

case class Invoice(
  invoiceId: Int,
  items: List[InvoiceItem]
)

val employees = Array(
  Employee(1, "Adams", "Andrew", "General Manager", 0),
  Employee(2, "Edwards", "Nancy", "Sales Manager", 1),
  Employee(3, "Peacock", "Jane", "Sales Support Agent", 2),
  Employee(4, "Park", "Margaret", "Sales Support Agent", 2),
  Employee(5, "Johnson", "Steve", "Sales Support Agent", 2),
  Employee(6, "Mitchell", "Michael", "IT Manager", 1),
  Employee(7, "King", "Robert", "IT Staff", 6),
  Employee(8, "Callahan", "Laura", "IT Staff", 6)
)

val mediaTypes = Array(
  MediaType(1, "MPEG audio file"),
  MediaType(2, "Protected AAC audio file"),
  MediaType(3, "Protected MPEG-4 video file"),
  MediaType(4, "Purchased AAC audio file"),
  MediaType(5, "AAC audio file")
)

val genres = Array(
  Genre(1, "Rock"),
  Genre(2, "Jazz"),
  Genre(3, "Metal"),
  Genre(4, "Alternative & Punk"),
  Genre(5, "Rock And Roll"),
  Genre(6, "Blues"),
  Genre(7, "Latin"),
  Genre(8, "Reggae"),
  Genre(9, "Pop"),
  Genre(10, "Soundtrack"),
  Genre(11, "Bossa Nova"),
  Genre(12, "Easy Listening"),
  Genre(13, "Heavy Metal"),
  Genre(14, "R&B/Soul"),
  Genre(15, "Electronica/Dance"),
  Genre(16, "World"),
  Genre(17, "Hip Hop/Rap"),
  Genre(18, "Science Fiction"),
  Genre(19, "TV Shows"),
  Genre(20, "Sci Fi & Fantasy"),
  Genre(21, "Drama"),
  Genre(22, "Comedy"),
  Genre(23, "Alternative"),
  Genre(24, "Classical"),
  Genre(25, "Opera")
)

val adjectives = Array("Cosmic", "Electric", "Velvet", "Mystic", "Atomic", "Phantom", "Silent", "Savage", "Eternal", "Digital", "Lost", "Sacred", "Wild", "Urban", "Golden", "Crystal", "Liquid", "Toxic", "Neon", "Crimson", "Stellar", "Raging", "Arctic", "Sonic", "Primal", "Midnight", "Screaming", "Infinite", "Royal", "Lunar")
val nouns = Array("Echo", "Giants", "Wolves", "Horizon", "Empire", "Void", "Thunder", "Dragons", "Saints", "Kings", "Ghosts", "Ravens", "Pirates", "Heroes", "Rebels", "Demons", "Machines", "Angels", "Knights", "Lions", "Shadows", "Zombies", "Wizards", "Killers", "Titans", "Outlaws", "Rebels", "Prophets", "Bandits", "Warriors")
val complements = Array("of Doom", "in Chains", "from Mars", "of the North", "of Death", "in Disguise", "of the Night", "from Hell", "of Tomorrow", "in Flames", "of the Deep", "from Beyond", "of Destruction", "in Shadow", "from the Sky", "of the Abyss", "in Exile", "from the East", "of Eternity", "from the Desert", "of the Underground")

import scala.util.Random

object Track {
  def generateTrack(trackId: Int): Track = {
    val genre = genres(Random.nextInt(genres.length)).genreId
    val mediaType = mediaTypes(Random.nextInt(mediaTypes.length)).mediaTypeId
    val name = s"${adjectives(Random.nextInt(adjectives.length))} ${nouns(Random.nextInt(nouns.length))} ${complements(Random.nextInt(complements.length))}"
    Track(trackId, name, mediaType, genre)
  }
}


object Invoice {
  def generateInvoice(invoiceId: Int, availableTracks: Array[Track]): Invoice = {
    val itemCount = Random.nextInt(5) + 1
    val selectedTracks = Random.shuffle(availableTracks.toList).take(itemCount)

    val items = selectedTracks.zipWithIndex.map { case (track, idx) =>
      val quantity = if (Random.nextDouble() < 0.8) 1 else Random.nextInt(3) + 1
      val unitPrice = BigDecimal(0.99 + Random.nextDouble() * 1.01).setScale(2, BigDecimal.RoundingMode.HALF_UP)
      InvoiceItem(
        invoiceLineId = invoiceId * 10 + idx + 1,
        invoiceId = invoiceId,
        trackId = track.trackId,
        unitPrice = unitPrice,
        quantity = quantity
      )
    }
    Invoice(invoiceId, items)
  }
}

val tracks = (1 to 200).map(Track.generateTrack).toArray
val invoices = (1 to 2000).map(id => Invoice.generateInvoice(id, tracks)).toArray

tracks.foreach { track =>
  println(f"ID: ${track.trackId}%3d | Name: ${track.name}%-45s | MediaType ID: ${track.mediaType}%2d | Genre ID: ${track.genre}%2d")
}

val media = List(1, 2, 3)
val filteredTracks = tracks.filter(track => media.contains(track.mediaType))
val invoiceFiltered = invoices.filter { invoice =>
    invoice.items.forall { item =>
        tracks.find(_.trackId == item.trackId).exists(_.genre == 1)
    }
}

println("Filtered invoices")
invoiceFiltered.foreach { invoice =>
  println(s"Invoice ID: ${invoice.invoiceId}")
  invoice.items.foreach { item =>
    println(f"  Invoice Line ID: ${item.invoiceLineId}%3d | Track ID: ${item.trackId}%3d | Prix: ${item.unitPrice}%.2f | Quantité: ${item.quantity}%2d")
  }
}

val topSales = tracks
    .map { track =>
        val totalSales = invoices.flatMap(_.items).filter(_.trackId == track.trackId).map(_.quantity).sum
        (track, totalSales)
    }
    .sortBy { case (_, sales) => -sales }
    .take(5)

println("Top 5")
topSales.foreach { case (track, sales) =>
    println(f"Track ID: ${track.trackId}%3d | Nom: ${track.name}%-45s | Total ventes: $sales")
}

val totalOrdersByGenre = genres.map { genre =>
    val trackIds = tracks.filter(_.genre == genre.genreId).map(_.trackId).toSet
    val totalOrders = invoices.flatMap(_.items).count(item => trackIds.contains(item.trackId))
    (genre.name, totalOrders)
}.toMap

println("Total commande par genre")
totalOrdersByGenre.foreach { case (genre, total) =>
    println(f"Genre: $genre%-20s | Total commande: $total")
}

def findTopManager(employeeId: Int): Employee = {
    val employee = employees.find(_.employeeId == employeeId).get
    if (employee.reportsTo == 0) employee
    else findTopManager(employee.reportsTo)
}

val emp = employees(2)
val topManager = findTopManager(emp.employeeId)
println(s"Le plus haut responsable de ${emp.firstName} ${emp.lastName} est ${topManager.firstName} ${topManager.lastName}")


ID:   1 | Name: Atomic Ravens of Doom                         | MediaType ID:  4 | Genre ID:  5
ID:   2 | Name: Wild Titans in Chains                         | MediaType ID:  3 | Genre ID: 10
ID:   3 | Name: Screaming Void of Eternity                    | MediaType ID:  4 | Genre ID:  1
ID:   4 | Name: Mystic Knights of Tomorrow                    | MediaType ID:  3 | Genre ID: 20
ID:   5 | Name: Lost Heroes from the Desert                   | MediaType ID:  5 | Genre ID: 10
ID:   6 | Name: Digital Wizards from the East                 | MediaType ID:  4 | Genre ID: 10
ID:   7 | Name: Arctic Knights of Destruction                 | MediaType ID:  3 | Genre ID: 14
ID:   8 | Name: Arctic Knights of the Night                   | MediaType ID:  3 | Genre ID: 19
ID:   9 | Name: Atomic Ravens in Disguise                     | MediaType ID:  1 | Genre ID: 25
ID:  10 | Name: Crimson Lions from the Desert                 | MediaType ID:  4 | Genre ID: 14
ID:  11 | Name: Sonic Wolves from Beyond

defined class Employee
defined class MediaType
defined class Genre
defined class Track
defined class InvoiceItem
defined class Invoice
employees = Array(Employee(1,Adams,Andrew,General Manager,0), Employee(2,Edwards,Nancy,Sales Manager,1), Employee(3,Peacock,Jane,Sales Support Agent,2), Employee(4,Park,Margaret,Sales Support Agent,2), Employee(5,Johnson,Steve,Sales Support Agent,2), Employee(6,Mitchell,Michael,IT Manager,1), Employee(7,King,Robert,IT Staff,6), Employee(8,Callahan,Laura,IT Staff,6))
mediaTypes = Array(MediaType(1,MPEG audio file), MediaType(2,Protected AAC audio file), MediaType(3,Protected MPEG-4 video file), MediaType(4,Purchased AAC audio file), MediaType(5,AAC audio file))
genres = Array(Genre(1,Rock), Genre(2,Jazz), G...


Array(Genre(1,Rock), Genre(2,Jazz), G...

## TP4

### 1 Manipulation des données générées

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD

val tracksRDD: RDD[Track] = sc.parallelize(tracks)
val invoicesRDD: RDD[Invoice] = sc.parallelize(invoices)
val genresRDD: RDD[Genre] = sc.parallelize(genres)
val employeesRDD: RDD[Employee] = sc.parallelize(employees)

//filtrage

val filteredTracksRDD = tracksRDD.filter(track => media.contains(track.mediaType))

println("Tracks filtrés par type de média:")
filteredTracksRDD.take(10).foreach { track =>
  println(f"ID: ${track.trackId}%3d | Name: ${track.name}%-45s | MediaType ID: ${track.mediaType}%2d")
}

val rockTrackIds = tracksRDD
  .filter(_.genre == 1)
  .map(_.trackId)
  .collect()
  .toSet


val invoiceFilteredRDD = invoicesRDD.filter { invoice =>
  invoice.items.forall { item =>
    rockTrackIds.contains(item.trackId)
  }
}

println("Commandes filtrées avec le genre 1:")
invoiceFilteredRDD.take(25).foreach { invoice =>
  println(s"Invoice ID: ${invoice.invoiceId}")
  invoice.items.foreach { item =>
    println(f"  Invoice Line ID: ${item.invoiceLineId}%3d | Track ID: ${item.trackId}%3d | Prix: ${item.unitPrice}%.2f | Quantité: ${item.quantity}%2d")
  }
}


Tracks filtrés par type de média:
ID:   2 | Name: Wild Titans in Chains                         | MediaType ID:  3
ID:   4 | Name: Mystic Knights of Tomorrow                    | MediaType ID:  3
ID:   7 | Name: Arctic Knights of Destruction                 | MediaType ID:  3
ID:   8 | Name: Arctic Knights of the Night                   | MediaType ID:  3
ID:   9 | Name: Atomic Ravens in Disguise                     | MediaType ID:  1
ID:  12 | Name: Digital Rebels of Tomorrow                    | MediaType ID:  3
ID:  13 | Name: Liquid Horizon of the Deep                    | MediaType ID:  2
ID:  14 | Name: Wild Outlaws of the Night                     | MediaType ID:  1
ID:  15 | Name: Crystal Demons in Flames                      | MediaType ID:  3
ID:  17 | Name: Raging Empire of Eternity                     | MediaType ID:  2
Invoice ID: 89
  Invoice Line ID: 891 | Track ID:  55 | Prix: 1.58 | Quantité:  1
Invoice ID: 147
  Invoice Line ID: 1471 | Track ID:  55 | Prix: 1.83 | Qua

tracksRDD = ParallelCollectionRDD[66] at parallelize at <console>:70
invoicesRDD = ParallelCollectionRDD[67] at parallelize at <console>:71
genresRDD = ParallelCollectionRDD[68] at parallelize at <console>:72
employeesRDD = ParallelCollectionRDD[69] at parallelize at <console>:73
filteredTracksRDD = MapPartitionsRDD[70] at filter at <console>:77
rockTrackIds = Set(125, 152, 71, 113, 140, 155, 3, 127, 55, 107)
invoiceFilteredRDD = MapPartitionsRDD[73] at filter at <console>:98


MapPartitionsRDD[73] at filter at <console>:98

In [None]:
// agrégation

val allInvoiceItems = invoicesRDD.flatMap(_.items)

val salesByTrack = allInvoiceItems
  .map(item => (item.trackId, item.quantity))
  .reduceByKey(_ + _)

val topSalesRDD = tracksRDD
    .map(track => (track.trackId, track))
    .join(salesByTrack)
    .map { case (trackId, (track, totalSales)) => (track, totalSales) }
    .sortBy { case (_, sales) => -sales }
    .take(5)

println("Top 5 des titres en ventes:")
topSalesRDD.foreach { case (track, sales) =>
  println(f"Track ID: ${track.trackId}%3d | Nom: ${track.name}%-45s | Total ventes: $sales")
}


val itemsWithGenre = allInvoiceItems
  .map(item => (item.trackId, item))
  .join(tracksRDD.map(track => (track.trackId, track.genre)))
  .map { case (trackId, (item, genre)) => (genre, 1) }

val totalOrdersByGenreRDD = itemsWithGenre
  .reduceByKey(_ + _)
  .join(genresRDD.map(genre => (genre.genreId, genre.name)))
  .map { case (genreId, (totalOrders, genreName)) => (genreName, totalOrders) }

println("\nTotal commandes par genre:")
totalOrdersByGenreRDD.collect().foreach { case (genre, total) =>
  println(f"Genre: $genre%-20s | Total commandes: $total")
}

Top 5 des titres en ventes:
Track ID:  96 | Nom: Wild Zombies of the North                     | Total ventes: 55
Track ID: 169 | Nom: Royal Dragons in Chains                       | Total ventes: 53
Track ID:  39 | Nom: Sonic Ravens of the North                     | Total ventes: 52
Track ID: 161 | Nom: Cosmic Ghosts of Tomorrow                     | Total ventes: 52
Track ID: 125 | Nom: Liquid Thunder of Destruction                 | Total ventes: 50

Total commandes par genre (RDD):
Genre: Alternative & Punk   | Total commandes: 199
Genre: World                | Total commandes: 157
Genre: Comedy               | Total commandes: 346
Genre: R&B/Soul             | Total commandes: 350
Genre: Classical            | Total commandes: 239
Genre: Blues                | Total commandes: 202
Genre: Reggae               | Total commandes: 152
Genre: Easy Listening       | Total commandes: 184
Genre: Science Fiction      | Total commandes: 216
Genre: Sci Fi & Fantasy     | Total commandes: 27

allInvoiceItems = MapPartitionsRDD[134] at flatMap at <console>:61
salesByTrack = ShuffledRDD[136] at reduceByKey at <console>:65
topSalesRDD = Array((Track(96,Wild Zombies of the North,2,18),55), (Track(169,Royal Dragons in Chains,4,15),53), (Track(39,Sonic Ravens of the North,2,2),52), (Track(161,Cosmic Ghosts of Tomorrow,1,13),52), (Track(125,Liquid Thunder of Destruction,5,1),50))
itemsWithGenre = MapPartitionsRDD[152] at map at <console>:83
totalOrdersByGenreRDD = MapPartitionsRDD[158] at map at <console>:88


MapPartitionsRDD[158] at map at <console>:88

## 2 Analyse des transformations

In [None]:
val filteredTracksRDD_debug = tracksRDD.filter(track => media.contains(track.mediaType))
println("1. filtrage des tracks:")
println(filteredTracksRDD_debug.toDebugString)
println("Shuffle: Non")

1. Pipeline filtrage des tracks:
(2) MapPartitionsRDD[169] at filter at <console>:54 []
 |  ParallelCollectionRDD[66] at parallelize at <console>:70 []
Shuffle: Non


filteredTracksRDD_debug = MapPartitionsRDD[169] at filter at <console>:54


MapPartitionsRDD[169] at filter at <console>:54

In [None]:
val rockTrackIds_debug = tracksRDD
  .filter(_.genre == 1)
  .map(_.trackId)

val invoiceFilteredRDD_debug = invoicesRDD.filter { invoice =>
  invoice.items.forall { item =>
    rockTrackIds.contains(item.trackId)
  }
}

println("2. filtrage des commandes:")
println("rockTrackIds:")
println(rockTrackIds_debug.toDebugString)
println("Shuffle: Non")
println()
println("InvoiceFiltered:")
println(invoiceFilteredRDD_debug.toDebugString)
println("Shuffle: Non")

2. Pipeline filtrage des factures Rock:
2a. RockTrackIds:
(2) MapPartitionsRDD[167] at map at <console>:58 []
 |  MapPartitionsRDD[166] at filter at <console>:57 []
 |  ParallelCollectionRDD[66] at parallelize at <console>:70 []

2b. InvoiceFiltered:
(2) MapPartitionsRDD[168] at filter at <console>:60 []
 |  ParallelCollectionRDD[67] at parallelize at <console>:71 []



rockTrackIds_debug = MapPartitionsRDD[167] at map at <console>:58
invoiceFilteredRDD_debug = MapPartitionsRDD[168] at filter at <console>:60


MapPartitionsRDD[168] at filter at <console>:60

In [29]:
val allInvoiceItems_debug = invoicesRDD.flatMap(_.items)

val salesByTrack_debug = allInvoiceItems_debug
  .map(item => (item.trackId, item.quantity))
  .reduceByKey(_ + _)

val topSalesRDD_debug = tracksRDD
  .map(track => (track.trackId, track))
  .join(salesByTrack_debug)
  .map { case (trackId, (track, totalSales)) => (track, totalSales) }
  .sortBy { case (_, sales) => -sales }
  .take(5)

println("3. Top 5 des ventes:")
println("AllInvoiceItems:")
println(allInvoiceItems_debug.toDebugString)
println("Shuffle: Non")
println()
println("SalesByTrack:")
println(salesByTrack_debug.toDebugString)
println("Shuffle: Oui (reduceByKey)")
println()



3. Top 5 des ventes:
AllInvoiceItems:
(2) MapPartitionsRDD[196] at flatMap at <console>:55 []
 |  ParallelCollectionRDD[67] at parallelize at <console>:71 []
Shuffle: Non

SalesByTrack:
(2) ShuffledRDD[198] at reduceByKey at <console>:59 []
 +-(2) MapPartitionsRDD[197] at map at <console>:58 []
    |  MapPartitionsRDD[196] at flatMap at <console>:55 []
    |  ParallelCollectionRDD[67] at parallelize at <console>:71 []
Shuffle: Oui (reduceByKey)



allInvoiceItems_debug = MapPartitionsRDD[196] at flatMap at <console>:55
salesByTrack_debug = ShuffledRDD[198] at reduceByKey at <console>:59
topSalesRDD_debug = Array((Track(96,Wild Zombies of the North,2,18),55), (Track(169,Royal Dragons in Chains,4,15),53), (Track(39,Sonic Ravens of the North,2,2),52), (Track(161,Cosmic Ghosts of Tomorrow,1,13),52), (Track(125,Liquid Thunder of Destruction,5,1),50))


Array((Track(96,Wild Zombies of the North,2,18),55), (Track(169,Royal Dragons in Chains,4,15),53), (Track(39,Sonic Ravens of the North,2,2),52), (Track(161,Cosmic Ghosts of Tomorrow,1,13),52), (Track(125,Liquid Thunder of Destruction,5,1),50))

In [None]:
val itemsWithGenre_debug = allInvoiceItems_debug
  .map(item => (item.trackId, item))
  .join(tracksRDD.map(track => (track.trackId, track.genre)))
  .map { case (trackId, (item, genre)) => (genre, 1) }

val totalOrdersByGenreRDD_debug = itemsWithGenre_debug
  .reduceByKey(_ + _)
  .join(genresRDD.map(genre => (genre.genreId, genre.name)))
  .map { case (genreId, (totalOrders, genreName)) => (genreName, totalOrders) }

println("4. Total commandes par genre:")
println("ItemsWithGenre:")
println(itemsWithGenre_debug.toDebugString)
println("Shuffle: Oui (join)")
println()
println("TotalOrdersByGenreRDD:")
println(totalOrdersByGenreRDD_debug.toDebugString)
println("Shuffle: Oui (reduceByKey et join)")

itemsWithGenre_debug = MapPartitionsRDD[214] at map at <console>:57
totalOrdersByGenreRDD_debug = MapPartitionsRDD[220] at map at <console>:62


4. Pipeline Total commandes par genre:
4a. ItemsWithGenre:
(2) MapPartitionsRDD[214] at map at <console>:57 []
 |  MapPartitionsRDD[213] at join at <console>:56 []
 |  MapPartitionsRDD[212] at join at <console>:56 []
 |  CoGroupedRDD[211] at join at <console>:56 []
 +-(2) MapPartitionsRDD[209] at map at <console>:55 []
 |  |  MapPartitionsRDD[196] at flatMap at <console>:55 []
 |  |  ParallelCollectionRDD[67] at parallelize at <console>:71 []
 +-(2) MapPartitionsRDD[210] at map at <console>:56 []
    |  ParallelCollectionRDD[66] at parallelize at <console>:70 []
Shuffle: Oui (join)

4b. TotalOrdersByGenreRDD:
(2) MapPartitionsRDD[220] at map at <console>:62 []
 |  MapPartitionsRDD[219] at join at <console>:61 []
 |  MapPartitionsRDD[218] at join at <console>:61 []
 |  CoGroupedRDD[217] at join at <console>:61 []
 |  ShuffledRDD[215] at reduceByKey at <console>:60 []
 +-(2) MapPartitionsRDD[214] at map at <console>:57 []
    |  MapPartitionsRDD[213] at join at <console>:56 []
    |  Map

MapPartitionsRDD[220] at map at <console>:62

## 3 Chargement de CSV

In [None]:
val tracksCSV = sc.textFile("./csv/Track.csv").map(_.split(";"))
val invoiceItemsCSV = sc.textFile("./csv/InvoiceItem.csv").map(_.split(";"))

val tracksFromCSV: RDD[Track] = tracksCSV.map(cols =>
    Track(
        trackId = cols(0).toInt,
        name = cols(1),
        mediaType = cols(2).toInt,
        genre = cols(3).toInt
    )
)

val invoiceItemsFromCSV: RDD[InvoiceItem] = invoiceItemsCSV.map(cols =>
    InvoiceItem(
        invoiceLineId = cols(0).toInt,
        invoiceId = cols(1).toInt,
        trackId = cols(2).toInt,
        unitPrice = BigDecimal(cols(3)),
        quantity = cols(4).toInt
    )
)

lastException = null
tracksCSV = MapPartitionsRDD[232] at map at <console>:59
invoiceItemsCSV = MapPartitionsRDD[235] at map at <console>:60
tracksFromCSV = MapPartitionsRDD[236] at map at <console>:62
invoiceItemsFromCSV = MapPartitionsRDD[237] at map at <console>:71


MapPartitionsRDD[237] at map at <console>:71

In [36]:
val filteredTracksFromCSV = tracksFromCSV.filter(track => media.contains(track.mediaType))
println("Tracks filtrés par type de média:")
filteredTracksFromCSV.take(10).foreach { track =>
    println(f"ID: ${track.trackId}%3d | Name: ${track.name}%-45s | MediaType ID: ${track.mediaType}%2d")
}

Tracks filtrés par type de média:
ID:   1 | Name: For Those About To Rock (We Salute You)       | MediaType ID:  1
ID:   2 | Name: Balls to the Wall                             | MediaType ID:  2
ID:   3 | Name: Fast As a Shark                               | MediaType ID:  2
ID:   4 | Name: Restless and Wild                             | MediaType ID:  2
ID:   5 | Name: Princess of the Dawn                          | MediaType ID:  2
ID:   6 | Name: Put The Finger On You                         | MediaType ID:  1
ID:   7 | Name: Let's Get It Up                               | MediaType ID:  1
ID:   8 | Name: Inject The Venom                              | MediaType ID:  1
ID:   9 | Name: Snowballed                                    | MediaType ID:  1
ID:  10 | Name: Evil Walks                                    | MediaType ID:  1


filteredTracksFromCSV = MapPartitionsRDD[245] at filter at <console>:54


MapPartitionsRDD[245] at filter at <console>:54

In [None]:
val rockTrackIdsCSV = tracksFromCSV.filter(_.genre == 1).map(_.trackId).collect().toSet
val invoiceFilteredFromCSV = invoiceItemsFromCSV.groupBy(_.invoiceId).filter { case (_, items) =>
    items.forall(item => rockTrackIdsCSV.contains(item.trackId))
}
println("Commandes filtrées avec le genre 1:")
invoiceFilteredFromCSV.take(10).foreach { case (invoiceId, items) =>
    println(s"Invoice ID: $invoiceId")
    items.foreach { item =>
        println(f"  Invoice Line ID: ${item.invoiceLineId}%3d | Track ID: ${item.trackId}%3d | Prix: ${item.unitPrice}%.2f | Quantité: ${item.quantity}%2d")
    }
}

Commandes filtrées avec le genre 1 (CSV):
Invoice ID: 408
  Invoice Line ID: 2207 | Track ID: 2953 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 2208 | Track ID: 2955 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 2209 | Track ID: 2957 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 2210 | Track ID: 2959 | Prix: 0.99 | Quantité:  1
Invoice ID: 302
  Invoice Line ID: 1635 | Track ID: 2972 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 1636 | Track ID: 2974 | Prix: 0.99 | Quantité:  1
Invoice ID: 146
  Invoice Line ID: 796 | Track ID: 1367 | Prix: 0.99 | Quantité:  1
Invoice ID: 226
  Invoice Line ID: 1219 | Track ID: 424 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 1220 | Track ID: 426 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 1221 | Track ID: 428 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 1222 | Track ID: 430 | Prix: 0.99 | Quantité:  1
Invoice ID: 288
  Invoice Line ID: 1559 | Track ID: 2508 | Prix: 0.99 | Quantité:  1
  Invoice Line ID: 1560 | Track ID: 2510 | Prix: 0.99 |

rockTrackIdsCSV = Set(2163, 2199, 3021, 1322, 1665, 2630, 1586, 1501, 2452, 809, 2094, 1411, 2612, 1024, 2744, 1369, 1168, 2306, 3053, 760, 3008, 1633, 1995, 2263, 347, 2412, 1237, 1031, 2463, 1315, 2427, 962, 2544, 777, 555, 88, 2280, 352, 1750, 1211, 1158, 2309, 2512, 582, 2976, 762, 3072, 1005, 2210, 2117, 2940, 1596, 1406, 683, 2231, 2622, 2644, 2381, 2395, 1443, 2659, 1618, 3285, 994, 2953, 1401, 2947, 1569, 3098, 2671, 2676, 2014, 5, 3108, 3012, 1205, 449, 1591, 2114, 440, 2380, 1793, 677, 1305, 1437, 1173, 1486, 1497, 10, 2195, 1705, 3017, 1608, 56, 1655, 2686, 1310, 3004, 550, 2740, 500, 2184, 1164, 1999, 797, 2448, 2933, 2141, 2434, 814, 2627, 1260, 698, 1988, 2459, 747, 1640, 340, 2146, 3276, 829, 1746, 3103, 2348, 1001, 2972, 2432, 1243, 2...


Set(2163, 2199, 3021, 1322, 1665, 2630, 1586, 1501, 2452, 809, 2094, 1411, 2612, 1024, 2744, 1369, 1168, 2306, 3053, 760, 3008, 1633, 1995, 2263, 347, 2412, 1237, 1031, 2463, 1315, 2427, 962, 2544, 777, 555, 88, 2280, 352, 1750, 1211, 1158, 2309, 2512, 582, 2976, 762, 3072, 1005, 2210, 2117, 2940, 1596, 1406, 683, 2231, 2622, 2644, 2381, 2395, 1443, 2659, 1618, 3285, 994, 2953, 1401, 2947, 1569, 3098, 2671, 2676, 2014, 5, 3108, 3012, 1205, 449, 1591, 2114, 440, 2380, 1793, 677, 1305, 1437, 1173, 1486, 1497, 10, 2195, 1705, 3017, 1608, 56, 1655, 2686, 1310, 3004, 550, 2740, 500, 2184, 1164, 1999, 797, 2448, 2933, 2141, 2434, 814, 2627, 1260, 698, 1988, 2459, 747, 1640, 340, 2146, 3276, 829, 1746, 3103, 2348, 1001, 2972, 2432, 1243, 2...

In [None]:
val salesByTrackCSV = invoiceItemsFromCSV
    .map(item => (item.trackId, item.quantity))
    .reduceByKey(_ + _)

val topSalesCSV = tracksFromCSV
    .map(track => (track.trackId, track))
    .join(salesByTrackCSV)
    .map { case (_, (track, totalSales)) => (track, totalSales) }
    .sortBy { case (_, sales) => -sales }
    .take(5)

println("Top 5 des titres en ventes:")
topSalesCSV.foreach { case (track, sales) =>
    println(f"Track ID: ${track.trackId}%3d | Nom: ${track.name}%-45s | Total ventes: $sales")
}

Top 5 des titres en ventes (CSV):
Track ID:   2 | Nom: Balls to the Wall                             | Total ventes: 2
Track ID: 2073 | Nom: Saber Amar                                    | Total ventes: 2
Track ID:   8 | Nom: Inject The Venom                              | Total ventes: 2
Track ID: 2085 | Nom: Meu Erro                                      | Total ventes: 2
Track ID:  20 | Nom: Overdose                                      | Total ventes: 2


salesByTrackCSV = ShuffledRDD[252] at reduceByKey at <console>:55
topSalesCSV = Array((Track(2,Balls to the Wall,2,1),2), (Track(2073,Saber Amar,1,7),2), (Track(8,Inject The Venom,1,1),2), (Track(2085,Meu Erro,1,7),2), (Track(20,Overdose,1,1),2))


Array((Track(2,Balls to the Wall,2,1),2), (Track(2073,Saber Amar,1,7),2), (Track(8,Inject The Venom,1,1),2), (Track(2085,Meu Erro,1,7),2), (Track(20,Overdose,1,1),2))

In [39]:
val itemsWithGenreCSV = invoiceItemsFromCSV
    .map(item => (item.trackId, item))
    .join(tracksFromCSV.map(track => (track.trackId, track.genre)))
    .map { case (_, (item, genre)) => (genre, 1) }

val totalOrdersByGenreCSV = itemsWithGenreCSV
    .reduceByKey(_ + _)
    .join(genresRDD.map(genre => (genre.genreId, genre.name)))
    .map { case (genreId, (totalOrders, genreName)) => (genreName, totalOrders) }

println("Total commandes par genre:")
totalOrdersByGenreCSV.collect().foreach { case (genre, total) =>
    println(f"Genre: $genre%-20s | Total commandes: $total")
}

Total commandes par genre:
Genre: Alternative & Punk   | Total commandes: 244
Genre: World                | Total commandes: 13
Genre: Comedy               | Total commandes: 9
Genre: R&B/Soul             | Total commandes: 41
Genre: Classical            | Total commandes: 41
Genre: Blues                | Total commandes: 61
Genre: Reggae               | Total commandes: 30
Genre: Easy Listening       | Total commandes: 10
Genre: Science Fiction      | Total commandes: 6
Genre: Sci Fi & Fantasy     | Total commandes: 20
Genre: Soundtrack           | Total commandes: 20
Genre: Jazz                 | Total commandes: 80
Genre: Heavy Metal          | Total commandes: 12
Genre: TV Shows             | Total commandes: 47
Genre: Electronica/Dance    | Total commandes: 12
Genre: Drama                | Total commandes: 29
Genre: Bossa Nova           | Total commandes: 15
Genre: Alternative          | Total commandes: 14
Genre: Rock                 | Total commandes: 835
Genre: Hip Hop/Rap     

itemsWithGenreCSV = MapPartitionsRDD[268] at map at <console>:57
totalOrdersByGenreCSV = MapPartitionsRDD[274] at map at <console>:62


MapPartitionsRDD[274] at map at <console>:62