In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Projects').getOrCreate()

In [None]:
# # Charger les fichiers CSV dans des DataFrames
BASE_PATH = "hdfs:///user/root/movielens"
movie_path = f"{BASE_PATH}/movies.csv"
# movies_df = spark.read.csv(movie_path, header=True, inferSchema=True)
# movies_df = spark.read.option("header", "true").csv(movie_path)

movies_df = spark.read.csv("hdfs://hadoop-master:9000/movielens/movies.csv", header=True, inferSchema=True)
# ratings_df = spark.read.csv("hdfs://hadoop-master:9000/movielens/ratings.csv", header=True, inferSchema=True)
# tags_df = spark.read.csv("hdfs://hadoop-master:9000/movielens/tags.csv", header=True, inferSchema=True)
# genomes_scores_df = spark.read.csv("hdfs://hadoop-master:9000/movielens/genome-scores.csv", header=True, inferSchema=True)
# genomes_tags_df = spark.read.csv("hdfs://hadoop-master:9000/movielens/genome-tags.csv", header=True, inferSchema=True)
# links_df = spark.read.csv("hdfs://hadoop-master:9000/movielens/links.csv", header=True, inferSchema=True)

In [None]:
# Charger les fichiers CSV dans des DataFrames
movies_df = spark.read.csv("ml-latest/movies.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("ml-latest/ratings.csv", header=True, inferSchema=True)
tags_df = spark.read.csv("ml-latest/tags.csv", header=True, inferSchema=True)
genomes_scores_df = spark.read.csv("ml-latest/genome-scores.csv", header=True, inferSchema=True)
genomes_tags_df = spark.read.csv("ml-latest/genome-tags.csv", header=True, inferSchema=True)
links_df = spark.read.csv("ml-latest/links.csv", header=True, inferSchema=True)

In [None]:
print("Aperçu des données chargées :")
print("Movies DataFrame:")
movies_df.show(5)
movies_df.printSchema()
print("Ratings DataFrame:")
ratings_df.show(5)
ratings_df.printSchema()
print("Tags DataFrame:")
tags_df.show(5)
print("Genome Scores DataFrame:")
genomes_scores_df.show(5)
print("Genome Tags DataFrame:")
genomes_tags_df.show(5)
print("Links DataFrame:")
links_df.show(5)


Aperçu des données chargées :
Movies DataFrame:
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

Ratings DataFrame:
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only 

# ratings

In [None]:
#Suppresion des colonnes inutiles dans le DataFrame des notes
ratings_df = ratings_df.drop('timestamp')
ratings_df = ratings_df.drop('userId')
print("Schéma après nettoyage des données :")
ratings_df.printSchema()
print(ratings_df.show(5))



Schéma après nettoyage des données :
root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)

+-------+------+
|movieId|rating|
+-------+------+
|    307|   3.5|
|    481|   3.5|
|   1091|   1.5|
|   1257|   4.5|
|   1449|   4.5|
+-------+------+
only showing top 5 rows
None


In [None]:
# number of ratings per movie
from pyspark.sql.functions import col, count
ratings_count_df = ratings_df.groupBy('movieId').agg(count('rating').alias('num_ratings'))
print("Nombre de notes par film :")
ratings_count_df.show(5)

Nombre de notes par film :
+-------+-----------+
|movieId|num_ratings|
+-------+-----------+
|   1591|       6508|
|   1645|      15215|
|    471|      12308|
|   1088|      14100|
|   1580|      44287|
+-------+-----------+
only showing top 5 rows


In [None]:
#Regrouper ratings par movieId et calculer la note moyenne
average_ratings_df = ratings_df.groupBy('movieId').avg('rating').withColumnRenamed('avg(rating)', 'average_rating')
print("Aperçu des notes moyennes par film :")
average_ratings_df.show(5)


#Comedy|Drama|Romance|
#genre : 1 par ligne
from pyspark.sql.functions import explode, split
movies_df = movies_df.withColumn('genre', explode(split(movies_df.genres, '\|')))
movies_df = movies_df.drop('genres')
print("Schéma après transformation des genres :")
movies_df.printSchema()
print(movies_df.show(5))



Aperçu des notes moyennes par film :


  movies_df = movies_df.withColumn('genre', explode(split(movies_df.genres, '\|')))


+-------+------------------+
|movieId|    average_rating|
+-------+------------------+
|   1591|2.6466656422864165|
|   1645|3.5352941176470587|
|    471| 3.652908677283068|
|   1088|3.2480141843971633|
|   1580| 3.578533203874726|
+-------+------------------+
only showing top 5 rows
Schéma après transformation des genres :
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = false)

+-------+----------------+---------+
|movieId|           title|    genre|
+-------+----------------+---------+
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
+-------+----------------+---------+
only showing top 5 rows
None


In [None]:
# Extraire l'année du titre du film
from pyspark.sql.functions import regexp_extract
movies_df = movies_df.withColumn('year', regexp_extract('title', r'\((\d{4})\)', 1))
print("Schéma après extraction de l'année :")
movies_df.printSchema()
print(movies_df.show(5))

Schéma après extraction de l'année :
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = false)
 |-- year: string (nullable = true)

+-------+----------------+---------+----+
|movieId|           title|    genre|year|
+-------+----------------+---------+----+
|      1|Toy Story (1995)|Adventure|1995|
|      1|Toy Story (1995)|Animation|1995|
|      1|Toy Story (1995)| Children|1995|
|      1|Toy Story (1995)|   Comedy|1995|
|      1|Toy Story (1995)|  Fantasy|1995|
+-------+----------------+---------+----+
only showing top 5 rows
None


In [None]:
movies_df.show(5)

+-------+----------------+---------+----+
|movieId|           title|    genre|year|
+-------+----------------+---------+----+
|      1|Toy Story (1995)|Adventure|1995|
|      1|Toy Story (1995)|Animation|1995|
|      1|Toy Story (1995)| Children|1995|
|      1|Toy Story (1995)|   Comedy|1995|
|      1|Toy Story (1995)|  Fantasy|1995|
+-------+----------------+---------+----+
only showing top 5 rows


movies->ratings->tags

In [None]:
# Joindre les dataframes pour obtenir un dataset complet
from pyspark.sql.functions import col
full_df = movies_df.join(average_ratings_df, on='movieId', how='left') \
    .join(ratings_count_df, on='movieId', how='left')

print("Aperçu du dataset complet :")
full_df.show(5)

Aperçu du dataset complet :
+-------+----------------+---------+----+------------------+-----------+
|movieId|           title|    genre|year|    average_rating|num_ratings|
+-------+----------------+---------+----+------------------+-----------+
|      1|Toy Story (1995)|Adventure|1995|3.8866494325899312|      68469|
|      1|Toy Story (1995)|Animation|1995|3.8866494325899312|      68469|
|      1|Toy Story (1995)| Children|1995|3.8866494325899312|      68469|
|      1|Toy Story (1995)|   Comedy|1995|3.8866494325899312|      68469|
|      1|Toy Story (1995)|  Fantasy|1995|3.8866494325899312|      68469|
+-------+----------------+---------+----+------------------+-----------+
only showing top 5 rows


In [None]:
import pandas as pd

pandas_df = full_df.toPandas()
pandas_df.to_csv("full_dataset_v2.csv", index=False)