In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install pyspark
!pip install sparknlp
!pip install nltk

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
import sparknlp
sparknlp.start()

In [None]:
#import necessary libraries
import re
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')
from nltk.stem import WordNetLemmatizer
from nltk import pos_tag

import sparknlp
from pyspark.sql import *
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline

from pyspark.ml.feature import *

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

In [None]:
# start spark session configured for spark nlp
spark = SparkSession.builder \
     .master('local[50]') \
     .appName('Spark NLP') \
     .config("spark.driver.memory","80g")\
     .config("spark.driver.maxResultSize", "0") \
     .config("spark.kryoserializer.buffer.max", "2000M") \
     .config("spark.jars", "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/tmp/spark-nlp-gpu-multi-fix4.jar")\
     .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.0") \
     .getOrCreate()

In [None]:
#load the data
df = spark.read.csv('/content/drive/MyDrive/amazon_reviews_multilingual_US_v1_00.tsv', sep="\t", header=True).repartition(30)

In [None]:
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   52946003|R3H88ZRH7E8F41|0783235615|     152746530|         Topaz [VHS]|           Video|          2|           10|         17|   N|                N|Uninvolving, but ...|The wooden Freder...| 2001-01-15|
|         US|   52389844|R2K3ZKNSHEIHL8|B00000K31V|     710018450|            eXistenZ|       Video DVD|          4|    

In [None]:
#print columns
print(df.columns)

['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'product_category', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date']


In [None]:
#select necessary columns for modeling
df = df.select(col("customer_id"), col("review_id"), col("product_id"), col("review_headline"), col("review_body"), col("star_rating"))

In [None]:
# check for null values
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

# drop rows with null values
df = df.dropna()

#### **The data contains information about customer reviews for various products in different marketplaces. To ensure data quality, the following requirements were applied to the dataset:**

1. Only reviews with a minimum length of 20 characters are included in the dataset.
2. Only products with at least 2 reviews are included in the dataset.
3. Only a maximum of 20 reviews per product and customer are included in the dataset to avoid bias.

In [None]:
# Filter out reviews less than 20 characters long and products with less than 2 reviews
df_new = df.filter(length(col("review_body")) >= 20)

# Select up to 20 reviews per product and customer
df_new = df_new.groupBy("product_id", "customer_id").agg({"review_id": "count"}).filter(col("count(review_id)") <= 20)
df_new = df_new.groupBy("product_id", "customer_id").agg({"count(review_id)": "max"}).withColumnRenamed("max(count(review_id))", "review_id")
df_new = df_new.join(df_new.groupBy("product_id").agg({"review_id": "count"}).withColumnRenamed("count(review_id)", "product_review_count"), "product_id")
df_new = df_new.join(df_new.groupBy("customer_id").agg({"review_id": "count"}).withColumnRenamed("count(review_id)", "customer_review_count"), "customer_id")
df_new = df_new.filter((col("product_review_count") >= 2) & (col("customer_review_count") >= 2))

df_new.show(2)

+-----------+----------+---------+--------------------+---------------------+
|customer_id|product_id|review_id|product_review_count|customer_review_count|
+-----------+----------+---------+--------------------+---------------------+
|   10000064|B007L6VR6M|        1|                 188|                    2|
|   10000064|B008OW1S3O|        1|                 137|                    2|
+-----------+----------+---------+--------------------+---------------------+
only showing top 2 rows



In [None]:
df_new = df_new.drop("product_id", "review_id", "product_review_count", "customer_review_count")

In [None]:
# Join with the original DataFrame to get the review details
df_new = df_new.join(df.select("review_id", "product_id", "customer_id", "review_headline", "review_body", "star_rating"), "customer_id")

In [None]:
df_new.show(10)

+-----------+--------------+----------+--------------------+--------------------+-----------+
|customer_id|     review_id|product_id|     review_headline|         review_body|star_rating|
+-----------+--------------+----------+--------------------+--------------------+-----------+
|   10000810|R1BX560EJUMMZL|B0055SWM08|Not my usual choi...|The reviews for t...|          3|
|   10000810|R2N0CSF286FXGO|B007N85POS|     Angry Birds Rio|For the last few ...|          4|
|   10000810|R1JUKQQCYWRJEM|B008P1EWIM|Weather Live - Ap...|I was disapointte...|          2|
|   10000810|R3VUV7D1XA10RA|B00ATSBIIG| Beautiful graphics.|The graphics are ...|          3|
|   10000810|R2BA2QT6K5MJW5|B008HK3XOU|A sturdy cover to...|I absolutely love...|          5|
|   10000810|R2IEYW5OUH23HD|B009C7ZQM4|        Clean master|I love having thi...|          5|
|   10000810|R22IBFORFYK61I|B00AFCEIB0|It's good to have...|I like that the f...|          3|
|   10000810|R2Q7BD4ROMCHRQ|B0063GORYU|Very simplistic g...|

In [None]:
df_new.count()

54901614

In [None]:
# Remove HTML tags
df_new = df_new.withColumn("review_body", regexp_replace("review_body", "<.*?>", ""))

# Truncate review body at 2,000 characters
df_new = df_new.withColumn("review_body", substring("review_body", 0, 4000))

In [None]:
#concate revie_headline and review_body and create a new column "review"
df_new = df_new.withColumn("review", concat(col("review_headline"), lit(" "), col("review_body")))
df_new = df_new.drop("review_headline", "review_body")
df_new.show(5)

+-----------+--------------+----------+-----------+--------------------+
|customer_id|     review_id|product_id|star_rating|              review|
+-----------+--------------+----------+-----------+--------------------+
|   10000810|R1BX560EJUMMZL|B0055SWM08|          3|Not my usual choi...|
|   10000810|R2N0CSF286FXGO|B007N85POS|          4|Angry Birds Rio F...|
|   10000810|R1JUKQQCYWRJEM|B008P1EWIM|          2|Weather Live - Ap...|
|   10000810|R3VUV7D1XA10RA|B00ATSBIIG|          3|Beautiful graphic...|
|   10000810|R2BA2QT6K5MJW5|B008HK3XOU|          5|A sturdy cover to...|
+-----------+--------------+----------+-----------+--------------------+
only showing top 5 rows



#### **We also applied a vocabulary-based filter on the reviews. If a review contains a token that doesn’t occur in at least 20 other reviews, then the review is excluded from the dataset.**

In [None]:
# Tokenize review_body column
tokenizer = Tokenizer(inputCol="review_body", outputCol="tokens")
df_tokens = tokenizer.transform(df_new)

# Count the frequency of each token
cv = CountVectorizer(inputCol="tokens", outputCol="raw_features")
cv_model = cv.fit(df_tokens)
df_features = cv_model.transform(df_tokens)

# Filter out the reviews that contain tokens that occur in less than 20 other reviews
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=20)
idf_model = idf.fit(df_features)
df_filtered = idf_model.transform(df_features).select("review_id", "features")

# Join the resulting DataFrame with the original DataFrame on the review_id column
df_final = df.join(df_filtered, "review_id")

In [None]:
#drop duplicates from the dataframe
df_final = df_final.dropDuplicates()

#combine all the dataframes into one (repartitioned for parallelism)
df_final = df_final.coalesce(1)

#save the clean data as csv
df_final.write.csv("/content/drive/MyDrive/df_cleaned.csv", header=True)