# Automated Temporal Brand & Product Perception Discovery from Online Reviews with Topical Phrase Mining.

## 0. Setup

### 0.1. Imports

In [None]:
import findspark
findspark.init()

import glob
import multiprocessing
import os

from gensim.parsing.preprocessing import preprocess_string, strip_short, remove_stopwords, strip_punctuation, strip_numeric
import nltk
from nltk.corpus import stopwords
from nltk.util import ngrams
import numpy as np 
import pandas as pd
from pyarrow import json
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from sklearn import preprocessing
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

### 0.2. Configuration

In [None]:
# Input data -- raw Amazon Reviews 2018 data sets
RAW_REVIEWS_FILE='./data/Cell_Phones_and_Accessories.json.gz'
RAW_REVIEWS_METADATA_FILE='./data/meta_Cell_Phones_and_Accessories.json.gz'
CATEGORY='Cell Phones'

# Configuration for processed data
REVIEWS_FILE = './data/cleaned_reviews.json'
TOPICS_DIR = './data/topics'
CORES = multiprocessing.cpu_count()

### 0.3 Multiprocessing Helper

In [None]:
def run_parallel(fn, arg):
    if CORES == 1:
        result = fn(arg)
    else:
        pool = multiprocessing.Pool(CORES-1)
        result = pool.map(fn, arg)
        pool.terminate()
    
    return result

def filter_parallel(fn, arg):        
    if CORES == 1:
        result = [c for c, check in zip(arg, fn(arg)) if check]
    else:
        pool = multiprocessing.Pool(CORES-1)
        result = [c for c, check in zip(arg, pool.map(fn, arg)) if check]
        pool.terminate()

    return result

# 1. Prepare the Raw Reviews Data Set

Start the Spark Session.

In [None]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("Product Reviews") \
                    .getOrCreate()

## 1.1. Get the Products Associated with Each Brand

In [None]:
def get_products(category):
    """ Returns a dataframe with all mobile phone products """
    df = spark.read.json(RAW_REVIEWS_METADATA_FILE)
    
    products_df = df.select("asin",
                       F.lower(df.brand).alias("brand"), 
                       F.lower(df.title).alias("product"),
                       F.explode("category").alias("category")) \
               .filter(f"category = '{category}'") \
               .drop("category") \
               .filter("brand is not null")
    
    return products_df


def get_products_by_brand(df):
    """ Returns the number of unique SKUs per Brand """
    brands_df = df.select("brand") \
                  .groupby("brand") \
                  .count() \
                  .withColumnRenamed("count", "num_products") \
                  .sort(F.col("num_products").desc())
    
    return brands_df

### 1.2 Get the reviews for the desired category and filter to most reviewed brands.

In [None]:
def get_reviews(products_df):
    reviews_df = spark.read.json(RAW_REVIEWS_FILE)
    reviews_df = reviews_df.select("asin", "overall", "reviewText", "reviewTime", "vote") \
                           .filter("reviewText is not null") \
                           .withColumn("reviewYear", F.substring(F.col("reviewTime"), -4,4).cast(IntegerType())) \
                           .join(products_df, "asin") 
    return reviews_df


def get_pop_brand_reviews(reviews_df, min_reviews=1000):
    """ Returns reviews for only the popular brands """
    brands_df = reviews_df.groupby("brand") \
                          .count().sort(F.col("count").desc()) \
                          .where(f"count >= {min_reviews}")
    reviews_df = reviews_df.join(brands_df, "brand").drop("count")
    
    return brands_df, reviews_df
    

### 1.3 Load the dataframes.

In [None]:
# Get brand/product meta information
products_df = get_products(CATEGORY)

# Get reviews for the desired category of products
reviews_df = get_reviews(products_df)
top_brands_df, reviews_df = get_pop_brand_reviews(reviews_df, 400)
reviews_df.cache()

products_df.unpersist()

## 1.4 Save the compressed, processed data set to disk.

In [None]:
orderedCols = ["brand","product","asin","reviewYear","overall","reviewText"]
clean_df = reviews_df.select(orderedCols)\
                     .coalesce(1)\
                     .sort("brand","product","asin","reviewYear","overall")\
                     .withColumn("review_id", F.monotonically_increasing_id())

