In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=5f8a8071c274c45d2e4a5e37e3c841a085cd33ab58db998df4981ff405776978
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.types import IntegerType, FloatType, StringType
from pyspark.sql.functions import (
    col, udf, count, length, when, isnan, split, size,
    array_min, array_max, array_distinct, to_timestamp,
    lower, regexp_replace, concat_ws
)

import nltk
import string
from nltk.corpus import stopwords, wordnet
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import re

nltk.download('wordnet')

import psycopg2

[nltk_data] Downloading package wordnet to /root/nltk_data...


True

In [6]:
spark = SparkSession \
    .builder \
    .config("spark.driver.memory", "10g") \
    .config("spark.jars", "postgresql-42.6.0.jar") \
    .appName("Yelp Review Sense - NLP - Text Cleaning & Processing") \
    .getOrCreate()

## Retrieving Data from AWS RDS

In [None]:
# Define the JDBC connection details
jdbc_url = "jdbc:postgresql://intentionally-removed.amazonaws.com/yelpreviewsense"
db_table = ""
db_user = "postgres"
db_password = ""

In [9]:
# Specify the read options
read_options = {
    "url": jdbc_url,
    "driver": "org.postgresql.Driver",
    "dbtable": f"(SELECT review_id, text FROM {db_table}) AS review_data",
    "user": db_user,
    "password": db_password,
}

# Read the table into a PySpark DataFrame
df = spark.read.format("jdbc").options(**read_options).load()

df.show(5, 80)

+----------------------+--------------------------------------------------------------------------------+
|             review_id|                                                                            text|
+----------------------+--------------------------------------------------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aware it is going to take about 2 hours fr...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over the years, and nothing compares to the ...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclectic assortment: a large chicken leg, fried...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.   Our favorite is the lamb curry and korm...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us tour of upcoming patio/rooftop area which...|
+----------------------+--------------------------------------------------------------------------------+
only showing top 5 rows



## Text Cleaning and Preprocessing

Convert text to lowercase

In [10]:
df = df.withColumn("cleaned_text", lower(col("text")))
df.show(10, 40)

+----------------------+----------------------------------------+----------------------------------------+
|             review_id|                                    text|                            cleaned_text|
+----------------------+----------------------------------------+----------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aw...|if you decide to eat here, just be aw...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over...|i've taken a lot of spin classes over...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclecti...|family diner. had the buffet. eclecti...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.  ...|wow!  yummy, different,  delicious.  ...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us t...|cute interior and owner (?) gave us t...|
|JrIxlS1TzJ-iCu79ul40cQ|I am a long term frequent customer of...|i am a long term frequent customer of...|
|6AxgBCNX_PNTOxmbRSwcKQ|Loved this to

Remove special characters and punctuation

In [11]:
df = df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), '[' + re.escape(string.punctuation) + ']', ''))
df.show(10, 40)

+----------------------+----------------------------------------+----------------------------------------+
|             review_id|                                    text|                            cleaned_text|
+----------------------+----------------------------------------+----------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aw...|if you decide to eat here just be awa...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over...|ive taken a lot of spin classes over ...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclecti...|family diner had the buffet eclectic ...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.  ...|wow  yummy different  delicious   our...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us t...|cute interior and owner  gave us tour...|
|JrIxlS1TzJ-iCu79ul40cQ|I am a long term frequent customer of...|i am a long term frequent customer of...|
|6AxgBCNX_PNTOxmbRSwcKQ|Loved this to

Remove stopwords

In [12]:
# Tokenize the 'cleaned_text' column into a new 'tokens' column
df = Tokenizer(inputCol="cleaned_text", outputCol="tokens").transform(df)

# Remove stopwords
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
df = remover.transform(df)

# Join the 'filtered_tokens' back into a text, overwriting the 'cleaned_text' column
df = df.withColumn("cleaned_text", concat_ws(" ", col("filtered_tokens")))
df = df.drop("tokens", "filtered_tokens")

df.show(10, 40)

+----------------------+----------------------------------------+----------------------------------------+
|             review_id|                                    text|                            cleaned_text|
+----------------------+----------------------------------------+----------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aw...|decide eat aware going take 2 hours b...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over...|ive taken lot spin classes years noth...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclecti...|family diner buffet eclectic assortme...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.  ...|wow  yummy different  delicious   fav...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us t...|cute interior owner  gave us tour upc...|
|JrIxlS1TzJ-iCu79ul40cQ|I am a long term frequent customer of...|long term frequent customer establish...|
|6AxgBCNX_PNTOxmbRSwcKQ|Loved this to

Remove emojis and emoticons

