# Proyecto de consolidacion: Inside AirBnb 
##### Herramientas para la calidad del dato

In [7]:
%region us-east-1
%number_of_workers 2
%idle_timeout 60
%glue_version 4.0
%worker_type G.1X
%additional_python_modules TextBlob

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Previous region: us-east-1
Setting new region to: us-east-1
Region is set to: us-east-1
Previous number of workers: None
Setting new number of workers to: 2
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Additional python modules to be included:
TextBlob


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.testing as Test
from pyspark.sql import Window as W
import pandas as pd
import numpy as np
from textblob import TextBlob
from datetime import datetime
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 60
Session ID: e8e1bb70-0549-4d78-ac85-993c94b4f10a
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--additional-python-modules TextBlob
Waiting for session e8e1bb70-0549-4d78-ac85-993c94b4f10a to get into ready status...
Session e8e1bb70-0549-4d78-ac85-993c94b4f10a has been created.



In [2]:
# Constants
BUCKET_NAME = "insideairbnbbucket"
listings_raw = f"s3://{BUCKET_NAME}/raw/listings/listings_q4.csv.gz"
reviews_raw = f"s3://{BUCKET_NAME}/raw/reviews/reviews_q4.csv.gz"
listings_bronze = f"s3://{BUCKET_NAME}/bronze_listings/"
reviews_bronze = f"s3://{BUCKET_NAME}/bronze_reviews/"
listings_silver = f"s3://{BUCKET_NAME}/silver_listings/"
reviews_silver = f"s3://{BUCKET_NAME}/silver_reviews/"

neighborhood_room_type_report_gold = f"s3://{BUCKET_NAME}/gold_report_neighborhood_and_room_type/"
neighborhood_host_type_report_gold = f"s3://{BUCKET_NAME}/gold_report_neighborhood_and_host_type/"
host_name_report_gold = f"s3://{BUCKET_NAME}/gold_report_host_name/"
benefits_report_gold = f"s3://{BUCKET_NAME}/gold_report_benefits/"




# 2. Bronze Layer

> The Bronze layer is where we land all the data from external source systems. The table structures in this layer correspond to the source system table structures "as-is," along with any additional metadata columns that capture the load date/time, process ID, etc.

## 1.1 listings

#### read raw listings from the **raw layer** of the datalake

