In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import logging, re

In [2]:
## Creo el archivo de logs, y redirijo las salidas de los print de control para que en produccion queden como logs

logging.basicConfig(filename='logs_pipeline.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
print = logger.info

In [11]:
## Cargo todas las hojas (tablas) del archivo excel en un diccionario

excel = pd.ExcelFile("Films_2 .xlsx")
tables = { tb:excel.parse(tb) for tb in excel.sheet_names if tb != "MER"}  # Excluyo la hoja del modelo lógico

In [12]:
## Creo el pipeline que ejecutará las tareas de limpieza en las tablas

class PipelineClean:
    integers_cols = ("rental_duration", "length", "release_year", "num_voted_users")
    decimal_cols = ("rental_rate", "replacement_cost")


    def clean_headers(self, df):
        print("Limpiando encabezados")
        df.columns = df.columns.map(lambda x: x.strip())
        return df


    def clean_integers(self, df):
        def clean(x):
            try:
                ok = int(x)
            except ValueError:
                digit = re.sub("\D", "", x)   # Substituyo todo lo que NO sea digito con nada... para que sea parseable a entero
                if digit == "":
                    ok = np.nan
                else:
                    ok = int(digit)
            return ok

        for col in df.columns:
            if "_id" in col or col in self.integers_cols:
                print(f"Limpiando enteros, col: {col}")
                df[col] = df[col].apply(clean)    
        return df


    def clean_decimals(self, df):
        for col in df.columns:
            if col in self.decimal_cols:
                print(f"Limpiando decimales, col: {col}")
                df[col] = df[col].apply(lambda x: float(re.sub("\D", "", x)) )    
        return df


    def parse_dates(self, df):
        for col in df.columns:
            if "date" in col or col=="last_update":
                print(f"Casteando fechas, col: {col}")
                df[col] = pd.to_datetime(df[col], errors="coerce")
        return df


    def clean_strings(self, df):
        for col, t in df.dtypes.items():
            if t == "object":
                print(f"Eliminando whitespaces en strings, col: {col}")
                df[col] = df[col].apply(lambda x: x.strip())
        return df


    def drop_duplicates(self, df, name):
        print(f"Eliminando posibles duplicados")
        for col in df.columns:
            if col == name+"_id":
                df.drop_duplicates(col, inplace=True)
        return df


    def execute_all(self, df, name):
        print(f"Ejecutando pipeline para tabla: {name}")
        return (
            df.pipe(self.clean_headers)
            .pipe(self.clean_integers)
            .pipe(self.clean_decimals)
            .pipe(self.parse_dates)
            .pipe(self.clean_strings)
            .pipe(self.drop_duplicates, name)
        )

In [13]:
## Ejecuto el pipeline para todas las tablas sobrescribiendo el mismo diccionario que las contiene

pipeline = PipelineClean()

for name, df in tables.items():
    tables[name] = pipeline.execute_all(df, name)
    print("")

In [14]:
## Ahora que están limpias las tablas, convierto a Pyspark para hacer las consultas

spark = SparkSession.builder.appName("DML_Spark").getOrCreate()

for name, df in tables.items():
    spark.createDataFrame(df).createOrReplaceTempView(name)  # Creo vista para cada tabla para usar sintaxis sql y responder las preguntas

## El enunciado dice textual: "Formular 5 posibles preguntas de negocio a las cuales los datos procesados
# puedan dar respuesta. Responde las preguntas."

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 04:26:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/06 04:26:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [15]:
## 1. Cuántas copias existen por pelicula para alquilar ?

spark.sql("""
SELECT title, COUNT(*) as Cantidad_inventario
FROM film f
JOIN inventory i
ON f.film_id = i.film_id
GROUP BY title
ORDER BY Cantidad_inventario DESC
""").show()

                                                                                

+--------------------+-------------------+
|               title|Cantidad_inventario|
+--------------------+-------------------+
|        TORQUE BOUND|                  8|
|       APACHE DIVINE|                  8|
|      GILMORE BOILED|                  8|
|         HARRY IDAHO|                  8|
|       BOUND CHEAPER|                  8|
|      INNOCENT USUAL|                  8|
| EXPENDABLE STALLION|                  8|
|     RUSH GOODFELLAS|                  8|
|      GRIT CLOCKWORK|                  8|
| RIDGEMONT SUBMARINE|                  8|
|        DOGMA FAMILY|                  8|
|    CUPBOARD SINNERS|                  8|
|        NETWORK PEAK|                  8|
|SWEETHEARTS SUSPECTS|                  8|
|          PITY BOUND|                  8|
|       MUSCLE BRIGHT|                  8|
|       GARDEN ISLAND|                  8|
|     VIRGINIAN PLUTO|                  8|
|      GIANT TROOPERS|                  8|
|       HUSTLER PARTY|                  8|
+----------

In [16]:
## 2. Cuál es el numero de copias (inventario) de peliculas por tienda?

spark.sql("""
SELECT s.store_id AS Tienda,  COUNT(*) as Total_peliculas
FROM film f
JOIN inventory i
ON f.film_id = i.film_id
JOIN store s
ON s.store_id = i.store_id    
GROUP BY s.store_id
""").show()

                                                                                

+------+---------------+
|Tienda|Total_peliculas|
+------+---------------+
|     1|              4|
|     2|           4577|
+------+---------------+



In [17]:
## 3. Cuál o cuales son las peliculas que alquila la tienda 1?

spark.sql("""
SELECT DISTINCT title
FROM film f
JOIN inventory i
ON f.film_id = i.film_id
JOIN store s
ON s.store_id = i.store_id   
WHERE s.store_id == 1
""").show()

                                                                                

+----------------+
|           title|
+----------------+
|ACADEMY DINOSAUR|
+----------------+



In [18]:
## 4. Cuales son las 5 peliculas que más se alquilaron durante el tiempo disponible en el dataset?

spark.sql("""
SELECT title, COUNT(*) AS Veces_alquilada
FROM film f
JOIN inventory i
ON f.film_id = i.film_id 
JOIN rental r
ON i.inventory_id = r.inventory_id
GROUP BY title
ORDER BY Veces_alquilada DESC
LIMIT 5
""").show()


                                                                                

+-------------------+---------------+
|              title|Veces_alquilada|
+-------------------+---------------+
| BUCKET BROTHERHOOD|             34|
|   ROCKETEER MOTHER|             33|
|     JUGGLER HARDLY|             32|
|RIDGEMONT SUBMARINE|             32|
|     FORWARD TEMPLE|             32|
+-------------------+---------------+



In [21]:
## 5. Cuánto fue el ingreso total de la empresa para el mes 06-2005?

spark.sql("""
SELECT EXTRACT(month from rental_date) AS Mes, SUM(rental_rate*rental_duration) AS Ingreso_total
FROM film f
JOIN inventory i
ON f.film_id = i.film_id
JOIN rental r
ON i.inventory_id = r.inventory_id
WHERE EXTRACT(month from rental_date) = 6
GROUP BY Mes
""").show()

                                                                                

+---+-------------+
|Mes|Ingreso_total|
+---+-------------+
|  6|    3387231.0|
+---+-------------+



In [22]:
## 6. Quienes son los 5 clientes más asiduos?

spark.sql(f"""
SELECT first_name, last_name, COUNT(*) AS Total_rentas
FROM rental r
JOIN customer c
ON r.customer_id = c.customer_id
GROUP BY first_name, last_name
ORDER BY total_rentas DESC
LIMIT 5
""").show()

                                                                                

+----------+---------+------------+
|first_name|last_name|Total_rentas|
+----------+---------+------------+
|   ELEANOR|     HUNT|          46|
|      KARL|     SEAL|          45|
|    MARCIA|     DEAN|          42|
|     CLARA|     SHAW|          42|
|     TAMMY|  SANDERS|          41|
+----------+---------+------------+

