Import the libraries

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime
from fastapi import FastAPI, Request, Response

Indicate pandas to display float numbers using only 2 decimals

In [2]:
pd.set_option('display.float_format', '{:.2f}'.format)

# 1. ETL

### 1.1 Extraction: start by retrieving the data from various csv files.

##### 1.1.1 Create all the necessary dataframes

In [3]:
df1_ratings = pd.read_csv(r'MLOpsReviews/ratings/1.csv')
df2_ratings = pd.read_csv(r'MLOpsReviews/ratings/2.csv')
df3_ratings = pd.read_csv(r'MLOpsReviews/ratings/3.csv')
df4_ratings = pd.read_csv(r'MLOpsReviews/ratings/4.csv')
df5_ratings = pd.read_csv(r'MLOpsReviews/ratings/5.csv')
df6_ratings = pd.read_csv(r'MLOpsReviews/ratings/6.csv')
df7_ratings = pd.read_csv(r'MLOpsReviews/ratings/7.csv')
df8_ratings = pd.read_csv(r'MLOpsReviews/ratings/8.csv')

df_amazon = pd.read_csv(r'MLOpsReviews/amazon_prime_titles.csv')
df_disney = pd.read_csv(r'MLOpsReviews/disney_plus_titles.csv')
df_hulu = pd.read_csv(r'MLOpsReviews/hulu_titles.csv')
df_netflix = pd.read_csv(r'MLOpsReviews/netflix_titles.csv')

##### 1.1.2 Combine all the ratings datasets

In [4]:
df_ratings = pd.concat([df1_ratings, df2_ratings, df3_ratings, df4_ratings, 
                        df5_ratings, df6_ratings, df7_ratings, df8_ratings])

### 1.2 Transformation: basic EDA and data cleaning/preparation.

### _First group of datasets  (ratings 1 - 8)_

##### 1.2.1 Checking for null values

In [5]:
df_ratings.isnull().sum()

userId       0
rating       0
timestamp    0
movieId      0
dtype: int64

##### 1.2.2 Checking and dropping duplicates

In [6]:
df_ratings.duplicated().sum()
df_ratings.drop_duplicates(inplace=True)

##### 1.2.3 Checking data types of each column.

In [7]:
df_ratings.dtypes

userId         int64
rating       float64
timestamp      int64
movieId       object
dtype: object

##### 1.2.4 Create a new column called date with a proper date format. I will NOT drop the timestamp column for now.

In [8]:
df_ratings['date'] = pd.to_datetime(df_ratings['timestamp'], unit='s').dt.strftime('%Y-%m-%d')

##### 1.2.5 Get the average rating grouped by unique movies and store the array in a variable

In [9]:
average_score = df_ratings.groupby('movieId')['rating'].mean()

##### 1.2.6 Make a new dataframe containing the average rating and the movie ID from the previous variable by resetting the index. This dataframe will be used later on to create a bigger dataframe with more data.

In [10]:
df_average_score = average_score.reset_index()[['movieId', 'rating']]

### _Second group of datasets (information about movies on Amazon, Disney, Hulu and Netflix)._

##### 1.2.7 Check for duplicates

In [11]:
print(df_amazon.duplicated().sum())
print(df_disney.duplicated().sum())
print(df_hulu.duplicated().sum())
print(df_netflix.duplicated().sum())

0
0
0
0


##### 1.2.8 Make a list of the platforms dataframes

In [12]:
platforms = [df_amazon, df_disney, df_hulu, df_netflix]

##### 1.2.9 Assing a name to these dataframes to create a composite ID later

In [13]:
df_amazon.name = 'amazon'
df_disney.name = 'disney'
df_hulu.name = 'hulu'
df_netflix.name = 'netflix'

##### 1.2.10 Create a new column at the start of each of the platforms dataframes with the name 'id' and a value corresponding of the fist letter of the name of the platform and the show_id

In [14]:
for i in platforms:
    i['platform'] = i.name
    i.insert(loc=0, column='id', value= i.name[0]+i['show_id'])

##### 1.2.11 Merge the previous four dataframes into a new one provided that they already have an ID column and a platform column

In [15]:
df_platforms = pd.concat([df_amazon, df_disney, df_hulu, df_netflix])

##### 1.2.12 Check if the amount of unique movies in these platforms coincides with the amount of unique movies in the ratings data set