In [3]:
listings_schema = T.StructType([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("listing_url", T.StringType(), True),
    T.StructField("scrape_id", T.LongType(), True),
    T.StructField("last_scraped", T.TimestampType(), True),
    T.StructField("name", T.StringType(), True),
    T.StructField("description", T.StringType(), True),
    T.StructField("neighborhood_overview", T.StringType(), True),
    T.StructField("picture_url", T.StringType(), True),
    T.StructField("host_id", T.IntegerType(), True),
    T.StructField("host_url", T.StringType(), True),
    T.StructField("host_name", T.StringType(), True),
    T.StructField("host_since", T.DateType(), True),
    T.StructField("host_location", T.StringType(), True),
    T.StructField("host_about", T.StringType(), True),
    T.StructField("host_response_time", T.StringType(), True),
    T.StructField("host_response_rate", T.StringType(), True),
    T.StructField("host_acceptance_rate", T.StringType(), True),
    T.StructField("host_is_superhost", T.BooleanType(), True),
    T.StructField("host_thumbnail_url", T.StringType(), True),
    T.StructField("host_picture_url", T.StringType(), True),
    T.StructField("host_neighbourhood", T.StringType(), True),
    T.StructField("host_listings_count", T.StringType(), True),
    T.StructField("host_total_listings_count", T.StringType(), True),
    T.StructField("host_verifications", T.ArrayType(T.StringType()), True),
    T.StructField("host_has_profile_pic", T.BooleanType(), True),
    T.StructField("host_identity_verified", T.BooleanType(), True),
    T.StructField("neighbourhood", T.StringType(), True),
    T.StructField("neighbourhood_cleansed", T.StringType(), True),
    T.StructField("neighbourhood_group_cleansed", T.StringType(), True),
    T.StructField("latitude", T.FloatType(), True),
    T.StructField("longitude", T.FloatType(), True),
    T.StructField("property_type", T.StringType(), True),
    T.StructField("room_type", T.StringType(), True),
    T.StructField("accommodates", T.IntegerType(), True),
    T.StructField("bathrooms", T.FloatType(), True),
    T.StructField("bathrooms_text", T.StringType(), True),
    T.StructField("bedrooms", T.IntegerType(), True),
    T.StructField("beds", T.IntegerType(), True),
    T.StructField("amenities", T.ArrayType(T.StringType()), True),
    T.StructField("price", T.DoubleType(), True),
    T.StructField("minimum_nights", T.IntegerType(), True),
    T.StructField("maximum_nights", T.IntegerType(), True),
    T.StructField("minimum_minimum_nights", T.IntegerType(), True),
    T.StructField("maximum_minimum_nights", T.IntegerType(), True),
    T.StructField("minimum_maximum_nights", T.IntegerType(), True),
    T.StructField("maximum_maximum_nights", T.IntegerType(), True),
    T.StructField("minimum_nights_avg_ntm", T.FloatType(), True),
    T.StructField("maximum_nights_avg_ntm", T.FloatType(), True),
    T.StructField("calendar_updated", T.DateType(), True),
    T.StructField("has_availability", T.BooleanType(), True),
    T.StructField("availability_30", T.IntegerType(), True),
    T.StructField("availability_60", T.IntegerType(), True),
    T.StructField("availability_90", T.IntegerType(), True),
    T.StructField("availability_365", T.IntegerType(), True),
    T.StructField("calendar_last_scraped", T.DateType(), True),
    T.StructField("number_of_reviews", T.IntegerType(), True),
    T.StructField("number_of_reviews_ltm", T.IntegerType(), True),
    T.StructField("number_of_reviews_l30d", T.IntegerType(), True),
    T.StructField("first_review", T.DateType(), True),
    T.StructField("last_review", T.DateType(), True),
    T.StructField("review_scores_rating", T.FloatType(), True),
    T.StructField("review_scores_accuracy", T.FloatType(), True),
    T.StructField("review_scores_cleanliness", T.FloatType(), True),
    T.StructField("review_scores_checkin", T.FloatType(), True),
    T.StructField("review_scores_communication", T.FloatType(), True),
    T.StructField("review_scores_location", T.FloatType(), True),
    T.StructField("review_scores_value", T.FloatType(), True),
    T.StructField("license", T.StringType(), True),
    T.StructField("instant_bookable", T.BooleanType(), True),
    T.StructField("calculated_host_listings_count", T.IntegerType(), True),
    T.StructField("calculated_host_listings_count_entire_homes", T.IntegerType(), True),
    T.StructField("calculated_host_listings_count_private_rooms", T.IntegerType(), True),
    T.StructField("calculated_host_listings_count_shared_rooms", T.IntegerType(), True),
    T.StructField("reviews_per_month", T.FloatType(), True)
])




In [4]:
df_listings = (
    spark
    .read
    .format("csv")
    .option("header", True)
    .option("schema", listings_schema)
    .option("compression", "gzip")
    .option("sep", ",")
    .option("quote", '"')
    .option("escape", '"')
    .option("multiLine", True)
    .option("encoding", "UTF-8")
    .option("dateFormat", "yyyy-MM-dd")
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
    .load(listings_raw)
)




In [5]:
df_listings.printSchema()

root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_cou

In [6]:
reviews_schema = T.StructType([
    T.StructField('listing_id', T.LongType(), True), 
    T.StructField('id', T.LongType(), True), 
    T.StructField('date', T.StringType(), True), 
    T.StructField('reviewer_id', T.IntegerType(), True), 
    T.StructField('reviewer_name', T.StringType(), True), 
    T.StructField('comments', T.StringType(), True)
])




In [7]:
df_reviews = (
    spark
    .read
    .format("csv")
    .option("header", True)
    .option("schema", reviews_schema)
    .option("compression", "gzip")
    .option("sep", ",")
    .option("quote", '"')
    .option("escape", '"')
    .option("multiLine", True)
    .option("encoding", "UTF-8")
    .option("dateFormat", "yyyy-MM-dd")
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
    .load(reviews_raw)
)




