In [1]:
import os
import pandas as pd
import pyspark
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import rand, col
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

seed = 100
filePath = "Organized_dataset/cleaned/review_cleaned.csv"
sc = SparkContext()

In [2]:
dfPandas = pd.read_csv(filePath, index_col=0)
dfPandas.head()

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0_level_0,useful,review_id,text,business_id,stars,date,user_id,cool
funny,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,0,GJXCdrto3ASJOqKeVWPi6Q,I *adore* Travis at the Hard Rock's new Kelly ...,NZnhc2sEQy3RmzKTZnqtwQ,5.0,2017-01-14 21:30:33,yXQM5uF2jS6es16SJzNHfg,0.0
0,3,2TzJjDVDEuAW6MR5Vuc1ug,I have to say that this office really has it t...,WTqjgwHlXbSFevF32_DJVw,5.0,2016-11-09 20:09:03,n6-Gk65cPZL6Uz8qRm3NYw,0.0
4,5,G7XHMxG0bx9oBJNECG4IFg,Tracy dessert had a big name in Hong Kong and ...,3fw2X5bZYeW9xCz_zGhOHg,3.0,2016-05-07 01:21:02,jlu4CztcSxrKx56ba1a5AQ,5.0
0,0,svK3nBU7Rk8VfGorlrN52A,You can't really find anything wrong with this...,YvrylyuWgbP90RgMqZQVnQ,5.0,2017-04-07 21:27:49,NJlxGtouq06hhC7sS2ECYw,0.0
0,0,1wVA2-vQIuW_ClmXkDxqMQ,Great lunch today. Staff was very helpful in a...,NyLYY8q1-H3hfsTwuwLPCg,4.0,2015-01-03 22:47:34,86J5DwcFk4f4In1Vxe2TvA,0.0


In [3]:
dfPandas.shape

(3148044, 8)

In [4]:
print("NaNs: ", dfPandas['text'].isnull().sum())
dfPandas.loc[dfPandas.isnull().any(axis=1)]

NaNs:  0


Unnamed: 0_level_0,useful,review_id,text,business_id,stars,date,user_id,cool
funny,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,0,OSdqcxyXqM-XTEgajJNUmw,Working with Tina and Marcia has been such a p...,,,,,
My husband and I had not purchased a home before and we definitely needed some hand holding. They were patient and professional. We got our dream home and the entire experience was awesome! Thank you so much ladies for a job well done!,eO0Dsp8MBjUT16lno8gkmA,5.0,2017-02-22 01:24:23,QRn8ELZmvP8S4evnSRU9bQ,0.0,,,


In [5]:
from sklearn.feature_extraction.text import TfidfTransformer 
from sklearn.feature_extraction.text import CountVectorizer

count_vectorizer = CountVectorizer(max_df=0.95, min_df=0.05) # Ignore the term that appears in more than 95% of the documents, and less than 5% of the documents
count_vectorizer.fit(dfPandas['text'])

CountVectorizer(analyzer='word', binary=False, decode_error='strict',
                dtype=<class 'numpy.int64'>, encoding='utf-8', input='content',
                lowercase=True, max_df=0.95, max_features=None, min_df=0.05,
                ngram_range=(1, 1), preprocessor=None, stop_words=None,
                strip_accents=None, token_pattern='(?u)\\b\\w\\w+\\b',
                tokenizer=None, vocabulary=None)

In [6]:
tfidf_transformer = TfidfTransformer()
tfidf_transformer.fit(count_vectorizer.transform(dfPandas['text']))

TfidfTransformer(norm='l2', smooth_idf=True, sublinear_tf=False, use_idf=True)

In [7]:
count_vectorizer.get_feature_names()

