In [2]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor

In [3]:
# Cargar los dataframes desde archivos parquet
df_items = pd.read_parquet('../data/interim/parquet/final_items.parquet')
df_games = pd.read_parquet('../data/interim/parquet/final_games.parquet')
df_reviews = pd.read_parquet('../data/interim/parquet/final_reviews.parquet')

In [4]:
# Asegurar que la columna 'item_id' sea de tipo int
df_items['item_id'] = df_items['item_id'].astype(int)

In [5]:
# Primer merge: unir df_items y df_games en 'item_id' y 'id'
merged_df_1 = pd.merge(df_items, df_games, left_on='item_id', right_on='id')

# Segundo merge: unir el DataFrame resultante con df_reviews en 'user_id'
final_merged_df = pd.merge(merged_df_1, df_reviews, on='user_id')



In [6]:
# Eliminar la columna item_id_y y renombrar item_id_x a item_id
final_merged_df = final_merged_df.drop(columns=['item_id_y']).rename(columns={'item_id_x': 'item_id'})

# Definir el nuevo orden de las columnas (mover las columnas de géneros al final para una mejor organización)

new_column_order = [
    'user_id', 'steam_id', 'item_id', 'developer', 'item_name', 'playtime_forever',
    'app_name', 'id', 'price', 'release_year', 'funny', 'posted', 
    'helpful', 'recommend', 'sentiment_analysis',
    # Mover las columnas de géneros al final
    'Indie', 'Action', 'Adventure', 'Casual', 'Simulation', 'Strategy', 'RPG',
    'Singleplayer', 'Free to Play', 'Multiplayer', 'Great Soundtrack', 'Puzzle',
    'Early Access', '2D', 'Atmospheric', 'VR', 'Sports', 'Platformer',
    'Story Rich', 'Sci-fi', 'Fantasy', 'Horror', 'Open World', 'Difficult',
    'Anime', 'Massively Multiplayer', 'Pixel Graphics', 'Co-op', 'Shooter',
    'Racing', 'Female Protagonist', 'Funny', 'First-Person', 'FPS', 'Sandbox',
    'Arcade'
]

# Reorganizar las columnas
final_merged_df = final_merged_df.reindex(columns=new_column_order)

# Seleccionar las columnas relevantes para análisis de género y tiempo de juego
genre_playtime_df = final_merged_df[['user_id', 'playtime_forever', 'release_year', 'Indie', 'Action', 'Adventure', 'Casual', 'Simulation', 'Strategy', 'RPG', 'Singleplayer', 'Free to Play', 'Multiplayer', 'Great Soundtrack', 'Puzzle', 'Early Access', '2D', 'Atmospheric', 'VR', 'Sports', 'Platformer', 'Story Rich', 'Sci-fi', 'Fantasy', 'Horror', 'Open World', 'Difficult', 'Anime', 'Massively Multiplayer', 'Pixel Graphics', 'Co-op', 'Shooter', 'Racing', 'Female Protagonist', 'Funny', 'First-Person', 'FPS', 'Sandbox', 'Arcade']]

# Crear una copia y asegurando que 'release_year' es de tipo int
genre_playtime_df = genre_playtime_df.copy()
genre_playtime_df['release_year'] = genre_playtime_df['release_year'].astype(int)

In [7]:
# Creando dataframes para análisis específicos

recommendations_df = final_merged_df[['app_name', 'release_year', 'recommend', 'sentiment_analysis']]

developer_df = final_merged_df[['developer', 'item_id', 'release_year', 'Free to Play']]

userdata_df = final_merged_df[['user_id', 'price', 'recommend']]

best_developer_year_df = final_merged_df[['developer', 'release_year', 'recommend']]

developer_reviews_analysis_df = final_merged_df[['developer', 'sentiment_analysis']]


