In [2]:
import sys
import os

# Agregar la ruta del directorio scripts
sys.path.append(os.path.abspath(os.path.join('..', 'scripts')))

### Extract and Transform

Se convirtio el dataset `movies_dataset.csv` a formato `parquet` con las siguientes modificaciones:

- **Columnas eliminadas**: `video`, `imdb_id`, `adult`, `original_title`, `poster_path`, `homepage`
- **Valores nulos rellenados**:
  - `revenue`: 0
  - `budget`: 0
- **Conversión de tipos**:
  - `popularity`: `float64`
  - `budget`: `float64`
  - `id`: `float64`

En la carpeta `scripts/convert_csv_to_parquet.py` puedes ver el funcionamiento de la funcion. Asegurate de poner bien la ruta y que exista el archivo.
- Una vez transformado se recomienda eliminar el archivo .csv 

In [3]:
from convert_csv_to_parquet import convertir_csv_a_parquet

convertir_csv_a_parquet(
    r'C:\Users\mauri\OneDrive\Escritorio\MLops\data\raw\movies_dataset.csv', # Origen
    r'C:\Users\mauri\OneDrive\Escritorio\MLops\data\raw\movies_dataset.parquet', # Destino
    columns_to_drop=['video','imdb_id','adult','original_title','poster_path','homepage'],
    fillna_values={'revenue': 0, 'budget': 0},
    dtype_conversion={'popularity': float, 'budget': int, 'id': int}
)

In [4]:
import pandas as pd

df = pd.read_parquet(r'C:\Users\mauri\OneDrive\Escritorio\MLops\data\raw\movies_dataset.parquet')
df.head(3)

Unnamed: 0,belongs_to_collection,budget,genres,id,original_language,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count
0,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000.0,"[{'id': 16, 'name': 'Animation'}, {'id': 35, '...",862.0,en,"Led by Woody, Andy's toys live happily in his ...",21.946943,"[{'name': 'Pixar Animation Studios', 'id': 3}]","[{'iso_3166_1': 'US', 'name': 'United States o...",1995-10-30,373554033.0,81.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,7.7,5415.0
1,,65000000.0,"[{'id': 12, 'name': 'Adventure'}, {'id': 14, '...",8844.0,en,When siblings Judy and Peter discover an encha...,17.015539,"[{'name': 'TriStar Pictures', 'id': 559}, {'na...","[{'iso_3166_1': 'US', 'name': 'United States o...",1995-12-15,262797249.0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,6.9,2413.0
2,"{'id': 119050, 'name': 'Grumpy Old Men Collect...",0.0,"[{'id': 10749, 'name': 'Romance'}, {'id': 35, ...",15602.0,en,A family wedding reignites the ancient feud be...,11.7129,"[{'name': 'Warner Bros.', 'id': 6194}, {'name'...","[{'iso_3166_1': 'US', 'name': 'United States o...",1995-12-22,0.0,101.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Still Yelling. Still Fighting. Still Ready for...,Grumpier Old Men,6.5,92.0


Eliminar valores nulos de `release_date`

In [5]:
df = df.dropna(subset=['release_date'])

formato `AAAA-mm-dd` en fechas, y creacion de columna `release_year` para el año de estreno.

In [6]:
# Identificar las filas con valores incorrectos
incorrect_format = df[~df['release_date'].str.match(r'^\d{4}-\d{2}-\d{2}$', na=False)]
# Muestra las filas con formato incorrecto.
incorrect_format

# Ya que los errores no pueden ser corregidos manualmente por falta de informacion
# Y ademas las demas filas parecieran incorrectas y con muchos Nan. Se procede a eliminar estos datos.

Unnamed: 0,belongs_to_collection,budget,genres,id,original_language,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count
19730,0.065736,,"[{'name': 'Carousel Productions', 'id': 11176}...",,104.0,Released,,False,6.0,1,0.0,,,,,,,
29503,1.931659,,"[{'name': 'Aniplex', 'id': 2883}, {'name': 'Go...",,68.0,Released,,False,7.0,12,0.0,,,,,,,
35587,2.185485,,"[{'name': 'Odyssey Media', 'id': 17161}, {'nam...",,82.0,Released,,False,4.3,22,0.0,,,,,,,


