In [24]:
from pyspark.sql import SparkSession

In [25]:
spark = SparkSession.builder.appName("airbnbSparkonHDFS").getOrCreate()

In [26]:
hdfs_path = "hdfs://localhost:9000/user/hadoop/"
file_path = hdfs_path + "airbnb.csv"
df_air = spark.read.csv(path=file_path, header=True, inferSchema=True)
df_air = df_air.fillna({"overall_satisfaction": 3.0})

In [27]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf, avg, col, expr, count, collect_list, array_distinct, concat_ws
from pyspark.sql.types import StringType


In [28]:
bucketizer = Bucketizer(splits=[0, 100, 200, 300, 400, 500, 1000, 5000], inputCol ='price', outputCol='buckets')

In [29]:
df_buck = bucketizer.setHandleInvalid("keep").transform(df_air)

In [30]:
grouped_df = df_buck.groupBy("buckets").agg(
    avg("accommodates").alias("accommodates_average"),
    expr("percentile_approx(accommodates, 0.5)").alias("accommodates_median"),
    avg("bedrooms").alias("bedrooms_average"),
    expr("percentile_approx(bedrooms, 0.5)").alias("bedrooms_median"),
    avg("reviews").alias("reviews_average"),
    expr("percentile_approx(reviews, 0.5)").alias("reviews_median"),
    array_distinct(collect_list("neighborhood")).alias("neighbor_list"),
    count("*").alias("length")
).orderBy("buckets")
grouped_df = grouped_df.withColumn("neighbor_list", concat_ws(",", col("neighbor_list")))

In [31]:
scope = {0.0: '0-100', 1.0:'100-200', 2.0:'200-300', 3.0:'300-400', 4.0:'400-500', 5.0:'500-1000', 6.0:'1000-5000'}
udf_scope = udf(lambda x: scope[x], StringType())
grouped_df = grouped_df.withColumn("price_range", udf_scope("buckets"))
grouped_df = grouped_df.select('price_range', 'accommodates_average', 'accommodates_median', 'bedrooms_average', 
                               'bedrooms_median', 'reviews_average', 'reviews_median', 'neighbor_list')

In [32]:
grouped_df.show()

+-----------+--------------------+-------------------+------------------+---------------+-------------------+--------------+--------------------+
|price_range|accommodates_average|accommodates_median|  bedrooms_average|bedrooms_median|    reviews_average|reviews_median|       neighbor_list|
+-----------+--------------------+-------------------+------------------+---------------+-------------------+--------------+--------------------+
|      0-100|   1.974025974025974|                  2|0.9897959183673469|            1.0| 21.000927643784788|             5|Allston,Dorcheste...|
|    100-200|  2.9757869249394675|                  2|1.1011326860841424|            1.0| 17.102502017756255|             5|Jamaica Plain,Bea...|
|    200-300|  3.8719298245614033|                  4| 1.536412078152753|            1.0| 13.719298245614034|             4|Back Bay,Jamaica ...|
|    300-400|   4.812182741116751|                  5|  2.00507614213198|            2.0|  9.309644670050762|             4|

In [33]:
grouped_df.write.csv(hdfs_path + 'sorted_ranged_price.csv', header= True)