# **CODIGO CARGADO EN EL JOB DE AWS GLUE**

In [None]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job

# Definimos los argumentos requeridos por Glue
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Inicializamos Spark y Glue
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Inicializamos el Job
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Cargamos los archivos CSV desde S3
df_film = spark.read.option("header", True).option("sep", ";").csv("s3://pruebatecnicajosemqe/Films.csv")
df_customer = spark.read.option("header", True).option("sep", ";").csv("s3://pruebatecnicajosemqe/customer.csv")
df_inventory = spark.read.option("header", True).option("sep", ";").csv("s3://pruebatecnicajosemqe/inventory.csv")
df_rental = spark.read.option("header", True).option("sep", ";").csv("s3://pruebatecnicajosemqe/rental.csv")
df_store = spark.read.option("header", True).option("sep", ";").csv("s3://pruebatecnicajosemqe/store.csv")

#INSPECCION INICIAL DE LAS TABLAS
def inspeccion_inicial(df, nombre_df):
    print(f"\n=== Inspección del DataFrame: {nombre_df} ===")
    print("Esquema del DataFrame:")
    df.printSchema()

    print("\nPrimeras 5 filas:")
    df.show(5, truncate=False)

    print("\nNúmero total de registros:")
    print(df.count())

    print("\nNombres de columnas:")
    print(df.columns)

inspeccion_inicial(df_film, "df_film")
inspeccion_inicial(df_customer, "df_customer")
inspeccion_inicial(df_inventory, "df_inventory")
inspeccion_inicial(df_rental, "df_rental")
inspeccion_inicial(df_store, "df_store")

#LIMPIEZA DE ESPACIOS

from pyspark.sql.functions import trim, col
from pyspark.sql.types import StringType

def limpiar_espacios(df):

    df = df.toDF(*[c.strip() for c in df.columns])


    for c in df.columns:
        if df.schema[c].dataType == StringType():
            df = df.withColumn(c, trim(col(c)))

    return df

df_film_sinespacios = limpiar_espacios(df_film)
df_customer_sinespacios = limpiar_espacios(df_customer)
df_inventory_sinespacios= limpiar_espacios(df_inventory)
df_rental_sinespacios = limpiar_espacios(df_rental)
df_store_sinespacios = limpiar_espacios(df_store)

#CAMBIO DE VALORES NULL Y VACIOS POR None

from pyspark.sql.functions import when, col, trim


def reemplazar_nulls(df):
    for c in df.columns:
        if df.schema[c].dataType == StringType():
            df = df.withColumn(
                c,
                when(
                    (trim(col(c)) == "NULL") |
                    (trim(col(c)) == "Null") |
                    (trim(col(c)) == ""),
                    None
                ).otherwise(trim(col(c)))
            )
    return df

df_film_None = reemplazar_nulls(df_film_sinespacios)
df_customer_None = reemplazar_nulls(df_customer_sinespacios)
df_inventory_None = reemplazar_nulls(df_inventory_sinespacios)
df_rental_None = reemplazar_nulls(df_rental_sinespacios)
df_store_None = reemplazar_nulls(df_store_sinespacios)

#ELIMINAR COLUMNAS CON 100% VALORES NULOS

def eliminar_columnas_nulas(df):
    non_null_cols = [c for c in df.columns if df.filter(col(c).isNotNull()).count() > 0]
    return df.select(*non_null_cols)

df_film_sincolumnasnulas = eliminar_columnas_nulas(df_film_None)
df_inventory_sincolumnasnulas = eliminar_columnas_nulas(df_inventory_None)
df_rental_sincolumnasnulas = eliminar_columnas_nulas(df_rental_None)
df_customer_sincolumnasnulas = eliminar_columnas_nulas(df_customer_None)
df_store_sincolumnasnulas = eliminar_columnas_nulas(df_store_None)

#Debido a que las columnas customer_id_old y segment  cuentan con un 43% de datos nulos,
#se decide eliminarlas, ya que no son columnas criticas ni indispensables para el análisis
df_customer_sincolumnasnulas = df_customer_sincolumnasnulas.drop('customer_id_old','segment')

