In [1]:
dataset = 'Musical_Instruments'

reviews_filepath = './data/raw_data/reviews_{0}_5.json.gz'.format(dataset)
metadata_filepath = './data/metadata/meta_{0}.json.gz'.format(dataset)

In [2]:
# %load modules/scripts/Load\ datasets.py


# In[6]:

# reviews_filepath = '../../data/raw_data/reviews_Musical_Instruments_5.json.gz'
# metadata_filepath = '../../data/metadata/meta_Musical_Instruments.json.gz'


# In[7]:

all_reviews = (spark
    .read
    .json(reviews_filepath))

all_metadata = (spark
    .read
    .json(metadata_filepath))



In [3]:
# %load modules/scripts/Summarize\ reviews.py


# # Summarize the reviews

# In[1]:

# all_reviews = (spark
#     .read
#     .json('../../data/raw_data/reviews_Musical_Instruments_5.json.gz'))


# In[2]:

from pyspark.sql.functions import col, expr, udf, trim
from pyspark.sql.types import IntegerType
import re

remove_punctuation = udf(lambda line: re.sub('[^A-Za-z\s]', '', line))
make_binary = udf(lambda rating: 0 if rating in [1, 2] else 1, IntegerType())

reviews = (all_reviews
    .na.fill({ 'reviewerName': 'Unknown' })
    .filter(col('overall').isin([1, 2, 5]))
    .withColumn('label', make_binary(col('overall')))
    .select(col('label').cast('int'), remove_punctuation('summary').alias('summary'))
    .filter(trim(col('summary')) != ''))


# ## Splitting data and balancing skewness

# In[3]:

train, test = reviews.randomSplit([.8, .2], seed=5436L)


# In[4]:

def multiply_dataset(dataset, n):
    return dataset if n <= 1 else dataset.union(multiply_dataset(dataset, n - 1))


# In[5]:

reviews_good = train.filter('label == 1')
reviews_bad = train.filter('label == 0')

reviews_bad_multiplied = multiply_dataset(reviews_bad, reviews_good.count() / reviews_bad.count())


train_reviews = reviews_bad_multiplied.union(reviews_good)


# ## Benchmark: predict by distribution

# In[6]:

accuracy = reviews_good.count() / float(train_reviews.count())
print('Always predicting 5 stars accuracy: {0}'.format(accuracy))


# ## Learning pipeline

# In[7]:

from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import LogisticRegression

tokenizer = Tokenizer(inputCol='summary', outputCol='words')

pipeline = Pipeline(stages=[
    tokenizer, 
    StopWordsRemover(inputCol='words', outputCol='filtered_words'),
    HashingTF(inputCol='filtered_words', outputCol='rawFeatures', numFeatures=120000),
    IDF(inputCol='rawFeatures', outputCol='features'),
    LogisticRegression(regParam=.3, elasticNetParam=.01)
])


# ## Testing the model accuracy

# In[8]:

model = pipeline.fit(train_reviews)


# In[9]:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

prediction = model.transform(test)
BinaryClassificationEvaluator().evaluate(prediction)


# ## Using model to extract the most predictive words

# In[10]:

from pyspark.sql.functions import explode
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

words = (tokenizer
    .transform(reviews)
    .select(explode(col('words')).alias('summary')))

predictors = (model
    .transform(words)
    .select(col('summary').alias('word'), 'probability'))

first = udf(lambda x: x[0].item(), FloatType())
second = udf(lambda x: x[1].item(), FloatType())

predictive_words = (predictors
   .select(
       'word', 
       second(col('probability')).alias('positive'), 
       first(col('probability')).alias('negative'))
   .groupBy('word')
   .agg(
       F.max('positive').alias('positive'),
       F.max('negative').alias('negative')))

positive_predictive_words = (predictive_words
    .select(col('word').alias('positive_word'), col('positive').alias('pos_prob'))
    .sort('pos_prob', ascending=False))

negative_predictive_words = (predictive_words
    .select(col('word').alias('negative_word'), col('negative').alias('neg_prob'))
    .sort('neg_prob', ascending=False))


# In[11]:

import pandas as pd

pd.concat([
    positive_predictive_words.limit(10).toPandas(),
    negative_predictive_words.limit(10).toPandas() ],
    axis=1)



Always predicting 5 stars accuracy: 0.506650874636


Unnamed: 0,positive_word,pos_prob,negative_word,neg_prob
0,given,0.719084,limited,0.715598
1,job,0.710784,lackluster,0.708946
2,perfect,0.709105,ok,0.7007
3,ticket,0.708017,okay,0.699921
4,maybe,0.707469,fair,0.695551
5,disappointedone,0.706871,lightweightattractive,0.689687
6,supplies,0.704381,worst,0.683845
7,monkey,0.703659,destroyed,0.680854
8,bear,0.702184,disappointed,0.680082
9,awesome,0.700836,schallers,0.679959


