# Table of Contents
 <p><div class="lev1 toc-item"><a href="#Assumes-Spark-2.4" data-toc-modified-id="Assumes-Spark-2.4-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Assumes Spark 2.4</a></div><div class="lev1 toc-item"><a href="#Imports" data-toc-modified-id="Imports-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Imports</a></div><div class="lev1 toc-item"><a href="#Functions" data-toc-modified-id="Functions-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Functions</a></div><div class="lev1 toc-item"><a href="#Settings" data-toc-modified-id="Settings-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Settings</a></div><div class="lev1 toc-item"><a href="#Set-Normalized-URL" data-toc-modified-id="Set-Normalized-URL-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Set Normalized URL</a></div><div class="lev1 toc-item"><a href="#Load-Data" data-toc-modified-id="Load-Data-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Load Data</a></div><div class="lev1 toc-item"><a href="#Manual-Tweaking-for-Potential-ER-Issues" data-toc-modified-id="Manual-Tweaking-for-Potential-ER-Issues-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Manual Tweaking for Potential ER Issues</a></div><div class="lev1 toc-item"><a href="#Enter-Constants" data-toc-modified-id="Enter-Constants-8"><span class="toc-item-num">8&nbsp;&nbsp;</span>Enter Constants</a></div><div class="lev1 toc-item"><a href="#Filter-for-Minimal-Rev-Count" data-toc-modified-id="Filter-for-Minimal-Rev-Count-9"><span class="toc-item-num">9&nbsp;&nbsp;</span>Filter for Minimal Rev Count</a></div><div class="lev2 toc-item"><a href="#Check-in-Reviews-with-Enough-Reviews-of-Reviews-at-All" data-toc-modified-id="Check-in-Reviews-with-Enough-Reviews-of-Reviews-at-All-91"><span class="toc-item-num">9.1&nbsp;&nbsp;</span>Check in Reviews with Enough Reviews of Reviews at All</a></div><div class="lev2 toc-item"><a href="#Optional-Filter-for-Product" data-toc-modified-id="Optional-Filter-for-Product-92"><span class="toc-item-num">9.2&nbsp;&nbsp;</span>Optional Filter for Product</a></div><div class="lev1 toc-item"><a href="#Review-Rating" data-toc-modified-id="Review-Rating-10"><span class="toc-item-num">10&nbsp;&nbsp;</span>Review Rating</a></div><div class="lev1 toc-item"><a href="#Tokenize-to-Words" data-toc-modified-id="Tokenize-to-Words-11"><span class="toc-item-num">11&nbsp;&nbsp;</span>Tokenize to Words</a></div><div class="lev1 toc-item"><a href="#TFIDF" data-toc-modified-id="TFIDF-12"><span class="toc-item-num">12&nbsp;&nbsp;</span>TFIDF</a></div><div class="lev1 toc-item"><a href="#TFIDF-sklearn-implementation" data-toc-modified-id="TFIDF-sklearn-implementation-13"><span class="toc-item-num">13&nbsp;&nbsp;</span>TFIDF sklearn implementation</a></div><div class="lev1 toc-item"><a href="#TFIDF-by-Sentiment" data-toc-modified-id="TFIDF-by-Sentiment-14"><span class="toc-item-num">14&nbsp;&nbsp;</span>TFIDF by Sentiment</a></div><div class="lev1 toc-item"><a href="#TFIDF-by-Sentiment-sklearn-implementation" data-toc-modified-id="TFIDF-by-Sentiment-sklearn-implementation-15"><span class="toc-item-num">15&nbsp;&nbsp;</span>TFIDF by Sentiment sklearn implementation</a></div>

# Assumes Spark 2.4

Some array function only in spark >2.3. I use docker image with statsmodels and plotly and vaderSentiment based on spark 2.4.5

**must include "--docker-tag cici-nlp-sentiment" in cluster deploy command input**

In [1]:
sc

# Imports

In [2]:
%%capture
!pip install watermark
!pip3 install --upgrade --no-cache-dir --extra-index-url http://pypi.cu/root/circleup/+simple/ --trusted-host pypi.cu cu-helio-insights==0.0.12

In [3]:
import numpy as np
import pandas as pd
from pyspark.sql.window import Window
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import CountVectorizer, IDF, HashingTF
from pyspark.ml.pipeline import Pipeline
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.util import bigrams
from nltk.stem import WordNetLemmatizer
from nltk import sent_tokenize
import string
import plotly
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.express as px
from plotly.subplots import make_subplots
from scipy import stats
from sklearn import linear_model

In [4]:
import insights
from insights.investor_tools.widgets.style import (
    Font,
    CU_PLOTLY_COLOR_SEQUENCE,
    CU_PLOTLY_COLORSCALE,
    Color
)
from spark_tools import T, F, c, read_google_sheet
import apollo
from apollo import OverrideConfiguration
from apollo import dataset
import apollo_artifacts
from apollo_artifacts import datasets
from transform.attributes.utils import tf_idf_attributes

In [5]:
%load_ext watermark
%watermark -v -m --iversions -g

numpy            1.15.1
platform         1.0.8
pandas           0.20.2
apollo_artifacts 1.1.26
py4j             0.10.7
apollo           1.11.7
plotly           4.5.0
insights         0.0.12
CPython 3.6.8
IPython 6.1.0

compiler   : GCC 6.3.0 20170516
system     : Linux
release    : 4.9.43-17.39.amzn1.x86_64
machine    : x86_64
processor  : 
CPU cores  : 4
interpreter: 64bit
Git hash   :


# Functions

In [6]:
@F.udf(T.ArrayType(T.StringType()))
def sent_tokenize_udf(s):
    return sent_tokenize(s)

@F.udf(T.ArrayType(T.StringType()))
def tokenize(s, custom_words_str='', strip_str="=-_/\+.:,'* 1234567890—"):
    custom_words_li = custom_words_str.split()
    filter_words = added_stopwords_li + stopwords_li + punkts_li + custom_words_li
    return [w.lower().strip(strip_str) for w in word_tokenize(str(s)) if w.lower() not in filter_words]

@F.udf(T.ArrayType(T.StringType()))
def lemmatize(tokens):
    lemmatizer = WordNetLemmatizer()
    return [lemmatizer.lemmatize(w) for w in tokens]

