In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f 
from pyspark.sql.window import Window
from pyspark.sql.types import StringType,StructField,StructType,IntegerType,DoubleType

In [2]:
# Initialize a Spark session
spark = SparkSession \
    .builder \
    .appName("Day_6") \
    .config("spark.jars", "/usr/lib/jvm/java-11-openjdk-amd64/lib/postgresql-42.6.0.jar") \
    .getOrCreate()

23/09/12 10:57:31 WARN Utils: Your hostname, ubuntu-Lenovo-Legion-5-15ARH05 resolves to a loopback address: 127.0.1.1; using 172.16.5.112 instead (on interface wlp4s0)
23/09/12 10:57:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/12 10:57:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Read tables from postgres  to df
listings_df = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/spark_project") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "listing_table") \
    .option("user", "postgres").option("password", "postgres").load()

calendar_df = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/spark_project") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "calendar_table") \
    .option("user", "postgres").option("password", "postgres").load()

reviews_df = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/spark_project") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "reviews_table") \
    .option("user", "postgres").option("password", "postgres").load()

# listing_df_raw.write.jdbc(url=jdbc_url, table='listing_table', mode="append", properties=properties)
# calendar_df_raw.write.jdbc(url=jdbc_url, table='calendar_table', mode="append", properties=properties)
# reviews_df_raw.write.jdbc(url=jdbc_url, table='reviews_table', mode="append", properties=properties)

**Q1) Property Price Categories and Value for Money:**

Objective: Divide properties into cheap, mid, and luxury categories based on prices, analyze total bedrooms and bathrooms, and find value-for-money properties along with its sentiment analysis ratings.


In [6]:
from textblob import TextBlob

def analyze_sentiment(comment):
    if comment is None or not isinstance(comment, str):
        # Handle cases where 'comment' is None or not a string
        return "unknown"
    
    analysis = TextBlob(comment)
    # Classify sentiment as positive, neutral, or negative based on polarity
    if analysis.sentiment.polarity > 0:
        return "positive"
    elif analysis.sentiment.polarity == 0:
        return "neutral"
    else:
        return "negative"

# Register the UDF
sentiment_analysis_udf = f.udf(analyze_sentiment, StringType())

reviews_sentiment = reviews_df.withColumn("sentiment", sentiment_analysis_udf(f.col("comments")))

# reviews_sentiment.show(50, truncate=False)
sentiment_df=reviews_sentiment.select("listing_id","id","sentiment")
sentiment_df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+----------+--------+---------+
|listing_id|      id|sentiment|
+----------+--------+---------+
|   9896713|88911455| positive|
|   9896713|89088569| positive|
|   9896713|89654618| positive|
|   9896713|89887887| positive|
|   9896713|91269769| positive|
|   9896713|91839722| positive|
|   9896713|92492337|  neutral|
|   9896713|93045940| positive|
|   9896713|93966900| positive|
|   9896713|95209788| positive|
|   9896713|95481263| positive|
|   9896713|95654848| positive|
|   9896713|96811925| positive|
|   9896713|97851184| positive|
|   9896713|98411027| positive|
|   9896713|98871421| positive|
|   9896713|99171233| positive|
|   9896713|99673240| positive|
|   2701124|18828032| positive|
|   2701124|21222340| positive|
+----------+--------+---------+
only showing top 20 rows



                                                                                

In [21]:
#calculating quartiles for price column. This results in 3 quartiles Q1,Q2 and Q3.
quartiles = listings_df.stat.approxQuantile("price", [0.25, 0.5, 0.75], 0.01)
from pyspark.sql.functions import when
# quantiles

#creating UDF for categorizing listings into Cheap, Mid-range and Luxury according to price and taking quartiles as limit.

# def categorize_price(price):
#     if price <= quartiles[0]: #first quartile
#         return "cheap"
#     elif price <= quartiles[2]: #second quartile
#         return "mid-range"
#     else:
#         return "luxury"

#Registering UDF to spark
# categorize_price_udf = f.udf(categorize_price, StringType())

# Create a new column 'price_category' based on the categorization
# listings_df_1 = listings_df.withColumn("price_category", categorize_price_udf(f.col("price")))
# listings_df_1.show()

listings_df_1 = listings_df.withColumn(
    "price_category",
    when(f.col("price") <= quartiles[0], "cheap")
    .when((f.col("price") > quartiles[0]) & (f.col("price") <= quartiles[1]), "mid-range")
    .otherwise("luxury")
)


category_stats = listings_df_1.select("id","name","bedrooms", "bathrooms", "price","price_category","host_name","number_of_reviews")


# Calculate value for money as bedrooms + bathrooms per dollar spent
category_stats = category_stats.withColumn(
    "value_for_money",
    (f.col("bedrooms") + f.col("bathrooms")) / f.col("price")
)

window_spec=Window.partitionBy(f.col("price_category")).orderBy(f.col("value_for_money").desc())
most_value_for_money = category_stats.withColumn("rank", f.row_number().over(window_spec)).filter(f.col("rank") <=5 ).select("*")
# most_value_for_money.show()



most_value_for_money = most_value_for_money.withColumnRenamed("id", "listing_id")

listings_with_sentiment = most_value_for_money.join(sentiment_df, most_value_for_money["listing_id"] == sentiment_df["listing_id"], how="left")
listings_with_sentiment=listings_with_sentiment.drop(sentiment_df["listing_id"])
listings_with_sentiment.printSchema()
listings_with_sentiment_1 = listings_with_sentiment.groupBy("listing_id").agg(f.collect_list("sentiment").alias("review_sentiments"),
                                                                              f.first("name").alias("name"),
                                                                              f.first("bedrooms").alias("bedrooms"), 
                                                                              f.first("bathrooms").alias("bathrooms"),
                                                                              f.first("price").alias("prince"),
                                                                              f.first("price_category").alias("price_category"),
                                                                              f.first("rank").alias("rank"))