In [4]:
# %load modules/scripts/User\ trustedness.py


# # User trustedness

# ## Loading data

# In[9]:

# all_reviews = (spark
#     .read
#     .json('../../data/raw_data/reviews_Musical_Instruments_5.json.gz'))


# ## Extracting ranking components

# In[10]:

reviews = all_reviews
reviews_per_reviewer = reviews.groupBy('reviewerID').count()


# In[31]:

from pyspark.sql.functions import col, udf, avg
from pyspark.sql.types import DoubleType

helpfulness_ratio = udf(
    lambda (useful, out_of): useful / float(out_of + 1), 
    returnType=DoubleType())

helpfulness = (reviews
  .select('reviewerID', helpfulness_ratio(col('helpful')).alias('helpfulness'))
  .groupBy('reviewerID')
  .agg(avg(col('helpfulness')).alias('helpfulness')))


# ## Computing rankings & visualizing the good and bad reviews from the most trusted users

# In[32]:

reviewers_trustedness = (helpfulness
    .join(reviews_per_reviewer, 'reviewerID')
    .select('reviewerID', (col('helpfulness') * col('count')).alias('trustedness')))


# In[ ]:

reviewers_trustedness.limit(10).toPandas()



Unnamed: 0,reviewerID,trustedness
0,A17A1KTVI3DG6U,3.047815
1,A36C867ZDP30NQ,2.0
2,A5MC7LP0ZBO4Q,0.0
3,A2DG65AWX5RJ4J,0.666667
4,A2IZ3ST24HSO4H,0.0
5,ACWJDL1ZYX8RE,0.833333
6,A3AOPVQ7EZHTWA,7.267545
7,A3LOJ2QHXITCF7,0.0
8,A2CARFAX5FNQT9,0.833333
9,AX11NOUMV8G95,1.0


In [5]:
# %load modules/scripts/Recommender\ system.py


# ## Loading and indexing the data for training

# In[2]:

# all_reviews = (spark
#     .read
#     .json('../../data/raw_data/reviews_Musical_Instruments_5.json.gz'))


# In[4]:

from pyspark.sql.functions import col, expr, udf, trim
from pyspark.sql.types import IntegerType
import re

remove_punctuation = udf(lambda line: re.sub('[^A-Za-z\s]', '', line))
make_binary = udf(lambda rating: 0 if rating in [1, 2] else 1, IntegerType())

reviews = all_reviews.withColumn('label', make_binary(col('overall')))


# In[5]:

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexing_pipeline = Pipeline(stages=[
    StringIndexer(inputCol="reviewerID", outputCol="reviewerIndex"),
    StringIndexer(inputCol="asin", outputCol="asinIndex")
])

indexer = indexing_pipeline.fit(reviews)
indexed_reviews = indexer.transform(reviews)


# In[6]:

train, _, test = [ chunk.cache() for chunk in indexed_reviews.randomSplit([.6, .2, .2], seed=1800009193L) ]


# ## Balancing data

# In[7]:

def multiply_dataset(dataset, n):
    return dataset if n <= 1 else dataset.union(multiply_dataset(dataset, n - 1))

reviews_good = train.filter('label == 1')
reviews_bad = train.filter('label == 0')

reviews_bad_multiplied = multiply_dataset(reviews_bad, reviews_good.count() / reviews_bad.count())

train_reviews = reviews_bad_multiplied.union(reviews_good)


# ## Evaluator

# In[8]:

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='label')


# ## Benchmark: predict by distribution

# In[9]:

from pyspark.sql.functions import lit

average_rating = (train_reviews
    .groupBy()
    .avg('label')
    .collect()[0][0])

average_rating_prediction = test.withColumn('prediction', lit(average_rating))

average_rating_evaluation = evaluator.evaluate(average_rating_prediction)

print('The RMSE of always predicting {0} stars is {1}'.format(average_rating, average_rating_evaluation))


# ## Recommender system

# In[10]:

from pyspark.ml.recommendation import ALS

als = ALS(
        maxIter=15,
        regParam=0.1,
        userCol='reviewerIndex',
        itemCol='asinIndex',
        ratingCol='label',
        rank=24,        
        seed=1800009193L)


# ## Evaluating the model

# In[14]:

recommender_system = als.fit(train_reviews)


# In[15]:

predictions = recommender_system.transform(test)


# In[16]:

evaluation = evaluator.evaluate(predictions.filter(col('prediction') != float('nan')))