clean_df.write.format("json")\
    .option("compression", "org.apache.hadoop.io.compress.GzipCodec")\
    .save("prepared_reviews")

# Clean up Spark
spark.stop()

## 2. Import & Prepare Topical Phrases

Load the topics from the ToPMine outputs (containing phrases and frequencies).

In [None]:
def load_topics(topic_dir, ext="txt", min_freq=None):
    
    # get topic files
    file_pattern = os.path.join(topic_dir, f"*.{ext}")
    topic_files = glob.glob(file_pattern)
    
    # load topic phrases into dataframe
    col_names = ["phrase", "freq", "topic"]
    phrases_df = pd.DataFrame(columns=col_names)
    
    for i, f in enumerate(topic_files):
        df = pd.read_csv(f, sep="\t", header=None, names=col_names[:2])
        df['topic'] = i
        
        if min_freq:
            df = df[df['freq'] > min_freq]
        phrases_df = phrases_df.append(df, ignore_index=True)
        
    phrases_df['freq'] = phrases_df['freq'].astype(int)
    
    return phrases_df

Because phrases may be in multiple topics, assign a phrase to the dominant phrase (i.e. ensure one-to-one relationship between phrases and topics).

In [None]:
def preprocess_phrase(phrase):
    MIN_TOKEN_LEN = 2
    
    phrase = phrase.lower()
    phrase = strip_short(phrase, MIN_TOKEN_LEN)
    phrase = preprocess_string(phrase, filters=[strip_numeric])
    
    phrase = [w for w in phrase if w not in stopwords.words('english')]
    
    phrase = ' '.join(phrase) if phrase else None
                    
    return phrase   

def unique_phrases(df, f=preprocess_phrase):
    """ Each phrase may only appear in one topic """
    
    # preprocess phrases
    df['phrase'] = df['phrase'].map(f)
    
    #combine phrases by topic
    df = df.groupby(['phrase','topic'], as_index=False)['freq'].sum()
    
    # assign to topic where phrase is most frequently seen
    df['rank'] = df.groupby(['phrase'])['freq'].rank(ascending=False)
    df = df[df['rank']==1]
    
    # clean up dataframe
    df = df.drop(columns=['rank'])
    df = df.sort_values(['topic','freq'], ascending=False)
    df = df.reset_index(drop=True)
    
    return df

def top_phrases(df, max_topic_size, topic_col='topic', phrase_col='phrase', freq_col='freq'):
    """ Limit the number of candidate phrases in each topic"""
    
    # rank by freq and keep top 'max_topic_size' freq phrases per topic
    df['rank'] = df.groupby([topic_col])[freq_col].rank(ascending=False)
    df = df[df['rank'] <= max_topic_size]
    
    # clean up dataframe
    df = df.drop('rank',axis=1)
    df = df.reset_index(drop=True)

    return df

Transform topics into sets of phrases.

In [None]:
def topics_to_sets(df, topic_col='topic', phrase_col='phrase'):
    num_topics = df[topic_col].max()+1
    phrase_sets = []
    
    for i in range(0, num_topics):
        topic_i = set(df[df[topic_col] == i][phrase_col])
        phrase_sets.append(topic_i)
    
    return phrase_sets

Load all topic phrases into sets.

In [None]:
MIN_PHRASE_FREQ = 50
MAX_TOPIC_SIZE = 500

topics_df = load_topics(TOPICS_DIR, min_freq=MIN_PHRASE_FREQ)
phrases_df = unique_phrases(topics_df)
phrases_df = top_phrases(phrases_df, MAX_TOPIC_SIZE)
phrase_sets = topics_to_sets(phrases_df)

## 2. Build Vocabulary

The vocab datastructure will aid in both compression and efficient lookup of topical phrases for later computations.

