-
Notifications
You must be signed in to change notification settings - Fork 3
/
usertopics.py
130 lines (108 loc) · 4.73 KB
/
usertopics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from __future__ import division
import logging
import re
import multiprocessing
from gensim.models import LdaModel
from gensim import corpora
from pymongo import MongoClient
from settings import Settings
from sets import Set
# Intialize all collection
tags_collection = MongoClient(Settings.MONGO_CONNECTION_STRING)[Settings.TAGS_DATABASE][Settings.REVIEWS_COLLECTION]
userscore_collection = MongoClient(Settings.MONGO_CONNECTION_STRING)[Settings.TOPICS_DATABASE][Settings.USERSCORE_COLLECTION]
businessscore_collection = MongoClient(Settings.MONGO_CONNECTION_STRING)[Settings.TOPICS_DATABASE][Settings.BUSINESSSCORE_COLLECTION]
# Predict topic from review , use Topic model obtained by running LDA on all reviews
class Predict():
def __init__(self):
dictionary_path = "models/dictionary.dict"
lda_model_path = "models/lda_model_50_topics.lda"
self.dictionary = corpora.Dictionary.load(dictionary_path)
self.lda = LdaModel.load(lda_model_path)
def run(self, new_review):
new_review_bow = self.dictionary.doc2bow(new_review)
new_review_lda = self.lda[new_review_bow]
return new_review_lda
# Worker Process to Run LDA on each User
def worker(identifier, start, end,itemList,itemType):
#Intialize predict class
predict=Predict()
for i in range(start,end):
itemId = itemList[i]
itemScore=[]
# create a userScore Array for 50 topics, Score has weightage for each topic
# Intialize User score array
for i in range(0,50):
itemScore.append(0.0)
reviewList=tags_collection.find({itemType:itemId})
# Go throgh review of each user and run LDA on in it
# Multiply topic score with Rating given to that review which infer whether User have
# postive sentiment or negative sentiment about this topic aspect
counter=0
for review in reviewList:
reviewLists = re.sub("[^\w]", " ", review["text"]).split()
ldaScore=predict.run(reviewLists)
counter = counter + 1
if counter % 1000 == 0:
print 'Thread:'+str(identifier)+' '+str(counter) +' '+'record inserted for'+ str(itemType)+'Profile '
reviewCount=0
for topicScore in ldaScore:
reviewCount = reviewCount + 1
itemScore[topicScore[0]]= float(itemScore[topicScore[0]])+ float(topicScore[1])*float(review["rating"])
# Normalize Userscore dividing by total Number of Reviews
for i in range(0,50):
itemScore[i]=float(itemScore[i])/float(reviewCount)
# Insert the result in userscore collection
if itemType == "userId":
userscore_collection.insert({"userId":itemId , "userscore":itemScore})
else:
businessscore_collection.insert({"business":itemId , "businessscore":itemScore})
def main():
#Set up log level as info
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
# Cursor to extract userId and business from all reviews
reviews_cursor = tags_collection.find()
reviews_cursor.batch_size(1000)
# Create userList and businessList to store unique User Id
userList=[]
businessList=[]
# Go through all reviews in the Collection and Add userId and business to userList
counter=0
for review in reviews_cursor:
userList.append(review["userId"])
businessList.append(review["business"])
counter = counter + 1
if counter % 100000 == 0:
print str(counter) + 'Record Read from reviews collection'
# Remove duplicate business and userId from the List
userList=Set(userList)
userList=list(userList)
businessList=Set(businessList)
businessList=list(businessList)
print 'Number of User in Dataset' + str(len(userList))
print 'Number of Business in Dataset' + str(len(businessList))
# Process User Review to create User Profile
count = len(userList)
workers = 6
batch = int(count / workers)
jobs = []
for i in range(workers):
p = multiprocessing.Process(target=worker, args=((i + 1), int(i * batch), int((i+1)*batch),userList,"userId"))
jobs.append(p)
p.start()
for j in jobs:
j.join()
print '%s.exitcode = %s' % (j.name, j.exitcode)
#Process Business Review to create Business Profile
count = len(businessList)
workers = 6
batch = count / workers
jobs = []
for i in range(workers):
p = multiprocessing.Process(target=worker, args=((i + 1), int(i * batch), int((i+1)*batch),businessList,"business"))
jobs.append(p)
p.start()
for j in jobs:
j.join()
print '%s.exitcode = %s' % (j.name, j.exitcode)
if __name__ == '__main__':
main()