-
Notifications
You must be signed in to change notification settings - Fork 0
/
transform_word2vec_amazon.py
132 lines (105 loc) · 4.79 KB
/
transform_word2vec_amazon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Transform amazon review summary data
# Kahye Song
# Insight Data Engineering 2014 June
from pyspark import SparkConf, SparkContext
from flask import *
import json
import numpy as np
import os
import text_nltk
from gensim.models import Word2Vec
from gzip import GzipFile
from redis import Redis
import sys
import time
# configure spark
conf=SparkConf()
conf.setAppName("Amazon review transform")
sc = SparkContext(conf=conf,pyFiles=['word2vec.py','text_nltk.py'])
if len(sys.argv)>1:
data_file = sys.argv[1]
else:
raise Exception("An input file has to be provided.")
# word2vec vector model file
dic_file = "text8.model"
# fetch vectors related to each word
def vectors(x):
try:
v = vectors.vectors
except AttributeError:
# if no model has been loaded, load model
vectors.vectors = {}
m = Word2Vec.load(dic_file)
for i, word in enumerate(m.vocab):
# the vector dimention has been reduced to 25 for faster computation. According to the test, dim 25 is sufficient to tell apart different emotions
vectors.vectors[word] = [1, ] + [float(x) for x in m[word][:25]]
v = vectors.vectors
# return the vector associated with the given word, if not, return the first vector - need to be fixed
return v[x] if x in v else v.itervalues().next()
# all the emotions I am interested in finding in Amazon review text
emotion_names = ['good','like','bad','dislike','happy', 'sad', 'surprised', 'angry', 'disgusted']
num_emotions = len(emotion_names)
# calculate emotion scores by finding the normalized correlation between emotion word vectors and a given word vector
def empathy_vec(x):
k, v = x
d = []
for a_emotion in emotion_names:
myvec= norm_vec(vectors(a_emotion)[1:])
myvec2 = norm_vec(np.array(v[1:]))
d.append(np.dot(myvec, myvec2))
return (k, d)
# normalize a vector
def norm_vec(myvec):
myvec = np.array(myvec)
return myvec/np.linalg.norm(myvec)
def reformat_scores(x):
return np.concatenate([np.array([x[0]]),np.array(x[1][:-1])/x[1][-1]]).tolist()
# field of interest in Amazon review either 'review/summary' or 'review/text'
text_field_name = 'review/summary'
# count the run time
begin_time = time.time()
#---------------------------------------------------------
# BEGIN the main processing
#--------------------------------------------------------
# fetch the data file and parse the field of interest and filter out those without any review words
data = sc.textFile(data_file).map(json.loads).cache()
data = data.filter(lambda x: 'review/score' in x and 'product/productId' in x and 'review/time' in x and text_field_name in x)
data = data.filter(lambda x: x[text_field_name]!=None or x['review/score']!=None)
data = data.filter(lambda x: len(x[text_field_name])>0)
# pick review text - tokenize and lammentize
article_vector = data.map(lambda x: ( (x['product/productId'],float(x['review/score']),x['review/time']),text_nltk.lemma_tokenize(x[text_field_name])))
article_vector = article_vector.filter(lambda x: len(x[1])>0)
# calculate the mean vector of the review text words
article_vector = article_vector.map(lambda x: (x[0],np.nanmean([vectors(w) for w in x[1]],axis=0)))
# calculate the avereage emotion scores of all reviews for a given review score among 1,2,3,4,5
avg_all_review = article_vector.map(empathy_vec).map(lambda x: (x[0][1],np.array(x[1]+[1.0]))).reduceByKey(lambda a,b: a+b).sortByKey().map(reformat_scores).collect()
avg_all_review = np.array(avg_all_review)
run_time = time.time()-begin_time
print 'total run_time: '
print run_time
#---------------------------------------------------------
# BEGIN Output API
#--------------------------------------------------------
# format the api output
def result_format(data_mat,emo_ind):
return {'review/score': data_mat[:,0].tolist(), 'emotion_score': data_mat[:,(emo_ind+1)].tolist()}
# redis server to cache api outputs that has been querried recently
redis = Redis()
# API output: correlation between the review scores and emotion scores
app = Flask(__name__, static_folder='static')
@app.route('/amazon.json', methods=['GET', 'POST'])
def amazon():
# fetch emotion of interest
emotion = request.args.get('emotion', emotion_names[0])
emotion_ind = emotion_names.index(emotion)
# if the emotion scores are already calculated, fetch from the server
text = redis.hget('amazon.json', emotion)
if not text:
# if emotions scores are not calculated,
text = json.dumps(result_format(avg_all_review,emotion_ind))
redis.hset('amazon.json', emotion, text)
response = Response(text, mimetype='application/json')
response.headers['Access-Control-Allow-Origin'] = '*'
return response
# running the api
app.run(host='0.0.0.0', port=int(os.getenv('PORT', '5000')), debug=False, threaded=True)