In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark aggregation functions") \
    .config("spark.python.worker.faulthandler.enabled", "true") \
    .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true") \
    .getOrCreate()

In [2]:
listings = spark.read.csv("../data/listings.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",", 
    quote='"',
    escape='"', 
    multiLine=True,
    mode="PERMISSIVE" 
)
listings.printSchema()

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

In [3]:
reviews = spark.read.csv("../data/reviews.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)
reviews.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [4]:
# 1. For each listing compute string category depending on its price, and add it as a new column.
# A category is defined in the following way:
#
# * price < 50 -> "Budget"
# * 50 <= price < 150 -> "Mid-range"
# * price >= 150 -> "Luxury"
# 
# Only include listings where the price is not null.
# Count the number of listings in each category
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_replace

listings = listings.withColumn('price_numeric', regexp_replace('price', '[$,]', '').cast('float'))

# TODO: Implement a UDF
@F.udf(StringType())
def categorize_listing_price_udf(price_num):
    if price_num is None:
        return 'Unknown'
    elif price_num < 50:
        return 'Budget'
    elif 50 <= price_num < 150:
        return 'Mid-range'
    elif price_num >= 150:
        return 'Luxury'
    return 'Unknown'
# TODO: Apply the UDF to create a new DataFrame

listings_categorized = listings \
    .filter(listings.price_numeric.isNotNull()) \
    .withColumn('category', categorize_listing_price_udf(listings.price_numeric)) \
    .select('id', 'name', 'price', 'category') \
    .groupby('category') \
    .count() \
    .show()

+---------+-----+
| category|count|
+---------+-----+
|Mid-range|28108|
|   Budget| 6612|
|   Luxury|27964|
+---------+-----+



In [24]:
# 2. In this task you will need to compute a santiment score per review, and then an average sentiment score per listing.
# A santiment score indicates how "positive" or "negative" a review is. The higher the score the more positive it is, and vice-versa.
#
# To compute a sentiment score per review compute the number of positive words in a review and subtract the number of negative
# words in the same review (the list of words is already provided)
#
# To complete this task, compute a DataFrame that contains the following fields:
# * name - the name of a listing
# * average_sentiment - average sentiment of reviews computed using the algorithm described above
from pyspark.sql.types import FloatType

# Lists of positive and negative words
positive_words = {'good', 'great', 'excellent', 'amazing', 'fantastic', 'wonderful', 'pleasant', 'lovely', 'nice', 'enjoyed'}
negative_words = {'bad', 'terrible', 'awful', 'horrible', 'disappointing', 'poor', 'hate', 'unpleasant', 'dirty', 'noisy'}

# TODO: Implement the UDF
def sentiment_score(comment):
    if comment is None:
        return 0
    comment_lower = comment.lower()
    pos_score = sum(1 for word in positive_words if word in comment_lower)
    neg_score = sum(1 for word in negative_words if word in comment_lower)
    return float(pos_score - neg_score)

sentiment_score_udf = F.udf(sentiment_score, FloatType())

reviews_with_sentiment = reviews \
  .withColumn(
    'santiment_score',
    sentiment_score_udf(reviews.comments)
  )

# TODO: Create a final DataFrame
final_df = listings \
    .join(reviews_with_sentiment, listings.id == reviews_with_sentiment.listing_id, how='inner') \
    .groupBy('listing_id','name') \
    .agg(F.round(F.avg(F.col('santiment_score')), 2).alias('avg_sant_score')) \
    .orderBy(F.desc('avg_sant_score'))

final_df.select('name', 'avg_sant_score').show()

+--------------------+--------------+
|                name|avg_sant_score|
+--------------------+--------------+
|Central and Cozy ...|           6.0|
|Boutique 3bd Lond...|           6.0|
|Tranquil terraced...|           6.0|
|London NW3. Feel ...|           5.0|
|Canal Side stylis...|           5.0|
|Twickenham double...|           5.0|
|Luxury Holiday Le...|           5.0|
|Luxury  Modern  S...|           5.0|
|     Heart of Fulham|           5.0|
|Light-filled 1 be...|           5.0|
|Gorgeous family h...|           5.0|
|Modern cozy room ...|           5.0|
|Spacious Victoria...|           5.0|
|Bright&cosy flat ...|           5.0|
|Bright Battersea ...|           5.0|
|Impressive Flat i...|           5.0|
|Charming Period o...|           5.0|
|Beautifully Conte...|           5.0|
|   Home Sweet home !|           5.0|
|Maison Blooms-Lux...|           5.0|
+--------------------+--------------+
only showing top 20 rows


In [15]:
# 3. Rewrite the following code from the previous exercise using SparkSQL:
#
# ```
# from pyspark.sql.functions import length, avg, count
# 
# reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
# reviews_with_comment_length \
#   .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
#   .groupBy('listing_id').agg(
#       avg(reviews_with_comment_length.comment_length).alias('average_comment_length'),
#       count(reviews_with_comment_length.id).alias('reviews_count')
#   ) \
#   .filter('reviews_count >= 5') \
#   .orderBy('average_comment_length', ascending=False) \
#   .show()
# ```
# This was a solution for the the task:
#
# "Get top five listings with the highest average review comment length. Only return listings with at least 5 reviews"

reviews.createOrReplaceTempView("reviews")
listings.createOrReplaceTempView("listings")

# Write the SQL query
sql_query = """
SELECT
    r.listing_id,
    l.name,
    AVG(LENGTH(r.comments)) AS average_comment_length,
    COUNT(r.id) AS reviews_count
FROM reviews r
INNER JOIN listings l
    ON r.listing_id = l.id
GROUP BY r.listing_id, l.name
HAVING COUNT(r.id) >= 5
ORDER BY average_comment_length DESC
"""

spark \
  .sql(sql_query) \
  .show()


+------------------+--------------------+----------------------+-------------+
|        listing_id|                name|average_comment_length|reviews_count|
+------------------+--------------------+----------------------+-------------+
|618608352812465378|Beautiful Georgia...|    1300.1666666666667|            6|
|          28508447|The warm and cosy...|    1089.3333333333333|            6|
|627425975703032358|Superb loft beaut...|     951.7777777777778|            9|
|           2197681|Luxurious apartme...|                 939.2|            5|
|          13891813|Beautiful 2 Bedro...|                 905.0|            5|
|            979753|Pop-Art Brand New...|     893.9230769230769|           13|
|630150178279666225|Georgian oasis of...|     890.7272727272727|           11|
|           8856894|Beautiful period ...|     890.1666666666666|            6|
|          29469389|  London Single home|                 885.0|            6|
|          22524075|Large 3 bed 2 bat...|           

In [28]:
# 4. [Optional][Challenge]
# Calculate an average time passed from the first review for each host in the listings dataset. 
# To implmenet a custom aggregation function you would need to use "pandas_udf" function to write a custom aggregation function.
#
# Documentation about "pandas_udf": https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html 
#
# To use "pandas_udf" you would need to install two additional dependencies in the virtual environment you use for PySpark:
# Run these commands:
# ```
# pip install pandas
# pip install pyarrow
# ```

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import PandasUDFType
import pandas as pd

@pandas_udf(DoubleType())
def average_days_since_first_review_udf(first_review_series: pd.Series) -> float:
    # TODO: Implement the UDF
    dates = pd.to_datetime(first_review_series)
    days_passed = (pd.Timestamp('today') - dates).dt.days
    if days_passed.empty:
        return None
    return days_passed.mean()

listings \
  .filter(
    listings.first_review.isNotNull()
  ) \
  .groupBy('host_id') \
  .agg(
    average_days_since_first_review_udf(listings.first_review).alias('average_days_since_first_review_days')
  ) \
  .show()

+-------+------------------------------------+
|host_id|average_days_since_first_review_days|
+-------+------------------------------------+
|   6774|                  2040.1666666666667|
|   9089|                               523.0|
|   9323|                              3033.0|
|  10657|                              2200.0|
|  11431|                              3629.0|
|  14596|                              2716.0|
|  19195|                              3680.0|
|  25235|                              3316.0|
|  26258|                                88.0|
|  30577|                              1007.0|
|  30780|                              3317.0|
|  32851|                              3550.5|
|  34007|                               385.0|
|  36808|                               787.0|
|  38691|                              1014.0|
|  40515|                              2276.0|
|  40944|                   568.0833333333334|
|  41759|                              5406.0|
|  43039|    