#LIMPIAR COLUMNAS NUMERICAS

from pyspark.sql.functions import regexp_replace, col

def limpiar_columnas_numericas(df, columnas):
    for c in columnas:
        df = df.withColumn(c, regexp_replace(col(c), "[^0-9]", ""))
    return df

columnas_numericas_df_film = ['film_id','release_year','language_id','rental_duration','length','num_voted_users']
columnas_numericas_df_inventory = ['inventory_id', 'film_id', 'store_id']
columnas_numericas_df_rental = ['rental_id','inventory_id','customer_id','staff_id']
columnas_numericas_df_customer = ['customer_id','store_id','address_id','active']
columnas_numericas_df_store = ['store_id', 'manager_staff_id', 'address_id']

df_film_limpio = limpiar_columnas_numericas(df_film_sincolumnasnulas, columnas_numericas_df_film)
df_inventory_limpio = limpiar_columnas_numericas(df_inventory_sincolumnasnulas, columnas_numericas_df_inventory)
df_rental_limpio = limpiar_columnas_numericas(df_rental_sincolumnasnulas, columnas_numericas_df_rental)
df_customer_limpio = limpiar_columnas_numericas(df_customer_sincolumnasnulas, columnas_numericas_df_customer)
df_store_limpio = limpiar_columnas_numericas(df_store_sincolumnasnulas, columnas_numericas_df_store)

#LIMPIAR COLUMNAS DECIMALES

def limpiar_caracteres_columnas_decimales(df, columnas):
    for c in columnas:
        df = df.withColumn(c, regexp_replace(col(c), r"[^0-9.]", ""))
    return df

def limpiar_columnas_decimales(df, columnas):
    for c in columnas:
        df = df.withColumn(c, regexp_replace(col(c), ",", "."))
    return df

columnas_decimales_df_film = ['rental_rate', 'replacement_cost']
df_film_limpio = limpiar_caracteres_columnas_decimales(df_film_limpio, columnas_decimales_df_film)
df_film_limpio = limpiar_columnas_decimales(df_film_limpio, columnas_decimales_df_film)

#RENOMBRAR COLUMNAS

df_film_limpio = df_film_limpio.withColumnRenamed("last_update", "last_update_film")
df_inventory_limpio = df_inventory_limpio.withColumnRenamed("last_update", "last_update_inventory")
df_rental_limpio = df_rental_limpio.withColumnRenamed("last_update", "last_update_rental")
df_customer_limpio = df_customer_limpio.withColumnRenamed("last_update", "last_update_customer")
df_store_limpio = df_store_limpio.withColumnRenamed("last_update", "last_update_store")

df_store_limpio = df_store_limpio.withColumnRenamed("address_id", "address_id_store")
df_customer_limpio = df_customer_limpio.withColumnRenamed("address_id", "address_id_customer")

#CAMBIO DE TIPO DE DATOS

from pyspark.sql.functions import col
from pyspark.sql.types import *

df_film_final = (
    df_film_limpio
    .withColumn("film_id", col("film_id").cast(ShortType()))
    .withColumn("title", col("title").cast(StringType()))
    .withColumn("description", col("description").cast(StringType()))
    .withColumn("release_year", col("release_year").cast(IntegerType()))
    .withColumn("language_id", col("language_id").cast(ByteType()))
    .withColumn("rental_duration", col("rental_duration").cast(ByteType()))
    .withColumn("rental_rate", col("rental_rate").cast(DecimalType(4, 2)))
    .withColumn("length", col("length").cast(ShortType()))
    .withColumn("replacement_cost", col("replacement_cost").cast(DecimalType(5, 2)))
    .withColumn("num_voted_users", col("num_voted_users").cast(IntegerType()))
    .withColumn("rating", col("rating").cast(StringType()))
    .withColumn("special_features", col("special_features").cast(StringType()))
    .withColumn("last_update_film", col("last_update_film").cast(TimestampType()))

)