# value_for_money_listings = listings_with_sentiment.filter( most_value_for_money["id"] == most_value_for_money["id"])

# Print ratings according to sentimental analysis for these listings
print("\nRatings According to Sentimental Analysis for Most Value for Money Properties:")
listings_with_sentiment_1=listings_with_sentiment_1.orderBy(f.col("price_category"))
listings_with_sentiment_1=listings_with_sentiment_1.drop(listings_with_sentiment_1["listing_id"])

# listings_with_sentiment_1.printSchema()



root
 |-- listing_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- price_category: string (nullable = false)
 |-- host_name: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- value_for_money: double (nullable = true)
 |-- rank: integer (nullable = false)
 |-- id: integer (nullable = true)
 |-- sentiment: string (nullable = true)


Ratings According to Sentimental Analysis for Most Value for Money Properties:


In [22]:
# Define the JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/spark_project"
properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

listings_with_sentiment_1.write.jdbc(url=jdbc_url, table='question_1_table', mode="append", properties=properties)


                                                                                

Q2) Find out which month has the most booking(Use quartile to get threshold values to determine the off peak and the peak time) Use these values to list out the properties in peak time and off peak time. Then calculate the total revenue of each host during peak months and calculate their average response rate as well to find correlation between them.

In [10]:
calendar_df = calendar_df.withColumn("price", f.regexp_replace(f.col("price"), "[^0-9]", "").cast(IntegerType()))
calendar_df = calendar_df.withColumn("price", f.when(f.col("available") == False, "booked").otherwise(f.col("price")))
calendar_df.show()


+----------+----------+---------+------+
|listing_id|      date|available| price|
+----------+----------+---------+------+
|   7308811|2017-05-08|    false|booked|
|   7308811|2017-05-07|    false|booked|
|   7308811|2017-05-06|    false|booked|
|   7308811|2017-05-05|    false|booked|
|   7308811|2017-05-04|    false|booked|
|   7308811|2017-05-03|    false|booked|
|   7308811|2017-05-02|    false|booked|
|   7308811|2017-05-01|    false|booked|
|   7308811|2017-04-30|    false|booked|
|   7308811|2017-04-29|    false|booked|
|   7308811|2017-04-28|    false|booked|
|   7308811|2017-04-27|    false|booked|
|   7308811|2017-04-26|    false|booked|
|   7308811|2017-04-25|    false|booked|
|   7308811|2017-04-24|    false|booked|
|   7308811|2017-04-23|    false|booked|
|   7308811|2017-04-22|    false|booked|
|   7308811|2017-04-21|    false|booked|
|   7308811|2017-04-20|    false|booked|
|   7308811|2017-04-19|    false|booked|
+----------+----------+---------+------+
only showing top

In [11]:
# Calculate booking counts by month
booking_counts_df = calendar_df.groupBy(f.month("date").alias("month")).agg(f.count("*").alias("booking_count")).orderBy(f.col("booking_count"))
result_df = booking_counts_df.join(calendar_df, (f.month(calendar_df["date"]) == booking_counts_df["month"]), "left")

# Calculate quartiles for booking counts
quartiles = result_df.approxQuantile("booking_count", [0.25, 0.75], 0.001)
q1 = quartiles[0]
q3 = quartiles[1]



#Create a new column in result_df to categorize months
result_df = result_df.withColumn("month_category", (f.col("booking_count") >= q1) & (f.col("booking_count") <= q3))


# Join with listings_df based on the id
result_df_1 = listings_df.join(result_df, (listings_df["id"]) == result_df["listing_id"], "left")
result_df_1 = result_df_1.withColumn(
    "month_category",
    f.when(f.col("month_category") == True, "peak").otherwise("off_peak")
)


result_df_1 = result_df_1.withColumn("revenue", f.when(result_df["available"] == "false", listings_df["price"]).otherwise(0))
# result_df_1.filter(f.col("available") == False).show()
result_df_1 = result_df_1.withColumn("host_response_rate", f.regexp_replace(f.col("host_response_rate"), "%", "").cast("int"))
# result_df_1.show()


total_revenue_by_host = result_df_1.filter(f.col("month_category") == 'peak').groupBy("host_name").agg(f.sum("revenue").alias("total_revenue"),f.coalesce(f.avg("host_response_rate"),f.lit(0)).alias("avg_response_rate"))
# total_revenue_by_host.show()

correlation = total_revenue_by_host.corr("total_revenue", "avg_response_rate")
# correlation


total_revenue_by_host.show()




                                                                                

+---------------+-------------+-----------------+
|      host_name|total_revenue|avg_response_rate|
+---------------+-------------+-----------------+
|          Tyler|      4549500|            100.0|
|           Faye|      4044000|            100.0|
|          Shawn|     14374100|            100.0|
|           Chad|      3856200|            100.0|
|       Giuseppe|      2376000|93.33333333333333|
|        Shannon|      3820000|            100.0|
|         Aubrey|      2190500|              0.0|
|         Andree|       894000|            100.0|
|        Carolyn|      2190000|            100.0|
|         Nicolo|      2550000|            100.0|
|         Kashif|      6080000|              0.0|
| Emily And Carl|      8425000|             90.0|
|            Sue|     13440000|             80.0|
|           Rich|        17900|              0.0|
|    Maria Elena|       810000|            100.0|
|          Scott|     11664100|             91.4|
|        Sanchit|      3471100|              0.0|


In [13]:
# Define the JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/spark_project"
properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

total_revenue_by_host.write.jdbc(url=jdbc_url, table='question_2_table', mode="append", properties=properties)


                                                                                