In [7]:
# Se borraron esas 3 filas del df.
df.drop(incorrect_format.index, inplace=True)

In [8]:
# Ahora si pasamos a datetime con formato aaaa-mm-dd
df['release_date'] = pd.to_datetime(df['release_date'], format='%Y-%m-%d')

# Creacion columna release_year y muestra. 
df['release_year'] = df['release_date'].dt.year
df[['release_date','release_year']].sample(5)


Unnamed: 0,release_date,release_year
17785,2005-11-04,2005
43209,1998-01-01,1998
43941,1970-12-19,1970
30265,1956-07-21,1956
32833,2005-01-08,2005


crear columna  `return` dividiendo  `revenue` / `budget`, cuando no hay datos para calcularlo, tomar valor 0.



In [9]:
df['return'] = df.apply(lambda row: row['revenue'] / row['budget'] if 
                        pd.notnull(row['revenue']) and
                        pd.notnull(row['budget']) and
                        row['budget'] != 0 else
                        0, axis=1)

de `status` eliminaremos todas las peliculas que no hayan sido publicadas. Luego eliminaremos la columna ya que todas seran `Released`.

In [10]:
# de `status` eliminaremos todas las peliculas que no hayan sido publicadas. Luego eliminaremos la columna ya que todas seran `Released`.
df = df[df['status'] == 'Released']
df = df.drop('status', axis=1)

df = df.reset_index(drop=True)

- `belongs_to_collection `,  `production_companies ` ,  `genres ` ,  `production_countries`,  `spoken_languages` están anidados.
- deberán desanidarlos O buscar la manera de acceder sin desanidarlos.

### Empecemos por `belongs_to_collection `

In [11]:
# Esta columna contiene 40888 datos nulos y sus formatos en str . 
df['belongs_to_collection'].apply(type).value_counts() 

belongs_to_collection
<class 'NoneType'>    40470
<class 'str'>          4466
Name: count, dtype: int64

In [12]:
from convert_str_to_list_dict import convert_list_dict

df['belongs_to_collection'] = df['belongs_to_collection'].apply(convert_list_dict)

# Mostrar el DataFrame con los tipos corregidos
df[['belongs_to_collection']].head(4)

Unnamed: 0,belongs_to_collection
0,"{'id': 10194, 'name': 'Toy Story Collection', ..."
1,
2,"{'id': 119050, 'name': 'Grumpy Old Men Collect..."
3,


In [13]:
# Ahora si todos son diccionarios y podemos desanidar.
df['belongs_to_collection'].apply(type).value_counts() 

belongs_to_collection
<class 'NoneType'>    40470
<class 'dict'>         4466
Name: count, dtype: int64

In [14]:
# Desanidar los diccionarios en la columna 'belongs_to_collection'
belong_desanidado = pd.json_normalize(df['belongs_to_collection'])

# Renombrar las columnas agregando el prefijo 'btc_'
belong_desanidado.rename(columns={'id': 'btc_id',
                                  'name':'btc_name',
                                  'poster_path':'btc_poster_path',
                                  'backdrop_path': 'btc_backdrop_path'}, inplace=True)
belong_desanidado.head(4)

Unnamed: 0,btc_id,btc_name,btc_poster_path,btc_backdrop_path
0,10194.0,Toy Story Collection,/7G9915LfUQ2lVfwMEEhDsn3kT4B.jpg,/9FBwqcd9IRruEDUrTdcaafOMKUq.jpg
1,,,,
2,119050.0,Grumpy Old Men Collection,/nLvUdqgPgm3F85NMCii9gVFUcet.jpg,/hypTnLot2z8wpFS7qwsQHW1uV8u.jpg
3,,,,


In [15]:
df = pd.concat([df, belong_desanidado], axis=1)

### Desanidemos las demas columnas anidadas. `genres`. Que tambien tiene sus datos en str.

In [16]:
from convert_str_to_list_dict import convert_list_dict

# Aplicar la función a la columna 'genres'
df['genres'] = df['genres'].apply(convert_list_dict)

In [17]:
# Mas del 99% del dataset fue transformado exitosamente.
df['genres'].apply(type).value_counts()

genres
<class 'list'>    44936
Name: count, dtype: int64

