In [0]:
"""
Here we are going to solve all the business logics that were requested in the requirements.
"""

from scripts.helper import *
from pyspark.sql.functions import col, lit, count, length

PROCESS_DATE = '2023-07-02' # get_process_date() # date of data processed in YYYY-MM-DD format


# Descripción del ejercicio

Airbnb es la plataforma líder de alquiler de alojamientos del mundo. Habiendo comenzado
sus operaciones en el año 2007, hoy ya cuenta con más de 4 millones de huéspedes, 6
millones de alojamientos disponibles y ha sido usado por más de 150 millones de personas
en más de 100,000 ciudades del mundo.

Para el siguiente ejercicio, buscaremos responder a ciertos interrogantes de negocio que
podrían darse en las oficinas de Airbnb en París, y que serán respondidas por el equipo de
Data Engineers y Data Scientists.

Para esto, se incluyen los siguientes datasets:
* `listings.csv`: incluye información de los alojamientos que existen en París, incluyendo
su descripción, cantidad de habitaciones, precio por noche, entre otras variables.
* `listings_summary.csv`: consiste en el mismo dataset de `listings.csv`, solo que con
una menor cantidad de columnas.
* `reviews.csv`: incluye información de las reseñas que los usuarios dieron en la
plataforma. Incluye la fecha en la que se escribió, el texto de la reseña, el nombre
del usuario, entre otras variables.
* `reviews_summary.csv`: consiste en el mismo dataset de `reviews.csv`, solo que con
una menor cantidad de columnas.
* `neighbourhoods.geojson`: incluye el nombre de los distintos barrios que conforman
la ciudad, además de un objeto que incluye un arreglo de las coordenadas que
delimitan al barrio.
* `calendar.csv`: incluye información a futuro de los días en el año para reservar cada
alojamiento (es análogo al calendario que se observa en la página web al querer
reservar un alojamiento). Incluye disponibilidad día a día, el precio para ese día, entre
otros.

### A partir de estos datasets, se busca responder a las siguientes preguntas:

1. Dos ejecutivos de Airbnb están discutiendo respecto a cuáles son las zonas con más
departamentos en Airbnb. Les ofreces hacer un análisis con buen nivel de detalle
para enviarles un informe respecto a esto. Ellos dicen que no tienen tiempo de
leerlo, que solo quieren ver una foto, imagen o tabla que les permita saberlo
rápidamente. ¿Qué les muestras? (Desarrollar en código)

In [0]:
"""
SELECT neighbourhood, COUNT(*) AS total_apartments
FROM bi_corp_staging.listings
GROUP BY neighbourhood
ORDER BY total_apartments DESC;
"""

df_listings = spark.sql(f"SELECT id, neighbourhood FROM bi_corp_staging.listings WHERE partition_date='{PROCESS_DATE}'")

df_final = df_listings.groupBy("neighbourhood") \
    .agg(count("id").alias("total_apartments")) \
    .withColumn("partition_date", lit(PROCESS_DATE))
df_final.orderBy(col("total_apartments").desc()).show()

spark_write_df(df_final, BUSINESS_SCHEMA, 'apartments_per_zone', BUSINESS)

+--------------------+----------------+--------------+
|       neighbourhood|total_apartments|partition_date|
+--------------------+----------------+--------------+
|Paris, Île-de-Fra...|           29241|    2023-07-02|
|                null|           26146|    2023-07-02|
|       Paris, France|             383|    2023-07-02|
|Boulogne-Billanco...|             377|    2023-07-02|
|Clichy, Île-de-Fr...|             263|    2023-07-02|
|  Paris, IDF, France|             247|    2023-07-02|
|Montreuil, Île-de...|             229|    2023-07-02|
|Pantin, Île-de-Fr...|             209|    2023-07-02|
|Paris, Île-de-Fra...|             206|    2023-07-02|
|Vincennes, Île-de...|             204|    2023-07-02|
|Levallois-Perret,...|             203|    2023-07-02|
|Neuilly-sur-Seine...|             183|    2023-07-02|
|Saint-Ouen, Île-d...|             175|    2023-07-02|
|Issy-les-Moulinea...|             169|    2023-07-02|
|Ivry-sur-Seine, Î...|             156|    2023-07-02|
|Montrouge