In [None]:
class TopicVocab:
    
    def __init__(self, df, topic_col='topic', phrase_col='phrase'):
        self.__vocab, self.__inverse_vocab = self.__create_vocab(df, phrase_col)
        self.__topic_vocab = self.__create_topic_vocab(df, topic_col, phrase_col)
        self.__topics = list(df[topic_col])
        self.num_topics = len(set(self.__topics))
        
    def __create_vocab(self, df, phrase_col):
        vocab = list(df[phrase_col])
        inverse_vocab = {}
        for idx, phrase in enumerate(vocab):
            inverse_vocab[phrase] = idx
        return vocab, inverse_vocab
    
    def __create_topic_vocab(self, df, topic_col, phrase_col):
        topics = list(phrases_df[topic_col].unique())
        topic_vocab = {}
        
        for t in topics:
            phrase_ids = list(df[df[topic_col]==t].index)
            topic_vocab[t]= phrase_ids
            
        return topic_vocab
            
    def get_phrase_id(self, phrase):
        return self.__inverse_vocab[phrase]
    
    def get_topic_id(self, phrase):
        phrase_id = self.__inverse_vocab[phrase]
        return self.__topics[phrase_id]
    
    def phrase_ids(self, topic):
        return set(self.__topic_vocab[topic])
    
    def phrases(self, topic):
        topic_vocab = self.__topic_vocab[topic]
        phrases = [self[idx] for idx in topic_vocab]
        return set(phrases)
    
    def size(self):
        return len(self.__vocab)
    
    def intersection(self, other):
        return other.intersection(set(self.__vocab))
    
    def __and__(self, other):
        return self.intersection(other)
    
    def __getitem__(self, idx):
        phrase = None
        if idx < len(self.__vocab):
            phrase = self.__vocab[idx]
        return phrase
        

Build the vocab.

In [None]:
vocab = TopicVocab(phrases_df)

print(f"The vocab contains {vocab.size()} interesting product attribute phrases")

## 3. Process the Reviews into Sentences & Phrases

The cleaned and prepared review data is imported and then processed into sentences and phrases.

Read in the reviews.

In [None]:
reviews = json.read_json(REVIEWS_FILE)
reviews_df = reviews.to_pandas()
del reviews

Split each review into sentences.

In [None]:
review_text = list(reviews_df['reviewText'])
review_sentences = run_parallel(nltk.sent_tokenize, review_text)

Create candidate phrases from the review sentences.

In [None]:
def preprocess_sentence(sentence):
    MIN_TOKEN_LEN = 2
    MAX_NGRAM = 3
    SEP = ' '
    
    sentence = sentence.lower()
    sentence = strip_short(sentence, MIN_TOKEN_LEN)
    sentence = preprocess_string(sentence, 
                                 filters=[remove_stopwords,
                                          strip_punctuation,
                                          strip_numeric])
    
    n_grams = nltk.everygrams(sentence, max_len=MAX_NGRAM)
    n_grams = set([SEP.join(t) for t in n_grams])
                             
    return n_grams

def preprocess_review(sentences):
    return [preprocess_sentence(s) for s in sentences]

review_ngrams = run_parallel(preprocess_review, review_sentences)

Find all of the candidate phrases that are discovered product attributes.

In [None]:
def encode_phrases(ngram_sets, vocab, encoding_type='phrase'):
    """ filter to ngrams that are in the phrase vocabulary """
    encoders = {'phrase': vocab.get_phrase_id,
                'topic': vocab.get_topic_id}
    
    phrases = [list(vocab & ngrams) for ngrams in ngram_sets]
    encode = encoders[encoding_type]
    
    ids = []
    for phrase in phrases:
        tmp_ids = [encode(p) for p in phrase]
        ids.append(tmp_ids)

    return ids

def phrase_encoder(ngrams):
    return encode_phrases(ngrams, vocab)

def topic_encoder(ngrams):
    return encode_phrases(ngrams, vocab, encoding_type='topic')


Extract interesting phrases from the ngrams and encode using the dictionary.

In [None]:
review_phrases = run_parallel(phrase_encoder, review_ngrams)

Encode the topic ids for the extracted phrases.

In [None]:
review_topics = run_parallel(topic_encoder, review_ngrams)

Free up memory by releasing the candidate ngrams in favor of the compressed, econded phrases/topics.

In [None]:
del review_ngrams

## 4. Sentiment Scoring of Reviews & Topics

The sentiment scoring component computes sentence-level sentiment scores for every review.

In [None]:
analyser = SentimentIntensityAnalyzer()

def score_sentiment(sentences):
    scores = list(map(analyser.polarity_scores, sentences))
    scores = [s['compound'] for s in scores]
    return scores