In [18]:
# Vemos que los NoneTypes son valores nulos en todas sus columnas por ende los eliminaremos. del df.
nulos = df[~df['genres'].apply(lambda x: isinstance(x, list))]
nulos

Unnamed: 0,belongs_to_collection,budget,genres,id,original_language,overview,popularity,production_companies,production_countries,release_date,...,tagline,title,vote_average,vote_count,release_year,return,btc_id,btc_name,btc_poster_path,btc_backdrop_path


In [19]:
df = df.dropna(subset=['genres'])
# Ahora solo quedaron valores listas.
df['genres'].apply(type).value_counts()

genres
<class 'list'>    44936
Name: count, dtype: int64

Como estos son listas de diccionarios y no un solo diccionario con unico valor para cada fila, la logica es que existan multiples valores `id` y `names` para cada fila.
Por lo tanto voy a usar lo logica de joins de SQL. Poniendo solo un identificador unico para cada fila de `genres` llamado `genres_id` en el dataframe orignal, pero que ese `genres_id` pueda compartir multiples valores en la tabla `genres_desanidado`. que luego puede ser mergeada o concatenada para saber sus multiples valores. 

In [20]:
from desanidar_columnas import desanidar_columna
# Aplicamos la funcion y ahora tenemos la tabla desanidada y la tabla original con los id pertenecientes de la tabla desanidada.
#Le asignamos el nombre que en realidad es un Dataframe a una variable.
genres_desanidado = desanidar_columna(df, 'genres', 'genres_desanidado')

genres_desanidado.head(4)

Unnamed: 0,id,name,genres_id
0,16.0,Animation,0
1,35.0,Comedy,0
2,10751.0,Family,0
3,12.0,Adventure,1


### Sigamos desanidado columnas anidadas. Ahora: `production_companies`.

In [21]:
# Vemos que la mayoria de los valores son listas de diccionarios mal tipeados. y que contiene 88 floats.
df['production_companies'].apply(type).value_counts()

production_companies
<class 'str'>    44936
Name: count, dtype: int64

In [22]:
# Aplicamos nuestra funcion creada para automatizar procesos.
df['production_companies'] = df['production_companies'].apply(convert_list_dict)
df['production_companies'].apply(type).value_counts()

production_companies
<class 'list'>    44936
Name: count, dtype: int64

In [23]:
# Aplicamos la funcion que desanida las columnas en otra tabla. 
# 1 es para que no sean igual los id a genres_id
# Lo asignamos como una variable que es ahi donde estaran las tablas
pc_desanidado = desanidar_columna(df, 'production_companies', 'pc_desanidado', 1)

pc_desanidado.head(4)

Unnamed: 0,name,id,production_companies_id
0,Pixar Animation Studios,3.0,1
1,TriStar Pictures,559.0,2
2,Teitler Film,2550.0,2
3,Interscope Communications,10201.0,2


### Sigamos desanidando. Ahora: `production_countries`. Que contiene el mismo formato de datos que production_companies.

In [24]:
# Seguimos aplicando la misma funcion. 
df['production_countries'] = df['production_countries'].apply(convert_list_dict)
df['production_countries'].apply(type).value_counts()

production_countries
<class 'list'>    44936
Name: count, dtype: int64

In [25]:
# Aplicamos la funcion que desanida las columnas en otra tabla. 
# Lo asignamos como una variable que es ahi donde estaran las tablas
pctry_desanidado = desanidar_columna(df, 'production_countries', 'pctry_desanidado', 2)

pctry_desanidado.head(4)

Unnamed: 0,iso_3166_1,name,production_countries_id
0,US,United States of America,2
1,US,United States of America,3
2,US,United States of America,4
3,US,United States of America,5


### Desanidemos `spoken_languages`. Otra lista de diccionarios. Que repite el mismo procedimiento.

In [26]:
df['spoken_languages'] = df['spoken_languages'].apply(convert_list_dict)
df['spoken_languages'].apply(type).value_counts()

spoken_languages
<class 'list'>    44936
Name: count, dtype: int64

In [27]:
# Aplicamos la funcion que desanida las columnas en otra tabla. 
# Lo asignamos como una variable que es ahi donde estaran las tablas
slan_desanidado = desanidar_columna(df, 'spoken_languages', 'slan_desanidado', 3)

slan_desanidado.head(4)


