In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark aggregation functions") \
    .getOrCreate()

24/11/30 23:00:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
listings = spark.read.csv("../data/listings.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",", 
    quote='"',
    escape='"', 
    multiLine=True,
    mode="PERMISSIVE" 
)
listings.printSchema()

                                                                                

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

In [3]:
reviews = spark.read.csv("../data/reviews.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)
reviews.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



                                                                                

In [4]:
# 1. For each listing compute string category depending on its price, and add it as a new column.
# A category is defined in the following way:
#
# * price < 50 -> "Budget"
# * 50 <= price < 150 -> "Mid-range"
# * price >= 150 -> "Luxury"
# 
# Only include listings where the price is not null.
# Count the number of listings in each category

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_replace

listings = listings.withColumn('price_numeric', regexp_replace('price', '[$,]', '').cast('float'))

def categorize_price(price):
    if price is None:
        return 'Unknown'
    elif price < 50:
        return 'Budget'
    elif 50 <= price < 150:
        return 'Mid-range'
    elif price >= 150:
        return 'Luxury'
    else:
        return 'Unknown'

categorize_price_udf = udf(categorize_price, StringType())

listings_with_category = listings \
  .filter(listings.price_numeric.isNotNull()) \
  .withColumn(
    'price_category',
    categorize_price_udf(listings.price_numeric)
  ) \
  .groupBy('price_category') \
  .count() \
  .show()


                                                                                

+--------------+-----+
|price_category|count|
+--------------+-----+
|     Mid-range|29562|
|        Budget| 5995|
|        Luxury|27648|
+--------------+-----+



In [5]:
# 2. In this task you will need to compute a santiment score per review, and then an average sentiment score per listing.
# A santiment score indicates how "positive" or "negative" a review is. The higher the score the more positive it is, and vice-versa.
#
# To compute a sentiment score per review compute the number of positive words in a review and subtract the number of negative
# words in the same review (the list of words is already provided)
#
# To complete this task, compute a DataFrame that contains the following fields:
# * name - the name of a listing
# * average_sentiment - average sentiment of reviews computed using the algorithm described above

from pyspark.sql.functions import avg
from pyspark.sql.types import FloatType

# Lists of positive and negative words
positive_words = {'good', 'great', 'excellent', 'amazing', 'fantastic', 'wonderful', 'pleasant', 'lovely', 'nice', 'enjoyed'}
negative_words = {'bad', 'terrible', 'awful', 'horrible', 'disappointing', 'poor', 'hate', 'unpleasant', 'dirty', 'noisy'}

def sentiment_score(comment):
    if comment is None:
        return 0.0
    comment_lower = comment.lower()
    score = 0
    
    for word in positive_words:
        if word in comment_lower:
            score += 1
            
    for word in negative_words:
        if word in comment_lower:
            score -= 1
    return float(score)

sentiment_score_udf = udf(sentiment_score, FloatType())

reviews_with_sentiment = reviews \
  .withColumn(
    'sentiment_score',
    sentiment_score_udf(reviews.comments)
  )

listings \
   .join(reviews_with_sentiment, listings.id == reviews.listing_id, 'inner') \
   .groupBy('listing_id', 'name') \
   .agg(
      avg('sentiment_score').alias('average_sentiment')
   ) \
   .orderBy('average_sentiment', ascending=False) \
   .select('listing_id', 'name', 'average_sentiment') \
   .show(truncate=False)


[Stage 8:>                                                          (0 + 1) / 1]

+-------------------+--------------------------------------------------+-----------------+
|listing_id         |name                                              |average_sentiment|
+-------------------+--------------------------------------------------+-----------------+
|8630729            |Central and Cozy 2 BR Flat next to Pimlico station|6.0              |
|1210552487630259444|Cozy flat in London                               |6.0              |
|1213675403351699745|Knightsbridge Two Bedroom Duplex                  |5.0              |
|3804150            |London NW3. Feel at home single room              |5.0              |
|15488286           |Canal Side stylish 1 Bedroom Apartment near tube  |5.0              |
|945592509209998667 |Cozy One Bedroom Full Flat                        |5.0              |
|24763465           |Luxury Holiday Let | Prime SW19 Village Location  |5.0              |
|1091966865591580198|Stunning 3 bedroom apartment in Surbiton          |5.0              |

                                                                                

In [6]:
# 3. Rewrite the following code from the previous exercise using SparkSQL:
#
# ```
# from pyspark.sql.functions import length, avg, count
# 
# reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
# reviews_with_comment_length \
#   .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
#   .groupBy('listing_id').agg(
#       avg(reviews_with_comment_length.comment_length).alias('average_comment_length'),
#       count(reviews_with_comment_length.id).alias('reviews_count')
#   ) \
#   .filter('reviews_count >= 5') \
#   .orderBy('average_comment_length', ascending=False) \
#   .show()
# ```
# This was a solution for the the task:
#
# "Get top five listings with the highest average review comment length. Only return listings with at least 5 reviews"

reviews.createOrReplaceTempView("reviews")
listings.createOrReplaceTempView("listings")

# Write the SQL query
sql_query = """
SELECT
  r.listing_id,
  AVG(LENGTH(r.comments)) AS average_comment_length,
  COUNT(r.id) AS reviews_count
FROM
  reviews r
JOIN
  listings l
  ON r.listing_id = l.id
GROUP BY
  r.listing_id
HAVING
  COUNT(r.id) >= 5
ORDER BY
  average_comment_length DESC
"""

spark \
  .sql(sql_query) \
  .show()


24/11/30 23:01:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 12:>                                                         (0 + 1) / 1]