@F.udf(T.ArrayType(T.StringType()))
def bigram(tokens):
    bi_tup = list(bigrams(tokens))
    bi_li = ['_'.join(tup) for tup in bi_tup]
    return bi_li

@F.udf(T.ArrayType(T.StringType()))
def unique_list(li):
    deduped_li = list(set(li))
    return deduped_li

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return F.udf(to_array_, T.ArrayType(T.DoubleType()))(col)

# Settings

In [9]:
np.random.seed(42) # set seed for models for reproducibility

stopwords_li = stopwords.words('english')

punkts_li = list(string.punctuation)

added_stopwords_li = [
    "it’s",
    "'d",
    "'s",
    "n't",
    "'m",
    "i've",
    "it's",
    "'ve",
    "'re",
    "'ll",
    "``",
    "''",
    "...",
    "--",
    "https",
    "voxbox",
    "influenster",
    "cracker",
    "or",
    'it‚äôs',
    'don‚äôt',
    'i‚äôm',
    'doesn‚äôt'
]

pd.options.display.max_rows = 300
OverrideConfiguration(default_to_production=True).apply()

spark.conf.set('spark.sql.execution.arrow.enabled', 'false')

# Set Normalized URL

# Load Data

In [10]:
REV_COLS = [
    'source_name',
    'normalized_url',
    'review_source_id',
    'reviewer_source_id',
    'brand_name',
    'product_name',
    'product_source_id',
    'review_date',
    'review_rating',
    'review_content'
]

reviews = (
    apollo
    .dataset('online_reviews__deduped_review__1_0')
    .latest_segment_df()
    .select(REV_COLS)
    .persist()
)

# removing hyperlinks from reviews
review = reviews.withColumn(
    'review_content', F.regexp_replace(
        F.regexp_replace("review_content", r"(?i)<a\s*[^>]*>", ""), r"(?i)<\s*/\s*a\s*>", ""
    ).alias('review_content'))

# Manual Tweaking for Potential ER Issues

In [11]:
ER_BRAND_URL_MAP = {}
# ER_BRAND_URL_MAP = {'dae':'daehair.com'} # manually add {brand_id : normalized_url}

URL_BLACKLIST = []
# Some urls sneak into the list somehow. Add them here to remove them from the charts

In [12]:
review.show()

+-----------+--------------------+--------------------+--------------------+------------------+--------------------+-----------------+--------------------+-------------+--------------------+
|source_name|      normalized_url|    review_source_id|  reviewer_source_id|        brand_name|        product_name|product_source_id|         review_date|review_rating|      review_content|
+-----------+--------------------+--------------------+--------------------+------------------+--------------------+-----------------+--------------------+-------------+--------------------+
|     amazon|   hayabusafight.com|      R1HEF263H49T6Q|amzn1.account.AHW...|          Hayabusa|Hayabusa Marvel H...|       B08466SJGQ| 2020-08-24 00:00:00|          3.0| These wraps are ...|
|     amazon|       goodsense.com|       RXAHDISSUI2KC|amzn1.account.AG4...|        Good Sense|Good Sense Sunflo...|       B004GU3Z5M| 2021-04-17 00:00:00|          5.0|   Man I love these |
|     amazon|       budweiser.com|      R2TTX

In [8]:
for brand_name, url in ER_BRAND_URL_MAP.items():
    reviews = reviews.withColumn(
        "normalized_url",
        F.when(
            F.col("brand_name") == brand_name,
            url
        ).otherwise(F.col("normalized_url"))
    )

# Enter Constants

In [36]:
# Unique reviewers required for analysis (closedly related to but not quite reviews)
# usually 200, could be 100. larger is better (less noisy)
MIN_REV_COUNT = 100

In [13]:
NORMALIZED_URL_LI = [
    'hellobubble.com',
    'neutrogena.com',
    'cerave.com',
    'cetaphil.com',
    'cleanandclear.com',
]




BRAND_NAME_LI = [
    'Bubble',
    'Neutrogena',
    'Cerave',
    'Cetaphil',
    'Clean & Clear'
]

In [14]:
brand_map = dict(zip(BRAND_NAME_LI, NORMALIZED_URL_LI))
url_map = dict(zip(NORMALIZED_URL_LI, BRAND_NAME_LI))

In [20]:
GROUP_NAME = "Skincare" # just a title for use in charts

In [21]:
# if a brand sells more than just one type of product, should focus on the main product
# format: dict of string (comma separated lower case words) in order matching that in NORMALIZED_URL_LI
# if there is a string for one brand, must enter empty strings for all others to keep matching order.
# in the future this can be replaced potentially by product ER (category or attributes)

PRODUCT_NAME_CONTAINS_LI = [
    '',
    #'',
    #'',
    #'',
    #'',
    #''

] # the csv list is an OR operation, it includes product with ANY of the listed words

PRODUCT_NAME_NOT_CONTAINS_LI = [
    #'',
    #'',
    #'',
    #'',
    #'',
    ''
] # the csv list is an AND operation, it excludes product with ANY of the listed words

In [22]:
brand_name_di = dict(zip(NORMALIZED_URL_LI, BRAND_NAME_LI))

In [23]:
product_name_include_di = dict(zip(NORMALIZED_URL_LI, [w.split(", ") if w else [] for w in PRODUCT_NAME_CONTAINS_LI ]))
product_name_exclude_di = dict(zip(NORMALIZED_URL_LI, [w.split(", ") if w else [] for w in PRODUCT_NAME_NOT_CONTAINS_LI]))

In [24]:
product_name_include_di

{'hellobubble.com': []}

# Filter for Minimal Rev Count

In [25]:
reviews = reviews.withColumn('brand_name', regexp_replace('brand_name','Bug Soother', 'Simply Soothing'))

In [26]:
#Map missing/incorrect brand names or URLs their respective values  
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from itertools import chain
from pyspark.sql.functions import create_map, lit

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, StringType())
mapping_expr = create_map([lit(x) for x in chain(*url_map.items())])
#reviews = reviews.withColumn('normalized_url', translate(brand_map)('brand_name'))
reviews = reviews.withColumn('brand_name', translate(url_map)('normalized_url'))
#reviews = reviews.withColumn('brand_name', mapping_expr[reviews['normalized_url']])