2. En otras ciudades del mundo, la gente está usando cada vez menos la aplicación. Al
equipo ejecutivo en París le preocupa esto y quiere saber cómo es la situación en
esa ciudad. Específicamente, le gustaría ver cómo fueron evolucionando la
cantidad de reviews escritas a lo largo de los años.

In [0]:
"""
SELECT SUBSTR(date, 1, 4) AS year, COUNT(*) AS total_reviews
FROM bi_corp_staging.reviews
GROUP BY year
ORDER BY year;
"""

df_reviews = spark.sql(f"SELECT SUBSTR(date, 1, 4) AS year, id FROM bi_corp_staging.reviews WHERE partition_date='{PROCESS_DATE}'")

df_final = df_reviews.groupBy("year") \
    .agg(count("id").alias("total_reviews")) \
    .withColumn("partition_date", lit(PROCESS_DATE))
df_final.orderBy(col("total_reviews").desc()).show()

spark_write_df(df_final, BUSINESS_SCHEMA, 'reviews_per_year', BUSINESS)

+----+-------------+--------------+
|year|total_reviews|partition_date|
+----+-------------+--------------+
|2022|       337588|    2023-07-02|
|2019|       270791|    2023-07-02|
|2018|       208184|    2023-07-02|
|2021|       205582|    2023-07-02|
|2017|       142605|    2023-07-02|
|2020|       106115|    2023-07-02|
|2016|        94078|    2023-07-02|
|2015|        60240|    2023-07-02|
|2014|        26418|    2023-07-02|
|2013|        10130|    2023-07-02|
|2012|         3891|    2023-07-02|
|2011|         1290|    2023-07-02|
|2010|          354|    2023-07-02|
|2009|           26|    2023-07-02|
+----+-------------+--------------+

Finished writing table: bi_corp_business.reviews_per_year


3. El equipo de marketing está trabajando en una campaña global que consiste en
mostrar cuáles son las principales palabras de viajeros asociadas a las principales
ciudades del mundo. Les gustaría saber cuáles son las principales palabras que
dicen de París (realiza la representación que prefieras para responder esta
pregunta).

In [0]:
"""
SELECT word, COUNT(*) AS frequency
FROM (
  SELECT 
    EXPLODE(SPLIT(REGEXP_REPLACE(description, '(<br>|</b>|<b>|<br|\/>)', ' '), ' ')) AS word
  FROM bi_corp_staging.listings
  WHERE lower(neighbourhood) LIKE '%paris%' or lower(neighbourhood) LIKE '%parís%'
) subquery
WHERE len(word) > 4 -- ignore small words
GROUP BY word
ORDER BY frequency DESC
LIMIT 10;
"""

df_words = spark.sql(f"""
SELECT
  EXPLODE(SPLIT(REGEXP_REPLACE(description, '(<br>|</b>|<b>|<br|\/>)', ' '), ' ')) AS word
FROM bi_corp_staging.listings
WHERE partition_date='{PROCESS_DATE}'
AND RLIKE(LOWER(neighbourhood), r'(paris|parís)')
""")
df_final = df_words.where(length(col("word")) > 4).groupBy("word") \
    .agg(count("*").alias("frequency")) \
    .withColumn("partition_date", lit(PROCESS_DATE))
df_final.orderBy(col("frequency").desc()).show()
df_final.show()

spark_write_df(df_final, BUSINESS_SCHEMA, 'word_frequency_paris', BUSINESS)