In [8]:
df_reviews.printSchema()

root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)


#### write listings and reviews In the **Bronze layer** of the lakehouse,

In [9]:
(
    df_listings
    .write
    .format("parquet")
    .mode("overwrite")
    .save(listings_bronze)
)




In [10]:
(
    df_reviews
    .write
    .format("parquet")
    .mode("overwrite")
    .save(reviews_bronze)
)




# 2. Silver Layer
> In the **Silver layer** of the lakehouse, the data from the Bronze layer is matched, merged, conformed and cleansed ("just-enough") so that the Silver layer can provide an "Enterprise view" of all its key business entities, concepts and transactions.

## 2.1 Listings Transformations

1. Filter listings with reviews placed more than a year ago
2. Limpieza bedrooms y beds (bedroom missing =1, bed missing =bedroom)
3. Normalize & Cast 'price'column: *$xxxx.xx ->  xxxx.xx* 
4. Fill NaNs in 'price'column with weighted average based on:
    - number of bedrooms (bedrooms
    - room type 
    - neighbourhood_group_cleansed 
5. Bin 'host' into categories based on the amound of units he has active
    - particular
    - tenedor
    - gran_tenedor
    - super_tenedor
6. Get correlation between prior categorization and 'host_is_superhost' column.
7. Bin 'price' into groups based on calculated intervals
8. Apply format convention in 'name' and 'host_name' columns 
    - lower case para unificar criterios, 
    - special character removal ['* / - &'...]

In [11]:
# Columns selected to bronze of listings's table
columns_listings_selected = [
    'id', 
    'name', 
    'description', 
    'host_id', 
    'host_name', 
    'host_since', 
    'host_about', 
    'host_response_time',
    'host_response_rate',
    'host_acceptance_rate',
    'host_is_superhost',
    'host_thumbnail_url',
    'host_picture_url',
    'host_neighbourhood',
    'host_listings_count',
    'host_total_listings_count',
    'host_verifications',
    'host_identity_verified',
    'neighbourhood',
    'neighbourhood_cleansed',
    'neighbourhood_group_cleansed',
    'latitude', 
    'longitude', 
    'property_type',
    'room_type', 
    'accommodates', 
    'bathrooms', 
    'bathrooms_text', 
    'bedrooms', 
    'beds', 
    'amenities', 
    'price', 
    'number_of_reviews', 
    'number_of_reviews_ltm', 
    'number_of_reviews_l30d', 
    'first_review', 
    'last_review', 
    'review_scores_rating', 
    'review_scores_accuracy', 
    'review_scores_cleanliness', 
    'review_scores_checkin',
    'review_scores_communication', 
    'review_scores_location',
    'review_scores_value', 
    'license', 
    'calculated_host_listings_count', 
    'calculated_host_listings_count_entire_homes',
    'calculated_host_listings_count_private_rooms',
    'calculated_host_listings_count_shared_rooms', 
    'minimum_nights', 
    'reviews_per_month',
    'mean_price', 
    'low_boundary_price',
    'high_boundary_price', 
    'price_category', 
    'host_category'  
]




In [12]:
window_price = W.partitionBy(["neighbourhood_group_cleansed", "room_type", "bedrooms"])
window_price_avg = W.partitionBy("neighbourhood_group_cleansed", "room_type")
window_host = W.partitionBy("host_id")
last_review_one_year_ago = F.date_sub(F.lit(df_listings.agg(F.max("last_review").alias("last_review")).collect()[0]["last_review"]), 365)




In [13]:
df_listings_silver = (
    df_listings
    # Convert the last_review column to DateType
    .withColumn("last_review", F.to_date(F.col("last_review"), "yyyy-MM-dd"))
    # Filter reviews placed more than a year ago
    .filter(F.col("last_review") >= last_review_one_year_ago)
    # Normalize & Cast 'price'column
    .withColumn("price", F.regexp_replace(F.col("price"), "[$,]", "").cast(T.FloatType()))
    # c
    .withColumn("price", 
                F.when(F.col("price").isNull(), 
                       F.mean("price").over(window_price_avg))
                        .otherwise(F.col("price"))
               )
    # Get Avg price for each group
    .withColumn("mean_price", F.avg("price").over(window_price_avg))
    # Get boundaries for price categories
    .withColumn("low_boundary_price", F.col("mean_price") / 2)
    .withColumn("high_boundary_price", F.col("mean_price") * 2)
    # Bin 'price' into groups based on intervals
    .withColumn(
        "price_category",
        F.when(F.col("price") < F.col("low_boundary_price"), "Budget")
        .when((F.col("price") >= F.col("low_boundary_price")) & (F.col("price") < F.col("mean_price")), "Moderate")
        .when((F.col("price") >= F.col("mean_price")) & (F.col("price") < F.col("high_boundary_price")), "Standard")
        .otherwise("Luxury")
    )
    # Get # listings by host
    .withColumn("host_listings_count", F.count("id").over(window_host))
    # Bin 'host' into categories based on the amound of units he has active
    .withColumn(
        "host_category",
        F.when(F.col("host_listings_count") < 10, "Particular")
        .when((F.col("host_listings_count") >= 10) & (F.col("host_listings_count") < 50), "Tenedor")
        .when((F.col("host_listings_count") >= 50) & (F.col("host_listings_count") < 100), "Gran_tenedor")
        .when((F.col("host_listings_count") >= 100), "Super_tenedor")
    )
    .withColumn("bedrooms", F.when(F.col("bedrooms").isNull(), 1).otherwise(F.col("bedrooms")))
    .withColumn("beds", F.when(F.col("beds").isNull(), F.col("bedrooms")).otherwise(F.col("beds")))
    .withColumn("host_name", F.lower(F.col("host_name")))
    .withColumn("name", F.lower(F.col("name")))
    .withColumn("name", F.regexp_replace(F.col("name"), "[^a-zA-Z0-9 ]", ""))
    .withColumn("host_name", F.regexp_replace(F.col("host_name"), "[^a-zA-Z0-9 ]", ""))
    .withColumn("name", F.regexp_replace(F.col("name"), "\s+", " "))
    .withColumn("host_name", F.regexp_replace(F.col("host_name"), "\s+", " "))
    .withColumn("name", F.trim(F.col("name")))
    .withColumn("host_name", F.trim(F.col("host_name")))
    .select(columns_listings_selected)
)




In [14]:
(
    df_listings_silver
    .write
    .format("parquet")
    .mode("overwrite")
    .save(listings_silver)
)




In [15]:
reviews_clean = (
    df_reviews
    .withColumn("reviewer_name", F.lower(F.col("reviewer_name")))
    .withColumn("reviewer_name", F.regexp_replace(F.col("reviewer_name"), "[^a-zA-Z0-9 ]", ""))
    .withColumn("reviewer_name", F.regexp_replace(F.col("reviewer_name"), "\s+", " "))
    .withColumn("reviewer_name", F.trim(F.col("reviewer_name")))
)




In [16]:
# Function return the sentiment about the text input
def get_sentiment(text):
    import textblob
    comment = TextBlob(text)
    if comment.sentiment.polarity < 0:
        sentiment = "BE" # Bad Experience
    elif comment.sentiment.polarity == 0:
        sentiment = "NE" # Neutral Experience
    else:
        sentiment = "GE" # Good Experience
    return sentiment


getSentiment = F.udf(get_sentiment, F.StringType(), )





In [17]:
# filtrar las reviews_clean que el listing_id no este en id silver_listings_active y calculo del sentimiento en los comentarios
reviews_clean_active = (
    reviews_clean
    .join(df_listings_silver, reviews_clean.listing_id == df_listings_silver.id, "inner")
    .drop(df_listings_silver.id)
    .select(reviews_clean.columns)
    .filter(F.col("date") >= F.lit("2023-03-22"))
    .withColumn("sentiment", getSentiment(F.col("comments")))
)




In [18]:
(
    reviews_clean_active
    .groupBy("sentiment")
    .count()
    .show()
)

+---------+------+
|sentiment| count|
+---------+------+
|       NE|192029|
|       GE|160809|
|       BE|  4689|
+---------+------+


In [19]:
(
    reviews_clean_active
    .write
    .format("parquet")
    .mode("overwrite")
    .save(reviews_silver)
)




# 4. Gold Layer
> Data in the Gold layer of the lakehouse is typically organized in consumption-ready "project-specific" databases. The Gold layer is for reporting and uses more de-normalized and read-optimized data models with fewer joins.

## 4.1 Aggregations by neighborhood and room type

In [20]:
df_neighbourhood_and_room_type_report_gold = (
    df_listings_silver
    .groupBy("neighbourhood_group_cleansed", "room_type")
    .agg(
        F.countDistinct("id").alias("listings_count"),
        F.avg("price").alias("avg_price"),
        F.avg("review_scores_value").alias("avg_review_scores_value"),
        F.sum("number_of_reviews").alias("total_reviews"),
    )
    .orderBy(F.asc("neighbourhood_group_cleansed"), F.desc("avg_review_scores_value"))
)




In [21]:
df_neighbourhood_and_room_type_report_gold.show()

+----------------------------+---------------+--------------+------------------+-----------------------+-------------+
|neighbourhood_group_cleansed|      room_type|listings_count|         avg_price|avg_review_scores_value|total_reviews|
+----------------------------+---------------+--------------+------------------+-----------------------+-------------+
|                  Arganzuela|   Private room|           263| 53.32894736842108|      4.663384030418251|      14140.0|
|                  Arganzuela|Entire home/apt|           590|120.80679785330943|      4.543475298126069|      45955.0|
|                  Arganzuela|    Shared room|            11|35.199999999999996|     4.2309090909090905|         85.0|
|                     Barajas|   Private room|            68|          46.84375|     4.7327941176470585|       8382.0|
|                     Barajas|    Shared room|             5|              42.4|                  4.672|        364.0|
|                     Barajas|Entire home/apt|  

In [43]:
(
    df_neighbourhood_and_room_type_report_gold
    .write
    .format("parquet")
    .mode("overwrite")
    .option("path", neighborhood_room_type_report_gold)
    .saveAsTable("neighborhood_room_type_report_gold_table")
)




![rules](./images/neighborhood_room_type_report_gold.png)

In [23]:
def plot_avg_price_by_neighbourhood_and_room_type(ddf, neighbourhood_col='neighbourhood_group_cleansed', avg_price_col='avg_price', room_type_col='room_type'):
    """
    Plots the average price by neighbourhood and room type using a Pandas DataFrame and Plotly.
    
    Parameters:
    df (DataFrame): Pandas DataFrame containing the average prices segmented by neighbourhood and room type
    neighbourhood_col (str): Name of the column containing neighbourhood information
    avg_price_col (str): Name of the column containing average price information
    room_type_col (str): Name of the column containing room type information
    """
    # Convert to Pandas DataFrame
    df = ddf.toPandas()
    # Plotting the bar chart using Plotly
    fig = px.bar(df, x=neighbourhood_col, y=avg_price_col, color=room_type_col, barmode='group', title='Average Price by Neighbourhood and Room Type')
    fig.update_layout(xaxis_title='Neighbourhood', yaxis_title='Average Price', xaxis={'categoryorder':'total descending'}, xaxis_tickangle=-45)
    fig.show()




## 4.2 Aggregations by neighborhood and host type

In [25]:
## Ratio listings by Host
# Aggregate by neighbourhood
neighbourhood_agg = (
    df_listings_silver
    .groupBy("neighbourhood_group_cleansed")
    .agg(
        F.countDistinct("host_name").alias("hosts_neighbourhood"),
        F.count("id").alias("listings_neighbourhood")
    )
)

# Aggregate by neighbourhood and host category
neighbourhood_host_agg = (
    df_listings_silver
    .groupBy("neighbourhood_group_cleansed", "host_category")
    .agg(
        F.countDistinct("host_name").alias("hosts_category"),
        F.count("id").alias("listings_category"),
        F.avg("price").alias("avg_price"),
        F.avg("review_scores_value").alias("avg_review_scores_value"),
        F.sum("number_of_reviews").alias("total_reviews"),
        F.countDistinct("id").alias("listings_count")
    )
)

# Join aggregates and calculate ratios
df_neighbourhood_and_host_type_report_gold = (
    neighbourhood_host_agg
    .join(neighbourhood_agg, on="neighbourhood_group_cleansed")
    .withColumn(
        "host_category_ratio",
        F.round(F.col("hosts_category") / F.col("hosts_neighbourhood") * 100, 2)
    )
    .withColumn(
        "listing_category_ratio",
        F.round(F.col("listings_category") / F.col("listings_neighbourhood") * 100, 2)
    )
    .orderBy(F.asc("neighbourhood_group_cleansed"), F.desc("avg_review_scores_value"))
    #Drop calculation columns
    .drop("listings_category", "listings_neighbourhood", "hosts_category", "hosts_neighbourhood")
)




In [26]:
df_neighbourhood_and_host_type_report_gold.show()

+----------------------------+-------------+------------------+-----------------------+-------------+--------------+-------------------+----------------------+
|neighbourhood_group_cleansed|host_category|         avg_price|avg_review_scores_value|total_reviews|listings_count|host_category_ratio|listing_category_ratio|
+----------------------------+-------------+------------------+-----------------------+-------------+--------------+-------------------+----------------------+
|                  Arganzuela|   Particular| 95.78008434477223|      4.622237237237236|      50489.0|           666|              90.57|                 77.08|
|                  Arganzuela|Super_tenedor|128.53667262969586|      4.579302325581396|       2155.0|            44|               1.08|                  5.09|
|                  Arganzuela|      Tenedor|101.00624045676531|      4.448444444444444|       5022.0|            92|              10.24|                 10.65|
|                  Arganzuela| Gran_tene

In [42]:
(
    df_neighbourhood_and_host_type_report_gold
    .write
    .format("parquet")
    .mode("overwrite")
    .option("path", neighborhood_host_type_report_gold)
    .saveAsTable("neighborhood_host_type_report_gold_table")
)




![rules](./images/neighborhoos_host_type_report.png)

## 4.3 Aggregations by host name

In [28]:
# Top ten Host with more listings
df_host_name_report_gold = (
    df_listings_silver
    .groupBy("host_name")
    .agg(
        F.countDistinct("id").alias("listings_count"),
        F.avg("price").alias("avg_price"),
        F.avg("review_scores_value").alias("avg_review_scores_value"),
        F.sum("number_of_reviews").alias("total_reviews"),
    )
    .orderBy(F.desc("listings_count"))
)




In [29]:
df_host_name_report_gold.show(10)

+------------+--------------+------------------+-----------------------+-------------+
|   host_name|listings_count|         avg_price|avg_review_scores_value|total_reviews|
+------------+--------------+------------------+-----------------------+-------------+
|   mit house|           282|223.65048737985506|      4.407340425531914|       8042.0|
|       jorge|           254|152.16343364895604|      4.456614173228345|      12686.0|
|fran y marta|           250| 159.1869410548177|      4.623092369477912|      10602.0|
|      javier|           224|123.33713336169068|       4.62330357142857|      16748.0|
|      daniel|           197|125.72309188868068|      4.646598984771574|      13970.0|
|      emilio|           173|220.07822772889628|      4.589479768786125|       7648.0|
|      carlos|           170| 120.4953069090886|      4.501058823529415|      11981.0|
|        raul|           168|109.31452161870335|     4.5821428571428555|      17813.0|
|       maria|           147|  95.894003555

In [41]:
(
    df_host_name_report_gold
    .write
    .format("parquet")
    .mode("overwrite")
    .option("path", host_name_report_gold)
    .saveAsTable("host_name_report_gold_table")
)




![rules](./images/dq_host_name_report_gold.png)

## 4.4 Aggregations by neighborhood and host type + avg income

In [31]:
listings_counts_reviews = (
    reviews_clean_active
    .groupBy("listing_id")
    .agg(
        F.count("id").alias("reviews_count")
    )
    .filter(F.col("listing_id").isNotNull())
)




In [32]:
dict_neighbourhood_price = {
    "Arganzuela":4755,
    "Barajas":3673,
    "Carabanchel":2676,
    "Centro":6205,
    "Chamartín":6214,
    "Chamberí":6630,
    "Ciudad Lineal":3645,
    "Fuencarral - El Pardo":4005,
    "Hortaleza":4334,
    "Latina":2746,
    "Moncloa - Aravaca":4837,
    "Moratalaz":3083,
    "Puente de Vallecas":2359,
    "Retiro":5410,
    "Salamanca":7671,
    "San Blas - Canillejas":2947,
    "Tetuán":4491,
    "Usera":2465,
    "Vicálvaro":2880,
    "Villa de Vallecas":2812,
    "Villaverde":2077    
}




In [33]:
# transform dictionary in dataframe pyspark
df_neighbourhood_price = (
    spark
    .createDataFrame(dict_neighbourhood_price.items(), ["neighbourhood_group", "avg_price_neighbourhood"])
)




In [34]:
# Avg price by neighbourhood and host type
df_benefits_report_gold = (
    df_listings_silver
    .join(listings_counts_reviews, listings_counts_reviews.listing_id == df_listings_silver.id, "inner")
    .select("neighbourhood_group_cleansed","host_category","id","price","review_scores_value","minimum_nights","bathrooms","bedrooms","beds","reviews_count")
    .groupBy("neighbourhood_group_cleansed", "host_category")
    .agg(
        F.countDistinct("id").alias("listings_count"),
        F.avg("price").cast(T.DecimalType(20,2)).alias("avg_price"),
        F.avg("review_scores_value").cast(T.DecimalType(20,2)).alias("avg_review_scores_value"),
        F.avg("minimum_nights").cast(T.DecimalType(20,2)).alias("avg_minimum_nights"),
        F.avg("bathrooms").cast(T.DecimalType(20,2)).alias("avg_bathrooms"),
        F.avg("bedrooms").cast(T.DecimalType(20,2)).alias("avg_bedrooms"),
        F.avg("beds").cast(T.DecimalType(20,2)).alias("avg_beds"),
        F.avg("reviews_count").cast(T.DecimalType(20,2)).alias("avg_reviews_count")
    )
    .withColumn("avg_benefits_last_year", (F.col("avg_price") * F.col("avg_reviews_count") * F.col("avg_minimum_nights")).cast(T.DecimalType(20,2)))
    .join(df_neighbourhood_price, df_listings_silver.neighbourhood_group_cleansed == df_neighbourhood_price.neighbourhood_group, "inner")
    .select('neighbourhood_group_cleansed', 'host_category', 'listings_count', 'avg_price', 'avg_reviews_count', 'avg_benefits_last_year',"avg_price_neighbourhood")
    .withColumn("price_ratio_benefits", F.round(F.col("avg_benefits_last_year") / F.col("avg_price_neighbourhood") * 100, 2))
    .orderBy(F.asc("neighbourhood_group_cleansed"), F.desc("avg_review_scores_value"))
)




In [35]:
df_benefits_report_gold.orderBy(F.desc("price_ratio_benefits")).show(100, False)

+----------------------------+-------------+--------------+---------+-----------------+----------------------+-----------------------+--------------------+
|neighbourhood_group_cleansed|host_category|listings_count|avg_price|avg_reviews_count|avg_benefits_last_year|avg_price_neighbourhood|price_ratio_benefits|
+----------------------------+-------------+--------------+---------+-----------------+----------------------+-----------------------+--------------------+
|Retiro                      |Gran_tenedor |39            |162.35   |15.23            |31204.09              |5410                   |576.79              |
|Moncloa - Aravaca           |Gran_tenedor |20            |283.70   |7.65             |19098.68              |4837                   |394.85              |
|Tetuán                      |Gran_tenedor |31            |98.71    |10.06            |14954.92              |4491                   |333.00              |
|Arganzuela                  |Gran_tenedor |62            |112.1

In [37]:
(
    df_benefits_report_gold
    .write
    .format("parquet")
    .mode("overwrite")
    .option("path", benefits_report_gold)
    .saveAsTable("benefits_report_gold_table")
)




![rules](./images/dq_benefits_report_gold.png)