## Part 1 Data Preprocessing

In [1]:
import numpy as np
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import col, lower, regexp_replace, split
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import udf
from nltk.stem.porter import *

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1622827002238_0009,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<br></br>

## Preprocessing tripAdvisor review data 

- Create ``number of helpful`` with label 0 and 1 
- Text processing
    - Remove symbols, digits
    - Lowercase letters
    - Tokenize corpus
    - Remove stopwords
    - Lemmatize tokens

#### Load tripAdvisor review data

In [11]:
trip = spark.read.json('s3://dse230-project-data1/data/review.txt')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
trip.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- author: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- num_cities: long (nullable = true)
 |    |-- num_helpful_votes: long (nullable = true)
 |    |-- num_reviews: long (nullable = true)
 |    |-- num_type_reviews: long (nullable = true)
 |    |-- username: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_stayed: string (nullable = true)
 |-- id: long (nullable = true)
 |-- num_helpful_votes: long (nullable = true)
 |-- offering_id: long (nullable = true)
 |-- ratings: struct (nullable = true)
 |    |-- business_service_(e_g_internet_access): double (nullable = true)
 |    |-- check_in_front_desk: double (nullable = true)
 |    |-- cleanliness: double (nullable = true)
 |    |-- location: double (nullable = true)
 |    |-- overall: double (nullable = true)
 |    |-- rooms: double (nullable = true)
 |    |-- service: double (nullable = true)
 |    |-- sleep_quality: double (nullable 

<br></br>

#### create label with number of helpful reviews

In [13]:
# bin label with number of helpful reviews
label_udf = udf(lambda x: 1 if x > 0 else 0, IntegerType())
trip = trip.withColumn('is_helpful', label_udf("num_helpful_votes"))
trip_review = trip.select(['text', 'is_helpful'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# check label distribution
trip_review.filter(trip_review.is_helpful==1).count()/trip_review.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.4415641031186224

<br></br>

#### clean text string

In [7]:
def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "^rt ", "")
    c = regexp_replace(c, "(https?\://)\S+", "")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    return c

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
trip_review = trip_review.withColumn('text', clean_text(col("text")).alias("text"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<br></br>

#### tokenize the text

In [16]:
Tokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'text', outputCol = 'text_token')

trip_token = Tokenizer.transform(trip_review)
trip_token.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+
|                text|is_helpful|          text_token|
+--------------------+----------+--------------------+
|stayed in a king ...|         0|[stayed, in, a, k...|
|on every visit to...|         0|[on, every, visit...|
|this is a great p...|         0|[this, is, a, gre...|
|the andaz is a ni...|         0|[the, andaz, is, ...|
|i have stayed at ...|         0|[i, have, stayed,...|
|excellent staff t...|         0|[excellent, staff...|
|i stayed at the s...|         0|[i, stayed, at, t...|
|my husband and i ...|         0|[my, husband, and...|
|wonderful boutiqu...|         0|[wonderful, bouti...|
|this hotel is a n...|         0|[this, hotel, is,...|
+--------------------+----------+--------------------+
only showing top 10 rows

<br></br>

#### remove stopwords

In [17]:
remover = StopWordsRemover(inputCol = 'text_token', outputCol = 'token_sw_removed')

trip_token = remover.transform(trip_token)
trip_token.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+--------------------+
|                text|is_helpful|          text_token|    token_sw_removed|
+--------------------+----------+--------------------+--------------------+
|stayed in a king ...|         0|[stayed, in, a, k...|[stayed, king, su...|
|on every visit to...|         0|[on, every, visit...|[every, visit, ny...|
|this is a great p...|         0|[this, is, a, gre...|[great, property,...|
|the andaz is a ni...|         0|[the, andaz, is, ...|[andaz, nice, hot...|
|i have stayed at ...|         0|[i, have, stayed,...|[stayed, us, anda...|
|excellent staff t...|         0|[excellent, staff...|[excellent, staff...|
|i stayed at the s...|         0|[i, stayed, at, t...|[stayed, setai, 3...|
|my husband and i ...|         0|[my, husband, and...|[husband, stayed,...|
|wonderful boutiqu...|         0|[wonderful, bouti...|[wonderful, bouti...|
|this hotel is a n...|         0|[this, hotel, is,...|[hotel, nice, sta...|
+-----------

<br></br>

#### lemmatization

In [18]:
# Instantiate stemmer object
stemmer = PorterStemmer()

# Create stemmer python function
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# create user defined function
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

# Create new df with vectors containing the stemmed tokens 
final_token = (trip_token.withColumn("vector_stemmed", stemmer_udf("token_sw_removed")))

final_token.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|is_helpful|          text_token|    token_sw_removed|      vector_stemmed|
+--------------------+----------+--------------------+--------------------+--------------------+
|stayed in a king ...|         0|[stayed, in, a, k...|[stayed, king, su...|[stay, king, suit...|
|on every visit to...|         0|[on, every, visit...|[every, visit, ny...|[everi, visit, ny...|
|this is a great p...|         0|[this, is, a, gre...|[great, property,...|[great, properti,...|
|the andaz is a ni...|         0|[the, andaz, is, ...|[andaz, nice, hot...|[andaz, nice, hot...|
|i have stayed at ...|         0|[i, have, stayed,...|[stayed, us, anda...|[stay, andaz, pro...|
|excellent staff t...|         0|[excellent, staff...|[excellent, staff...|[excel, staff, re...|
|i stayed at the s...|         0|[i, stayed, at, t...|[stayed, setai, 3...|[stay, setai, nig...|
|my husband and i ...|        

In [20]:
# keep filterd tokens and label
final_token = final_token.withColumn('unigrams', col('vector_stemmed')).select(['unigrams', 'is_helpful'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# save processed data to parquet
#final_token.coalesce(5).write.parquet('s3://dse230-project-data1/final_token.parquet', mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<br></br>

## Preprocessing Airbnb reviews

In [2]:
airbnb = spark.read.csv('s3://dse230-project-data1/data/reviews.csv', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
airbnb.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)

#### select data

In [4]:
comments = airbnb.select(['id','comments'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### remove null rows

In [5]:
comments = comments.filter(comments.comments.isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### clean text string

In [8]:
comments = comments.withColumn('comments', clean_text(col("comments")).alias("comments"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### tokenize the text

In [9]:
Tokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'comments', outputCol = 'text_token')
comments = Tokenizer.transform(comments)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### remove stopwords

In [22]:
comments = remover.transform(comments)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### lemmatization

In [23]:
final_review = (comments.withColumn("vector_stemmed", stemmer_udf("token_sw_removed")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
final_review = final_review.withColumn('unigrams', col('vector_stemmed')).select(['unigrams', 'id'])

# save processed data to parquet
final_review.write.parquet('s3://dse230-project-data1/final_review.parquet', mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…