['10',
 'about',
 'after',
 'again',
 'all',
 'also',
 'always',
 'am',
 'amazing',
 'an',
 'and',
 'another',
 'any',
 'are',
 'area',
 'around',
 'as',
 'asked',
 'at',
 'away',
 'awesome',
 'back',
 'bad',
 'bar',
 'be',
 'because',
 'been',
 'before',
 'being',
 'best',
 'better',
 'big',
 'bit',
 'both',
 'busy',
 'but',
 'by',
 'came',
 'can',
 'check',
 'cheese',
 'chicken',
 'clean',
 'come',
 'coming',
 'could',
 'customer',
 'day',
 'definitely',
 'delicious',
 'did',
 'didn',
 'different',
 'dinner',
 'do',
 'don',
 'done',
 'down',
 'drinks',
 'eat',
 'enough',
 'even',
 'ever',
 'every',
 'everything',
 'excellent',
 'experience',
 'favorite',
 'feel',
 'few',
 'find',
 'first',
 'food',
 'for',
 'found',
 'fresh',
 'friendly',
 'from',
 'get',
 'give',
 'go',
 'going',
 'good',
 'got',
 'great',
 'had',
 'happy',
 'has',
 'have',
 'he',
 'her',
 'here',
 'highly',
 'his',
 'home',
 'hot',
 'how',
 'if',
 'in',
 'into',
 'is',
 'it',
 'just',
 'know',
 'last',
 'like',
 'l

In [8]:
tfidf = tfidf_transformer.transform(count_vectorizer.transform(dfPandas['text']))

In [9]:
tfidf

<3148044x234 sparse matrix of type '<class 'numpy.float64'>'
	with 114857449 stored elements in Compressed Sparse Row format>

In [11]:
import pickle

pickle.dump(count_vectorizer, open('Organized_dataset/cleaned/countVectorizer.pkl', 'wb'))
pickle.dump(tfidf_transformer, open('Organized_dataset/cleaned/tfidfTransformer.pkl', 'wb'))

In [12]:
import scipy

scipy.sparse.save_npz('Organized_dataset/cleaned/textTransform.npz', tfidf)

In [13]:
scoreMatrix = pd.DataFrame(
    data=tfidf.toarray(),
    columns=count_vectorizer.get_feature_names(),
)

In [14]:
scoreMatrix.head(1)

Unnamed: 0,10,about,after,again,all,also,always,am,amazing,an,...,while,who,will,with,work,worth,would,years,you,your
0,0.0,0.0,0.0,0.0,0.051035,0.06043,0.06782,0.0,0.0,0.0,...,0.0,0.0,0.055916,0.115414,0.0,0.082211,0.0,0.0,0.16944,0.0


In [15]:
scoreMatrix.to_csv('Organized_dataset/cleaned/textTransformPandas.csv', index=False)

In [20]:
import os
import pyspark
from pyspark import SparkFiles
from pyspark.sql import SQLContext
from pyspark.sql.functions import rand, col
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [22]:
# seed = 100
# sc = SparkContext()
# Already existed, no need to write them again

In [27]:
filePath = "Organized_dataset/cleaned/yelp_ratings.csv"

In [28]:
# read dataset into spark RDD
sc.addFile(filePath)
sqlContext = SQLContext(sc)
df = sqlContext.read.csv(
    SparkFiles.get("yelp_ratings.csv"), 
    header=True, 
    inferSchema=True
)

sqlContext.registerDataFrameAsTable(df, "df")
df = sqlContext.sql('''
    SELECT *
    FROM df
''')

Py4JJavaError: An error occurred while calling o22.addFile.
: java.io.FileNotFoundException: File file:/Users/zhouzihui/Desktop/Columbia/2019%20Fall/Personalization/Homework/2_Final%20Project/Organized_dataset/cleaned/yelp_ratings.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1544)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
df.show(n=10)

In [None]:
df.dtypes

In [None]:
dftrain = df.where(col('TrainTest') == 1)
dftest = df.where(col('TrainTest') == 0)

In [None]:
dftrain.take(3)

In [None]:
dftest.take(3)

In [None]:
import math
import operator
# using average rate as Baseline model
meanRating = df.rdd.map(lambda x: x[2]).mean()
baselineRmse = math.sqrt(
    dftest.rdd.map(lambda x: (meanRating - x[2]) ** 2).reduce(operator.add) / dftest.count()
)
print("Baseline Model (Rating Average for all users and movies) Performance on Test Set")
print("baseline performance on test set: ", baselineRmse)