In [27]:
import timeit
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

In [28]:
start_time = timeit.default_timer()

spark = SparkSession.builder \
    .appName("IcebergLocalDevelopment") \
    .master("local[*]") \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()


end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)

Elapsed time :  0.010302204999788955


25/01/24 11:49:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [29]:
start_time = timeit.default_timer()

import os, json
df1=spark.read.option("multiline","true").json("amenities_sample_output.json")
df2=spark.read.option("multiline","true").json("amenity_category.json")
df3=spark.read.json("expedia-lodging-amenities-en_us-1-all.jsonl")

end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)



Elapsed time :  16.594971634999638


                                                                                

In [92]:
df1.show(truncate=False)

+-------------------+---------------+--------------------------+----------------+
|amenities          |amenities_count|amenity_categories        |themes          |
+-------------------+---------------+--------------------------+----------------+
|[list of amenities]|45             |[list of amenity category]|[list of themes]|
+-------------------+---------------+--------------------------+----------------+



In [31]:
from pyspark.sql.functions import col, concat_ws, lower, split, size

# Flatten propertyAmenities and roomAmenities
flattened_df = df3.select(
    col("propertyId.expedia").alias("expedia_id"),
    col("popularAmenities").alias("themes"),
    concat_ws(", ", *[
        col(f"propertyAmenities.{col_name}") for col_name in df3.schema["propertyAmenities"].dataType.fieldNames()
    ]).alias("property_amenities"),
    concat_ws(", ", *[
        col(f"roomAmenities.{col_name}") for col_name in df3.schema["roomAmenities"].dataType.fieldNames()
    ]).alias("room_amenities")
)

# Combine both amenities into one field
combined_df = flattened_df.select(
    col("expedia_id"),
    col("themes"),
    lower(concat_ws(", ", col("property_amenities"), col("room_amenities"))).alias("combined_amenities")
)

# Add amenities_count column by splitting combined_amenities into an array and calculating its size
# Instead of recreating combined_amenities, use the existing column
result_df = combined_df.withColumn(
    "combined_amenities",
    split(col("combined_amenities"), ",\\s*")  # Split the existing combined_amenities column
)

# Step 2: Create the 'amenities_count' column based on the size of the 'combined_amenities' list
result_df = result_df.withColumn(
    "amenities_count",
    size(col("combined_amenities"))
)

# Show the result
result_df.show(truncate=False)

+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [32]:
# Replace 33554978 with the specific expedia_id you want to view
specific_expedia_id = 33554978

# Filter the DataFrame
specific_sample = result_df.filter(result_df.expedia_id == specific_expedia_id)

# Show the result
specific_sample.show(truncate=False)



+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [33]:
result_df.printSchema()

root
 |-- expedia_id: string (nullable = true)
 |-- themes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- combined_amenities: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- amenities_count: integer (nullable = false)



In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lower, trim, array, collect_list
import json

# Load the amenity_category.json file
with open("amenity_category.json", "r") as file:
    amenity_categories = json.load(file)

# Broadcast the amenity category mapping
broadcast_amenity_categories = spark.sparkContext.broadcast(amenity_categories)

# Function to map amenities to categories
def map_amenity_to_category(amenity):
    normalized_amenity = amenity.lower().replace(" ", "_")
    category_map = broadcast_amenity_categories.value
    if normalized_amenity in category_map:
        return category_map[normalized_amenity] if isinstance(category_map[normalized_amenity], list) else [category_map[normalized_amenity]]
    return ["Uncategorized"]

# Register the function as a UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

map_amenity_udf = udf(map_amenity_to_category, ArrayType(StringType()))

# Explode combined_amenities and map them to categories
mapped_df = result_df.withColumn("amenity", explode(col("combined_amenities"))) \
    .withColumn("categories", map_amenity_udf(col("amenity"))) \
    .withColumn("categories", explode(col("categories")))

# Final DataFrame for visualization
final_df = mapped_df.select("expedia_id", "amenity", "categories")

# Show the results
final_df.show(truncate=False)

[Stage 39:>                                                         (0 + 1) / 1]

+----------+-------------------+-------------------+
|expedia_id|amenity            |categories         |
+----------+-------------------+-------------------+
|33554978  |hair dryer         |Wellness Facilities|
|33554978  |shampoo            |Uncategorized      |
|33554978  |soap               |Uncategorized      |
|33554978  |toilet paper       |Uncategorized      |
|33554978  |towels provided    |Bedding/linens     |
|33554978  |beach sun loungers |Ocean View         |
|33554978  |beach sun loungers |View               |
|33554978  |beach towels       |Bedding/linens     |
|33554978  |beach towels       |View               |
|33554978  |beach towels       |Ocean View         |
|33554978  |near the beach     |View               |
|33554978  |near the beach     |Ocean View         |
|33554978  |near the beach     |Oceanfront         |
|33554978  |bed sheets provided|Uncategorized      |
|33554978  |air conditioning   |Air Conditioner    |
|33554978  |dining table       |Uncategorized 

                                                                                

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, collect_list, struct, array_distinct, flatten
import json


