# Extract Subsets from Reviews Data and Clean Up for Modeling and Labelling


In [2]:
!pip install langdetect # Library to detect language of review text
!pip install pyspark
!pip install spark-nlp==5.1.4

Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: langdetect
  Building wheel for langdetect (setup.py) ... [?25l[?25hdone
  Created wheel for langdetect: filename=langdetect-1.0.9-py3-none-any.whl size=993225 sha256=95a4596577bcdfadc683c9c8ccb2a07d29946b1051f98b45dde167b9e276cd59
  Stored in directory: /root/.cache/pip/wheels/95/03/7d/59ea870c70ce4e5a370638b5462a7711ab78fba2f655d05106
Successfully built langdetect
Installing collected packages: langdetect
Successfully installed langdetect-1.0.9
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wh

Import Required Libaries and Methods

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, regexp_replace
from pyspark.ml import Pipeline
import sparknlp

Initiate SparkContext and SparkSession and Import Google Drive Module

In [4]:
# Create a SparkSession
spark=SparkSession.builder.appName('ENSF-612').getOrCreate()

# Check version of the Spark NLP library and Apache Spark being used in the
# current Spark session
print("Spark NLP version: {}".format(sparknlp.version()))
print("Apache Spark version: {}".format(spark.version))

Spark NLP version: 5.1.4
Apache Spark version: 3.5.0


In [5]:
# Imports the drive module from the google.colab library
from google.colab import drive

# Mounts your Google Drive into the Colab environment at the specified directory path /content/drive
drive.mount('/content/drive')


Mounted at /content/drive


Function to Convert CSV to DataFrame

In [6]:
# Method to convert CSV to pandas DataFrame
def csv_to_df(file_path):
    """
      Converts a CSV file to a pandas DataFrame

      Parameters:
         file_path: The path to the CSV file, to be converted into a DataFrame

      Return:
         df: The newly created pandas DataFrame
    """

    # File location and type
    # file_location = "/content/drive/My Drive/ENSF-612/project-files/" + fileName  # Combine the filename with the /Filestore/tables/ path in DataBricks
    file_type = "csv" # Specify that the file type is a CSV. Other file types will be ignored.

    # CSV options
    infer_schema = "true" # Infer schema automatically while reading the CSV
    first_row_is_header = "true" # Treat the first row as header
    delimiter = "," # Define the delimiter used in the CSV file

   # Read the CSV file into a DataFrame using Spark
   # Allowing multiline entries in the CSV
   # Specifying an escape character for CSV entries
    df = spark.read.format(file_type) \
         .option("inferSchema", infer_schema) \
         .option("header", first_row_is_header) \
         .option("sep", delimiter) \
         .option("multiline","True") \
         .option("escape","\"") \
         .load(file_path)

    # Return DataFrame
    return df

Load the reviews

In [7]:
# Load csv data and convert it to a DataFrame

fileName = "reviews.csv"
filePath = "/content/drive/My Drive/ENSF-612/project-files/" + fileName
entire_df = csv_to_df(filePath)

Take a subset of the reviews and inspect the data

In [8]:
# take a subset of ~100,000 reviews for less processing
sample_fraction = 0.1
reviews_df = entire_df.sample(fraction=sample_fraction, seed=42)
reviews_df.show()
reviews_df.count()

+----------+------------------+-----+-----------+--------------+----------------------------------+
|listing_id|                id| date|reviewer_id| reviewer_name|                          comments|
+----------+------------------+-----+-----------+--------------+----------------------------------+
|     13913|          11876590|41746|    5194009|    Alessandro|              Alina was a perfe...|
|     13913|         538005731|43737|    7253695|          Bart|              Alina is an amazi...|
|     13913|         543287825|43745|   28531625|        Philip|              Felt at home - Al...|
|     15400|            528262|40799|     969713|       Valerie|              Delightful, charm...|
|     15400|           1491714|41076|    2384243| José Henrique|              We've been travel...|
|     98541|           3289231|41280|    4000934|       Cameron|              This worked out v...|
|     98541|           5755533|41470|    6650608|           Jim|              We were a group o...|


105142

###Exploratory Data Analysis

In [9]:
# Get the number of rows
num_rows = reviews_df.count()

# Get the number of columns
num_columns = len(reviews_df.columns)

# Display the shape (number of rows and columns)
print("Number of rows: ", num_rows)
print("Number of columns: ", num_columns)

Number of rows:  105142
Number of columns:  6


In [10]:
# Count the original number of rows
original_count = reviews_df.count()

# Remove duplicates and count the remaining rows
distinct_count = reviews_df.dropDuplicates().count()

# Calculate the count of duplicate rows
duplicate_count = original_count - distinct_count

# Display the count of duplicate rows
print("Number of duplicate rows: ", duplicate_count)

Number of duplicate rows:  0


View column names

In [11]:
column_names = reviews_df.columns
print(column_names)

['listing_id', 'id', 'date', 'reviewer_id', 'reviewer_name', 'comments']


Check for missing values

In [12]:
from pyspark.sql.functions import count, isnan, when

# Check for missing values in each column
reviews_df.select([count(when(isnan(c), c)).alias(c) for c in reviews_df.columns]).show()

+----------+---+----+-----------+-------------+--------+
|listing_id| id|date|reviewer_id|reviewer_name|comments|
+----------+---+----+-----------+-------------+--------+
|         0|  0|   0|          0|           10|       0|
+----------+---+----+-----------+-------------+--------+



There are no missing values in the columns that we care about, which are listing_id and comments

### Filtering Out Non-English Reviews

In [13]:
# Import the BooleanType class from the pyspark.sql.types module
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from langdetect import detect # Import the detect method from the langdetect library


# Define a User Defined Function (UDF) named 'check_if_english'
@udf(returnType=BooleanType())
def check_if_english(text):
    """
        Takes text as input and returns a BooleanType indicating if the text is in English.

        Parameters:
            text: The string of text whose language is to be analyzed

        Returns:
            language: A boolean that is True if the language is in english.
    """
    try:
        # Attempt to detect the language of the 'text' using the 'detect' function from langdetect
        language = detect(text)
    except:
        # If an exception occurs (e.g., due to non-text input), set 'language' to "error".
        language = "error"
    # Return True if the detected language is English ('en'), otherwise return False.
    return language == 'en'


####Extract a subset of reviews for listings with >= 20 reviews
We do this because we will eventually take an average sentiment score for each listing

In [14]:
# Calculate the number of reviews for each listing
listing_counts = reviews_df.groupBy("listing_id").count()

# Filter listings with at least 20 reviews
popular_listings = listing_counts.filter(col("count") >= 20)

# Perform an inner join of the reviews DataFrame with the popular listings to
# get only the rows corresponding to listings with at least 20 reviews
df_with_counts = reviews_df.join(popular_listings, "listing_id", "inner")


Get the first 20 reviews for each listing with at least 20 reviews

In [15]:

# Use the 'check_if_english' UDF to filter reviews based on whether the 'comments' are in English.
english_reviews = df_with_counts.filter(check_if_english('comments'))

# Add a row number partitioned by listing_id and ordered by review id
window_spec = Window.partitionBy("listing_id").orderBy("id")

# Alias the count column to avoid naming conflicts
df_with_row_number = english_reviews.withColumn("review_num", F.count("id").over(window_spec).alias("review_num"))

# Here we need to only take the reviews for listings with >= 20 reviews
# Calculate the number of reviews for each listing
df_with_num_reviews = df_with_row_number.groupBy("listing_id").count().withColumnRenamed("count", "num_reviews").cache()



In [16]:
# Filter listings with at least 20 reviews
df_listings_with_20_english_reviews = df_with_num_reviews.filter(col("num_reviews") >= 20)

# limit the subset to 500 listings, therefore ~10,000 reviews
first_500_listings = df_listings_with_20_english_reviews.limit(500)

# Get the remaining listings with over 20 reviews, for extracting a subset for manual labelling
remaining_listings = df_listings_with_20_english_reviews.subtract(first_500_listings)

# perform an inner join to get the reviews
df_20_english_reviews = df_with_row_number.join(first_500_listings, "listing_id", "inner")
remaining_reviews = df_with_row_number.join(remaining_listings, "listing_id", "inner")


# Select the first 20 reviews for 750 popular listing (15,000 rows)
selected_reviews = df_20_english_reviews.filter(col("review_num") <= 20).limit(10000)
reviews_for_labeling = remaining_reviews.filter(col("review_num") <= 20).limit(1000)


In [17]:
# Cache the dataframes for future use
selected_reviews.cache()


DataFrame[listing_id: int, id: bigint, date: int, reviewer_id: int, reviewer_name: string, comments: string, count: bigint, review_num: bigint, num_reviews: bigint]

In [18]:
reviews_for_labeling.cache()
reviews_for_labeling.show(50)
reviews_for_labeling.count()

+----------+---------+-----+-----------+-------------+--------------------+-----+----------+-----------+
|listing_id|       id| date|reviewer_id|reviewer_name|            comments|count|review_num|num_reviews|
+----------+---------+-----+-----------+-------------+--------------------+-----+----------+-----------+
|     90700|   691142|40851|     157881|     Jennifer|We had a fantasti...|   42|         1|         38|
|     90700|  1297706|41047|    1579008|       Lauren|Great Host and ve...|   42|         2|         38|
|     90700|  1718326|41106|    2376989|      Rebecca|This was a great ...|   42|         3|         38|
|     90700|  2700334|41206|    3554326|        Adlan|This apartment is...|   42|         4|         38|
|     90700|  5255190|41446|     174904|     Agustina|Very nice apartme...|   42|         5|         38|
|     90700|  6105473|41485|    1965535|    Christina|Lovely location, ...|   42|         6|         38|
|     90700|  6554999|41503|    3863823|         Anne|w

1000

In [19]:
# clear some cached dataframes from memory
# reviews_count_df.unpersist()
# listing_counts.unpersist()
df_with_num_reviews.unpersist()

DataFrame[listing_id: int, num_reviews: bigint]

In [20]:
reviews_for_labeling.count()

1000

In [21]:
# selected_reviews.count()

Check if any of the listings selected have fewer than 20 reviews

In [22]:
df_listings_for_labeling = reviews_for_labeling.groupBy("listing_id").count().withColumnRenamed("count", "total_reviews").orderBy("total_reviews", ascending=False)
df_listings_for_labeling.cache()
df_listings_for_labeling.show()
df_listings_for_labeling.count()

+----------+-------------+
|listing_id|total_reviews|
+----------+-------------+
|     90700|           20|
|    244125|           20|
|    280234|           20|
|    358360|           20|
|    375006|           20|
|    390319|           20|
|    390320|           20|
|    390356|           20|
|    390618|           20|
|    412652|           20|
|    436458|           20|
|    439489|           20|
|    454008|           20|
|    469187|           20|
|    523749|           20|
|    544824|           20|
|    583705|           20|
|    599058|           20|
|    620164|           20|
|    627123|           20|
+----------+-------------+
only showing top 20 rows



50

In [23]:
df_selected_listings = selected_reviews.groupBy("listing_id").count().withColumnRenamed("count", "total_reviews").orderBy("total_reviews", ascending=True)
df_selected_listings.cache()
# df_selected_listings.show()
# df_selected_listings.count()

DataFrame[listing_id: int, total_reviews: bigint]

In [24]:
# drop unneccessary pre-processing columns
columns_to_drop = ["count", "num_reviews"]
reviews_for_labeling = reviews_for_labeling.drop(*columns_to_drop)
selected_reviews = selected_reviews.drop(*columns_to_drop)

### Filter Out HTML Tags and punctuation in Reviews with regex
This is required for importing the data into Label Studio for manual labeling

In [25]:
def clean_reviews(df, original_column="comments"):
  html_pattern = "<.*?>"                 # replace with a single space
  multiple_spaces_pattern = "\\s+"       # replace multiple space with a single space
  comma_newline_pattern  = ",|\r\n?|\n"  # includes comma, carriage return, newline chars

  # intermediate column names
  no_html_tags_column = "comments_no_html"
  cleaned_column="comments_cleaned"

  # Apply the pattern to remove HTML tags
  df_no_html = df.withColumn(no_html_tags_column, regexp_replace(col(original_column), html_pattern, " "))

  # Apply the patterns to replace multiple spaces with a single space and commas/newline characters with an empty string
  df_cleaned = df_no_html.withColumn(cleaned_column, regexp_replace(regexp_replace(col(no_html_tags_column), multiple_spaces_pattern, " "), comma_newline_pattern, ""))

  # Drop temporary columns
  columns_to_drop = ["count", "num_reviews", original_column, no_html_tags_column]
  df_cleaned = df_cleaned.drop(*columns_to_drop)

  # Rename cleaned comments column back to original column name
  df_cleaned = df_cleaned.withColumnRenamed(cleaned_column, original_column)

  return df_cleaned


In [26]:
# apply cleaning function to dataframes
cleaned_reviews_for_labeling = clean_reviews(reviews_for_labeling)
cleaned_selected_reviews = clean_reviews(selected_reviews)

In [27]:
# Write the DataFrames to a CSV file
path = "/content/drive/MyDrive/ENSF-612/project-files/"
cleaned_reviews_for_labeling.write.csv(path + "temp_1000_reviews_for_labeling.csv", header=True, mode="overwrite")
cleaned_selected_reviews.write.csv(path + "temp_selected_10000_reviews.csv", header=True, mode="overwrite")
