**Importar librerías**

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from const import PATH_DATA_SOURCE, PATH_DATA_DEST
from funciones import guardar_dataframe_csv,data_filled

**1. Configurar el entorno**

In [3]:
# Crear una sesión de Spark
spark = SparkSession.builder.appName("Proyectopyspark").getOrCreate()

24/01/18 12:20:38 WARN Utils: Your hostname, PC01JOSEF resolves to a loopback address: 127.0.1.1; using 192.168.0.19 instead (on interface wlo1)
24/01/18 12:20:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/18 12:20:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/18 12:20:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


**2. Cargar datos**

In [4]:
# Reemplaza 'ruta/del/archivo' con la ruta de tu archivo de datos
#data = spark.read.csv('../data/srd/hotel_bookings.csv', header=True, inferSchema=True)
data = spark.read.csv(PATH_DATA_SOURCE, header=True, inferSchema=True)

                                                                                

**3. Primera exploración en los datos**

In [31]:
#data.show()
data.show(5, truncate=False)
#print(data.head(5))
#print(data.collect())

+-----+------------+-----------+---------+-----------------+------------------+------------------------+-------------------------+-----------------------+--------------------+------+--------+------+----+-------+--------------+--------------------+-----------------+----------------------+------------------------------+------------------+------------------+---------------+------------+--------+----------+--------------------+-------------+----+---------------------------+-------------------------+------------------+-----------------------+
|index|hotel       |is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_week_number|arrival_date_day_of_month|stays_in_weekend_nights|stays_in_week_nights|adults|children|babies|meal|country|market_segment|distribution_channel|is_repeated_guest|previous_cancellations|previous_bookings_not_canceled|reserved_room_type|assigned_room_type|booking_changes|deposit_type|agent   |company   |days_in_waiting_list|customer_type|adr |required_c

In [6]:
data.printSchema()

root
 |-- index: integer (nullable = true)
 |-- hotel: string (nullable = true)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_month: string (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: double (nullable = true)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = true)
 |-- country: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = true)
 

In [7]:
##usamos pandas para ver el formato tabla
pandas_df = data.toPandas()
print(pandas_df)

                                                                                

         index         hotel  is_canceled  lead_time  arrival_date_year  \
0            0  Resort Hotel            0        342               2015   
1            1  Resort Hotel            0        737               2015   
2            2  Resort Hotel            0          7               2015   
3            3  Resort Hotel            0         13               2015   
4            4  Resort Hotel            0         14               2015   
...        ...           ...          ...        ...                ...   
119385  119385    City Hotel            0         23               2017   
119386  119386    City Hotel            0        102               2017   
119387  119387    City Hotel            0         34               2017   
119388  119388    City Hotel            0        109               2017   
119389  119389    City Hotel            0        205               2017   

       arrival_date_month  arrival_date_week_number  \
0                    July                   

**3.1. Contamos los nulos por columna**

In [8]:
# Contar los valores nulos en cada columna
null_counts = data.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

# Obtener los resultados como un diccionario
null_counts_dict = null_counts.first().asDict()

# Mostrar los columna: valor núlos
for colname, count in null_counts_dict.items():
    print(f"{colname}: {count}")



index: 0
hotel: 0
is_canceled: 0
lead_time: 0
arrival_date_year: 0
arrival_date_month: 0
arrival_date_week_number: 0
arrival_date_day_of_month: 0
stays_in_weekend_nights: 0
stays_in_week_nights: 0
adults: 0
children: 4
babies: 0
meal: 0
country: 488
market_segment: 0
distribution_channel: 0
is_repeated_guest: 0
previous_cancellations: 0
previous_bookings_not_canceled: 0
reserved_room_type: 0
assigned_room_type: 0
booking_changes: 0
deposit_type: 0
agent: 16340
company: 112593
days_in_waiting_list: 0
customer_type: 0
adr: 0
required_car_parking_spaces: 0
total_of_special_requests: 0
reservation_status: 0
reservation_status_date: 0


                                                                                

**3.2. Completamos Nan/nulos**

In [18]:
## columna [Children], tiene 4 nulos completamos con ceros
## columna [country], tiene 488 nulos completamos con "Otros"
## columna [agent], tiene 16340 nulos completamos con "No Agent"
## columna [company], tiene 112593 nulos completamos con "No company"

# Llamar a la función para completar columnas
# Lista de columnas y valores a llenar
columns_to_fill = ['children', 'country', 'agent', 'company']
fill_values = {'children': 0, 'country': 'Otros', 'agent': 'No agent', 'company': 'No company'}

# Aplicar la función data_filled en un bucle
for columna, valor in fill_values.items():
    data = data_filled(data, columna, valor)


In [19]:
# Contar los valores nulos en cada columna
null_counts = data.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

# Obtener los resultados como un diccionario
null_counts_dict = null_counts.first().asDict()

# Mostrar los columna: valor núlos
for colname, count in null_counts_dict.items():
    print(f"{colname}: {count}")