In [8]:
def precalculate_and_save(endpoint_function, dataframe, column, output_filename):
    """
    Precalcula datos utilizando una función de endpoint específica y los guarda en un archivo Parquet.
    
    Parameters:
    endpoint_function (function): La función que procesará los datos.
    dataframe (pd.DataFrame): El DataFrame que contiene los datos a procesar.
    column (str): La columna que contiene los valores únicos para procesar.
    output_filename (str): El nombre del archivo donde se guardarán los datos procesados.
    
    """
    # Obtener los valores únicos en la columna especificada
    unique_values = dataframe[column].unique()
    
    # Utilizar ThreadPoolExecutor para ejecutar las operaciones en paralelo
    with ThreadPoolExecutor() as executor:
        # Mapear la función endpoint a cada valor único
        all_processed_data_list = list(executor.map(endpoint_function, unique_values))
        
    # Concatenar todos los datos procesados
    all_processed_data = pd.concat(all_processed_data_list)
    
    # Establecer el índice del DataFrame resultante a la columna especificada
    all_processed_data.set_index(column, inplace=True)
    
    # Guardar todos los datos procesados en un archivo Parquet
    all_processed_data.to_parquet(output_filename)
    
    # Imprimir cómo se vería el código en main.py para acceder a estos datos precalculados
    endpoint_name = endpoint_function.__name__
    print(f"""
@app.get("/{endpoint_name}/{{arg}}")
async def {endpoint_name}(arg: str):
    try:
        # Cargar los datos precalculados
        data = pd.read_parquet('{output_filename}')
        # Suponiendo que los datos están indexados por el argumento proporcionado
        result = data.loc[arg]
        return result.to_dict()
    except KeyError:
        raise HTTPException(status_code=404, detail="No se encontraron datos para el argumento proporcionado.")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    """)

# Ejemplo de uso:
# precalculate_and_save(mi_funcion, mi_dataframe, 'mi_columna', 'mi_archivo.parquet')


In [9]:
def developer(desarrollador: str):
    """
    Obtener la cantidad de ítems y el porcentaje de contenido gratuito por año, según la empresa desarrolladora.

    Parameters:
    desarrollador (str): El nombre de la empresa desarrolladora.

    Returns:
    pd.DataFrame: Un DataFrame con los datos organizados por año.
    """
    # Filtrar los datos por desarrollador
    dev_df = developer_df[developer_df['developer'] == desarrollador]
    # Agrupar los datos por año y calcular las métricas deseadas
    yearly_data = dev_df.groupby('release_year').agg(
        item_count=('item_id', 'count'), 
        free_content=('Free to Play', 'mean')
    )
    # Convertir la proporción de contenido gratuito a porcentaje
    yearly_data['free_content'] = (yearly_data['free_content'] * 100).apply(lambda x: f'{x:.0f}%')
    # Reorganizar el DataFrame y renombrar las columnas
    yearly_data = yearly_data.reset_index().rename(columns={'release_year': 'Año', 'item_count': 'Cantidad de Items', 'free_content': 'Contenido Free'})
    # Añadir la columna 'developer' al DataFrame
    yearly_data['developer'] = desarrollador
    
    return yearly_data



In [10]:
developer('Ubisoft')

Unnamed: 0,Año,Cantidad de Items,Contenido Free,developer
0,2003,1475,0%,Ubisoft
1,2004,586,0%,Ubisoft
2,2005,234,0%,Ubisoft
3,2007,51,0%,Ubisoft
4,2014,4240,0%,Ubisoft
5,2016,449,0%,Ubisoft


In [11]:
def userdata(User_id: str):
    """
    Obtener información del usuario, incluyendo el dinero gastado, el porcentaje de recomendación y la cantidad de ítems.

    Parameters:
    User_id (str): El ID del usuario.

    Returns:
    pd.DataFrame: Un DataFrame con la información del usuario.
    """
    # Filtrar los datos por usuario
    user_df = userdata_df[userdata_df['user_id'] == User_id]
    # Filtrar los precios válidos (excluir -1)
    valid_prices_df = user_df[user_df['price'] != -1]
    # Calcular el total gastado
    total_spent = valid_prices_df['price'].sum()
    # Calcular el porcentaje de recomendación
    recommend_percentage = user_df['recommend'].mean() * 100
    # Contar la cantidad de ítems
    item_count = len(user_df)
    # Crear un diccionario con los resultados
    result = {"user_id": User_id, "Dinero gastado": f"{total_spent} USD", "% de recomendación": f"{recommend_percentage:.2f}%", "cantidad de items": item_count}
    # Convertir el diccionario result a un DataFrame
    result_df = pd.DataFrame([result])
    return result_df