In [27]:
reviews.filter(F.col('normalized_url').isin(NORMALIZED_URL_LI)).groupby('normalized_url').count().show()

+-----------------+-------+
|   normalized_url|  count|
+-----------------+-------+
|       cerave.com| 339653|
|cleanandclear.com| 287780|
|     cetaphil.com| 295062|
|  hellobubble.com|   1475|
|   neutrogena.com|1309803|
+-----------------+-------+



In [28]:
reviews.filter(F.col('brand_name').isin(BRAND_NAME_LI)).groupby('brand_name').count().show()

+-------------+-------+
|   brand_name|  count|
+-------------+-------+
|Clean & Clear| 287780|
|   Neutrogena|1309803|
|       Cerave| 339653|
|       Bubble|   1475|
|     Cetaphil| 295062|
+-------------+-------+



In [65]:
reviews.filter(F.col('normalized_url').isin(NORMALIZED_URL_LI)).show(truncate = False)

+-----------+-----------------+------------------------------------+------------------------------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|source_name|normalized_url   |revi

In [66]:
# normalized_url and count of reviews
count_revs = (
    reviews
    #.dropna(subset=['normalized_url'])
    .groupby('normalized_url', 'brand_name')
    .agg(
        #changing reviewer_source_id to review_source_id
        F.count(F.concat(F.col('source_name'), F.col('review_source_id'))).alias('reviewer_count')
    )
)

# just normalized_urls where review count > threshold
count_revs_filtered = (
    count_revs
    .filter(F.col('reviewer_count') >= MIN_REV_COUNT)
    .select('normalized_url', 'brand_name')
    .distinct()
)

#filtering original reviews dataframe by joining against list of normalized urls with review count > threshold
filtered_revs = (
    reviews
    .filter(F.col('normalized_url').isin(count_revs_filtered.normalized_url) | F.col('brand_name').isin(count_revs_filtered.brand_name))
)

## Check in Reviews with Enough Reviews of Reviews at All
can't speak to ER misses here which occurs with some frequency

In [67]:
brand_revs = filtered_revs.filter(F.col('normalized_url').isin(NORMALIZED_URL_LI))
brand_revs = brand_revs.withColumn('review_content', regexp_replace(F.col('review_content'),'‚äô',"'")).alias('review_content')

In [68]:
brand_revs = brand_revs.drop('Unnamed: 6')
brand_revs = brand_revs.filter(F.col('source_name').isin(['walmart']))
brand_revs.show()