+------------------+----------------------+-------------+
|        listing_id|average_comment_length|reviews_count|
+------------------+----------------------+-------------+
|618608352812465378|    1300.1666666666667|            6|
|627425975703032358|    1190.7142857142858|            7|
|          28508447|    1089.3333333333333|            6|
|          42776409|                1089.2|            5|
|          40439200|     1074.857142857143|            7|
|           2197681|                 939.2|            5|
|          13891813|                 905.0|            5|
|            979753|     893.9230769230769|           13|
|630150178279666225|     890.7272727272727|           11|
|           8856894|     890.1666666666666|            6|
|          29469389|                 885.0|            6|
|          22524075|                 885.0|            5|
|           5555679|     878.7169811320755|          106|
|          41515075|                 875.5|            6|
|          543

                                                                                

In [7]:
# 4. [Optional][Challenge]
# Calculate an average time passed from the first review for each listing in the listings dataset. 
# To implmenet a custom aggregation function you would need to use "pandas_udf" function to write a custom aggregation function.
#
# Documentation about "pandas_udf": https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html 
#
# To use "pandas_udf" you would need to install two additional dependencies in the virtual environment you use for PySpark:
# Run these commands:
# ```
# pip install pandas
# pip install pyarrow
# ```

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import PandasUDFType
import pandas as pd


@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def average_days_since_first_review_udf(first_review_series) -> float:
    today = pd.to_datetime('today')
    listing_ages = (today - pd.to_datetime(first_review_series)).dt.days
    if listing_ages.empty:
        return None
    return listing_ages.mean()

listings \
  .filter(
    listings.first_review.isNotNull()
  ) \
  .groupBy('host_id') \
  .agg(
    average_days_since_first_review_udf(listings.first_review).alias('average_days_since_first_review_days')
  ) \
  .show()

[Stage 17:>                                                         (0 + 1) / 1]

+-------+------------------------------------+
|host_id|average_days_since_first_review_days|
+-------+------------------------------------+
|   4775|                  3046.6666666666665|
|   6774|                  1763.1666666666667|
|   9089|                               246.0|
|   9323|                              2756.0|
|  11431|                              3352.0|
|  14596|                              2439.0|
|  19195|                              3403.0|
|  24334|                               133.0|
|  25235|                              1686.0|
|  26648|                              2387.0|
|  30577|                               730.0|
|  30780|                              3040.0|
|  32851|                              3273.5|
|  34007|                               108.0|
|  36808|                               510.0|
|  38691|                               737.0|
|  40515|                              1999.0|
|  40944|                             487.875|
|  41759|    

                                                                                