# Load the data into a PySpark DataFrame
data = [
    (33554978, ["AC", "BARBECUE", "DRYER", "HOT_TUB", "KITCHEN", "MICROWAVE", "OUTDOOR_SPACE", "POOL", "TENNIS_COURT", "WASHER"],
     ["hair dryer", "shampoo", "soap", "toilet paper", "towels provided", "beach sun loungers", "beach towels", "near the beach", "bed sheets provided", "air conditioning", "dining table", "books", "dvd player", "music library", "stereo", "video library", "tv with cable/satellite service", "wifi available", "blender", "dishwasher", "ice maker", "microwave", "oven", "paper towels", "refrigerator", "stovetop", "toaster", "washing machine and dryer", "near the sea", "barbecue grill", "deck or patio", "garden", "car required", "no pets allowed", "fence around the pool", "outdoor pool", "private pool", "spa tub", "housekeeping (on request)", "iron/ironing board", "phone", "if you have requests for specific accessibility needs, please contact the property using the information on the reservation confirmation received after booking. ", "smoke-free property", "wheelchair accessible", "birdwatching nearby", "cycling nearby", "golf nearby", "hiking nearby", "kayaking nearby", "mountain biking nearby", "scuba diving nearby", "swimming nearby", "tennis on site", "whale watching nearby"], 56),
    (33554974, ["BARBECUE", "KITCHEN", "MICROWAVE"],
     ["hair dryer", "shampoo", "toilet paper", "towels provided", "bed sheets provided", "heating", "tv", "wifi available", "coffee/tea maker", "cookware/dishes/utensils", "electric kettle", "microwave", "oven", "refrigerator", "toaster", "barbecue grill", "car not required", "no pets allowed", "fire extinguisher", "smoke detector", "iron/ironing board", "smoke-free property"], 23)
]
schema = ["expedia_id", "themes", "combined_amenities", "amenities_count"]

result_df = spark.createDataFrame(data, schema)

# Load the amenity_category.json file
with open("amenity_category.json", "r") as file:
    amenity_categories = json.load(file)

# Broadcast the amenity category mapping
broadcast_amenity_categories = spark.sparkContext.broadcast(amenity_categories)

# Function to map amenities to categories
def map_amenity_to_category(amenity):
    normalized_amenity = amenity.lower().replace(" ", "_")
    category_map = broadcast_amenity_categories.value
    if normalized_amenity in category_map:
        return category_map[normalized_amenity] if isinstance(category_map[normalized_amenity], list) else [category_map[normalized_amenity]]
    #return ["Uncategorized"]

# Register the function as a UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

map_amenity_udf = udf(map_amenity_to_category, ArrayType(StringType()))

# Explode combined_amenities and map them to categories
mapped_df = result_df.withColumn("amenity", explode(col("combined_amenities"))) \
    .withColumn("categories", map_amenity_udf(col("amenity")))

# Flatten and group back the results by expedia_id
grouped_df = mapped_df.groupBy("expedia_id") \
    .agg(
        collect_list("amenity").alias("mapped_amenities"),
        flatten(collect_list("categories")).alias("mapped_categories")
    )

# Deduplicate the categories for each amenity
grouped_df = grouped_df.withColumn("mapped_categories", array_distinct(col("mapped_categories")))

# Show the final result
grouped_df.show(truncate=False)




+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------

                                                                                

In [36]:
# Replace 33554978 with the specific expedia_id you want to view
specific_expedia_id = 33554978

# Filter the DataFrame
specific_sample = result_df.filter(result_df.expedia_id == specific_expedia_id)

# Show the result
specific_sample.show(truncate=False)



+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [73]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, collect_list, array_distinct, lower, regexp_replace, broadcast
import json