+-----------+---------+--------------+
|       word|frequency|partition_date|
+-----------+---------+--------------+
|      space|    28358|    2023-07-02|
|  apartment|    20570|    2023-07-02|
|     access|    16029|    2023-07-02|
|      Guest|    11719|    2023-07-02|
|    located|    11519|    2023-07-02|
|      Paris|    11250|    2023-07-02|
|     living|    10597|    2023-07-02|
|    License|     9794|    2023-07-02|
|     number|     9783|    2023-07-02|
|     double|     9274|    2023-07-02|
|      floor|     8826|    2023-07-02|
|   equipped|     8467|    2023-07-02|
|    kitchen|     8281|    2023-07-02|
|     Paris.|     8270|    2023-07-02|
|      salle|     8132|    2023-07-02|
|appartement|     8058|    2023-07-02|
|    minutes|     7793|    2023-07-02|
|    bedroom|     7499|    2023-07-02|
|    chambre|     7299|    2023-07-02|
|    cuisine|     7218|    2023-07-02|
+-----------+---------+--------------+
only showing top 20 rows

+-----------+---------+--------------+

4. La inflación está haciendo estragos en Europa y al equipo ejecutivo le interesa saber
cuál es la concepción de esto entre los propietarios de alojamientos. En otras
palabras, les interesa saber si ya están teniendo en cuenta el aumento de precios a
futuro en sus departamentos y existe una tendencia alcista en los precios que
publican en Airbnb, o si no le están dando mayor importancia y los precios se ven
bastante estáticos a futuro. La pregunta que escuchaste a un ejecutivo decir es:
"¿Los inquilinos están planificando aumentar los precios?". (Siéntete libre de
responderla de la forma que quieras, a partir de correr algún análisis estadístico,
representación gráfica, etc. Para esta pregunta, puedes consultar el dataset
`calendar.csv` que incluye, para cada propiedad, el precio definido día a día para el
próximo año).

In [0]:
f"""
SELECT
  listing_id,
  SUM(CASE WHEN price > price_past THEN 1 ELSE 0 END) / COUNT(*) AS price_increase_percentage
FROM (
  SELECT
    listing_id,
    LAG(price) OVER (PARTITION BY listing_id ORDER BY date) AS price_past,
    price
  FROM bi_corp_staging.calendar
  WHERE partition_date >= '2023-01-01'
    AND partition_date < '2024-01-01'
) subquery
GROUP BY listing_id;
"""

start_date = '2023-01-01'
end_date = '2024-01-01'

df_prices = spark.sql(f"""
SELECT
    listing_id,
    LAG(price) OVER (PARTITION BY listing_id ORDER BY date) AS price_past,
    price
FROM bi_corp_staging.calendar
WHERE partition_date >= '{start_date}'
AND partition_date < '{end_date}'
""")
df_prices.createOrReplaceTempView("temp_precios")

df_final = spark.sql("""
SELECT
  listing_id,
  SUM(CASE WHEN price > price_past THEN 1 ELSE 0 END) / COUNT(*) AS price_increase_percentage
FROM temp_precios
GROUP BY listing_id
""").withColumn("partition_date", lit(PROCESS_DATE))
df_final.orderBy(col("price_increase_percentage").desc()).show()
df_final.show()

spark_write_df(df_final, BUSINESS_SCHEMA, 'price_increase', BUSINESS)

+------------------+-------------------------+--------------+
|        listing_id|price_increase_percentage|partition_date|
+------------------+-------------------------+--------------+
|          52099296|       0.8273972602739726|    2023-07-02|
|          52099231|       0.8273972602739726|    2023-07-02|
|          52099205|       0.8273972602739726|    2023-07-02|
|          52099305|       0.8273972602739726|    2023-07-02|
|          52099229|       0.8273972602739726|    2023-07-02|
|          52099306|       0.8273972602739726|    2023-07-02|
|          52099295|       0.8273972602739726|    2023-07-02|
|          52099307|       0.8273972602739726|    2023-07-02|
|          52099382|       0.8246575342465754|    2023-07-02|
|633059482852189926|       0.8246575342465754|    2023-07-02|
|          52099290|       0.8246575342465754|    2023-07-02|
|          52099200|        0.821917808219178|    2023-07-02|
|          52099165|        0.821917808219178|    2023-07-02|
|       