review_scores = run_parallel(score_sentiment, review_sentences)

Compute the topic-specific sentiment for each review.

In [None]:
def topic_sentiment_scorer(topic, scores, topics):
    scores_and_topics = list(zip(topics, scores))
    sentiments = [s for (t,s) in scores_and_topics if topic in t]

    n = len(sentiments)
    avg_sent = sum(sentiments) / n if n>0 else None
    
    return avg_sent

def parallel_topic_scorer(args):
    topic = args[0]
    scores = args[1]
    topics = args[2]
    return topic_sentiment_scorer(topic, scores, topics)
    

In [None]:
NUM_TOPICS = vocab.num_topics

def append_topic_sentiment(df, num_topics, review_scores, review_topics):
    num_reviews = len(review_scores)
    
    for i in range(num_topics):
        args = list(zip([i]*num_reviews, review_scores, review_topics))
        result = run_parallel(parallel_topic_scorer, args)
        df[f"topic_{i}_score"] = result
        
    return df


Add columns with the sentiment scores for each topic.

In [None]:
reviews_df = append_topic_sentiment(reviews_df, NUM_TOPICS,
                                    review_scores, review_topics)

## 5. Final Data Preparation

Flatten the seperate 2d data structures holding the extracted phrases and topics. Also aggregate lists of phrases to distinct topics.

In [None]:
def flatten_topic_phrases(review):
    merged = {}
    review_id, topics, phrases = review

    for i in range(len(topics)):
        topic = topics[i]
        phrase = phrases[i]
        for j in range(len(topic)):
            t = topic[j]
            p = phrase[j]
            if t not in merged:
                merged[t] = set()
            merged[t].add(p)

    result = [(review_id, topic, list(phrases)) for topic, phrases in merged.items()]

    return result

def filter_empty(args):
    return len(args) > 0

# Merge review id, topics, phrases into tuples
review_ids = list(reviews_df['review_id'])
combined = zip(review_ids, review_topics, review_phrases)
flattened_topics = run_parallel(flatten_topic_phrases, combined)

# Only keep review mappings where a topic/phrase was extracted
flattened_topics = filter_parallel(filter_empty, flattened_topics)

Clean up the intermediate columns to free up memory.

In [None]:
del review_topics
del review_phrases

Create a dataframe that splits the (review_id, topic, phrases) tuples into columns. Each review is expanded so that a row describes a review_id, topic combination.

In [None]:
def create_review_phrase_df(mappings):
    """ Transform tuples of (review_id, topic, phrase lists) into
        a data frame
    """

    # Create a dataframe col with (id, topic, phrase) tuples
    df = pd.DataFrame(columns=['tuples'])
    df['tuples'].astype('object')
    df['tuples'] = mappings
    
    # Create a unique row for each review/topic pair
    df = df['tuples'].explode().reset_index().drop('index',axis=1)
    df = pd.DataFrame(df['tuples'].tolist(), index=df.index, columns=['review_id','topic','phrases'])
    
    return df

review_phrase_df = create_review_phrase_df(flattened_topics)   

## 5. Summary of Smartphone Review Data.

**Total Reviews by Year**

In [None]:
by_year = reviews_df.groupby('reviewYear')['review_id'] \
                    .count() \
                    .reset_index() \
                    .set_index('reviewYear')

total_reviews = by_year['review_id'].sum()
by_year.columns=['review_count'] 
    
print(by_year)
print("===============================")
print(f"Total Reviews: {total_reviews}")

**Total Reviews by Year and Brand -- Top 10 Brands**

In [None]:
reviews_by_brand = reviews_df.groupby(['brand','reviewYear'])['review_id'].count() \
                     .reset_index() \
                     .pivot(index='brand', columns='reviewYear', 
                            values='review_id') \
                     .fillna(0)

total_by_brand = reviews_by_brand.sum(axis=1)

reviews_by_brand['total']= total_by_brand

top_10 = reviews_by_brand.sort_values('total', ascending=False)[:10]

top_10

**Average Rating by Brand**

In [None]:
ratings_by_brand = reviews_df.groupby(['brand','reviewYear'])['overall'].mean() \
                     .reset_index() \
                     .pivot(index='brand', columns='reviewYear', 
                            values='overall') 

