In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:05:08)
SparkSession available as 'spark'.


In [2]:
import pickle
import os
os.chdir('/Users/lakerwayne/Desktop/YelpChallenge')
from process import Restaurant
reviews = pickle.load(open('/Users/lakerwayne/Desktop/YelpChallenge/data_edinburg_rest.pickle', 'rb'))

In [3]:
phrases = {}
with open('italian_cuisine.txt','r') as txtfile:
    for line in txtfile.readlines():
        words = line.replace('\n', ' ').strip().split('_')
        phrases[words[0]] = words[1]

In [4]:
rid2reviews = []
for c in reviews:
    for i,r in enumerate(reviews[c]):
        review = tuple([r, reviews[c][r].texts[0]])
        rid2reviews.append(review)

In [5]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf

sentenceDataFrame = spark.createDataFrame(rid2reviews, ["id", "review"])
tokenizer = Tokenizer(inputCol="review", outputCol="words")
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.show(5)

+--------------------+--------------------+--------------------+
|                  id|              review|               words|
+--------------------+--------------------+--------------------+
|Nvi9RLcOdrJvSwcw-...|The idea of this ...|[the, idea, of, t...|
|XGMI18dOgrOo_r7Wz...|My wife and I ate...|[my, wife, and, i...|
|0HyuO5rOKeaa08TFz...|This is such a go...|[this, is, such, ...|
|HYDfO3jMTHDywO64A...|From the outside ...|[from, the, outsi...|
|paxRLjJUE2IQ-WyNq...|I'd say this is y...|[i'd, say, this, ...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [10]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized = remover.transform(tokenized)

In [11]:
from pyspark.sql.functions import struct
from pyspark.sql.types import *
import re

def cleanup_text(record):
    text = record[3]
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in text]
    return text_out

# define udf with an array of tokenized words
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = tokenized.withColumn("results", udf_cleantext(struct([tokenized[x] for x in tokenized.columns])))

In [12]:
clean_text.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  id|              review|               words|            filtered|             results|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Nvi9RLcOdrJvSwcw-...|The idea of this ...|[the, idea, of, t...|[idea, pub, extre...|[idea, pub, extre...|
|XGMI18dOgrOo_r7Wz...|My wife and I ate...|[my, wife, and, i...|[wife, ate, twice...|[wife, ate, twice...|
|0HyuO5rOKeaa08TFz...|This is such a go...|[this, is, such, ...|[gorgeous, pub,, ...|[gorgeous, pub, c...|
|HYDfO3jMTHDywO64A...|From the outside ...|[from, the, outsi...|[outside, looks, ...|[outside, looks, ...|
|paxRLjJUE2IQ-WyNq...|I'd say this is y...|[i'd, say, this, ...|[i'd, say, best, ...|[id, say, best, b...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [13]:
domain = clean_text.select('id','results')
domain.show(5)

+--------------------+--------------------+
|                  id|             results|
+--------------------+--------------------+
|Nvi9RLcOdrJvSwcw-...|[idea, pub, extre...|
|XGMI18dOgrOo_r7Wz...|[wife, ate, twice...|
|0HyuO5rOKeaa08TFz...|[gorgeous, pub, c...|
|HYDfO3jMTHDywO64A...|[outside, looks, ...|
|paxRLjJUE2IQ-WyNq...|[id, say, best, b...|
+--------------------+--------------------+
only showing top 5 rows



In [14]:
results = domain.select('results').collect()
ids = domain.select('id').collect()

In [24]:
overall_frq = {}
rest2frq = {}
for i, row in enumerate(results):
    rest2frq[ids[i]] = {}
    word_list = row[0]
    for widx, w in enumerate(word_list):
        if widx==len(word_list)-1:
                continue
        elif w in phrases and phrases[w]==word_list[widx+1]:
            dish = w+"_"+word_list[widx+1]
            if dish not in rest2frq:
                rest2frq[ids[i]][dish] = 0
            rest2frq[ids[i]][dish] += 1

In [25]:
print rest2frq

{Row(id=u'oJc1EGIqDNSU09loftUt7g'): {}, Row(id=u'yr0yZvHcj8uOHBCYNrccjg'): {}, Row(id=u'sVpAK9OW_jRlvdqeqmA0zA'): {}, Row(id=u'4UIsN3GpiPsL5vZxUMKsHg'): {u'heated_up': 1}, Row(id=u'yrZSuJc-6an-fk3XHo2lXw'): {u'ice_cream': 1}, Row(id=u'ZMcd7w-yDhFRLSwvJBV1VQ'): {}, Row(id=u'EiYE3T3hfqcpcficZ4zJzQ'): {}, Row(id=u'sePL8mNeCKap_t6_D6Fe1Q'): {}, Row(id=u'9wSHuOEou6PkRd-GbfH47w'): {}, Row(id=u'pYxq2wUXInrugQGrOLq4zw'): {}, Row(id=u'rOXAPMNLSUs5RcKfhdZJNw'): {}, Row(id=u'NwZDjx3sMfPdXaEVw9v4PA'): {}, Row(id=u'TLiCKRliKMdnRYuF7kTS5Q'): {}, Row(id=u't_COIFO2ZdAxQyhObR0ZwA'): {}, Row(id=u'AE7q6_m187u6jhbfRipcJw'): {u'thumbs_up': 1}, Row(id=u'5ww4e1I98Q9EoJncU-VslA'): {}, Row(id=u'7viKgy6DfljrNn0VEluOHw'): {u'ice_cream': 1}, Row(id=u'ZzGB894y2HGKOtqYY4b2YA'): {}, Row(id=u'7_EaA-LxLRZYKDi-ga7Vsw'): {}, Row(id=u'cK2JaWuMM84fYjtrPFfvNQ'): {u'used_to': 1}, Row(id=u'Qe7yHDU5SPTsLVqQ6Cgnsw'): {}, Row(id=u'6RX3vyGBWubB12PtFnBjbQ'): {}, Row(id=u'XFC0k6QbLstWAJ8_ppIVYw'): {}, Row(id=u'kUQhcKnyUjd9GcvxwT3O

In [26]:
with open("row_rid2dishes.pickle",'wb') as p:
    pickle.dump(rest2frq,p)