+-----------+--------------+----------------+--------------------+----------+--------------------+-----------------+-------------------+-------------+--------------------+
|source_name|normalized_url|review_source_id|  reviewer_source_id|brand_name|        product_name|product_source_id|        review_date|review_rating|      review_content|
+-----------+--------------+----------------+--------------------+----------+--------------------+-----------------+-------------------+-------------+--------------------+
|    walmart|neutrogena.com|        97529484|zd9mz31uh7aae1lul...|Neutrogena|Neutrogena Oil-Fr...|         13274448|2014-08-26 00:00:00|          5.0|This has got to b...|
|    walmart|    cerave.com|       140798333|            11077199|    Cerave|CeraVe Moisturizi...|        142429954|2020-01-14 00:00:00|          5.0|[This review was ...|
|    walmart|neutrogena.com|        15051611|z69h8k87zl1yzbcjy...|Neutrogena|4 Pack - Neutroge...|        895126879|2013-02-09 00:00:00|    

In [69]:
fig = px.bar(
    mentions,
    orientation='h',
    y="terms",
    x="proportion",
    color="proportion",
    title=f"Proportion of Reviews Mentioning Broth or Bone Broth",
    labels={

        
    },
    #text='reviewer_count',
    color_discrete_sequence=px.colors.sequential.Plasma[::2][::-1],
)

fig.update_layout(
    width=1450,
    height=800,
    font=Font.plot_title.value,
    plot_bgcolor="white",
    title={"x": 0.5},
    xaxis_tickformat='.0%',
    yaxis={'categoryorder':'total descending'}
)

fig.show()

KeyError: 'proportion'

## Optional Filter for Product

In [25]:
if PRODUCT_NAME_CONTAINS_LI:
    j = 0
    for url, name_include_li in product_name_include_di.items():
        if name_include_li:
            for i, a in enumerate(name_include_li):
                if i==0:
                    prod_filter = F.lower(c.product_name).contains(a) & c.normalized_url.contains(url)
                else:
                    prod_filter |= F.lower(c.product_name).contains(a) & c.normalized_url.contains(url)
        else:
            prod_filter = c.normalized_url.contains(url)

        if j==0:
            full_prod_filter = prod_filter
        else:
            full_prod_filter |= prod_filter
        j+=1

    brand_revs = (
        brand_revs
        .filter(F.when(full_prod_filter, True).otherwise(False))
    )

if PRODUCT_NAME_NOT_CONTAINS_LI:
    j = 0
    for url, name_exclude_li in product_name_exclude_di.items():
        if name_exclude_li:
            for i, a in enumerate(name_exclude_li):
                if i==0:
                    prod_filter = ~F.lower(c.product_name).contains(a) & c.normalized_url.contains(url)
                else:
                    prod_filter &= ~F.lower(c.product_name).contains(a) & c.normalized_url.contains(url)
        else:
            prod_filter = c.normalized_url.contains(url)

        if j==0:
            full_prod_filter = prod_filter
        else:
            full_prod_filter |= prod_filter
        j+=1

    brand_revs = (
        brand_revs
        .filter(F.when(full_prod_filter, True).otherwise(False))
    )

In [26]:
brand_revs.groupby('normalized_url').count().show()

+------------------+-----+
|    normalized_url|count|
+------------------+-----+
|myserenitykids.com|   18|
+------------------+-----+



In [22]:
drop_prods = brand_revs.where((F.col('brand_name') == 'Celsius') 
                & (F.col('product_name').contains('-'))
                & (F.col('source_name') == 'amazon'))

In [23]:
#Additional Product Filter
#Filter specific products (ex. fishing for Celsius, non drinks for Alani Nu) that make it past product filter 

brand_revs = brand_revs.where((~F.col('review_content').contains('fishing') | ~F.col('review_content').contains('rod')) 
                              & (~(F.col('product_name').contains('BCAA'))))
#brand_revs.where(F.col('review_content').contains('fishing')).show(truncate= False)
brand_revs = brand_revs.join(drop_prods,['source_name',
    'normalized_url',
    'review_source_id',
    'reviewer_source_id',
    'brand_name',
    'product_name',
    'product_source_id'], 'leftanti' )

In [76]:
brand_revs.filter(F.col('normalized_url').isin(NORMALIZED_URL_LI)).show()

+-----------+----------------+--------------------+--------------------+-------------+--------------------+-----------------+--------------------+-------------+--------------------+---------+
|source_name|  normalized_url|    review_source_id|  reviewer_source_id|   brand_name|        product_name|product_source_id|         review_date|review_rating|      review_content|sentiment|
+-----------+----------------+--------------------+--------------------+-------------+--------------------+-----------------+--------------------+-------------+--------------------+---------+
|     target|   starbucks.com|d3d3efbc-bf4e-4ba...|d557fbe7-d3d8-583...|    Starbucks|Starbucks Cold Br...|         78295214| 2020-02-13 13:24:35|          3.0|[This review was ...|     null|
|     amazon|kohanacoffee.com|      R3P04HHDOI6BA9|amzn1.account.AE7...|Kohana Coffee|Kohana Cold Brew ...|       B017VSPMFG| 2018-10-11 00:00:00|          5.0|      Great coffee! | Positive|
|     amazon|    stokbrew.com|      R2I7

In [15]:
brand_revs = (
    brand_revs
    .withColumn(
        'sentiment',
        (
            F.when((F.col('review_rating')==5), F.lit('Positive'))
            .otherwise(F.when(F.col('review_rating')==4, 'Neutral')
                       .otherwise(F.when(F.col('review_rating').isin([1, 2, 3]), 'Negative')
                                  .otherwise(F.lit(None)))) # had to do this as there are some review ratings not on 1-5 scale
        )
    )
)

brand_revs = brand_revs.withColumn(
    'review_content', F.regexp_replace(
        F.regexp_replace("review_text", r"(?i)<a\s*[^>]*>", ""), r"(?i)<\s*/\s*a\s*>", ""
    ).alias('review_content'))

#brand_revs = brand_revs.filter(~brand_revs.normalized_url.isin(URL_BLACKLIST))

In [16]:
brand_revs.show()

+-----------+------------------+----------------+--------------------+-------------+--------------------+-----------------+-----------+-------------+--------------------+---------+--------------------+
|source_name|    normalized_url|review_source_id|  reviewer_source_id|   brand_name|        product_name|product_source_id|review_date|review_rating|         review_text|sentiment|      review_content|
+-----------+------------------+----------------+--------------------+-------------+--------------------+-----------------+-----------+-------------+--------------------+---------+--------------------+
|     amazon|myserenitykids.com|               0|amzn1.account.AHE...|Serenity Kids|Style: Turmeric C...|                0|    1/30/21|            5|  this is a great...| Positive|  this is a great...|
|     amazon|myserenitykids.com|               1|amzn1.account.AE7...|Serenity Kids|Style: Turkey Bol...|                1|    2/15/21|            4|  okay, all moms,...|  Neutral|  okay, all 

In [28]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
window = Window.partitionBy(brand_revs['normalized_url']).orderBy(brand_revs['review_date'].desc())

brand_revs_2k = brand_revs.select('*', F.row_number().over(window).alias('row_num')) .filter(col('row_num') <= 2000)

In [29]:
brand_revs_pdf = brand_revs.toPandas()
brand_revs_pdf = brand_revs_pdf.sort_values('review_date',ascending = False).groupby('normalized_url').head(2000)

In [30]:
brand_revs = spark.createDataFrame(brand_revs_pdf)

# Review Rating

In [70]:
rev_rating_sdf = (
    brand_revs
    .filter(F.col('review_rating').isin([1,2,3,4,5]))
    .groupby('normalized_url', 'review_rating')
    .agg(
        F.countDistinct(F.concat(F.col('source_name'), F.col('review_source_id'))).alias('reviewer_count')
    )
)

rev_rating_sdf_sum = rev_rating_sdf.groupBy('normalized_url').sum('reviewer_count').alias('reviewer_sum')

rev_rating_sdf = rev_rating_sdf.join(rev_rating_sdf_sum, 'normalized_url')

#rev_rating_sdf = rev_rating_sdf.filter(~rev_rating_sdf.normalized_url.isin(URL_BLACKLIST))
rev_rating_sdf = rev_rating_sdf.orderBy(c.normalized_url)
    
rev_rating_pdf = rev_rating_sdf.toPandas()

In [71]:
rev_rating_sdf.show()

+-----------------+-------------+--------------+-------------------+
|   normalized_url|review_rating|reviewer_count|sum(reviewer_count)|
+-----------------+-------------+--------------+-------------------+
|       cerave.com|          2.0|           771|              44890|
|       cerave.com|          1.0|          1108|              44890|
|       cerave.com|          5.0|         33742|              44890|
|       cerave.com|          4.0|          7000|              44890|
|       cerave.com|          3.0|          2269|              44890|
|     cetaphil.com|          4.0|          5805|              29363|
|     cetaphil.com|          1.0|           719|              29363|
|     cetaphil.com|          5.0|         19830|              29363|
|     cetaphil.com|          2.0|           763|              29363|
|     cetaphil.com|          3.0|          2246|              29363|
|cleanandclear.com|          1.0|          1190|              12369|
|cleanandclear.com|          4.0| 

In [72]:
rev_rating_pdf['review_rating_frac'] = rev_rating_pdf['reviewer_count'] / rev_rating_pdf.groupby('normalized_url')['reviewer_count'].transform('sum')
rev_rating_pdf['review_rating'] = rev_rating_pdf['review_rating'].astype(str)
rev_rating_pdf = rev_rating_pdf.sort_values(["review_rating", "review_rating_frac"], ascending=[False, True])
rev_rating_pdf['brand_name'] = rev_rating_pdf['normalized_url'].replace(brand_name_di)

rev_rating_counts = rev_rating_sdf.groupBy('normalized_url').sum('reviewer_count')
rev_rating_counts = rev_rating_counts.orderBy('normalized_url')

In [73]:
rev_rating_counts.orderBy(c.normalized_url).collect()

[Row(normalized_url='cerave.com', sum(reviewer_count)=44890),
 Row(normalized_url='cetaphil.com', sum(reviewer_count)=29363),
 Row(normalized_url='cleanandclear.com', sum(reviewer_count)=12369),
 Row(normalized_url='hellobubble.com', sum(reviewer_count)=1394),
 Row(normalized_url='neutrogena.com', sum(reviewer_count)=79221)]

In [74]:
fig = px.bar(
    rev_rating_pdf,
    orientation='h',
    y="brand_name",
    x="review_rating_frac",
    color="review_rating",
    title=f"Review Rating Distributions for {BRAND_NAME_LI}",
    labels={
        "brand_name": "Brand",
        "review_rating_frac": "Percent of Reviewers",
        "sum(reviewer_count)": "Review Sum",
        "review_rating": "Review Rating",
        "reviewer_count": "Review Count",
        
    },
    text='reviewer_count',
    color_discrete_sequence=px.colors.sequential.Plasma[::2][::-1],
)

fig.update_layout(
    width=1450,
    height=800,
    font=Font.plot_title.value,
    plot_bgcolor="white",
    title={"x": 0.5},
    xaxis_tickformat='.0%',
    yaxis={'categoryorder':'total descending'}
)

fig.show()

# Tokenize to Words

In [75]:
# additional words to filter before combining into ngrams
CUSTOM_STOPWORDS = [

]

GROUP_CUSTOM_STOPWORDS = [
     
]

# words to filter after combined into ngrams
BIGRAM_UNIGRAM_WORD_FILTER = []

In [76]:
token_brand_revs = (
    brand_revs
    .withColumn('tokenized_text', lemmatize(tokenize(F.col('review_content'), F.lit(' '.join(GROUP_CUSTOM_STOPWORDS + CUSTOM_STOPWORDS)))))
    .dropna(subset=['tokenized_text'])
)

unigram_sdf = (
    token_brand_revs
    .withColumn('tokenized_word', F.explode(unique_list(F.col('tokenized_text'))))
    .dropna(subset=['tokenized_word'])
    .filter(F.col('tokenized_word') != '')
)

bigram_sdf = (
    token_brand_revs
    .withColumn('bigram', unique_list(bigram(F.col('tokenized_text'))))
    .dropna(subset=['bigram'])
    .withColumn('bigram_word', F.explode(F.col('bigram')))
    .dropna(subset=['bigram_word'])
    .filter(F.col('bigram_word') != '')
)

unibi_sdf = (
    token_brand_revs
    .withColumn('unibi', unique_list(F.concat(bigram(F.col('tokenized_text')), F.col('tokenized_text'))))
    .withColumn('unibi_word', F.explode(F.col('unibi')))
    .dropna(subset=['unibi_word'])
    .filter(F.col('unibi_word') != '')
)

In [77]:
tfidf.show()

+-----------------+--------------------+---------+--------------------+--------------------+
|   normalized_url|              tokens|rev_count|                  tf|               tfidf|
+-----------------+--------------------+---------+--------------------+--------------------+
|       cerave.com|[application_hard...|  1192848|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|
|cleanandclear.com|[bleaching_anyhti...|   395394|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|
|     cetaphil.com|[complicated_pull...|   778897|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|
|  hellobubble.com|[really_sensitive...|    34324|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|
|   neutrogena.com|[none, highly_pro...|  2507956|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|
+-----------------+--------------------+---------+--------------------+--------------------+



# TFIDF

In [78]:
example_sdf = (
    bigram_sdf
    .groupby(['normalized_url'])
    .agg(F.collect_list('bigram_word').alias('tokens'), F.count(F.concat(F.col('source_name'), F.col('reviewer_source_id'))).alias('rev_count'))
#     .filter(F.col('rev_count')>2000)
    .persist()
)

In [79]:
example_sdf = (
    unibi_sdf
    .groupby(['normalized_url'])
    .agg(F.collect_list('unibi_word').alias('tokens'), F.count(F.concat(F.col('source_name'), F.col('reviewer_source_id'))).alias('rev_count'))
#     .filter(F.col('rev_count')>2000)
    .persist()
)

In [80]:
# tf = HashingTF(inputCol="tokens", outputCol="tf").transform(example_sdf)
tf_mod = CountVectorizer(inputCol="tokens", outputCol="tf").fit(example_sdf)
tf = tf_mod.transform(example_sdf)

idf = IDF(inputCol="tf", outputCol="tfidf").fit(tf)
tfidf = idf.transform(tf).persist()


In [81]:
tf_out_sdf = (
    tf
    .withColumn('array', to_array("tf"))
    .select('normalized_url', F.posexplode('array'))
    .withColumnRenamed('col', 'tf')
    #.filter(F.col("tfidf") > 0)
    .join(vocab_sdf, on='pos')
)
tf_out_sdf.show()

NameError: name 'vocab_sdf' is not defined

In [82]:
#total terms used across reviews
len(tf_mod.vocabulary)

262144

In [83]:
#DF for all vocab terms from reviews 
vocab_pdf = pd.DataFrame(tf_mod.vocabulary).reset_index()
vocab_pdf.columns = ['pos', 'vocab']

vocab_sdf = spark.createDataFrame(vocab_pdf)

In [84]:
tfidf_out_sdf = (
    tfidf
    .withColumn('array', to_array("tfidf"))
    .select('normalized_url', F.posexplode('array'))
    .withColumnRenamed('col', 'tfidf')
    #.filter(F.col("tfidf") > 0)
    .join(vocab_sdf, on='pos')
)

In [85]:
tfidf_out_sdf.show()

+---+-----------------+------------------+-----------+
|pos|   normalized_url|             tfidf|      vocab|
+---+-----------------+------------------+-----------+
| 26|       cerave.com|               0.0|       make|
| 26|   neutrogena.com|               0.0|       make|
| 26|cleanandclear.com|               0.0|       make|
| 26|     cetaphil.com|               0.0|       make|
| 26|  hellobubble.com|               0.0|       make|
| 29|       cerave.com|               0.0|        one|
| 29|   neutrogena.com|               0.0|        one|
| 29|cleanandclear.com|               0.0|        one|
| 29|     cetaphil.com|               0.0|        one|
| 29|  hellobubble.com|               0.0|        one|
|474|       cerave.com|36.099668245203006|skin_review|
|474|   neutrogena.com| 57.97825506047756|skin_review|
|474|cleanandclear.com| 9.298399396491684|skin_review|
|474|     cetaphil.com|121.79079993836167|skin_review|
|474|  hellobubble.com|               0.0|skin_review|
|964|     

In [86]:
TOP_N_WORDS = 10
TFIDF_THRESHOLD = 0.5 # depends on amount of reviews to have enough signal

In [87]:
# for words that somehow make it through the stopwords checks
exclude_list = [
    
]
#ADD REGEX REPLACE FOR ‚äô replace with apostrophe
tfidf_out_sdf = (
    tfidf_out_sdf
    .filter(~(F.col('vocab').isin(exclude_list)))
)


#tfidf_out_sdf.where(c.normalized_url=='odelebeauty.com').orderBy('tfidf', ascending=False).show()

In [88]:
#Get all terms with avg rating and tfidf to csv
all_terms_sdf = (
    unibi_sdf
    .withColumnRenamed('unibi_word', 'vocab')
    .join(tfidf_out_sdf.select('normalized_url','vocab'), on=['normalized_url', 'vocab'])
)
terms_avg_rating_sdf = all_terms_sdf.groupby('normalized_url','vocab').agg(F.mean(F.col('review_rating')).alias('avg_rating'))
terms_avg_rating_pdf = terms_avg_rating_sdf.toPandas()

all_terms = tfidf_out_sdf.toPandas().merge(terms_avg_rating_pdf, on=['normalized_url', 'vocab'])
all_terms.to_csv('rowdy_comps_allterms.csv')

In [89]:
window = Window.partitionBy('normalized_url').orderBy(F.col('tfidf').desc())


top_tfidf_sdf = (
    tfidf_out_sdf
    .select('*', F.rank().over(window).alias('rank'))
    .filter(F.col('tfidf') > TFIDF_THRESHOLD)
    .filter(F.col('rank') <= TOP_N_WORDS)
    .drop('rank')
)

top_tfidf_sdf.show(100)

+-----+-----------------+------------------+--------------------+
|  pos|   normalized_url|             tfidf|               vocab|
+-----+-----------------+------------------+--------------------+
|  753|       cerave.com| 516.3946495171592|         love_cerave|
|  589|       cerave.com|395.32848040546025|      cerave_product|
|  569|       cerave.com|295.58406381085183|                cast|
|  823|       cerave.com|275.71627351355176| cerave_moisturizing|
|  633|       cerave.com| 269.6342968919293|          white_cast|
|  392|       cerave.com|  240.846776524814|  moisturizing_cream|
| 1845|       cerave.com| 191.3086218345449|        using_cerave|
| 3036|       cerave.com| 177.9751907642338|       cerave_facial|
|  607|       cerave.com|165.42976410813108|          hyaluronic|
|  344|       cerave.com| 162.2661855466196|             foaming|
|  393|cleanandclear.com|239.57052562725633|         clean_clear|
| 1721|cleanandclear.com|204.47841826518385|       morning_burst|
| 2792|cle

In [90]:
top_tfidf_pdf = top_tfidf_sdf.sort('tfidf', ascending = True).toPandas()
top_tfidf_pdf.sort_values(['normalized_url','tfidf'], ascending = False)

Unnamed: 0,pos,normalized_url,tfidf,vocab
50,47,neutrogena.com,1871.53078,neutrogena
49,318,neutrogena.com,1259.448427,hydro_boost
48,508,neutrogena.com,1256.812458,neutrogena®
46,689,neutrogena.com,897.56624,promotion_neutrogena
45,710,neutrogena.com,873.396769,member_crowdtap
44,720,neutrogena.com,862.410647,reward_program
42,767,neutrogena.com,800.888358,neutrogena®_sent
43,768,neutrogena.com,800.888358,program_neutrogena®
40,769,neutrogena.com,799.789746,neutrogena®_reward
41,772,neutrogena.com,799.789746,crowdtap_neutrogena®


In [91]:
#this join is where peatos.com is getting dropped
joined_tfidf_sdf = (
    unibi_sdf
    .withColumnRenamed('unibi_word', 'vocab')
    .join(top_tfidf_sdf.select('normalized_url','vocab'), on=['normalized_url', 'vocab'])
)

In [92]:
avg_rating_sdf = joined_tfidf_sdf.groupby('normalized_url','vocab').agg(F.mean(F.col('review_rating')).alias('avg_rating'))
avg_rating_pdf = avg_rating_sdf.toPandas()

In [93]:
top_tfidf_pdf = top_tfidf_pdf.merge(avg_rating_pdf, on=['normalized_url', 'vocab'])

In [94]:
n_brands = top_tfidf_pdf.normalized_url.nunique()

In [95]:
fig = make_subplots(
    rows=n_brands,
    cols=1,
)

j = 0

for url, data in top_tfidf_pdf.groupby('normalized_url'):
#for url, data in top_tfidf_pdf.groupby('normalized_url'):

    fig.add_trace(
        go.Bar(
            orientation='h',
            x=data.tfidf,
            y=data.vocab,
            marker=dict(color=data.avg_rating, coloraxis="coloraxis")
        ),
        row=j+1, col=1
    )
    j += 1
    fig.update_yaxes(automargin = True,title_text=brand_name_di.get(url), row=j, col=1, dtick =.5)#use dtick if not all term labels show up 
    fig.update_xaxes(automargin = True,showticklabels=False)
#     fig.update_xaxes(title_text=sentiment, row=j, col=1)
        
    
fig.update_layout(
    autosize = True,
    width=1400,
    height=1800,
    font=Font.plot_title.value,
    plot_bgcolor="white",
    title={"x": 0.5},
    coloraxis=dict(colorscale='Plasma', colorbar_title_text='<b>average rating</b>'),
    title_text=f"Top Unique Terms by Review Sentiment for Select {GROUP_NAME} Brands",
    showlegend=False
)
#fig.update_xaxes(automargin=True)

fig.show()

# TFIDF sklearn implementation

In [289]:
DOC_GROUP = ['normalized_url']

In [290]:
sk_form = (
    brand_revs
    .groupby(DOC_GROUP)
    .agg(F.concat_ws('\n', F.collect_list('review_content')).alias('review_text'))
    .persist()
)

In [291]:
example_pdf = sk_form.toPandas()

In [292]:
# if CUSTOM_STOPWORDS:
#     def tokenize(s, custom_words=CUSTOM_STOPWORDS):
#         filter_words = added_stopwords_li + stopwords_li + punkts_li + custom_words
#         return [w.lower() for w in word_tokenize(str(s)) if w.lower() not in filter_words]

#     tokenize_udf = spark.udf.register("tokenize", tokenize, T.ArrayType(T.StringType()))

In [293]:
REV_WORDS = []

In [294]:
from sklearn.feature_extraction.text import TfidfVectorizer
sk_tfidf = TfidfVectorizer(
    stop_words = GROUP_CUSTOM_STOPWORDS + CUSTOM_STOPWORDS + added_stopwords_li + stopwords_li + punkts_li + \
    REV_WORDS + GROUP_CUSTOM_STOPWORDS + BIGRAM_UNIGRAM_WORD_FILTER,
    ngram_range=(2, 3)
)
sk_out = sk_tfidf.fit_transform(example_pdf['review_text'])

In [295]:
tfidf_pdf = (
    pd.DataFrame(sk_out.toarray(), columns = sk_tfidf.get_feature_names())
    .stack()
    .to_frame()
    .reset_index()
    .rename(columns={0:'tfidf', 'level_1':'tokens'})
    .merge(example_pdf[['normalized_url']], right_index=True, left_on='level_0', how='left')
    .drop(['level_0'], axis=1)
)

In [296]:
len(tfidf_pdf)

18071135

In [297]:
# how many words per brand
TOP_N_PER_GROUP = 6

In [298]:
top_tfidf_pdf = (
    tfidf_pdf
    .sort_values(['tfidf'], ascending=False)
    .groupby(DOC_GROUP)
    .head(TOP_N_PER_GROUP)
    .sort_values(['tfidf'])
)

In [299]:
top_tfidf_pdf

Unnamed: 0,tokens,tfidf,normalized_url
1301242,hair feeling,0.091366,purezerobeauty.com
1362177,hair soft,0.101155,purezerobeauty.com
1301877,hair feels,0.114208,purezerobeauty.com
1300186,hair feel,0.120734,purezerobeauty.com
8530331,hair feels,0.12199,odelebeauty.com
7297789,air dry,0.127948,odelebeauty.com
15501960,fine hair,0.131396,evolvh.com
8893950,leaves hair,0.135763,odelebeauty.com
9068231,makes hair,0.137731,odelebeauty.com
1839777,makes hair,0.140312,purezerobeauty.com


In [300]:
n_brands = top_tfidf_pdf.normalized_url.nunique()

In [301]:
fig = make_subplots(
    rows=n_brands,
    cols=1,
)

j = 0
    
for url, data in top_tfidf_pdf.groupby('normalized_url'):

    fig.add_trace(
        go.Bar(
            orientation='h',
            x=data.tfidf,
            y=data.tokens,

        ),
        row=j+1, col=1
    )
    j += 1
    fig.update_yaxes(title_text=brand_name_di.get(url), row=j, col=1)
    fig.update_xaxes(showticklabels=False)
    #fig.update_xaxes(title_text=sentiment, row=j, col=1)
        
    
fig.update_layout(
    width=1400,
    height=1000,
    font=Font.plot_title.value,
    plot_bgcolor="white",
    title={"x": 0.5},
    colorway=CU_PLOTLY_COLOR_SEQUENCE,
    title_text=f"Top Unique Terms for Select {GROUP_NAME} Brands",
    showlegend=False
)


fig.show()

# TFIDF by Sentiment

In [302]:
example_sdf = (
    bigram_sdf
    .groupby(['normalized_url', 'sentiment'])
    .agg(F.collect_list('bigram_word').alias('tokens'), F.count(F.concat(F.col('source_name'), F.col('reviewer_source_id'))).alias('rev_count'))
#     .filter(F.col('rev_count')>2000)
    .persist()
)

In [303]:
# tf = HashingTF(inputCol="tokens", outputCol="tf").transform(example_sdf)
tf_mod = CountVectorizer(inputCol="tokens", outputCol="tf").fit(example_sdf)
tf = tf_mod.transform(example_sdf)

idf = IDF(inputCol="tf", outputCol="tfidf").fit(tf)
tfidf = idf.transform(tf).persist()


KeyboardInterrupt: 

In [None]:
len(tf_mod.vocabulary)

In [None]:
vocab_pdf = pd.DataFrame(tf_mod.vocabulary).reset_index()
vocab_pdf.columns = ['pos', 'vocab']

In [None]:
vocab_sdf = spark.createDataFrame(vocab_pdf)

In [None]:
tfidf_out_sdf = (
    tfidf
    .withColumn('array', to_array("tfidf"))
    .select('normalized_url', 'sentiment', F.posexplode('array'))
    .withColumnRenamed('col', 'tfidf')
    .filter(F.col("tfidf") > 0)
    .join(vocab_sdf, on='pos')
)

In [None]:
TOP_N_WORDS = 7
TFIDF_THRESHOLD = 2.1 # depends on amount of reviews to have enough signal, could lead to blank areas where there's not enough

In [None]:
window = Window.partitionBy('sentiment', 'normalized_url').orderBy(F.col('tfidf').desc())

top_tfidf_sdf = (
    tfidf_out_sdf
    .select('*', F.rank().over(window).alias('rank'))
    .filter(F.col('tfidf') > TFIDF_THRESHOLD)
    .filter(F.col('rank') <= TOP_N_WORDS)
    .drop('rank')
)

In [None]:
# top_tfidf_sdf = tfidf_out_sdf.filter(F.col('tfidf')>10)

In [None]:
top_tfidf_pdf = top_tfidf_sdf.sort('tfidf').toPandas()

In [None]:
color_map = {'Positive':0, 'Neutral':2, 'Negative':1}

In [None]:
n_brands = top_tfidf_pdf.normalized_url.nunique()

In [None]:
fig = make_subplots(
    rows=n_brands,
    cols=2,
)

i = 0

top_tfidf_pdf = top_tfidf_pdf[top_tfidf_pdf.sentiment!='Neutral']
for sentiment, sent_data in top_tfidf_pdf.groupby('sentiment'):
    
    j = 0
    
    for url, data in sent_data.groupby('normalized_url'):

        fig.add_trace(
            go.Bar(
                orientation='h',
                x=data.tfidf,
                y=data.vocab,
                marker_color=CU_PLOTLY_COLOR_SEQUENCE[color_map[sentiment]]
            ),
            row=j+1, col=i+1
        )
        j += 1
        fig.update_yaxes(title_text=brand_name_di.get(url), row=j, col=1)
        fig.update_xaxes(showticklabels=False)
        fig.update_xaxes(title_text=sentiment, row=j, col=i+1)
        
    i += 1
    
fig.update_layout(
    width=1400,
    height=1800,
    font=Font.plot_title.value,
    plot_bgcolor="white",
    title={"x": 0.5},
    colorway=CU_PLOTLY_COLOR_SEQUENCE,
    title_text=f"Top Unique Terms by Review Sentiment for Select {GROUP_NAME} Brands",
    showlegend=False
)


fig.show()

# TFIDF by Sentiment sklearn implementation

In [None]:
DOC_GROUP = ['sentiment', 'normalized_url']

In [None]:
sk_form = (
    brand_revs
    .groupby(DOC_GROUP)
    .agg(F.concat_ws('\n', F.collect_list('review_content')).alias('review_text'))
    .persist()
)

In [None]:
example_pdf = sk_form.toPandas()

In [None]:
# if CUSTOM_STOPWORDS:
#     def tokenize(s, custom_words=CUSTOM_STOPWORDS):
#         filter_words = added_stopwords_li + stopwords_li + punkts_li + custom_words
#         return [w.lower() for w in word_tokenize(str(s)) if w.lower() not in filter_words]

#     tokenize_udf = spark.udf.register("tokenize", tokenize, T.ArrayType(T.StringType()))

In [None]:
REV_WORDS = []

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
sk_tfidf = TfidfVectorizer(
    stop_words = GROUP_CUSTOM_STOPWORDS + CUSTOM_STOPWORDS + added_stopwords_li + stopwords_li + punkts_li + \
    REV_WORDS + GROUP_CUSTOM_STOPWORDS + BIGRAM_UNIGRAM_WORD_FILTER,
    ngram_range=(2, 2)
)
sk_out = sk_tfidf.fit_transform(example_pdf['review_text'])

In [None]:
tfidf_pdf = (
    pd.DataFrame(sk_out.toarray(), columns = sk_tfidf.get_feature_names())
    .stack()
    .to_frame()
    .reset_index()
    .rename(columns={0:'tfidf', 'level_1':'tokens'})
    .merge(example_pdf[['sentiment','normalized_url']], right_index=True, left_on='level_0', how='left')
    .drop(['level_0'], axis=1)
)

In [None]:
#top_tfidf_pdf

In [None]:
TOP_N_PER_GROUP = 7

In [None]:
top_tfidf_pdf = (
    tfidf_pdf
    .sort_values(['tfidf','sentiment'], ascending=False)
    .groupby(DOC_GROUP)
    .head(TOP_N_PER_GROUP)
    .sort_values(['tfidf'])
)

In [None]:
# IFIDF_THRESHOLD = 0.06

In [None]:
# top_tfidf_pdf = (
#     tfidf_pdf[tfidf_pdf.tfidf>IFIDF_THRESHOLD]
#     .sort_values(['tfidf'])
# )

In [None]:
color_map = {'Positive':0, 'Neutral':2, 'Negative':1}

In [None]:
n_brands = top_tfidf_pdf.normalized_url.nunique()

In [None]:
fig = make_subplots(
    rows=n_brands,
    cols=2,
)

i = 0

top_tfidf_pdf = top_tfidf_pdf[top_tfidf_pdf.sentiment!='Neutral'].sort_values(['normalized_url','tfidf'])
for sentiment, sent_data in top_tfidf_pdf.groupby('sentiment'):
    j = 0
    
    for url, data in sent_data.groupby('normalized_url'):

        fig.add_trace(
            go.Bar(
                orientation='h',
                x=data.tfidf,
                y=data.tokens,
                marker_color=CU_PLOTLY_COLOR_SEQUENCE[color_map[sentiment]]
            ),
            row=j+1, col=i+1
        )
        j += 1
        fig.update_yaxes(title_text=brand_name_di.get(url), row=j, col=1)
        fig.update_xaxes(showticklabels=False)
        fig.update_xaxes(title_text=sentiment, row=j, col=i+1)
        
    i += 1
    
fig.update_layout(
    width=1300,
    height=1200,
    font=Font.plot_title.value,
    plot_bgcolor="white",
    title={"x": 0.5},
    colorway=CU_PLOTLY_COLOR_SEQUENCE,
    title_text=f"Top Unique Terms by Review Sentiment for Select {GROUP_NAME} Brands",
    showlegend=False
)


fig.show()

In [None]:
rev_rating_sdf = (
    brand_revs
    .filter(F.col('review_rating').isin([1,2,3,4,5]))
    .groupby('normalized_url', 'review_rating')
    .agg(
        F.countDistinct(F.concat(F.col('source_name'), F.col('review_source_id'))).alias('reviewer_count')
    )
)

rev_rating_sdf_sum = rev_rating_sdf.groupBy('normalized_url').sum('reviewer_count').alias('reviewer_sum')

rev_rating_sdf = rev_rating_sdf.join(rev_rating_sdf_sum, 'normalized_url')

In [None]:
rev_rating_sdf.show()

In [None]:
# For exporting to CSV if necessary (must be pandas dataframe)
#top_tfidf_pdf.to_csv(f'bain__review_analysis__top_unique_terms__20210309.csv', index=False)