# Load the data into a PySpark DataFrame
data = [
    (33554978, ["AC", "BARBECUE", "DRYER", "HOT_TUB", "KITCHEN", "MICROWAVE", "OUTDOOR_SPACE", "POOL", "TENNIS_COURT", "WASHER"],
     ["hair dryer", "shampoo", "soap", "toilet paper", "towels provided", "beach sun loungers", "beach towels", "near the beach", "bed sheets provided", "air conditioning", "dining table", "books", "dvd player", "music library", "stereo", "video library", "tv with cable/satellite service", "wifi available", "blender", "dishwasher", "ice maker", "microwave", "oven", "paper towels", "refrigerator", "stovetop", "toaster", "washing machine and dryer", "near the sea", "barbecue grill", "deck or patio", "garden", "car required", "no pets allowed", "fence around the pool", "outdoor pool", "private pool", "spa tub", "housekeeping (on request)", "iron/ironing board", "phone", "if you have requests for specific accessibility needs, please contact the property using the information on the reservation confirmation received after booking. ", "smoke-free property", "wheelchair accessible", "birdwatching nearby", "cycling nearby", "golf nearby", "hiking nearby", "kayaking nearby", "mountain biking nearby", "scuba diving nearby", "swimming nearby", "tennis on site", "whale watching nearby"], 56),
    (33554974, ["BARBECUE", "KITCHEN", "MICROWAVE"],
     ["hair dryer", "shampoo", "toilet paper", "towels provided", "bed sheets provided", "heating", "tv", "wifi available", "coffee/tea maker", "cookware/dishes/utensils", "electric kettle", "microwave", "oven", "refrigerator", "toaster", "barbecue grill", "car not required", "no pets allowed", "fire extinguisher", "smoke detector", "iron/ironing board", "smoke-free property"], 23)
]
schema = ["expedia_id", "themes", "combined_amenities", "amenities_count"]

result_df = spark.createDataFrame(data, schema)

In [74]:
# Load the amenity_category.json file
with open("amenity_category.json", "r") as file:
    amenity_categories = json.load(file)

# Convert the amenity category dictionary into a DataFrame
# Flatten the categories if they are lists
amenity_category_data = []

In [75]:
for key, value in amenity_categories.items():
    if isinstance(value, list):
        for category in value:
            amenity_category_data.append((key, category))
    else:
        amenity_category_data.append((key, value))

In [77]:
amenity_category_data

