In [None]:
# Amazon Review Analysis Pipeline using NLP and Spark
# Ravi Patel 12291812
# Lyric Randle 16253122
# Michelle Lu 16127324
# Mason Allaman 16365811

# Abstract:
# Text data around product reviews is abundant with the growth of Amazon. Gathering actionable information from these reviews 
# can be key to adjust product decisions. This can include specific phrases that can be associated with negative and positive 
# reviews to training a classifier specifically around sentiment in the context of an industry or product segment. The value 
# generated is vast, but there can be difficulties to processing the amount of data. This experiment is going to be around processing 
# the 2013 Amazing reviews dataset using Spark and SparkNLP within a Docker Compose environment. 

In [None]:
import mysql.connector
import pandas as pd
import seaborn as sns
import json

con = mysql.connector.connect(
    user="root", password="root", host="mysql-server", database="product_analysis"
)

curs = con.cursor(dictionary=True)

In [None]:
curs.execute("show tables")
curs.fetchall()

In [None]:
# Total Reviews ingested:
curs.execute("select count(*) as total_reviews from reviews")
curs.fetchall()[0]['total_reviews']

In [None]:
# Total Products ingested:
curs.execute("select count(*) as total_products from products")
curs.fetchall()[0]['total_products']

In [None]:
curs.execute("select * from processed_review inner join reviews on processed_review.review_id = reviews.id limit 5")
curs.fetchall()

In [None]:
curs.execute("select p.title, p.category from products p  inner join reviews r on p.id = r.product_id where p.category != 'Books' limit 10")
curs.fetchall()

In [None]:
curs.execute("""SELECT products.title, COUNT(reviews.id) AS num_reviews
FROM products
LEFT JOIN reviews ON products.id = reviews.product_id
GROUP BY products.id order by num_reviews desc limit 5""")
top_reviewed_products = pd.DataFrame(curs.fetchall())
top_reviewed_products.plot(kind='bar',x='title')
top_reviewed_products

In [None]:
curs.execute("""SELECT products.category, COUNT(id) AS category_counts
FROM products
GROUP BY products.category""")
category_counts = curs.fetchall()


In [None]:
cat_df = pd.DataFrame(category_counts)
cat_df
cat_df.plot(kind='bar',x='category')

In [None]:
curs.execute('select count(*) from products where processed = 1')
curs.fetchall()

In [None]:
for category in category_counts:
    curs.execute('select rating from reviews inner join products on products.id = reviews.product_id where products.category=%s',[category['category']])
    cat_ratings = curs.fetchall()            
    cat_rating_df = pd.DataFrame(cat_ratings)
    cat_rating_df.columns = [f'{category["category"]}_Ratings']
    cat_rating_df.hist()

In [None]:
for category in category_counts:
    curs.execute('select helpfulness from reviews inner join products on products.id = reviews.product_id where products.category=%s',[category['category']])
    cat_helpful = curs.fetchall()            
    cat_helpful_df = pd.DataFrame(cat_helpful)
    cat_helpful_df.columns = [f'{category["category"]}_Helpful']
    cat_helpful_df.hist()

In [None]:
curs.execute('select * from products inner join reviews on reviews.product_id=products.id inner join processed_review on processed_review.review_id = reviews.id where reviews.processed = 1 and reviews.helpfulness > 0.5 and reviews.rating < 2.0 limit 10')
kp = curs.fetchall()
kp

In [None]:
curs.execute('select * from agg_product_review inner join products on products.id=agg_product_review.product_id where products.category!="Books" limit 10')
topics = curs.fetchall()
for t in topics:
    t['metric'] = json.loads(t['metric'])
most_topics = [t for t in topics if len(t['metric']) > 5]
most_topics