In [1]:
import pandas as pd
from ydata_profiling import ProfileReport
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Clean-Recommender") \
    .config("spark.driver.memory", "12g") \
    .getOrCreate()

  from .autonotebook import tqdm as notebook_tqdm


Genera el Profile Report

In [2]:
movies_pd = pd.read_csv("../data/raw_data/peliculas.csv")

In [3]:
# profile = ProfileReport(movies_pd, title="Movies Profiling Report", explorative=True)
# profile.to_file("../data/ProfileReport/peliculas_report.html")

In [4]:
# movies = spark.read.csv("../data/raw_data/peliculas.csv", header=True, inferSchema=True, sep=",")
movies = spark.createDataFrame(movies_pd)
# ratings = spark.read.csv("../data/raw_data/ratings.csv", header=True, inferSchema=True, sep=",")
# links = spark.read.csv("../data/raw_data/links.csv", header=True, inferSchema=True, sep=",")

Limpieza de datos

In [5]:
from pyspark.sql.types import ArrayType, StructType, \
StructField, IntegerType, StringType
from pyspark.sql.functions import col, regexp_replace, \
from_json, expr, explode, trim, to_date, concat_ws

In [6]:
movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- tmdbId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- id: long (nullable = true)



In [7]:
movies.show(1)
print(movies.count())