In [16]:
len(df_ratings['movieId'].unique()) == len(df_platforms['id'].unique())

True

##### 1.2.13 Replace the null values in the rating column with the string "G"

In [17]:
df_platforms['rating'] = df_platforms['rating'].fillna("G")

##### 1.2.14 Remove empty spaces at the beggining of the string and then use the pandas to_datetime function to convert the original string into a proper date object format

In [18]:
df_platforms['date_added'] = df_platforms['date_added'].str.strip()
df_platforms['date_added'] = pd.to_datetime(df_platforms['date_added'], format='%B %d, %Y')

##### 1.2.15 Go through every cell, select only the string type cells and apply the lower function to those, leave the rest as they are.

In [19]:
df_platforms.iloc[:] = df_platforms.iloc[:].applymap(lambda x: x.lower() if isinstance(x, str) else x)

##### 1.2.16 Do the same for the ratings dataframe

In [20]:
df_ratings.iloc[:] = df_ratings.iloc[:].applymap(lambda x: x.lower() if isinstance(x, str) else x)

##### 1.2.17 I split the duration column into two new columns using the split method. Then I transform all the missing values in the duration_int column to 0 in order to be able to transform it into an integer value.

In [21]:
df_platforms[['duration_int', 'duration_type']] = df_platforms['duration'].str.split(expand=True)
df_platforms['duration_int'] = df_platforms['duration_int'].fillna(0).astype(int)

### 1.3 Load: create and deploy an API using the _fastAPI_ framework and Render

##### Create an instance of the FastAPI class

In [22]:
app = FastAPI()

##### 1.3.1 First endpoint for getting the movie with the longest duration

In [24]:
@app.get('/get_max_duration/{anio}/{plataforma}/{dtype}')

# Define the function to handle the endpoint
def get_max_duration(anio: int, plataforma: str, dtype: str):
    
    # Check if input values are valid and exist in DataFrame
    assert anio in df_score['release_year'].unique(), f"Invalid year: {anio}"
    assert plataforma.lower() in df_score['platform'].unique(), f"Invalid platform: {plataforma}"
    assert dtype.lower() in df_score['duration_type'].unique(), f"Invalid duration type: {dtype}"
    
    # Filter the platform data for the requested platform name, year and duration type of the movie
    filter_1 = df_score.loc[(df_score['release_year'] == anio) & 
                                (df_score['duration_type'] == dtype.lower()) & 
                                (df_score['platform'] == plataforma.lower()) & 
                                (df_score['type'] == 'movie')]
    
    # Check if filtered data is empty
    if filter_1.empty:
        return {"error": "No movies found with the specified criteria."}
    
    # Sort the filtered data by duration
    filter_1 = filter_1.sort_values('duration_int', ascending=False)
    
    # Find the movie(s) with the maximum duration
    max_duration = filter_1['duration_int'].max()
    
    if len(filter_1.loc[filter_1['duration_int'] == max_duration]) > 1:
        response_1 = (filter_1.loc[filter_1['duration_int'] == max_duration]).tolist()
        return {"peliculas": response_1}
    else:
        return {"pelicula": filter_1.loc[filter_1['duration_int'].idxmax(), 'title']}

##### 1.3.2 Second endpoint that gets the amount of movies with a score higher than a specified amount. The inputs the platform name, the release year and the score to apply the corresponding filters.

###### 1.3.2.1 Add the average score from the df_average_rating dataframe to df_platforms by combining both tables through a common column (id) but first I rename one of the columns.

In [25]:
df_average_score = df_average_score.rename(columns={'movieId':'id'})

In [26]:
df_score = pd.merge(df_platforms, df_average_score, on='id')

###### 1.3.2.2 Rename some more columns for better readability

In [27]:
df_score = df_score.rename(columns={'rating_x':'rating','rating_y':'score'})

###### 1.3.2.3 Defining the logic for the endpoint

In [28]:
@app.get('/get_score_count/{plataforma}/{scored}/{anio}')