5. En promedio, ¿cuál es el precio que paga una persona que se queda en un Airbnb?

In [0]:
"""
SELECT AVG(CAST(SUBSTR(price, 2) AS FLOAT)) AS average_price
FROM bi_corp_staging.listings_summary;
"""

df_average_price = spark.sql(f"""
SELECT AVG(CAST(SUBSTR(price, 2) AS FLOAT)) AS average_price
FROM bi_corp_staging.listings_summary
WHERE partition_date='{PROCESS_DATE}';
""")

average_price = df_average_price.collect()[0][0]
print(f"Average price: ${round(average_price,2)}")

Average price: $24.01


6. El equipo de Marketing quiere hacerles un regalo a los mejores 10 anfitriónes de
Airbnb en París. Te pide una lista de quienes son para hacerlo. ¿A quiénes elegirías
para darle este regalo? ¿Por qué?

In [0]:
"""
SELECT host_id, host_name, calculated_host_listings_count
FROM bi_corp_staging.listings_summary
ORDER BY calculated_host_listings_count DESC
LIMIT 10;
"""

df_best_hosts = spark.sql(f"""
SELECT distinct host_id, host_name, calculated_host_listings_count
FROM bi_corp_staging.listings_summary
WHERE partition_date='{PROCESS_DATE}';
""")
best_hosts = df_best_hosts.orderBy(col("calculated_host_listings_count").desc()).collect()[:10]
best_hosts_names = list(map(lambda h: h['host_name'], best_hosts))
best_hosts_names

# To improve:
# Other aspects such as number of reviews or review scores could be considered to choose between hosts with the same calculated_host_listings_count

Out[49]: ['Joffrey',
 'Philippe',
 'Welkeys',
 'Hôtel Pavillon Monceau',
 'Hôtel 123 Sébastopol ****',
 'Michel',
 'Home Residence',
 'Charles',
 'Laurent',
 'Marie']

7. Por último: Crear una query SQL que devuelva: el precio promedio del mes de enero
de 2023 de cada una de las propiedades ranqueadas por los 3 usuarios que más
reviews escribieron. Esta query también debe incluir el porcentaje de que tan
disponible está este departamento durante ese mes. También se debe incluir el
nombre del dueño de la propiedad (Debe ser una sola query. Deben ordenarse los
resultados por `listing_id` en orden ascendente.

> Voy a omitir la sugerencia de usar Pandas y SQLite para seguir usando Spark y Hive dentro de Azure Databricks

In [0]:
query = f"""
WITH top_reviewers AS (
  SELECT listing_id, COUNT(*) AS review_count
  FROM bi_corp_staging.reviews_summary
  GROUP BY listing_id
  ORDER BY review_count DESC
  LIMIT 3
)
SELECT
  c.listing_id,
  l.host_name,
  AVG(CAST(SUBSTR(c.price, 2) AS FLOAT)) AS average_price,
  SUM(CASE WHEN c.available = 't' THEN 1 ELSE 0 END) / COUNT(*) AS availability_percentage
FROM bi_corp_staging.calendar c
JOIN top_reviewers t ON c.listing_id = t.listing_id
JOIN bi_corp_staging.listings_summary l ON c.listing_id = l.id
-- WHERE c.partition_date >= '2023-01-01' AND c.partition_date <= '2023-01-31'
-- Commented because I only have one partition, but this will be useful in the future
GROUP BY c.listing_id, l.host_name
ORDER BY c.listing_id ASC;
"""

spark.sql(query).show()

+----------+---------+------------------+-----------------------+
|listing_id|host_name|     average_price|availability_percentage|
+----------+---------+------------------+-----------------------+
|  17222007|Alexandra|157.29041095890412|     0.3287671232876712|
|  26244787|   Lionel| 367.6986301369863|     0.9506849315068493|
|  46255907|   Roxane| 711.9698630136986|     0.2054794520547945|
+----------+---------+------------------+-----------------------+