overall_avg_by_brand = reviews_df.groupby(['brand'])['overall'].mean() \
                         .reset_index() \
                         .set_index('brand')

ratings_by_brand['overall']= overall_avg_by_brand
ratings_by_brand.sort_values('overall', ascending=False)
ratings_by_brand[ratings_by_brand.index.isin(top_10.index)]


**Distribution of ratings over time**

In [None]:
stars_by_year = reviews_df.groupby(['overall','reviewYear'])['review_id'].count().reset_index() \
.pivot(index='overall', columns='reviewYear', values='review_id')

sby_np = stars_by_year.to_numpy()
sum_np = sby_np.sum(axis=0)
sby_np/sum_np

pd.DataFrame(sby_np/sum_np, index=stars_by_year.index, columns=stars_by_year.columns)

## 6. Analysis

### 6.1. Product Dimension Perception Over Time.

In [None]:
def avg_score_by_topic_year(df):
    result_df = df.drop('review_id', axis=1).groupby('reviewYear').mean()
    return result_df

avg_score_by_topic_year(reviews_df)

As an interesting aside, overall star ratings and often sentiment peaked in the 2014/2015 time period. This period also coincided with Amazon's public announcement that it would be cracking down on fake reviews.

## 6.2 Largest Changes in Product Dimension Sentiment

In [None]:
def overall_dimension_score_change(reviews_df, start_year, end_year):
    topics = []
    scores = []
    
    # Create a dataframe containing only the start and end years
    df = avg_score_by_topic_year(reviews_df)
    df = df.drop('overall', axis=1)
        
    df = df.loc[[start_year, end_year]]
    
    for t in df.columns:
        s_score, e_score = tuple(df[t])
        scores.append([s_score, e_score, e_score-s_score])
        
    df = pd.DataFrame(scores, index=df.columns, 
                      columns=[f"{start_year}_score", f"{end_year}_score", "chg_score"])
    
    df = df.sort_values("chg_score", ascending=False)
    return df
    
df = overall_dimension_score_change(reviews_df, 2009, 2018)

df.sort_values('chg_score', ascending=True)

## 6.3 Changes in Product Dimension Mention Freq

In [None]:
topic_cols = [f"topic_{i}_score" for i in range(vocab.num_topics)]
reviews_w_topics_df = reviews_df[topic_cols].dropna(thresh=1)

In [None]:
def freq_by_topic_year(df):
    topic_cols = [f"topic_{i}_score" for i in range(vocab.num_topics)]
    result_df = df.drop('review_id', axis=1).dropna(thresh=1).groupby('reviewYear').count()
    result_df = result_df[topic_cols]
    return result_df

tmp_df = freq_by_topic_year(reviews_df)
tmp_df = tmp_df.loc[[2009,2018]]
tmp_np = tmp_df.to_numpy()

In [None]:
def relative_topic_freq(df, year):
    mentioning_topics = df[[f"topic_{i}_score" for i in range(vocab.num_topics)]].dropna(thresh=1)
    years_df = df['reviewYear'].reset_index().drop('index',axis=1)
    years_df = years_df[years_df['reviewYear'] == year]
    join_df = years_df.join(mentioning_topics)
    n = len(join_df)
    tmp_np = join_df.drop('reviewYear',axis=1).count().to_numpy()
    return list(tmp_np/n*100)

data_2009 = relative_topic_freq(reviews_df, 2009)
data_2018 = relative_topic_freq(reviews_df, 2018)

In [None]:
prod_attributes = ['camera',
'customer service',
'calls & text',
'web/email/contacts',
'carrier',
'apps',
'buttons',
'call quality',
'chargers',
'form factor',
'screen',
'user friendly',
'processor speed',
'entertainment',
'battery',
'sim/sd card']


In [None]:
topic_cols = [f"topic_{i}_score" for i in range(vocab.num_topics)]
top_brands = ['samsung']
product_name = 'galaxy s5'

product = reviews_df[reviews_df['product'].str.contains(product_name)]
reviews_2018 = product[product['reviewYear']==2017].set_index('brand')
brands_2018 = reviews_2018[topic_cols].groupby('brand').mean()
brands_2018.loc[top_brands].mean()