In [13]:
emoji_pattern = r'[' \
                r'\\U0001F600-\\U0001F64F' \
                r'\\U0001F300-\\U0001F5FF' \
                r'\\U0001F680-\\U0001F6FF' \
                r'\\U0001F700-\\U0001F77F' \
                r'\\U0001F780-\\U0001F7FF' \
                r'\\U0001F800-\\U0001F8FF' \
                r'\\U0001F900-\\U0001F9FF' \
                r'\\U0001FA00-\\U0001FA6F' \
                r'\\U0001FA70-\\U0001FAFF' \
                r'\\U0001FB00-\\U0001FBFF' \
                r'\\U0001FC00-\\U0001FCFF' \
                r'\\U0001FD00-\\U0001FDFF' \
                r'\\U0001FE00-\\U0001FEFF' \
                r'\\U0001FF00-\\U0001FFFF' \
                r']'

emoticon_pattern = r'[:;][-\'\)\(\]\[dDpPoO/\\|33*]+|<3'

df = df.withColumn("cleaned_text", regexp_replace(df["cleaned_text"], emoji_pattern, ""))
df = df.withColumn("cleaned_text", regexp_replace(df["cleaned_text"], emoticon_pattern, ""))

df.show(10, 40)

+----------------------+----------------------------------------+----------------------------------------+
|             review_id|                                    text|                            cleaned_text|
+----------------------+----------------------------------------+----------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aw...|decide eat aware going take  hours be...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over...|ive taken lot spin classes years noth...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclecti...|family diner buffet eclectic assortme...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.  ...|wow  yummy different  delicious   fav...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us t...|cute interior owner  gave us tour upc...|
|JrIxlS1TzJ-iCu79ul40cQ|I am a long term frequent customer of...|long term frequent customer establish...|
|6AxgBCNX_PNTOxmbRSwcKQ|Loved this to

Remove URLs and HTML Tags

In [14]:
html_pattern = r"<.*?>"
url_pattern = r"https?://\S+|www\.\S+"

df = df.withColumn("cleaned_text", regexp_replace(df["cleaned_text"], html_pattern, ""))
df = df.withColumn("cleaned_text", regexp_replace(df["cleaned_text"], url_pattern, ""))

df.show(10, 40)

+----------------------+----------------------------------------+----------------------------------------+
|             review_id|                                    text|                            cleaned_text|
+----------------------+----------------------------------------+----------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aw...|decide eat aware going take  hours be...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over...|ive taken lot spin classes years noth...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclecti...|family diner buffet eclectic assortme...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.  ...|wow  yummy different  delicious   fav...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us t...|cute interior owner  gave us tour upc...|
|JrIxlS1TzJ-iCu79ul40cQ|I am a long term frequent customer of...|long term frequent customer establish...|
|6AxgBCNX_PNTOxmbRSwcKQ|Loved this to

Limit consecutive whitespaces to 1

In [15]:
df = df.withColumn("cleaned_text", regexp_replace(df["cleaned_text"], r'\s+', ' '))
df.show(10, 40)

+----------------------+----------------------------------------+----------------------------------------+
|             review_id|                                    text|                            cleaned_text|
+----------------------+----------------------------------------+----------------------------------------+
|KU_O5udG6zpxOg-VcAEodg|If you decide to eat here, just be aw...|decide eat aware going take hours beg...|
|BiTunyQ73aT9WBnpR9DZGw|I've taken a lot of spin classes over...|ive taken lot spin classes years noth...|
|saUsX_uimxRlCVr67Z4Jig|Family diner. Had the buffet. Eclecti...|family diner buffet eclectic assortme...|
|AqPFMleE6RsU23_auESxiA|Wow!  Yummy, different,  delicious.  ...|wow yummy different delicious favorit...|
|Sx8TMOWLNuJBWer-0pcmoA|Cute interior and owner (?) gave us t...|cute interior owner gave us tour upco...|
|JrIxlS1TzJ-iCu79ul40cQ|I am a long term frequent customer of...|long term frequent customer establish...|
|6AxgBCNX_PNTOxmbRSwcKQ|Loved this to

## Data Export to AWS

In [15]:
# Specify the new options
options = {
    "url": jdbc_url,
    "driver": "org.postgresql.Driver",
    "dbtable": db_table,
    "user": db_user,
    "password": db_password,
}

# Read the table into a PySpark DataFrame
master = spark.read.format("jdbc").options(**options).load()

# Merge the df with master df on review_id
master = master.join(df.select("review_id", "cleaned_text"), on = "review_id", how = "inner")

# Update dataset
master.write.format("jdbc").options(**options).mode("overwrite").save()