In [12]:
# Reemplazar -1 con 'Desconocido' en la columna 'release_year' antes de la iteración
genre_playtime_df['release_year'] = genre_playtime_df['release_year'].replace(-1, 'Desconocido')

def UserForGenre(genero: str):
    """
    Obtener el usuario con más horas jugadas para un género dado y una lista de la acumulación de horas jugadas por año de lanzamiento.
    
    Parameters:
    genero (str): El género del juego.
    
    Returns:
    pd.DataFrame: Un DataFrame con la información del usuario y las horas jugadas por año.
    """
    # Filtrar los datos por género
    genre_df = genre_playtime_df[genre_playtime_df[genero] == 1]
    # Calcular la suma de tiempo jugado por usuario
    user_playtime = genre_df.groupby('user_id')['playtime_forever'].sum()
    # Identificar el usuario con el máximo tiempo jugado
    max_playtime_user = user_playtime.idxmax()
    # Filtrar los datos por el usuario con el máximo tiempo jugado
    user_df = genre_df[genre_df['user_id'] == max_playtime_user]
    # Calcular la suma de tiempo jugado por año
    user_year_playtime = user_df.groupby('release_year')['playtime_forever'].sum().reset_index()
    # Convertir los datos a formato de diccionario
    hours_played = user_year_playtime.to_dict('records')
    # Crear un DataFrame con los resultados
    result_df = pd.DataFrame([{
        "Genero": genero,
        "Usuario con más horas jugadas": max_playtime_user,
        "Horas jugadas": str(hours_played)
    }])
    return result_df


# Crear una lista con todos los géneros
all_genres = [
        'Indie', 'Action', 'Adventure', 'Casual', 'Simulation', 'Strategy', 'RPG',
        'Singleplayer', 'Free to Play', 'Multiplayer', 'Great Soundtrack', 'Puzzle',
        'Early Access', '2D', 'Atmospheric', 'VR', 'Sports', 'Platformer',
        'Story Rich', 'Sci-fi', 'Fantasy', 'Horror', 'Open World', 'Difficult',
        'Anime', 'Massively Multiplayer', 'Pixel Graphics', 'Co-op', 'Shooter',
        'Racing', 'Female Protagonist', 'Funny', 'First-Person', 'FPS', 'Sandbox',
        'Arcade'
    ]

# Crear un DataFrame vacío para almacenar los datos precalculados
all_processed_data_list = []

# Iterar sobre todos los géneros y precalcular los datos
for genero in all_genres:
    processed_data = UserForGenre(genero)
    all_processed_data_list.append(processed_data)

# Concatenar todos los datos procesados
all_processed_data = pd.concat(all_processed_data_list)   

all_processed_data.set_index('Genero', inplace=True)

# Guardar todos los datos procesados en un archivo Parquet
all_processed_data.to_parquet('../data/processed/UserForGenre_data.parquet')



In [13]:
UserForGenre('Multiplayer')

Unnamed: 0,Genero,Usuario con más horas jugadas,Horas jugadas
0,Multiplayer,Sp3ctre,"[{'release_year': 1998, 'playtime_forever': 0}..."


In [18]:

def best_developer_year(año: int):
    """
    Obtener el top 3 de desarrolladores con juegos más recomendados por usuarios para el año dado.
    
    Parameters:
    año (int): El año de lanzamiento.
    
    Returns:
    pd.DataFrame: Un DataFrame con el top 3 de desarrolladores.
    """
    # Filtrar los datos por año
    year_df = best_developer_year_df[best_developer_year_df['release_year'] == año]
    # Filtrar los datos por recomendación
    recommended_df = year_df[year_df['recommend'] == True]
    # Contar las recomendaciones por desarrollador
    developer_recommend_count = recommended_df.groupby('developer').size()
    # Identificar el top 3 de desarrolladores
    top_3_developers = developer_recommend_count.nlargest(3).index.tolist()

    # Verificar la longitud de top_3_developers y llenar con 'N/A' si es necesario
    while len(top_3_developers) < 3:
        top_3_developers.append('N/A')

    # Crear un diccionario con los resultados
    result = {
        "release_year": [año],
        "Puesto 1": [top_3_developers[0]],
        "Puesto 2": [top_3_developers[1]],
        "Puesto 3": [top_3_developers[2]]
    }
    # Crear un DataFrame con los resultados
    result_df = pd.DataFrame(result)
    # Convertir la columna release_year a entero
    result_df['release_year'] = result_df['release_year'].astype(int)
    return result_df




In [15]:
def developer_reviews_analysis(desarrolladora: str):
    """
    Obtener un análisis de las reseñas de una empresa desarrolladora específica.

    Parameters:
    desarrolladora (str): El nombre de la empresa desarrolladora.

    Returns:
    pd.DataFrame: Un DataFrame con el análisis de las reseñas.
    """
    # Filtrar los datos por desarrolladora
    developer_df = developer_reviews_analysis_df[developer_reviews_analysis_df['developer'] == desarrolladora]
    # Contar la cantidad de reseñas positivas y negativas
    sentiment_counts = developer_df['sentiment_analysis'].value_counts()
    # Crear un diccionario con los resultados
    result = {"developer": desarrolladora, "Negative": int(sentiment_counts.get(0, 0)), "Positive": int(sentiment_counts.get(2, 0))}
    # Convertir el diccionario result a un DataFrame
    result_df = pd.DataFrame([result])
    return result_df


In [None]:
# Aplicar la función
precalculate_and_save(developer, developer_df, 'developer', '../data/processed/developer_data.parquet')



@app.get("/developer/{arg}")
async def developer(arg: str):
    try:
        # Cargar los datos precalculados
        data = pd.read_parquet('developer_data.parquet')
        # Suponiendo que los datos están indexados por el argumento proporcionado
        result = data.loc[arg]
        return result.to_dict()
    except KeyError:
        raise HTTPException(status_code=404, detail="No se encontraron datos para el argumento proporcionado.")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    


In [20]:
precalculate_and_save(best_developer_year, best_developer_year_df, 'release_year', '../data/processed/best_developer_year_data.parquet')



@app.get("/best_developer_year/{arg}")
async def best_developer_year(arg: str):
    try:
        # Cargar los datos precalculados
        data = pd.read_parquet('../data/processed/best_developer_year_data.parquet')
        # Suponiendo que los datos están indexados por el argumento proporcionado
        result = data.loc[arg]
        return result.to_dict()
    except KeyError:
        raise HTTPException(status_code=404, detail="No se encontraron datos para el argumento proporcionado.")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    


In [None]:
precalculate_and_save(developer_reviews_analysis, developer_reviews_analysis_df, 'developer', '../data/processed/developer_reviews_analysis_data.parquet')




@app.get("/developer_reviews_analysis/{arg}")
async def developer_reviews_analysis(arg: str):
    try:
        # Cargar los datos precalculados
        data = pd.read_parquet('developer_reviews_analysis_data.parquet')
        # Suponiendo que los datos están indexados por el argumento proporcionado
        result = data.loc[arg]
        return result.to_dict()
    except KeyError:
        raise HTTPException(status_code=404, detail="No se encontraron datos para el argumento proporcionado.")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    


In [17]:
precalculate_and_save(userdata, userdata_df, 'user_id', '../data/processed/userdata_data.parquet')



@app.get("/userdata/{arg}")
async def userdata(arg: str):
    try:
        # Cargar los datos precalculados
        data = pd.read_parquet('userdata_data.parquet')
        # Suponiendo que los datos están indexados por el argumento proporcionado
        result = data.loc[arg]
        return result.to_dict()
    except KeyError:
        raise HTTPException(status_code=404, detail="No se encontraron datos para el argumento proporcionado.")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    