df_inventory_final = (
    df_inventory_limpio
    .withColumn("inventory_id", col("inventory_id").cast(IntegerType()))
    .withColumn("film_id", col("film_id").cast(IntegerType()))
    .withColumn("store_id", col("store_id").cast(IntegerType()))
    .withColumn("last_update_inventory", col("last_update_inventory").cast(TimestampType()))
)

df_rental_final = (
    df_rental_limpio
    .withColumn("rental_id", col("rental_id").cast(IntegerType()))
    .withColumn("rental_date", col("rental_date").cast(TimestampType()))
    .withColumn("inventory_id", col("inventory_id").cast(IntegerType()))
    .withColumn("customer_id", col("customer_id").cast(ShortType()))
    .withColumn("return_date", col("return_date").cast(TimestampType()))
    .withColumn("staff_id", col("staff_id").cast(ByteType()))
    .withColumn("last_update_rental", col("last_update_rental").cast(TimestampType()))
)

df_customer_final = (
    df_customer_limpio
    .withColumn("customer_id", col("customer_id").cast(ShortType()))
    .withColumn("store_id", col("store_id").cast(ByteType()))
    .withColumn("first_name", col("first_name").cast(StringType()))
    .withColumn("last_name", col("last_name").cast(StringType()))
    .withColumn("email", col("email").cast(StringType()))
    .withColumn("address_id_customer", col("address_id_customer").cast(ShortType()))
    .withColumn("active", col("active").cast(BooleanType()))
    .withColumn("create_date", col("create_date").cast(DateType()))
    .withColumn("last_update_customer", col("last_update_customer").cast(TimestampType()))
)

df_store_final = (
    df_store_limpio
    .withColumn("store_id", col("store_id").cast(ByteType()))
    .withColumn("manager_staff_id", col("manager_staff_id").cast(ByteType()))
    .withColumn("address_id_store", col("address_id_store").cast(ShortType()))
    .withColumn("last_update_store", col("last_update_store").cast(TimestampType()))
)

#VERIFICAR Y ELIMINAR FILAS DUPLICADAS EN CADA TABLA

def eliminar_duplicados(df, nombre_tabla):

    df_sin_duplicados = df.dropDuplicates()

    return df_sin_duplicados

df_film_final = eliminar_duplicados(df_film_final, "df_film_final")
df_inventory_final = eliminar_duplicados(df_inventory_final, "df_inventory_final")
df_rental_final = eliminar_duplicados(df_rental_final, "df_rental_final")
df_customer_final = eliminar_duplicados(df_customer_final, "df_customer_final")
df_store_final = eliminar_duplicados(df_store_final, "df_store_final")

#Se crea una columna de Return_Status basada en diferentes condicionales y dependencias de otras columnas,
#para no eliminar filas con valores nulos que son importantes para el analisis.

from pyspark.sql.functions import col, when, datediff, avg, lit


avg_days = df_rental_final.filter(col("return_date").isNotNull()).select(avg(datediff(col("return_date"), col("rental_date")))).first()[0]


df_rental_final = df_rental_final.withColumn(
    "return_status",
    when(
        col("return_date").isNotNull() & col("rental_date").isNotNull(),
        "returned"
    ).when(
        col("return_date").isNull() &
        (datediff(col("last_update_rental"), col("rental_date")) > lit(avg_days * 3)),
        "lost"
    ).when(
        col("return_date").isNull() &
        (datediff(col("last_update_rental"), col("rental_date")) <= lit(avg_days * 3)),
        "rented"
    )
)

#lIMPIEZA DE NULLS EN TABLA CUSTOMER
df_customer_final = df_customer_final.fillna({"last_name": "no last name"})

#CONVERTIR TODOS LOS STRINGS EN MAYUSCULA
from pyspark.sql.functions import upper
from pyspark.sql.types import StringType

def to_uppercase_strings(df):
    for column, dtype in df.dtypes:
        if dtype == "string":
            df = df.withColumn(column, upper(df[column]))
    return df

