<center>TP3: Spark RDD</center>

## 1 Initialisation de Spark


In [1]:
#commencer ici avec jupyter
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .master('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

In [5]:
def parseRatings(row):
    splitted=row.split("::")
    return (int(splitted[0]),int(splitted[1]),int(splitted[2]),int(splitted[3]))

In [6]:
ratings_rdd = sc.textFile("MovieData/ratings.dat").map(parseRatings).cache()


In [7]:
ratings_rdd.map(lambda x: 1 if x[2] == 1 else 0).reduce(lambda x, y: x + y)

56174

In [8]:
ratings_rdd.map(lambda row: row[1]).distinct().count()

3706

In [10]:
from operator import add
user_ratings_count = ratings_rdd.map(lambda x: (x[0], 1)).reduceByKey(add)
top_user = user_ratings_count.max(key=lambda x: x[1])
print(f"UserID: {top_user[0]}, Number of Ratings: {top_user[1]}")

UserID: 4169, Number of Ratings: 2314


In [12]:
def parseMovies(row):
    fields = row.split("::")
    return (int(fields[0]), fields[1], fields[2])

movies_rdd = sc.textFile("MovieData/movies.dat").map(parseMovies).cache()

In [14]:
def parseUsers(row):
    fields = row.split("::")
    return (int(fields[0]), fields[1], int(fields[2]), int(fields[3]), fields[4])

users_rdd = sc.textFile("MovieData/users.dat").map(parseUsers).cache()

In [15]:
# Extraire et séparer les genres
genres_rdd = movies_rdd.flatMap(lambda x: x[2].split("|"))

# Supprimer les doublons et trier
unique_sorted_genres = genres_rdd.distinct().collect()
unique_sorted_genres.sort()

# Afficher la liste des genres triée
print(unique_sorted_genres)

['Action', 'Adventure', 'Animation', "Children's", 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']


In [16]:
# Extraire et séparer les genres, puis les transformer en paires clé-valeur
genre_counts_rdd = movies_rdd.flatMap(lambda x: [(genre, 1) for genre in x[2].split("|")])

# Compter le nombre de films pour chaque genre
genre_counts = genre_counts_rdd.reduceByKey(lambda x, y: x + y).collect()

# Trier les résultats par genre
genre_counts.sort(key=lambda x: x[0])

# Afficher le nombre de films pour chaque genre
for genre, count in genre_counts:
    print(f"{genre}: {count}")


Action: 503
Adventure: 283
Animation: 105
Children's: 251
Comedy: 1200
Crime: 211
Documentary: 127
Drama: 1603
Fantasy: 68
Film-Noir: 44
Horror: 343
Musical: 114
Mystery: 106
Romance: 471
Sci-Fi: 276
Thriller: 492
War: 143
Western: 68


In [18]:
# Filtrer les utilisateurs hommes de plus de 45 ans
filtered_users_rdd = users_rdd.filter(lambda x: x[1] == 'M' and int(x[2]) > 45)

# Joindre les RDD des utilisateurs filtrés et des évaluations
filtered_ratings_rdd = filtered_users_rdd.map(lambda x: (x[0], x)).join(ratings_rdd.map(lambda x: (x[0], x)))

# Filtrer pour ne garder que les évaluations avec une note >= 4
high_ratings_rdd = filtered_ratings_rdd.filter(lambda x: x[1][1][2] >= 4)

# Joindre avec le RDD des films pour obtenir les genres
high_rated_movies_rdd = high_ratings_rdd.map(lambda x: (x[1][1][1], x)).join(movies_rdd.map(lambda x: (x[0], x)))

# Extraire les genres et compter leur occurrence
popular_genres_rdd = high_rated_movies_rdd.flatMap(lambda x: [(genre, 1) for genre in x[1][1][2].split("|")])
popular_genre_counts = popular_genres_rdd.reduceByKey(lambda x, y: x + y).collect()

# Afficher les résultats
for genre, count in popular_genre_counts:
    print(f"{genre}: {count}")

Action: 11988
Sci-Fi: 7137
Western: 2024
Animation: 1361
Thriller: 9586
Children's: 2200
Drama: 22972
Horror: 2385
Mystery: 2883
Comedy: 14944
Adventure: 6336
Film-Noir: 1840
Fantasy: 1307
Musical: 2181
Romance: 7549
Documentary: 420
Crime: 4331
War: 5800


In [19]:
from operator import itemgetter

# Extraire l'année de sortie et les genres de chaque film
movies_with_year_rdd = movies_rdd.map(lambda x: (x[0], x[1], x[2].split("|"), x[1][-5:-1]))

# Compter le nombre de films pour chaque genre et chaque année
genre_year_counts_rdd = movies_with_year_rdd.flatMap(lambda x: [((x[3], genre), 1) for genre in x[2]])
genre_year_counts = genre_year_counts_rdd.reduceByKey(lambda x, y: x + y)

# Transformer en (année, (genre, nombre de films)) et trouver le genre le plus populaire pour chaque année
popular_genre_by_year = genre_year_counts.map(lambda x: (x[0][0], (x[0][1], x[1])))
most_popular_genre_by_year = popular_genre_by_year.reduceByKey(lambda x, y: x if x[1] > y[1] else y).collect()

# Trier et afficher les résultats
sorted_results = sorted(most_popular_genre_by_year, key=itemgetter(0))
for year, genre_count in sorted_results:
    print(f"{year}: {genre_count[0]}")


1919: Drama
1920: Comedy
1921: Action
1922: Drama
1923: Comedy
1925: Comedy
1926: Drama
1927: Drama
1928: Comedy
1929: Thriller
1930: Drama
1931: Horror
1932: Romance
1933: Horror
1934: Comedy
1935: Romance
1936: Comedy
1937: Drama
1938: Comedy
1939: Drama
1940: Comedy
1941: Drama
1942: Drama
1943: Drama
1944: Horror
1945: Drama
1946: Drama
1947: Film-Noir
1948: Drama
1949: Drama
1950: Sci-Fi
1951: Drama
1952: Drama
1953: Drama
1954: Drama
1955: Drama
1956: Sci-Fi
1957: Drama
1958: Drama
1959: Horror
1960: Drama
1961: Drama
1962: Drama
1963: Drama
1964: Musical
1965: Drama
1966: Drama
1967: Comedy
1968: Drama
1969: Western
1970: Comedy
1971: Drama
1972: Drama
1973: Drama
1974: Drama
1975: Drama
1976: Drama
1977: Comedy
1978: Drama
1979: Drama
1980: Drama
1981: Drama
1982: Drama
1983: Comedy
1984: Drama
1985: Drama
1986: Drama
1987: Comedy
1988: Comedy
1989: Comedy
1990: Drama
1991: Drama
1992: Drama
1993: Drama
1994: Drama
1995: Drama
1996: Drama
1997: Drama
1998: Drama
1999: Drama
200

In [20]:
# Filtrer les utilisateurs de moins de 18 ans
young_users_rdd = users_rdd.filter(lambda x: x[2] == 1)

# Joindre les RDD pour obtenir les notes données par ces utilisateurs
young_users_ratings_rdd = young_users_rdd.map(lambda x: (x[0], None)).join(ratings_rdd.map(lambda x: (x[0], x[2])))

# Compter le nombre de fois que chaque note apparaît
note_frequencies = young_users_ratings_rdd.map(lambda x: (x[1][1], 1)).reduceByKey(lambda x, y: x + y).collect()

# Trier et afficher les résultats
sorted_note_frequencies = sorted(note_frequencies, key=lambda x: x[0])
print(sorted_note_frequencies)


[(1, 2238), (2, 2983), (3, 6380), (4, 8808), (5, 6802)]