def get_score_count(plataforma: str, scored: float, anio: int):
    
    # Check if input values are valid and exist in DataFrame
    assert anio in df_score['release_year'].unique(), f"Invalid year: {anio}"
    assert plataforma.lower() in df_score['platform'].unique(), f"Invalid platform: {plataforma}"
    assert 0.5 <= scored <= 5.0, f"Invalid score (must be between 0.5 and 5): {scored}"
    
    # Filter the platform data for the requested platform, year and score of the movie
    filter_2 = df_score.loc[(df_score['release_year'] == anio) & 
                            (df_score['platform'] == plataforma.lower()) & 
                            (df_score['score'] > scored) &
                            (df_score['type'] == 'movie')]
    
    # Checks if filtered data is empty. If not it returns. the desired information.
    if filter_2.empty:
        return {"error": "No movies found with the specified criteria."}
    else:
        return {'plataforma': plataforma,
                'cantidad': filter_2.shape[0],
                'anio': anio,
                'score': scored}

##### 1.3.3 Third endpoint: shows the amount of movies in the specified platform.

In [29]:
@app.get('/get_count_platform/{plataforma}')
def get_count_platform(plataforma: str):

    # Check if input value is valid and exists in DataFrame
    assert plataforma.lower()  in df_score['platform'].unique(), f"Invalid platform: {plataforma}"
    
    # Filters the platform data for the requested platform and content type (movies only)
    filter_3 = df_score.loc[(df_score['platform'] == plataforma.lower()) &
                            (df_score['type'] == 'movie')]

    return {'plataforma': plataforma, 'peliculas': filter_3.shape[0]}

##### 1.3.4 Fourth endpoint: shows the actor that appears more frequently in the specified year and platform.

In [30]:
@app.get('/get_actor/{plataforma}/{anio}')
def get_actor(plataforma: str, anio: int):

    # Checks if input values are valid and exist in DataFrame
    assert plataforma.lower()  in df_score['platform'].unique(), f"Invalid platform: {plataforma}"
    assert anio in df_score['release_year'].unique(), f"Invalid year: {anio}"
    
    # Filter the data for the requested platform and year
    filter_4 = df_score.loc[(df_score['release_year'] == anio) & 
                            (df_score['platform'] == plataforma.lower())]
    
    # Checks if filtered data is empty. If not, it returns the desired information.
    if filter_4.empty:
        return {"error": "No result was found with the specified criteria."}
    else:
        # split the strings in the 'cast' column on comma separator, flatten the list of lists, remove spaces and count frequencies
        response_4 = filter_4['cast'].str.split(',').explode().str.strip().value_counts()
        return {
                'plataforma': plataforma,
                'anio': anio,
                'actor': response_4.index[0],
                'apariciones': response_4.iloc[0]
                }

##### 1.3.5 Fifth endpoint: shows the amount of available contents in the platforms in the specified type of content, year and country.

In [31]:
@app.get('/prod_per_county/{tipo}/{pais}/{anio}')
def prod_per_county(tipo: str, pais: str, anio: int):

    # Checks if input values are valid and exist in DataFrame
    assert tipo.lower()  in df_score['type'].unique(), f"Invalid type of content: {tipo}"
    assert pais.lower()  in df_score['country'].unique(), f"Invalid country: {pais}"
    assert anio in df_score['release_year'].unique(), f"Invalid year: {anio}"
       
    # Filter the data for the requested type of content, year and country
    filter_5 = df_score.loc[(df_score['type'] == tipo.lower() ) & 
                            (df_score['country'] == pais.lower() ) & 
                            (df_score['release_year'] == anio)]       
        
    # Checks if filtered data is empty. If not, it returns the desired information.
    if filter_5.empty:
        return {"error": "No result was found with the specified criteria."}
    else:
         return {'pais': pais, 'anio': anio, 'tipo': tipo, 'peliculas': filter_5.shape[0]}

##### 1.3.6 Sixth endpoint: shows the total amount of contents based on the audience rating.

In [32]:
@app.get('/get_contents/{rating}')
def get_contents(rating: str):
    
    # Checks if the input value is valid and exists in DataFrame
    assert rating.lower() in df_score['rating'].unique(), f"Invalid rating: {rating}"
    
    # Filters the data for the requested audience
    filter_6 = df_score.loc[df_score['rating'] == rating.lower()]       
        
    # Checks if filtered data is empty. If not, it returns the desired information.
    if filter_6.empty:
        return {"error": "No result was found with the specified criteria."}
    else:
        return {'rating': rating, 'contenido': filter_6.shape[0]}

##### 1.3.7 Deployment

##### 2.1 ML QUERY

In [None]:
# @app.get('/get_recomendation/{title}')
# def get_recomendation(title,):
    #    
    # return {'recomendacion':respuesta}