Unnamed: 0,iso_639_1,name,spoken_languages_id
0,en,English,3
1,en,English,4
2,fr,Français,4
3,en,English,5


Optimicemos el tamaño eliminando columnas inncesarias. Ademas podemos eliminar las columnas anidadas, que ya estan desanidadas.

In [28]:

# btc_poster_path y btc_backdrop_path son columnas no necesarias. belong_to_collection ya esta desanidado. 
df = df.drop('btc_poster_path', axis=1)
df = df.drop('btc_backdrop_path', axis=1)
df = df.drop('belongs_to_collection', axis=1)

# La frase celebre de una pelicula no es importante para recomendar a travez de un titulo, Se eliminara.
df = df.drop('tagline', axis=1)

In [29]:
#  Ya esta desanidado. eliminamos.
df = df.drop('genres', axis=1)
df = df.drop('production_companies', axis=1)
df = df.drop('production_countries', axis=1)
df = df.drop('spoken_languages', axis=1)

Modificaciones necesarias para la API

In [30]:
# Cambiamos a minisculas las peliculas para que la api no diferencie entre minusculas y mayusculas los titulos.
df['title'] = df['title'].str.lower()
df['overview'] = df['overview'].str.lower()

# columnas tipo float. cambiemoslo a int.
df['release_year'] = df['release_year'].astype(int)
df['vote_count'] = df['vote_count'].astype(int)
df['id'] = df['id'].astype(int)

# Redondear columnas a 2 decimales
df['return'] = df['return'].round(2)
df['popularity'] = df['popularity'].round(2)

df = df.reset_index(drop=True)


Hemos terminado movies_dataset. Asi ha quedado, luego de las transformaciones:

In [31]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 44936 entries, 0 to 44935
Data columns (total 19 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   budget                   44936 non-null  float64       
 1   id                       44936 non-null  int64         
 2   original_language        44926 non-null  object        
 3   overview                 44029 non-null  object        
 4   popularity               44936 non-null  float64       
 5   release_date             44936 non-null  datetime64[ns]
 6   revenue                  44936 non-null  float64       
 7   runtime                  44696 non-null  float64       
 8   title                    44936 non-null  object        
 9   vote_average             44936 non-null  float64       
 10  vote_count               44936 non-null  int64         
 11  release_year             44936 non-null  int64         
 12  return                   44936 n

## Load

Tambien hemos creado tablas extras:
- `genres_desanidado`
- `pc_desanidado`
- `pctry_desanidado`
- `slan_desanidado`

que exportaremos a junto con el dataset principal, a la carpeta processed. Ya que ya esta listo para pasar al eda. 

In [32]:
# Asegúrate de que el directorio de destino exista
output_dir = '../data/processed/movies/'
os.makedirs(output_dir, exist_ok=True)

Exportamos el df principal.

In [33]:
# Ruta y nombre del archivo a guardar.
parquet_path = os.path.join(output_dir, 'movies_dataset_etl.parquet')
# Guardar el DataFrame en la carpeta data/processed/movies
df.to_parquet(parquet_path, engine='pyarrow', compression='snappy', index=False)

print("Datos exportados correctamente a 'data/processed/'")

Datos exportados correctamente a 'data/processed/'


Exportamos las tablas extras.

In [34]:
# Ruta y nombre del archivo a guardar.
parquet_path = os.path.join(output_dir, 'genres_desanidado.parquet')
# Guardar el DataFrame en la carpeta data/processed
genres_desanidado.to_parquet(parquet_path, engine='pyarrow', compression='snappy', index=False)

print("Datos exportados correctamente a 'data/processed'")

Datos exportados correctamente a 'data/processed'


In [35]:
# Ruta y nombre del archivo a guardar.
parquet_path = os.path.join(output_dir, 'pc_desanidado.parquet')
# Guardar el DataFrame en la carpeta data/processed
pc_desanidado.to_parquet(parquet_path, engine='pyarrow', compression='snappy', index=False)

print("Datos exportados correctamente a 'data/processed'")

Datos exportados correctamente a 'data/processed'


La tabla slan_desanidado no sera exportada, y existira solo en este notebook. Ya que es irrelevante para este mvp.

La tabla slan_desanidado no sera exportada, y existira solo en este notebook. Ya que es irrelevante para este mvp.