In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics.pairwise import cosine_similarity

# Funciones auxiliares para generar características y calcular similitud del coseno
def generate_features(df, group_by_col, feature_cols):
    feature_series_list = [df.groupby(group_by_col)[col].agg(['sum', 'mean']) for col in feature_cols]
    features_df = pd.concat(feature_series_list, axis=1)
    return features_df

def compute_cosine_similarity(features_df):
    scaler = MinMaxScaler()
    features_scaled = scaler.fit_transform(features_df)
    cosine_sim_matrix = cosine_similarity(features_scaled, features_scaled)
    cosine_sim_df = pd.DataFrame(cosine_sim_matrix, index=features_df.index, columns=features_df.index)
    return cosine_sim_df

def precalculate_similarity(df, group_by_col, feature_cols, output_filename):
    features_df = generate_features(df, group_by_col, feature_cols)
    cosine_sim_df = compute_cosine_similarity(features_df)
    cosine_sim_df.to_parquet(output_filename)

# Funciones para calcular recomendaciones
def calcular_recomendaciones_juego(df, game_cosine_sim_df, item_id):
    sim_scores = game_cosine_sim_df[str(item_id)]
    recommended_games_ids = sim_scores.sort_values(ascending=False)[1:6].index
    recommended_games_names = df.set_index('item_id').loc[recommended_games_ids, 'app_name'].unique()
    result = {"Juegos recomendados": [recommended_games_names.tolist()]}
    return pd.DataFrame(result, index=[item_id])

def calcular_recomendaciones_usuario(df, user_cosine_sim_df, user_id):
    sim_scores = user_cosine_sim_df[str(user_id)]
    similar_users = sim_scores.sort_values(ascending=False)[1:]
    similar_users_list = similar_users.index.tolist()
    recommended_games_ids = df[df['user_id'].isin(similar_users_list)]['item_id'].unique()
    recommended_games_ids = sorted(recommended_games_ids, key=lambda x: similar_users_list.index(df[df['item_id'] == x]['user_id'].iloc[0]))
    recommended_game_names = df.set_index('item_id').loc[recommended_games_ids[:5], 'app_name'].unique()
    result = {"Juegos recomendados": [recommended_game_names.tolist()]}
    return pd.DataFrame(result, index=[user_id])

def calcular_recomendaciones(df, cosine_sim_df, ids, tipo):
    recommendations = []
    for id in ids:
        if tipo == 'juego':
            recommendations.append(calcular_recomendaciones_juego(df, cosine_sim_df, id))
        elif tipo == 'usuario':
            recommendations.append(calcular_recomendaciones_usuario(df, cosine_sim_df, id))
    return pd.concat(recommendations)

# Precalcular similitudes para juegos y usuarios
game_features_df = generate_features(reduced_df, 'item_id', ['playtime_forever', 'recommend', 'sentiment_analysis'])
user_features_df = generate_features(final_merged_df, 'user_id', ['playtime_forever', 'sentiment_analysis', 'recommend'])

game_cosine_sim_df = compute_cosine_similarity(game_features_df)
user_cosine_sim_df = compute_cosine_similarity(user_features_df)

# Obtén los valores únicos para los juegos y usuarios
unique_game_ids = reduced_df['item_id'].unique()
unique_user_ids = final_merged_df['user_id'].unique()

# Calcular las recomendaciones
recomendaciones_juego = calcular_recomendaciones(reduced_df, game_cosine_sim_df, unique_game_ids, 'juego')
recomendaciones_usuario = calcular_recomendaciones(final_merged_df, user_cosine_sim_df, unique_user_ids, 'usuario')

# Guardar los resultados en archivos Parquet
recomendaciones_juego.to_parquet('../data/processed/recomendaciones_juego.parquet')
recomendaciones_usuario.to_parquet('../data/processed/recomendaciones_usuario.parquet')