print('The RMSE of the recommender system is {0}'.format(evaluation))



The RMSE of always predicting 0.509133408363 stars is 0.491705072053
The RMSE of the recommender system is 0.389991926061


## Select a product

In [6]:
reviewed_products = (all_metadata
    .join(all_reviews, 'asin')
    .filter('''
        categories is not null 
        and related is not null'''))

(reviewed_products
     .sample(
         withReplacement=False, 
         fraction= 10. / reviewed_products.count() , 
         seed=4325535L)
     .select('asin', 'title')
     .toPandas())

Unnamed: 0,asin,title
0,B000068NW5,Hosa Cable GTR210 Guitar Instrument Cable - 10...
1,B0002CZV82,Boss DS1 Distortion Guitar Pedal
2,B0002E1NQ4,"Neotech 5701002 Super Banjo Strap, Black"
3,B0002GW3Y8,Fingerease Guitar String Lubricant
4,B0002M6B2M,"Martin M140 Bronze Acoustic Guitar Strings, Light"
5,B0002M72JS,Electro-Harmonix 12AX7EH Preamp Tube
6,B000KIRT74,Behringer TO800 Vintage Tube-Sound Overdrive P...
7,B000L7MNUM,Mighty Bright Duet Music Stand Light
8,B000LFCXL8,Seiko SQ50-V Quartz Metronome
9,B000WS1QC6,Yamaha PA130 120 Volt Keyboard AC Power Adaptor


In [7]:
selected_product = 'B000L7MNUM'

## Product negative words

In [8]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import explode

product_words_per_reviewer = (
    Tokenizer(inputCol='summary', outputCol='words')
        .transform(all_reviews.filter(col('asin') == selected_product))
    .select('reviewerID', 'words'))

word_ranks = (product_words_per_reviewer
    .select(explode(col('words')).alias('word'))
    .distinct()
    .join(negative_predictive_words, col('word') == negative_predictive_words.negative_word)
#     .select('word', 'neg_prob')
    .sort('neg_prob', ascending=False))

word_ranks.limit(10).toPandas()

Unnamed: 0,word,negative_word,neg_prob
0,ok,ok,0.7007
1,bummer,bummer,0.678174
2,work,work,0.586143
3,light,light,0.542457
4,bright,bright,0.478805
5,these,these,0.463166
6,for,for,0.463166
7,not,not,0.463166
8,they,they,0.463166
9,was,was,0.463166


In [10]:
selected_negative_word = 'ok'

## Trusted users that used the word

In [11]:
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import BooleanType

is_elemen_of = udf(lambda word, words: word in words, BooleanType())

users_that_used_the_word = (product_words_per_reviewer
    .filter(is_elemen_of(lit(selected_negative_word), col('words')))
    .select('reviewerID'))

users_that_used_the_word.toPandas()

Unnamed: 0,reviewerID
0,A2P5U53IUTDUE6


## Suggested products in the same category

In [12]:
from pyspark.sql.functions import col

product_category = (reviewed_products
    .filter(col('asin') == selected_product)
    .select('categories')
    .take(1)[0][0][0][-1])

print('Product category: {0}'.format(product_category))

Product category: Stand Lights


In [13]:
import pandas as pd
pd.set_option('display.max_colwidth', -1)

last_element = udf(lambda categories: categories[0][-1])

products_in_same_category = (reviewed_products
    .limit(100000)
    .filter(last_element(col('categories')) == product_category)
    .select('asin', 'title')
    .distinct())

products_in_same_category.limit(10).toPandas()

Unnamed: 0,asin,title
0,B000L7MNUM,Mighty Bright Duet Music Stand Light
1,B007IHYBV2,Mighty Bright 54810 Hammerhead LED Book Light
2,B003B0I09Y,"Mighty Bright Orchestra Light, with Adapter and Bag"


In [14]:
indexed_products = indexer.transform(
    products_in_same_category.crossJoin(users_that_used_the_word))

alternative_products = recommender_system.transform(indexed_products).sort('prediction', ascending=False)

alternative_products.toPandas()

Unnamed: 0,asin,title,reviewerID,reviewerIndex,asinIndex,prediction
0,B003B0I09Y,"Mighty Bright Orchestra Light, with Adapter and Bag",A2P5U53IUTDUE6,585.0,395.0,0.844104
1,B000L7MNUM,Mighty Bright Duet Music Stand Light,A2P5U53IUTDUE6,585.0,400.0,0.612708
2,B007IHYBV2,Mighty Bright 54810 Hammerhead LED Book Light,A2P5U53IUTDUE6,585.0,596.0,0.262089