+-----+--------------------+------+---------+--------------------+-------------+--------------------+------------+------------+----------+---+
|adult|              genres|tmdbId|    title|            overview|  poster_path|production_companies|release_date|vote_average|vote_count| id|
+-----+--------------------+------+---------+--------------------+-------------+--------------------+------------+------------+----------+---+
|False|[{'id': 16, 'name...|   862|Toy Story|Led by Woody, And...|posters\1.jpg|[{'name': 'Pixar ...|  1995-10-30|         7.7|    5415.0|  1|
+-----+--------------------+------+---------+--------------------+-------------+--------------------+------------+------------+----------+---+
only showing top 1 row

45466


Col: adult

In [8]:
movies.select("adult").distinct().count()

5

In [9]:
movies.groupBy("adult").count().show()

+--------------------+-----+
|               adult|count|
+--------------------+-----+
|               False|45454|
|  - Written by Ørnås|    1|
|                True|    9|
| Rune Balot goes ...|    1|
| Avalanche Sharks...|    1|
+--------------------+-----+



In [10]:
movies_clean = movies.filter(
    (movies.adult == True) | (movies.adult == False))

In [11]:
movies_clean.count()

45463

In [12]:
movies_clean = movies_clean.withColumn(
    "adult", col("adult").cast("boolean"))

Col: genres

In [13]:
movies_clean.select("genres").distinct().count()

4066

In [14]:
movies_clean.select("genres").distinct().show()

+--------------------+
|              genres|
+--------------------+
|[{'id': 12, 'name...|
|[{'id': 18, 'name...|
|[{'id': 36, 'name...|
|[{'id': 53, 'name...|
|[{'id': 14, 'name...|
|[{'id': 28, 'name...|
|[{'id': 28, 'name...|
|[{'id': 99, 'name...|
|[{'id': 18, 'name...|
|[{'id': 35, 'name...|
|[{'id': 53, 'name...|
|[{'id': 35, 'name...|
|[{'id': 35, 'name...|
|[{'id': 12, 'name...|
|[{'id': 35, 'name...|
|[{'id': 12, 'name...|
|[{'id': 35, 'name...|
|[{'id': 28, 'name...|
|[{'id': 14, 'name...|
|[{'id': 14, 'name...|
+--------------------+
only showing top 20 rows



Si tus datos usan comillas simples (') en lugar de dobles ("), lo primero es reemplazarlas, porque el parser JSON requiere comillas dobles.

In [15]:
# Reparar comillas simples a dobles si es necesario
movies_clean = movies_clean.withColumn(
    "genres_json", regexp_replace(col("genres"), "'", '"'))

In [16]:
# Esquema para los géneros
genre_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)

In [17]:
# Parsear el string JSON a array de structs
movies_clean = movies_clean.withColumn(
    "genres_parsed", from_json(col("genres_json"), genre_schema))

In [18]:
# Extraer solo los IDs
movies_clean = movies_clean.withColumn(
    "genres", expr("transform(genres_parsed, x -> x.id)"))

In [19]:
movies_clean.select("genres").distinct().show()

+--------------------+
|              genres|
+--------------------+
|  [53, 80, 18, 9648]|
|        [35, 28, 12]|
|[12, 18, 28, 1074...|
|       [35, 18, 878]|
|     [10749, 28, 35]|
|     [28, 10752, 37]|
|        [80, 35, 14]|
|            [12, 37]|
|       [878, 35, 80]|
|     [16, 10751, 35]|
|     [10749, 12, 18]|
|  [18, 10751, 10749]|
|       [878, 27, 28]|
|[18, 10402, 10749...|
| [10749, 35, 80, 18]|
|           [53, 878]|
|     [28, 35, 10751]|
| [53, 18, 10402, 80]|
|     [878, 12, 9648]|
|        [35, 27, 14]|
+--------------------+
only showing top 20 rows



In [20]:
# Explota los géneros para tener una fila por cada id-name
genre_pairs_df = movies_clean.select(explode("genres_parsed").alias("genre")) \
                       .select("genre.id", "genre.name") \
                       .distinct()

In [21]:
# Recoge los resultados como diccionario
genres_id = {row["id"]: row["name"] for row in genre_pairs_df.collect()}

In [22]:
sorted(genres_id.items())

[(12, 'Adventure'),
 (14, 'Fantasy'),
 (16, 'Animation'),
 (18, 'Drama'),
 (27, 'Horror'),
 (28, 'Action'),
 (35, 'Comedy'),
 (36, 'History'),
 (37, 'Western'),
 (53, 'Thriller'),
 (80, 'Crime'),
 (99, 'Documentary'),
 (878, 'Science Fiction'),
 (9648, 'Mystery'),
 (10402, 'Music'),
 (10749, 'Romance'),
 (10751, 'Family'),
 (10752, 'War'),
 (10769, 'Foreign'),
 (10770, 'TV Movie')]

Col: tmdbId

In [23]:
movies_clean.select("tmdbId").distinct().count()

45433

In [24]:
movies_clean.count()

45463

In [25]:
movies_clean.groupBy("tmdbId").count().filter("count > 1").show()

+------+-----+
|tmdbId|count|
+------+-----+
| 22649|    2|
|105045|    2|
| 84198|    2|
|132641|    2|
|  4912|    2|
| 12600|    2|
| 10991|    2|
| 15028|    2|
|109962|    2|
|110428|    2|
| 69234|    2|
| 14788|    2|
| 77221|    2|
|  5511|    2|
| 23305|    2|
| 18440|    2|
|168538|    2|
|159849|    2|
| 11115|    2|
|141971|    3|
+------+-----+
only showing top 20 rows



In [26]:
duplicated_ids = movies_clean.groupBy("tmdbId") \
                             .count() \
                             .filter("count > 1") \
                             .select("tmdbId") \
                             .rdd.flatMap(lambda x: x).collect()

In [27]:
movies_clean.filter(col("tmdbId").isin(duplicated_ids)).show(5)

+-----+------------------+------+--------------------+--------------------+----------------+--------------------+------------+------------+----------+----+--------------------+--------------------+
|adult|            genres|tmdbId|               title|            overview|     poster_path|production_companies|release_date|vote_average|vote_count|  id|         genres_json|       genres_parsed|
+-----+------------------+------+--------------------+--------------------+----------------+--------------------+------------+------------+----------+----+--------------------+--------------------+
|false|       [18, 10749]|105045|         The Promise|East-Berlin, 1961...| posters\687.jpg|[{'name': 'Studio...|  1995-02-16|         5.0|       1.0| 687|[{"id": 18, "name...|[{18, Drama}, {10...|
|false|              [18]|132641|                Wife|Ten years into a ...| posters\853.jpg|[{'name': 'Toho C...|  1953-04-29|         0.0|       0.0| 853|[{"id": 18, "name...|       [{18, Drama}]|
|false|[18

In [28]:
movies_clean = movies_clean.dropDuplicates(["tmdbId"])

Con esto hemos eliminado las peliculas duplicadas por tmdbId

In [29]:
movies_clean.count()

45433

In [30]:
movies_clean.groupBy("tmdbId").count().filter("count > 1").show()

+------+-----+
|tmdbId|count|
+------+-----+
+------+-----+



Col: title

In [31]:
movies_clean.select("title").distinct().count()

42278

In [32]:
movies_clean.groupBy("title").count().filter("count > 1").show()

+--------------------+-----+
|               title|count|
+--------------------+-----+
|              Heaven|    2|
|                Silk|    3|
|   The Biscuit Eater|    2|
|          Riverworld|    2|
|      A Woman's Face|    2|
|              Crisis|    2|
|Diary of a Chambe...|    2|
|       The Encounter|    2|
|          Nightmares|    2|
|              Hawaii|    2|
|The Time of Your ...|    2|
|             Salvage|    2|
|          Charleston|    2|
|             The Fan|    2|
|           Detention|    2|
|            Midnight|    4|
|                Deal|    2|
|          The Double|    4|
|       The Last Days|    2|
|            Madhouse|    3|
+--------------------+-----+
only showing top 20 rows



In [33]:
duplicated_title = movies_clean.groupBy("title") \
                             .count() \
                             .filter("count > 1") \
                             .select("title") \
                             .rdd.flatMap(lambda x: x).collect()

In [34]:
movies_clean = movies_clean.dropDuplicates(["title"])

Con esto hemos eliminado las peliculas duplicadas por title

Col: overview

In [35]:
movies_clean.select("overview").distinct().count()

41196

In [36]:
movies_clean.groupBy("overview").count().filter("count > 1").show()

+--------------------+-----+
|            overview|count|
+--------------------+-----+
|A Russian enginee...|    2|
|With friends like...|    2|
|Director Michael ...|    2|
|Poor but happy, y...|    2|
|An abstract anima...|    2|
|Funny, entertaini...|    2|
|No movie overview...|    3|
| A film by Jem Cohen|    2|
|Karol has everyth...|    2|
|  No overview found.|  128|
|The Kingdom of th...|    2|
|The ghost of a sa...|    2|
|                 NaN|  928|
|Alien pods come t...|    2|
|         No Overview|    7|
|Experimental film...|    2|
|When four women m...|    2|
|    No overview yet.|    2|
|                    |    4|
|A few funny littl...|    3|
+--------------------+-----+
only showing top 20 rows



In [37]:
duplicated_overview = movies_clean.groupBy("overview") \
                             .count() \
                             .filter("count > 1") \
                             .select("overview") \
                             .rdd.flatMap(lambda x: x).collect()

In [38]:
duplicates = movies_clean.filter(col("overview").isin(duplicated_overview))

In [39]:
duplicates.select("overview", "title").orderBy("overview").show()

+--------------------+--------------------+
|            overview|               title|
+--------------------+--------------------+
|                    |  Afstiros katallilo|
|                    |           Snow days|
|                    |  Slaves of New York|
|                    |             Iceland|
|A Russian enginee...|Failure of Engine...|
|A Russian enginee...|The Hyperboloid o...|
|A few funny littl...|        The Comics 2|
|A few funny littl...|          The Comics|
|A few funny littl...|      The New Comics|
| A film by Jem Cohen|Building a Broken...|
| A film by Jem Cohen|     Buried in Light|
|Alien pods come t...|    Parasyte: Part 2|
|Alien pods come t...|    Parasyte: Part 1|
|An abstract anima...|             Opus II|
|An abstract anima...|            Opus III|
|      Bollywood 2009|               Gaiir|
|      Bollywood 2009|  Be Dune Saade Char|
|Director Michael ...|               42 Up|
|Director Michael ...|               35 Up|
|Experimental film...|          

In [40]:
duplicates.select("overview", "title").orderBy("overview").count()

1103

In [41]:
movies_clean = movies_clean.filter(
    (col("overview").isNotNull()) &                                   # No null / NaN
    (trim(col("overview")) != "") &                                   # No cadena vacía
    (trim(col("overview")) != "NaN") &                                   # No cadena vacía
    (trim(col("overview")) != "No overview found.")                   # No texto inválido
)

In [42]:
movies_clean.count()

41218

In [43]:
movies_clean.groupBy("overview").count().filter("count > 1").show()

+--------------------+-----+
|            overview|count|
+--------------------+-----+
|A Russian enginee...|    2|
|With friends like...|    2|
|Director Michael ...|    2|
|Poor but happy, y...|    2|
|An abstract anima...|    2|
|Funny, entertaini...|    2|
|No movie overview...|    3|
| A film by Jem Cohen|    2|
|Karol has everyth...|    2|
|The Kingdom of th...|    2|
|The ghost of a sa...|    2|
|Alien pods come t...|    2|
|         No Overview|    7|
|Experimental film...|    2|
|When four women m...|    2|
|    No overview yet.|    2|
|A few funny littl...|    3|
|      Bollywood 2009|    2|
+--------------------+-----+



Col: poster_path

In [44]:
movies_clean.select("poster_path").distinct().count()

40342

In [45]:
movies_clean.groupBy("poster_path").count().filter("count > 1").show()

+-----------+-----+
|poster_path|count|
+-----------+-----+
|        NaN|  877|
+-----------+-----+



In [46]:
movies_clean = movies_clean.filter(                                 
    (trim(col("poster_path")) != "NaN")                
)

In [47]:
movies_clean.count()

40341

Col: production_companies

In [48]:
movies_clean.groupBy("production_companies").count().filter("count > 1").show()

+--------------------+-----+
|production_companies|count|
+--------------------+-----+
|[{'name': 'Hollyw...|    4|
|[{'name': 'Tokyo ...|    4|
|[{'name': 'Motion...|    3|
|[{'name': 'United...|    2|
|[{'name': 'Produc...|    2|
|[{'name': 'Showbo...|    6|
|[{'name': 'First ...|    7|
|[{'name': 'Netfli...|   18|
|[{'name': 'Mattel...|   10|
|[{'name': 'Golden...|   18|
|[{'name': 'Renega...|    3|
|[{'name': 'Walt D...|    3|
|[{'name': 'The Fi...|    3|
|[{'name': 'Charle...|    5|
|[{'name': 'Showbo...|    5|
|[{'name': 'Hanna-...|    7|
|[{'name': 'Greek ...|    3|
|[{'name': 'Touchs...|   63|
|[{'name': 'Montro...|    2|
|[{'name': 'Univer...|    3|
+--------------------+-----+
only showing top 20 rows



In [49]:
movies_clean.filter(                                 
    (trim(col("production_companies")) == "NaN")                
).count()

1

In [50]:
movies_clean = movies_clean.filter(                                 
    (trim(col("production_companies")) != "NaN")                
)

Col: release_date

In [51]:
movies_clean.filter(                                 
    (trim(col("release_date")) == "NaN")                
).count()

43

In [52]:
movies_clean = movies_clean.filter(                                 
    (trim(col("release_date")) != "NaN")                
)

In [53]:
movies_clean.filter(                                 
    (trim(col("release_date")) == "NaN")                
).count()

0

In [54]:
movies_clean.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- tmdbId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- id: long (nullable = true)
 |-- genres_json: string (nullable = true)
 |-- genres_parsed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)



In [55]:
movies_clean = movies_clean.withColumn("release_date", to_date(col("release_date"), "yyyy-MM-dd"))

In [56]:
# Convertir la columna genres (array de enteros) a string
movies_clean = movies_clean.withColumn("genres", concat_ws(",", "genres"))

In [57]:
movies_clean = movies_clean.drop("genres_json", "genres_parsed")

In [58]:
movies_clean.show(5)

+-----+--------------+------+--------------------+--------------------+------------------+--------------------+------------+------------+----------+------+
|adult|        genres|tmdbId|               title|            overview|       poster_path|production_companies|release_date|vote_average|vote_count|    id|
+-----+--------------+------+--------------------+--------------------+------------------+--------------------+------------+------------+----------+------+
|false|53,28,18,10752|252178|                 '71|A young British s...|posters\116889.jpg|[{'name': 'Screen...|  2014-10-10|         6.7|     414.0|116889|
|false|         18,53| 49110|             'R Xmas|A New York drug d...| posters\81214.jpg|[{'name': 'Barnho...|  2001-10-04|         5.3|       3.0| 81214|
|false|         18,28| 85255|...tick... tick.....|Racial tensions t...| posters\26157.jpg|[{'name': 'Metro-...|  1970-01-09|         6.2|       7.0| 26157|
|false|              |253484|    1 Chance 2 Dance|When a 17-year

In [59]:
movies_clean = movies_clean.select(
    "id",
    "title",
    "overview",
    "tmdbId",
    "vote_average",
    "vote_count",
    "genres",
    "adult",
    "release_date",
    "production_companies",
    "poster_path"
)

In [60]:
movies_clean.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- tmdbId: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- genres: string (nullable = false)
 |-- adult: boolean (nullable = true)
 |-- release_date: date (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- poster_path: string (nullable = true)



In [61]:
# movies_clean.write \
#     .option("header", True) \
#     .option("encoding", "UTF-8") \
#     .mode("overwrite") \
#     .csv("../data/clean_data/movies_clean")

In [62]:
movies_clean.write \
    .mode("overwrite") \
    .parquet("../data/clean_data/movies_clean")

In [63]:
moviesc_pd = movies_clean.toPandas()

In [64]:
# profilec = ProfileReport(moviesc_pd, title="Movies Profiling Report", explorative=True)
# profilec.to_file("../data/ProfileReport/peliculas_clean_report.html")

In [65]:
spark.stop()