<table>
<tr>
    <td>
        <img src="https://www.wordstream.com/wp-content/uploads/2021/07/how-to-get-amazon-reviews.png" width="200"/>
    </td>
    <td style="text-align: left; vertical-align: top;">
        <h1><strong>Amazon Reviews</strong><br></h1>
        <h4>Engineering Large Scale Data Analytics Systems<br>
        ENSF 612 - Fall 2023</h4>
    </td>
</tr>
</table>


*** Note: run all the code the first time. For subsecuent runs, set run_everything flag to False. This will avoid resetting spark, mounting the drive and compute high intensive functions that were already computed.


In [None]:
run_everything = True

**Setting Up Spark & Spark NLP**

In [None]:
# the capture magic command captures the output of the block to avoid clutter
%%capture

if run_everything:
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  !wget https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
  !tar -xvf spark-3.3.3-bin-hadoop3.tgz
  !pip install findspark
  !pip install -q spark-nlp

  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

  import findspark
  findspark.init()
  findspark.find()
  from pyspark.sql import SparkSession
  import sparknlp

  # Setting up 4 threads, potentially allowing a 4-core processor execute 4 tasks in parallel
  # And adding the Spark NLP package to the Spark session
  spark = SparkSession.builder\
      .appName("Colab")\
      .master("local[4]")\
      .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4")\
      .getOrCreate()

  sc = spark.sparkContext

**Mounting Drive & Loading Datasets**

In [None]:
if run_everything:
  from google.colab import drive
  drive.mount('/content/drive')

In [None]:
!ls drive/MyDrive/Big\ Data/datasets

In [None]:
dataset_directory = 'drive/MyDrive/Big Data/datasets'

# Gets the list of files in the dataset directory that end in ".json"
json_files = [file for file in os.listdir(dataset_directory) if file.endswith('.json')]

# Creates a list of full file paths
file_paths = [os.path.join(dataset_directory, file) for file in json_files]

In [None]:
import json

# Function to parse NDJSON (new line-delimited JSON) files and extract specific fields
def parse_ndjson(line):
    try:
        # Parse the JSON line and return only reviewText asin and reviewerID
        json_line = json.loads(line)
        return (
            json_line.get('overall', ''),
            json_line.get('reviewText', '')
        )
    except json.JSONDecodeError:
        # In case of error, skip this record and return None
        return None

In [None]:
# Initialize an empty RDD
data_rdd = spark.sparkContext.emptyRDD()

# Read each file into an RDD, parse its ndjson objects if not None, and union with the existing RDD
for file_path in file_paths:
    file_rdd = sc.textFile(file_path, 4)
    parsed_rdd = file_rdd.map(parse_ndjson).filter(lambda x: x is not None)
    data_rdd = data_rdd.union(parsed_rdd)

# convert the data_rdd to a distributed Spark DataFrame
df = spark.createDataFrame(data_rdd, schema=['score', 'review'])

**Data Inspection**

In [None]:
df.show # Shows the attributes of the DataFrame

In [None]:
df.count() # Amount of records on the DataFrame

In [None]:
df.head(1) # Preview a single record

In [None]:
if run_everything:
  from pyspark.sql.functions import col, count

  # Check if there are missing values in the dataset (If necessary, we would fill-in missing values with an appropiate method)
  for column in df.columns:
      null_count = df.filter(col(column).isNull()).count()
      print(f"Number of nulls in column {column}: {null_count}")

**Text Pre-Processing**

Expanding contractions

Although this step in not estrictly neccesary. Expanding contractions can make the text clearer and more consistent for the model, which can improve its ability to interpret and analyze the words.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import contractions

# Define the UDF for expanding contractions
def expand_contractions_text(text):
    return contractions.fix(text)

expand_contractions_udf = udf(expand_contractions_text, StringType())

# Apply the UDF to the DataFrame to create a new column with expanded contractions
expanded_df = df.withColumn("expanded_review", expand_contractions_udf("review"))

Defining DocumentAssembler and Spark NLP components

The DocumentAssembler is the initial step in a Spark NLP pipeline. It converts raw text into a structured Annotation format that subsequent Spark NLP annotators can utilize for processing.

In [None]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner

document_assembler = DocumentAssembler() \
    .setInputCol("expanded_review") \
    .setOutputCol("document")

1. Tokenization

In [None]:
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

2. Text Cleaning

In [None]:
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True) \
    .setCleanupPatterns(["[^A-Za-z'\\s]"])  # remove punctuations and numbers

3. Stopword Removal

In [None]:
stop_words_cleaner = StopWordsCleaner() \
    .setInputCols(["normalized"]) \
    .setOutputCol("cleanTokens")

4. Stemming/Lemmatization.

Stemming and lemmatization are both text normalization techniques that reduce words to their base or root form. Applying both can at times be redundant. For this application we decide to use Lemmatization.


In [None]:
# Use the pretrained LemmatizerModel from Spark NLP
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemmatized")

In [None]:
# Define the Spark NLP pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    normalizer,
    stop_words_cleaner,
    lemmatizer
])

processed = pipeline.fit(expanded_df).transform(expanded_df)

# Show the processed data
processed.show()

In [None]:
# Get the first row of the DataFrame
first_row = processed.first()

# Print first row with its content
print("Score:", first_row['score'])
print("Review:", first_row['review'])
print("Document:", [doc.result for doc in first_row['document']])
print("Token:", [tok.result for tok in first_row['token']])
print("Normalized:", [norm.result for norm in first_row['normalized']])
print("Clean Tokens:", [clean.result for clean in first_row['cleanTokens']])
print("Lemmatized:", [lemma.result for lemma in first_row['lemmatized']])


**Feature extraction**

Next steps:

Feature extraction (Bag of Words, TF-IDF, word embeddings Word2Vec)

Vectorization (Count Vectorizer, TfidfVectorizer)

Model selection (LogisticRegression, Nayve Bayes, SVM or unsupervised learning)





Additional steps (optional to improve accuracy):

Speech tagging (before stop word removal)

N-grams to use along with TD-IDF