df_film_final = to_uppercase_strings(df_film_final)
df_rental_final = to_uppercase_strings(df_rental_final)
df_customer_final = to_uppercase_strings(df_customer_final)

#SE REVISA EL % DE VALORES NULOS EN CADA COLUMNA, CON EL FIN DE TOMAR DE DECISIONES SOBRE ESTAS

from pyspark.sql.functions import col, when, count

def porcentaje_nulos_por_tabla(tablas_dict):
    resultados = {}
    for nombre_tabla, df in tablas_dict.items():
        total_filas = df.count()
        porcentaje_nulos = df.select([
            ((count(when(col(c).isNull(), c)) / total_filas) * 100).alias(c) for c in df.columns
        ])
        resultados[nombre_tabla] = porcentaje_nulos
    return resultados

tablas = {
    "film": df_film_final,
    "inventory": df_inventory_final,
    "rental": df_rental_final,
    "customer": df_customer_final,
    "store": df_store_final
}

porcentajes_nulos = porcentaje_nulos_por_tabla(tablas)

# Mostrar los resultados
for nombre_tabla, df_porcentaje in porcentajes_nulos.items():
    print(f"Porcentaje de nulos en la tabla '{nombre_tabla}':")
    df_porcentaje.show()


#MER

from pyspark.sql.functions import col

def generar_df_analitico(df_rental_final, df_customer_final, df_store_final, df_inventory_final, df_film_final):
    # Renombramos columnas clave para evitar ambigüedades
    df_customer_final = df_customer_final.withColumnRenamed("store_id", "customer_store_id")
    df_store_final = df_store_final.withColumnRenamed("store_id", "store_id_store")
    df_inventory_final = df_inventory_final.withColumnRenamed("store_id", "store_id_inventory")

    return (
        df_rental_final
        .join(df_customer_final, on="customer_id", how="inner")
        .join(df_store_final, col("customer_store_id") == col("store_id_store"), how="inner")
        .join(df_inventory_final, on="inventory_id", how="inner")
        .join(df_film_final, on="film_id", how="inner")
    )

df_analitico_final = generar_df_analitico(
    df_rental_final,
    df_customer_final,
    df_store_final,
    df_inventory_final,
    df_film_final
)
df_analitico_final.printSchema()
df_analitico_final.show(5)

#EDA Análisis exploratorio de datos

#Analisis univariado
#Variables numericas

from pyspark.sql.functions import col
from pyspark.sql.types import NumericType

def analisis_univariado_numerico(df):

    columnas_numericas = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]


    resumen = df.select(*columnas_numericas).describe()


    resumen.show()

analisis_univariado_numerico(df_analitico_final)

#Variables categoricas

from pyspark.sql.functions import col, desc
from pyspark.sql import Row

def resumen_categoricas(df, top_n=10):
    columnas_categoricas = [f.name for f in df.schema.fields
                            if f.dataType.simpleString() in ['string', 'boolean']]

    columnas_excluir = ['first_name', 'last_name', 'email', 'description']

    columnas_filtradas = [c for c in columnas_categoricas if c not in columnas_excluir]

    resumen = []

    for columna in columnas_filtradas:
        count = df.filter(col(columna).isNotNull()).count()
        unique = df.select(columna).distinct().count()
        fila_top = df.groupBy(columna).count().orderBy(desc("count")).first()

        top = fila_top[columna] if fila_top else None
        freq = fila_top["count"] if fila_top else None

        resumen.append(Row(
            columna=columna,
            count=count,
            unique=unique,
            top=top,
            freq=freq
        ))

    return spark.createDataFrame(resumen)

resumen_categoricas(df_analitico_final).show()

#detectar_outliers y Cuartiles en varialbles numericas

from pyspark.sql.functions import col
from pyspark.sql.types import NumericType
from pyspark.sql import Row

