In [0]:
# Створюємо схему 'validated' в каталозі 'airbnb'
spark.sql("CREATE SCHEMA IF NOT EXISTS airbnb.validated")


In [0]:
import pyspark.sql.functions as f

df_airbnb = spark.read.table("airbnb.raw.listings")

columns_to_keep = [
    'host_id','host_since','host_is_superhost', 'host_response_time', 
    'host_response_rate', 'host_acceptance_rate', 'host_neighbourhood',
    'host_listings_count', 'host_identity_verified',
    'latitude','longitude','property_type','room_type','accommodates',
    'bathrooms','bathrooms_text','bedrooms','beds','amenities','price',
    'minimum_nights','maximum_nights', 'number_of_reviews',
    'first_review', 'last_review', 'review_scores_rating',
    'review_scores_cleanliness', 'review_scores_checkin', 
    'review_scores_communication', 'review_scores_location',
    'review_scores_value', 'license','instant_bookable','reviews_per_month'
]

df_subset = df_airbnb.select(columns_to_keep)


### Базове очищення


In [0]:
df_cleaned = df_subset.withColumn(
    "price_clean",
    f.regexp_replace("price", "[$,]", "").cast("double")
).withColumn(
    "host_response_rate_clean",
    f.regexp_replace("host_response_rate", "%", "").cast("double")
).withColumn(
    "host_acceptance_rate_clean",
    f.regexp_replace("host_acceptance_rate", "%", "").cast("double")
)

df_cleaned = df_cleaned.dropna(subset=["latitude", "longitude", "property_type"])

df_cleaned = df_cleaned.fillna({
    "bathrooms": 0,
    "bedrooms": 0,
    "beds": 0,
    "reviews_per_month": 0
})





In [0]:
# Видаляємо стару таблицю
spark.sql("DROP TABLE IF EXISTS airbnb.validated.listings_cleaned")


In [0]:
# Булеві значення
df_cleaned = df_cleaned.withColumn(
    "host_is_superhost_bool",
    f.col("host_is_superhost") == "t"
).withColumn(
    "host_identity_verified_bool",
    f.col("host_identity_verified") == "t"
).withColumn(
    "instant_bookable_bool",
    f.col("instant_bookable") == "t"
)

# Дати
df_cleaned = df_cleaned.withColumn("host_since", f.to_date("host_since")) \
    .withColumn("first_review", f.to_date("first_review")) \
    .withColumn("last_review", f.to_date("last_review"))

# Вік оголошення
df_cleaned = df_cleaned.withColumn(
    "listing_age_days",
    f.datediff(f.current_date(), f.col("host_since"))
)

# Витяг кількості ванних кімнат з тексту
df_cleaned = df_cleaned.withColumn(
    "bathrooms_extracted",
    f.regexp_extract("bathrooms_text", r"([0-9]+\.?[0-9]*)", 1).cast("double")
)



Заповнення

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS airbnb.validated")

df_cleaned.write.mode("overwrite").saveAsTable("airbnb.validated.listings_cleaned")



In [0]:
spark.read.table("airbnb.validated.listings_cleaned").printSchema()
display(spark.read.table("airbnb.validated.listings_cleaned"))


In [0]:
spark.read.table("airbnb.validated.listings_cleaned").printSchema()


In [0]:
display(spark.read.table("airbnb.validated.listings_cleaned"))


Враховуючи, що мета, створити перевірений набір, а не повністю очищений

- Вибрано релевантні колонки з сирої таблиці airbnb.raw.listings.
- Проведено базову перевірку якості даних: очищено price, host_response_rate, host_acceptance_rate від символів;
- Перетворено деякі текстові поля на числові та булеві;
- Витягнуто кількість ванних кімнат з bathrooms_text;
- Обчислено вік лістингу (listing_age_days);
- Перетворено дати на тип date;
- Видалено рядки з критичними відсутніми значеннями (latitude, longitude, property_type).
- Збережено результат у таблицю airbnb.validated.listings_cleaned


# Test

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS airbnb.validated")

df_cleaned.write.mode("overwrite").saveAsTable("airbnb.validated.listings_cleaned_1")


In [0]:
display(spark.read.table("airbnb.validated.listings_cleaned_1"))


In [0]:
import pyspark.sql.functions as f

df_airbnb = spark.read.table("airbnb.raw.listings")

columns_to_keep = [
    'host_id','host_since','host_is_superhost', 'host_response_time', 
    'host_response_rate', 'host_acceptance_rate', 'host_neighbourhood',
    'host_listings_count', 'host_identity_verified',
    'latitude','longitude','property_type','room_type','accommodates',
    'bathrooms','bathrooms_text','bedrooms','beds','amenities','price',
    'minimum_nights','maximum_nights', 'number_of_reviews',
    'first_review', 'last_review', 'review_scores_rating',
    'review_scores_cleanliness', 'review_scores_checkin', 
    'review_scores_communication', 'review_scores_location',
    'review_scores_value', 'license','instant_bookable','reviews_per_month'
]

df_subset = df_airbnb.select(columns_to_keep)

df_cleaned = df_subset \
    .withColumn("price_clean", f.expr("try_cast(regexp_replace(price, '[$,]', '') AS double)")) \
    .withColumn("host_response_rate_clean", f.expr("try_cast(regexp_replace(host_response_rate, '%', '') AS double)")) \
    .withColumn("host_acceptance_rate_clean", f.expr("try_cast(regexp_replace(host_acceptance_rate, '%', '') AS double)")) \
    .withColumn("host_is_superhost_bool", f.col("host_is_superhost") == "t") \
    .withColumn("host_identity_verified_bool", f.col("host_identity_verified") == "t") \
    .withColumn("instant_bookable_bool", f.col("instant_bookable") == "t") \
    .withColumn("host_since", f.to_date("host_since")) \
    .withColumn("first_review", f.to_date("first_review")) \
    .withColumn("last_review", f.to_date("last_review")) \
    .withColumn("listing_age_days", f.datediff(f.current_date(), f.col("host_since"))) \
    .withColumn("bathrooms_extracted", f.expr("try_cast(regexp_extract(bathrooms_text, '([0-9]+\\.?[0-9]*)', 1) AS double)")) \
    .dropna(subset=["latitude", "longitude", "property_type"])

spark.sql("CREATE SCHEMA IF NOT EXISTS airbnb.validated")

df_cleaned.write.mode("overwrite").saveAsTable("airbnb.validated.listings_cleaned")


In [0]:
display(spark.read.table("airbnb.validated.listings_cleaned"))
