In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, concat_ws, lit, concat, desc, count, when, asc, rank, dense_rank, udf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType




In [35]:
spark = SparkSession.builder.appName("DataAnalysis").getOrCreate()

In [36]:
#charge le csv dans un dataframe
film_df = spark.read.csv("film.csv", header=True, inferSchema=True)

In [37]:
#supprime la colonne image
film_df = film_df.drop("*image")

In [38]:
# Ajouter une colonne d'identifiant unique à chaque ligne
film_df = film_df.withColumn("row_id", monotonically_increasing_id())

# Spécifier l'indice de la ligne à supprimer (2 dans votre cas)
index_to_remove = 2

# Supprimer la ligne en fonction de l'indice
film_df = film_df.filter(col("row_id") != index_to_remove)

# Supprimer la colonne d'identifiant unique ajoutée
film_df = film_df.drop("row_id")

In [39]:
# créer la colonne crédit
film_df = film_df.withColumn(
    "Credits",
    concat(
        col("title"), lit(": a"),
        col("Actor"), lit("and"),
        col("actress"), lit("film’s, directed by"),
        col("Director")
    )
)

In [None]:
#affiche le résultat
film_df.show(truncate=False)

In [None]:
#Tri les films par année
#affiche les résultats
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_tries_par_annee = film_df.orderBy(col("Year"))
films_tries_par_annee.show(truncate=False)
films_tries_par_annee.write.csv("films_tries_par_annee.csv", header=True, mode="overwrite")


In [43]:
#Tri les films par popularité
#affiche les résultats
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_tries_par_popularite = film_df.orderBy(col("Popularity"))
films_tries_par_popularite.write.csv("films_tries_par_popularite.csv", header=True, mode="overwrite")
films_tries_par_popularite.show(truncate=False)

In [None]:
#Filtre les films qui durent plus de 2h
#affiche les résultats
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_plus_de_2_heures = film_df.filter(col("Length") > 120)
films_plus_de_2_heures.write.csv("films_plus_de_2_heures.csv", header=True, mode="overwrite")
films_plus_de_2_heures.show(truncate=False)


In [None]:
#Filtre les films d'horreur
#affiche les résultats
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_genre1 = film_df.filter(col("Subject") == "Horror")
films_genre1.write.csv("films_genre1.csv", header=True, mode="overwrite")
films_genre1.show(truncate=False)

In [None]:
#Tri les films sur le mystère
#affiche les résultats
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_genre2 = film_df.filter(col("Subject") == "Mystery")
films_genre2.write.csv("films_genre2.csv", header=True, mode="overwrite")
films_genre2.show(truncate=False)

In [None]:
#Isole la colonne acteurs, compte le nombre de fois que chaque nom revient et les classe par ordre décroissant
#affiche les résultats
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


acteur_plus_de_films = film_df.groupBy("Actor").count().orderBy(desc("count"))
acteur_plus_de_films.show(truncate=False)
acteur_plus_de_films.write.csv("acteur_plus_de_films.csv", header=True, mode="overwrite")

In [None]:
# Compter le nombre de récompenses (Yes) pour chaque directeur
# classe par ordre décroissant
# Afficher le résultat
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


directeur_plus_de_recompenses = film_df.groupBy("Director").agg(count(when(col("Awards") == "Yes", True)).alias("Nombre de Recompenses"))
directeur_plus_de_recompenses = directeur_plus_de_recompenses.orderBy(desc("Nombre de Recompenses"))
directeur_plus_de_recompenses.show(truncate=False)
directeur_plus_de_recompenses.write.csv("directeur_plus_de_recompenses.csv", header=True, mode="overwrite")

In [None]:
# Filtrer les films ayant remporté un prix
# Trouver le film le plus populaire parmi ceux ayant remporté un prix
# Afficher le résultat
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_avec_prix = film_df.filter(col("Awards") == "Yes")
film_plus_populaire_avec_prix = films_avec_prix.orderBy(desc("Popularity")).limit(1)
film_plus_populaire_avec_prix.show(truncate=False)
film_plus_populaire_avec_prix.write.csv("film_plus_populaire_avec_prix.csv", header=True, mode="overwrite")


In [None]:
# Filtrer les films n'ayant pas obtenu de récompense
# Grouper par genre et compter le nombre de films sans récompense pour chaque genre
# Classer par ordre décroissant
# Afficher le résultat
#enregistre le résultat dans un csv et si un fichier existe déjà à ce nom, le remplace


films_sans_recompense = film_df.filter(col("Awards") == "No")
genres_sans_recompense = films_sans_recompense.groupBy("Subject").count()
genres_sans_recompense = genres_sans_recompense.orderBy(desc("count"))
genres_sans_recompense.show(truncate=False)
genres_sans_recompense.write.csv("genres_sans_recompense.csv", header=True, mode="overwrite")

In [None]:
#Classement des films par popularité pour chaque genre
# Ajouter les colonnes de classement
# afficher le résultat
window_spec = Window.partitionBy("Subject").orderBy(desc("Popularity"))


film_df = film_df.withColumn("Rank", rank().over(window_spec))
film_df = film_df.withColumn("DenseRank", dense_rank().over(window_spec))





# Compte le nombre de film par réalisateur
# Création d'une UDF pour convertir le "Title" en majuscules
# Application de la UDF à la colonne "Title"
# Afficher le résultat avec la nouvelle colonne "TitleUpper"


nombre_films_par_directeur = film_df.groupBy("Director").agg(count("Title").alias("NombreFilms"))
nombre_films_par_directeur.show(truncate=False)



convertir_majuscules_udf = udf(lambda title: title.upper(), StringType())


film_df = film_df.withColumn("Title", convertir_majuscules_udf("Title"))



film_df.show(truncate=False)

