In [1]:
from pyspark import SparkContext

In [2]:
import re

In [3]:
import math

In [4]:
# program start here
print("\033[1;31mProgram starts!\033[0m")
hotelFilePath = '../../datasets/Datafiniti_Hotel_Reviews_Jun19.csv'
sc = SparkContext('local[*]', 'Review_extraction')
sc.setLogLevel("ERROR")

[1;31mProgram starts![0m


In [5]:
# Split lines into words, not using regex to save performance
def splitWords(line):
    words = ''
    neglect = False
    result = []
    for char in line:
        if char == ',' and not neglect and words != '':
            result.append(words)
            words = ''
        else:
            if char == '"':
                neglect = not neglect
            else:
                words += char
    if words != '':
        result.append(words)
    return result

In [6]:
def remove_malformed_data(data):
    if len(data) != 25 or data[7] != 'US':
        return data, -1
    else:
        return data, 0

In [7]:
def reassign_id(review):
    return (review_dict[review], (review[0], review[1][0], review[1][1], review[1][2], review[1][3]))

In [8]:
# We need to tokennize the reviews by tokenize each words, and remove stop words.
def tokenizeReview(review_data):
    return review_data[0], list(filter(lambda r: r.lower() not in stopwords, tokenize(review_data[1][3])))
def tokenize(review_data):
    return list(filter(lambda r: len(r) > 0, re.split(r'\W+', review_data.lower())))
print(tokenize("Hello world! hello!, hello? world!"))

['hello', 'world', 'hello', 'hello', 'world']


In [9]:
# TF - term frequency
def term_frequency(tokens):
    token_counts = {}
    for token in tokens:
        if token not in token_counts:
            token_counts[token] = 1
        else:
            token_counts[token] += 1
    total_tokens = len(tokens)
    term_frequency = {}
    for token in token_counts.keys():
        term_frequency[token] = 1.00 * token_counts[token]/ total_tokens
    return term_frequency
#Test the validity of term_frequency
print(term_frequency(tokenize("Hello world! hello!, hello? world!")))
print(term_frequency(tokenize("Iago advises Roderigo to spoil some of Othello’s pleasure")))

{'hello': 0.6, 'world': 0.4}
{'iago': 0.1, 'advises': 0.1, 'roderigo': 0.1, 'to': 0.1, 'spoil': 0.1, 'some': 0.1, 'of': 0.1, 'othello': 0.1, 's': 0.1, 'pleasure': 0.1}


In [10]:
# IDF - Inverse Document Frequency
def inverse_document_frequency(rdd):
    tokensRDD = rdd.flatMap(lambda review: list(set(review[1]))).map(lambda t: (t,1)).reduceByKey(lambda a,b:a+b)
    return (tokensRDD.map(lambda t: (t[0], float(total_document_count/t[1]))))

In [11]:
# TF-IDF Term Frequency - Inverse Document Frequency
def tf_idf(review_data, idfMapBroadCasted):
    tfs = term_frequency(review_data)
    tfidf = {}
    for token in tfs.keys():
        tfidf[token] = tfs[token] * idfMapBroadCasted.value[token]
    return tfidf

In [12]:
# Compute cossine similarity between review based on their TF-IDF scores on duplicate tokens
def dotProduct(review_a, review_b):
    return sum([review_a[t] * review_b[t] for t in review_a if t in review_b])
print(dotProduct({"a": 0.5, "b": 0.2}, {"a": 0.5, "b": 0.2, "c": 0.2}))

def norm(review_a):
    return math.sqrt(dotProduct(review_a, review_a))
print(norm({"a": 0.5, "b": 0.2}))

def cossine(review_a, review_b):
    return dotProduct(review_a, review_b) /(norm(review_a) * norm(review_b))
print(cossine({"a": 0.5, "b": 0.2}, {"a": 0.5, "b": 0.2, "c": 0.2}))

0.29000000000000004
0.5385164807134505
0.9374368665610919


In [13]:
# Repartition the RDD to improve performance
m_prime = 23
def custom_partition(review_id):
    return review_id * m_prime + 103)

SyntaxError: unmatched ')' (<ipython-input-13-bf6c47651e68>, line 4)

In [14]:
# Compare each review with each other review:
def compare(review_data, reviewTFIDFMapBroadCast):
    duplicate_review_ids = []
    for review_id in reviewTFIDFMapBroadCast.value.keys():
        if review_data[0] >= review_id:
            continue
        else:
            similarity = cossine(review_data[1], reviewTFIDFMapBroadCast.value[review_id])
            if similarity >= 0.99:
                duplicate_review_ids.append(review_id)
    return review_data[0], duplicate_review_ids

In [15]:
hotelFile = sc.textFile(hotelFilePath)
initialRDD = hotelFile.flatMap(lambda line: line.split('\n')) \
    .map(lambda line: splitWords(line)) \
    .map(lambda data: remove_malformed_data(data)) \
    .filter(lambda data: data[1] == 0) \
    .map(lambda data: data[0]) \
    .map(lambda line: (line[11], [line[3], line[4], line[6], line[9],
                                  line[10], line[12], line[13], line[16], line[18], line[19], line[0], line[14][0:11]]))
print("Initial cleaning saved %s lines of data."
      % (initialRDD.count()))

Initial cleaning saved 10000 lines of data.


In [16]:
# Collect only the review portion of all data and uses distinct to remove duplicate reviews
reviewRDD = initialRDD \
    .map(lambda data: (data[1][10], (data[0], data[1][7], data[1][8], data[1][9]))) \
    .distinct()
total_document_count = reviewRDD.count()
print("Total review count before cleaning has %s reviews." % (total_document_count))

Total review count before cleaning has 9787 reviews.


In [17]:
print(reviewRDD.take(1))

[('AVwcj_OhkufWRAb5wi9T', ("Best Western Carmel's Town House Lodge", '5', 'Not cheap but excellent location. Price is somewhat standard for not hacing reservations. But room was nice and clean. They offer good continental breakfast which is a plus and compensates. Front desk service and personnel where excellent. It is Carmel, no A/C in rooms but they have a fan for air circulation.', 'Very good'))]


In [18]:
# Since we have duplicate review_ID in this dataset, we need to re-assign their ids.
reviewList = reviewRDD.collect()
review_dict = {}
counter = 0
for review in reviewList:
    review_dict[review] = counter
    counter += 1
reviewRDD = reviewRDD.map(reassign_id)

In [19]:
print(reviewRDD.take(1))

[(0, ('AVwcj_OhkufWRAb5wi9T', "Best Western Carmel's Town House Lodge", '5', 'Not cheap but excellent location. Price is somewhat standard for not hacing reservations. But room was nice and clean. They offer good continental breakfast which is a plus and compensates. Front desk service and personnel where excellent. It is Carmel, no A/C in rooms but they have a fan for air circulation.', 'Very good'))]


In [20]:
# We need to remove stopwords to compute the real similarity between reviews. Using set to remove duplicates and 
# for faster comparison
stopwords = set(sc.textFile('../../datasets/stopwords.txt').flatMap(lambda data: data.split('\n')).collect())

In [21]:
# Tokenize every review
reviewTokenized = reviewRDD.map(tokenizeReview)
print(reviewTokenized.take(1))

[(0, ['cheap', 'excellent', 'location', 'price', 'standard', 'hacing', 'reservations', 'room', 'nice', 'clean', 'offer', 'good', 'continental', 'breakfast', 'a', 'compensates', 'front', 'desk', 'service', 'personnel', 'excellent', 'carmel', 'a', 'c', 'rooms', 'a', 'fan', 'air', 'circulation'])]


In [22]:
# Calculate the IDF for each token in the review
idfMap = inverse_document_frequency(reviewTokenized).collectAsMap()
idfMapBroadCast = sc.broadcast(idfMap)
print(idfMap['cheap'])

71.96323529411765


In [23]:
# Perform TFIDF on each review
reviewTFIDF = reviewTokenized.map(lambda review_data: (review_data[0], tf_idf(review_data[1], idfMapBroadCast)))
print(reviewTFIDF.take(1))

[(0, {'cheap': 2.4814908722109537, 'excellent': 0.6736182806800193, 'location': 0.11436216828894939, 'price': 0.3601737018363817, 'standard': 1.6382658185470373, 'hacing': 337.48275862068965, 'reservations': 2.463377800151019, 'room': 0.06444200088231615, 'nice': 0.1201861675999607, 'clean': 0.09382339689204605, 'offer': 0.7211170055997642, 'good': 0.1306047827479449, 'continental': 1.454667063020214, 'breakfast': 0.10872511553501599, 'a': 0.12184959391768793, 'compensates': 337.48275862068965, 'front': 0.21185358356603243, 'desk': 0.21386740090031028, 'service': 0.1669880052551656, 'personnel': 5.192042440318302, 'carmel': 84.37068965517241, 'c': 1.5552200858096297, 'rooms': 0.11649387594776998, 'fan': 2.9865730851388466, 'air': 1.2980106100795754, 'circulation': 168.74137931034483})]


In [24]:
# Collect the review TFIDF as a map:
reviewTFIDFMap = reviewTFIDF.collectAsMap()
reviewTFIDFMapBroadCast = sc.broadcast(reviewTFIDFMap)
print(reviewTFIDFMapBroadCast.value[0])

{'cheap': 2.4814908722109537, 'excellent': 0.6736182806800193, 'location': 0.11436216828894939, 'price': 0.3601737018363817, 'standard': 1.6382658185470373, 'hacing': 337.48275862068965, 'reservations': 2.463377800151019, 'room': 0.06444200088231615, 'nice': 0.1201861675999607, 'clean': 0.09382339689204605, 'offer': 0.7211170055997642, 'good': 0.1306047827479449, 'continental': 1.454667063020214, 'breakfast': 0.10872511553501599, 'a': 0.12184959391768793, 'compensates': 337.48275862068965, 'front': 0.21185358356603243, 'desk': 0.21386740090031028, 'service': 0.1669880052551656, 'personnel': 5.192042440318302, 'carmel': 84.37068965517241, 'c': 1.5552200858096297, 'rooms': 0.11649387594776998, 'fan': 2.9865730851388466, 'air': 1.2980106100795754, 'circulation': 168.74137931034483}


In [25]:
possibleDuplicateReviewMap = reviewTFIDF.map(lambda review_data: compare(review_data, reviewTFIDFMapBroadCast)).collectAsMap()

In [26]:
duplicateCandidates = {}
for review_id in possibleDuplicateReviewMap.keys():
    if len(possibleDuplicateReviewMap[review_id]) > 0:
        duplicateCandidates[review_id] = possibleDuplicateReviewMap[review_id]

In [27]:
reviewMap = reviewRDD.collectAsMap()

In [28]:
print("Cleaned reviews has total of %s reviews before Tf-idf removal" % (len(reviewMap)))

Cleaned reviews has total of 9787 reviews before Tf-idf removal


In [29]:
for duplicateKey in duplicateCandidates.keys():
    for k in duplicateCandidates[duplicateKey]:
        print(str(reviewMap[duplicateKey]) + ", " + str(reviewMap[k]))

('AWCVdownIxWefVJwvUDw', 'Hawthorn Suites By Wyndham Louisville/jeffersontown', '4', 'MoreMore', 'Perfect stay'), ('AVz6h4u2FcQ3k02bDhKp', 'Wyndham Garden Baronne Plaza New Orleans', '3', 'MoreMore', 'June trip to New Orleans')
('AWCVdownIxWefVJwvUDw', 'Hawthorn Suites By Wyndham Louisville/jeffersontown', '4', 'MoreMore', 'Perfect stay'), ('AWEKrVXr3-Khe5l_eyaL', 'Ramada Metairie New Orleans Airport', '1', 'MoreMore', 'Not exceptable...roaches')
('AWCVdownIxWefVJwvUDw', 'Hawthorn Suites By Wyndham Louisville/jeffersontown', '4', 'MoreMore', 'Perfect stay'), ('AV1eUFlyIxWefVJwfdod', 'Days Inn-Ku Lawrence', '1', 'MoreMore', "Don't stay")
('AWCVdownIxWefVJwvUDw', 'Hawthorn Suites By Wyndham Louisville/jeffersontown', '4', 'MoreMore', 'Perfect stay'), ('AVwebW3tIN2L1WUf40LS', 'Wyndham Santa Monica At The Pier', '5', 'MoreMore', 'Excellent Customer Service!')
('AWCVdownIxWefVJwvUDw', 'Hawthorn Suites By Wyndham Louisville/jeffersontown', '4', 'MoreMore', 'Perfect stay'), ('AVwebW3tIN2L1WUf

In [102]:
for duplicateKey in duplicateCandidates.keys():
    for k in duplicateCandidates[duplicateKey]:
        if k in reviewMap.keys():
            del reviewMap[k]

In [104]:
print("Cleaned reviews has total of %s reviews after Tf-idf removal" % (len(reviewMap)))
print(reviewRDD.take(1))

Cleaned reviews has total of 9759 reviews after Tf-idf removal
[(0, ('AVwcj_OhkufWRAb5wi9T', "Best Western Carmel's Town House Lodge", '5', 'Not cheap but excellent location. Price is somewhat standard for not hacing reservations. But room was nice and clean. They offer good continental breakfast which is a plus and compensates. Front desk service and personnel where excellent. It is Carmel, no A/C in rooms but they have a fan for air circulation.', 'Very good'))]


In [105]:
print("Saving review data to file.")
output_file = open("General_Review_Data.json", "w")
counter = 0
output_file.write("{")
output_file.write('"Reviews": {')
for review_id in reviewMap.keys():
    output_file.write('"' + str(review_id) + '": {')
    output_file.write('"Id": ' + str(review_id) + ',')
    output_file.write('"ReviewId": ' + '"' + reviewMap[review_id][0] + '",')
    output_file.write('"ReviewEdHotel": ' + '"' + reviewMap[review_id][1] + '",')
    output_file.write('"ReviewedScore": ' + '"' + reviewMap[review_id][2] + '",')
    output_file.write('"ReviewedText": ' + '"' + reviewMap[review_id][3] + '",')
    output_file.write('"ReviewedTitle": ' + '"' + reviewMap[review_id][4] + '"')
    counter += 1
    if counter != len(reviewMap):
        output_file.write('},')
    else:
        output_file.write('}')
output_file.write('}')
output_file.write('}')
output_file.close()
print("Data saved to General_Review_Data.json")

Saving review data to file.
Data saved to General_Review_Data.json