[('1_game_drive_per_night', 'Entertainment'),
 ('24_hour_business_center', 'Business Services'),
 ('24_hour_fitness_facilities', 'Wellness Facilities'),
 ('24_hour_front_desk', 'Guest Services'),
 ('24_hour_health_club', 'Wellness Facilities'),
 ('24_hour_pool_access', 'Pool'),
 ('2_for_1_buffet', 'Restaurant'),
 ('2_game_drives_per_night', 'Entertainment'),
 ('300_thread_count_linen', 'Bedding/linens'),
 ('400_count_egyptian_100%_cotton_sheets', 'Bedding/linens'),
 ('a/c_or_climate_control', 'Air Conditioner'),
 ('above_ground_pool', 'Pool'),
 ('ac', 'Air Conditioner'),
 ('access', 'Wheelchair Accessible'),
 ('access', 'Accessibility'),
 ('access_to_nearby_health_club', 'Wellness Facilities'),
 ('access_to_nearby_indoor_pool', 'Pool'),
 ('access_to_nearby_outdoor_pool', 'Pool'),
 ('access_via_exterior_corridors', 'Accessibility'),
 ('accessible', 'Wheelchair Accessible'),
 ('accessible', 'Accessibility'),
 ('accessible_bathtub', 'Accessibility'),
 ('accessible_bathtub', 'Wheelchair Ac

In [78]:
# Now create the DataFrame with flattened categories
amenity_category_df = spark.createDataFrame(amenity_category_data, ["amenity", "category"])


In [79]:
amenity_category_df.show(truncate=False)

+-------------------------------------+---------------------+
|amenity                              |category             |
+-------------------------------------+---------------------+
|1_game_drive_per_night               |Entertainment        |
|24_hour_business_center              |Business Services    |
|24_hour_fitness_facilities           |Wellness Facilities  |
|24_hour_front_desk                   |Guest Services       |
|24_hour_health_club                  |Wellness Facilities  |
|24_hour_pool_access                  |Pool                 |
|2_for_1_buffet                       |Restaurant           |
|2_game_drives_per_night              |Entertainment        |
|300_thread_count_linen               |Bedding/linens       |
|400_count_egyptian_100%_cotton_sheets|Bedding/linens       |
|a/c_or_climate_control               |Air Conditioner      |
|above_ground_pool                    |Pool                 |
|ac                                   |Air Conditioner      |
|access 

In [80]:
result_df.show(truncate=False)

                                                                                

+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [82]:
from pyspark.sql import functions as F

# Exploding the combined_amenities column into individual rows
result_df_exploded = result_df.withColumn("amenity", F.explode("combined_amenities"))

In [84]:
result_df_exploded.show(5,truncate=False)

+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [85]:
# Joining the exploded result_df with the amenity_category_df to get the category
result_df_mapped = result_df_exploded.join(
    amenity_category_df, 
    result_df_exploded["amenity"] == amenity_category_df["amenity"], 
    "left"
)

In [87]:
result_df_mapped.show(5,truncate=False)



+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [86]:
result_df_mapped.show(5,truncate=False)



+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [93]:
# Grouping by expedia_id and aggregating the categories into a list
result_df_final = result_df_mapped.groupBy("expedia_id", "themes","combined_amenities", "amenities_count") \
                                  .agg(F.collect_list("category").alias("categories"))
result_df_final = result_df_final.withColumn(
    "categories", F.array_distinct(F.col("categories"))
)
# Show the result
result_df_final.show(truncate=False)

                                                                                

+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [49]:
from pyspark.sql.functions import col, explode, lower, regexp_replace, transform

# Normalize amenity names in the combined_amenities array (lowercase and space replaced with underscores)
result_df = result_df.withColumn("normalized_amenities", 
                                 transform(col("combined_amenities"), 
                                           lambda x: regexp_replace(lower(x), " ", "_")))

# Explode the combined_amenities array to have one amenity per row
exploded_df = result_df.withColumn("amenity", explode(col("normalized_amenities")))

# Show the result to ensure everything is correct
exploded_df.show(truncate=False)




+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [56]:
# Normalize the exploded amenities
normalized_amenities_df = exploded_df.withColumn("normalized_amenity", regexp_replace(lower(col("amenity")), " ", "_"))

# Broadcast the amenity_category_df
broadcasted_category_df = broadcast(amenity_category_df)

# Join the exploded amenities with the category mapping DataFrame
mapped_df = normalized_amenities_df.join(broadcasted_category_df, 
                                         normalized_amenities_df["normalized_amenity"] == amenity_category_df["amenity"], 
                                         "left_outer")

In [67]:
mapped_df.show(truncate=False)

                                                                                

+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [68]:
# Group by expedia_id and aggregate amenities and their categories
grouped_df = mapped_df.groupBy("expedia_id") \
    .agg(
        collect_list("amenity").alias("mapped_amenities"),
        collect_list("category").alias("mapped_categories")
    )

# Deduplicate the categories for each amenity
grouped_df = grouped_df.withColumn("mapped_categories", array_distinct(col("mapped_categories")))

# Show the final result
grouped_df.show(truncate=False)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `amenity` cannot be resolved. Did you mean one of the following? [`themes`, `expedia_id`, `amenities_count`, `normalized_amenity`, `combined_amenities`].;
'Aggregate [expedia_id#5805L], [expedia_id#5805L, collect_list('amenity, 0, 0) AS mapped_amenities#6199, collect_list('category, 0, 0) AS mapped_categories#6201]
+- Project [expedia_id#5805L, themes#5806, combined_amenities#5807, amenities_count#5808L, normalized_amenities#5844, normalized_amenity#5978]
   +- Project [expedia_id#5805L, themes#5806, combined_amenities#5807, amenities_count#5808L, normalized_amenities#5844, normalized_amenity#5978, category#5814]
      +- Join LeftOuter, (normalized_amenity#5978 = amenity#5813)
         :- Project [expedia_id#5805L, themes#5806, combined_amenities#5807, amenities_count#5808L, normalized_amenities#5844, amenity#5852, regexp_replace(lower(amenity#5852),  , _, 1) AS normalized_amenity#5978]
         :  +- Project [expedia_id#5805L, themes#5806, combined_amenities#5807, amenities_count#5808L, normalized_amenities#5844, amenity#5852]
         :     +- Generate explode(normalized_amenities#5844), false, [amenity#5852]
         :        +- Project [expedia_id#5805L, themes#5806, combined_amenities#5807, amenities_count#5808L, transform(combined_amenities#5807, lambdafunction(regexp_replace(lower(lambda x_0#5845),  , _, 1), lambda x_0#5845, false)) AS normalized_amenities#5844]
         :           +- LogicalRDD [expedia_id#5805L, themes#5806, combined_amenities#5807, amenities_count#5808L], false
         +- ResolvedHint (strategy=broadcast)
            +- LogicalRDD [amenity#5813, category#5814], false


In [69]:
mapped_df.printSchema()


root
 |-- expedia_id: long (nullable = true)
 |-- themes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- combined_amenities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- amenities_count: long (nullable = true)
 |-- normalized_amenities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- normalized_amenity: string (nullable = true)



In [71]:
mapped_df.show(5, truncate=False)




+----------+---------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [72]:
# Drop any potential duplicate columns (if any)
mapped_df = mapped_df.drop("normalized_amenity")

# Group by expedia_id and aggregate amenities and their categories
grouped_df = mapped_df.groupBy("expedia_id") \
    .agg(
        collect_list("combined_amenities").alias("mapped_amenities"),
        collect_list("themes").alias("mapped_categories")  # If 'themes' is related to categories
    )

# Deduplicate the categories for each amenity
grouped_df = grouped_df.withColumn("mapped_categories", array_distinct(col("mapped_categories")))

# Show the final result
grouped_df.show(truncate=False)




+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                