def detectar_outliers_iqr_df(df, precision=0.01):
    columnas_numericas = [f.name for f in df.schema.fields
                          if isinstance(f.dataType, NumericType) and "id" not in f.name.lower()]

    resumen = []

    for columna in columnas_numericas:
        try:
            q1, q2, q3 = df.approxQuantile(columna, [0.25, 0.5, 0.75], precision)
            iqr = q3 - q1
            lim_inf = q1 - 1.5 * iqr
            lim_sup = q3 + 1.5 * iqr
            count_outliers = df.filter((col(columna) < lim_inf) | (col(columna) > lim_sup)).count()

            resumen.append(Row(
                columna=columna,
                Q1=round(q1, 2),
                Q2=round(q2, 2),
                Q3=round(q3, 2),
                Limite_Inferior=round(lim_inf, 2),
                Limite_Superior=round(lim_sup, 2),
                Outliers=count_outliers
            ))
        except:
            resumen.append(Row(
                columna=columna,
                Q1=None,
                Q2=None,
                Q3=None,
                Limite_Inferior=None,
                Limite_Superior=None,
                Outliers=None
            ))

    df_resumen = spark.createDataFrame(resumen)
    df_resumen.select("columna", "Q1", "Q2", "Q3", "Limite_Inferior", "Limite_Superior", "Outliers") \
              .orderBy("columna") \
              .show()

    return df_resumen

detectar_outliers_iqr_df(df_analitico_final)

#Otros analisis del negocio

from pyspark.sql.functions import col, count, avg, desc, isnull, datediff, to_date

# 1. Número total de películas, clientes, tiendas, alquileres
print("Número total de películas, clientes, tiendas y alquileres:")
print("Películas:", df_analitico_final.select("film_id").distinct().count())
print("Clientes:", df_analitico_final.select("customer_id").distinct().count())
print("Tiendas:", df_analitico_final.select("store_id_store").distinct().count())
print("Alquileres:", df_analitico_final.select("rental_id").distinct().count())

# 2. ¿Cuántos clientes están activos vs. inactivos?
print("\nCantidad de clientes activos vs. inactivos:")
df_analitico_final.groupBy("active").count().show()

# 3. Número de películas distintas por tienda
print("\nNúmero de películas distintas disponibles por tienda:")
df_analitico_final.select("store_id_store", "film_id").distinct().groupBy("store_id_store").count().show()

# 4. Top 10 clientes con más alquileres
print("\nTop 10 clientes con más alquileres:")
df_analitico_final.groupBy("customer_id").count().orderBy(desc("count")).show(10)

# 5. Cantidad de alquileres por tienda
print("\nCantidad de alquileres por tienda:")
df_analitico_final.groupBy("store_id_store").count().orderBy("store_id_store").show()

# 6. Películas más alquiladas
print("\nPelículas con mayor número de alquileres:")
df_analitico_final.groupBy("title").count().orderBy(desc("count")).show(10)

# 7. Promedio de alquileres por cliente
print("\nPromedio de alquileres por cliente:")
alquileres_por_cliente = df_analitico_final.groupBy("customer_id").count()
alquileres_por_cliente.select(avg("count").alias("avg_rentals")).show()

# 8. Clientes que no devolvieron películas
clientes_lost = df_analitico_final.filter(col("return_status") == "LOST").select("customer_id").distinct()
print(f"Total de clientes con películas no devueltas: {clientes_lost.count()}")
clientes_lost.show()

# 9. Duración promedio de alquileres (días)
print("\nDuración promedio de los alquileres en días:")
df_analitico_final = df_analitico_final.withColumn("dias_alquiler", datediff("return_date", "rental_date"))
df_analitico_final.select(avg("dias_alquiler").alias("promedio_dias")).show()


# 10. Valor de perdidas por peliculas no devueltas
from pyspark.sql.functions import col, sum as spark_sum

print("Ingreso potencial perdido por películas con estado LOST:")

ingreso_perdido_lost = df_analitico_final.filter(col("return_status") == "LOST") \
    .agg(spark_sum("rental_rate")) \
    .collect()[0][0]

print(f"Perdidas por peliculas no devueltas: ${ingreso_perdido_lost:.2f}")

job.commit()