index: 0
hotel: 0
is_canceled: 0
lead_time: 0
arrival_date_year: 0
arrival_date_month: 0
arrival_date_week_number: 0
arrival_date_day_of_month: 0
stays_in_weekend_nights: 0
stays_in_week_nights: 0
adults: 0
children: 0
babies: 0
meal: 0
country: 0
market_segment: 0
distribution_channel: 0
is_repeated_guest: 0
previous_cancellations: 0
previous_bookings_not_canceled: 0
reserved_room_type: 0
assigned_room_type: 0
booking_changes: 0
deposit_type: 0
agent: 0
company: 0
days_in_waiting_list: 0
customer_type: 0
adr: 0
required_car_parking_spaces: 0
total_of_special_requests: 0
reservation_status: 0
reservation_status_date: 0


                                                                                

**Tablas de salida:**

Tabla de Salida 1: Número de cambios de reserva acumulados por mes

arrival_date_month

booking_changes

cumulative_changes

In [20]:
# Calcular la cantidad acumulada de cambios por mes
#agrupamos por año y meses
data_with_accumulated_changes = data.groupBy("arrival_date_year","arrival_date_month","booking_changes") \
                                    .agg(F.sum("booking_changes").alias("cumulative_changes")) \
                                    .orderBy("arrival_date_year","arrival_date_month")


# Mostrar el nuevo DataFrame
data_with_accumulated_changes.show()
num_rows = data_with_accumulated_changes.count()
print(f"cantidad de fila: {num_rows}")

                                                                                

+-----------------+------------------+---------------+------------------+
|arrival_date_year|arrival_date_month|booking_changes|cumulative_changes|
+-----------------+------------------+---------------+------------------+
|             2015|            August|              4|                16|
|             2015|            August|              1|               438|
|             2015|            August|              2|               166|
|             2015|            August|              0|                 0|
|             2015|            August|              5|                20|
|             2015|            August|              3|                45|
|             2015|            August|             20|                20|
|             2015|            August|             11|                11|
|             2015|          December|              0|                 0|
|             2015|          December|              2|               122|
|             2015|          December|

In [21]:
# Llamar a la función para guardar el DataFrame en un archivo CSV
guardar_dataframe_csv(data_with_accumulated_changes, PATH_DATA_DEST, 'Tabla1', modo='overwrite')

Tabla de Salida 2: Promedio diario de tarifas por tipo de comida y año
arrival_date_year

meal

avg_daily_rate

In [22]:
# Calcular el promedio diario de tarifas por tipo de comida y año
avg_daily_rates = data.groupBy("meal", "arrival_date_year", "arrival_date_month", "arrival_date_day_of_month") \
                      .agg(F.mean("adr").alias("avg_daily_rate")) \
                      .orderBy("arrival_date_year","arrival_date_month", "arrival_date_day_of_month")

# Mostrar el resultado
avg_daily_rates.show()
num_rows = avg_daily_rates.count()
print(f"cantidad de fila: {num_rows}")

+----+-----------------+------------------+-------------------------+------------------+
|meal|arrival_date_year|arrival_date_month|arrival_date_day_of_month|    avg_daily_rate|
+----+-----------------+------------------+-------------------------+------------------+
|  HB|             2015|            August|                        1|130.09301886792454|
|  BB|             2015|            August|                        1|151.45228070175438|
|  FB|             2015|            August|                        2|             159.0|
|  HB|             2015|            August|                        2|        162.465625|
|  BB|             2015|            August|                        2|125.63727272727274|
|  FB|             2015|            August|                        3|             142.3|
|  BB|             2015|            August|                        3| 78.82664596273294|
|  HB|             2015|            August|                        3| 168.9185714285714|
|  HB|             20

In [23]:
# Llamar a la función para guardar el DataFrame en un archivo CSV
guardar_dataframe_csv(avg_daily_rates, PATH_DATA_DEST, 'Tabla2', modo='overwrite')

Tabla de Salida 3: Número total de huéspedes por país y tipo de cliente

country

customer_type

total_guests

In [24]:
# Calcular el número total de huéspedes por país y tipo de cliente
total_guests_by_country_type = data.groupBy("country", "customer_type") \
                                    .agg(F.sum("adults").alias("total_adults"),
                                         F.sum("children").alias("total_children"),
                                         F.sum("babies").alias("total_babies"),
                                         F.sum(F.col("adults") + F.col("children") + F.col("babies")).alias("total_guests")) \
                                    .orderBy("country","customer_type")

# Mostrar el resultado
total_guests_by_country_type.show()
num_rows = total_guests_by_country_type.count()
print(f"cantidad de fila: {num_rows}")

+-------+---------------+------------+--------------+------------+------------+
|country|  customer_type|total_adults|total_children|total_babies|total_guests|
+-------+---------------+------------+--------------+------------+------------+
|    ABW|      Transient|           5|           0.0|           0|         5.0|
|    AGO|       Contract|          13|           2.0|           0|        15.0|
|    AGO|          Group|           4|           0.0|           0|         4.0|
|    AGO|      Transient|         532|          75.0|           5|       612.0|
|    AGO|Transient-Party|          21|           0.0|           0|        21.0|
|    AIA|      Transient|           2|           2.0|           0|         4.0|
|    ALB|      Transient|          16|           0.0|           1|        17.0|
|    ALB|Transient-Party|           5|           0.0|           0|         5.0|
|    AND|      Transient|          15|           4.0|           0|        19.0|
|    ARE|      Transient|          91|  

In [25]:
# Llamar a la función para guardar el DataFrame en un archivo CSV
guardar_dataframe_csv(total_guests_by_country_type, PATH_DATA_DEST, 'Tabla3', modo='overwrite')