In [1]:
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import udf, round, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, VectorAssembler, CountVectorizer, IDF, IDFModel, CountVectorizerModel, StringIndexer
from nltk.stem import PorterStemmer
from torchtext.vocab import GloVe
import numpy as np
from functools import reduce
from datetime import datetime
from google.cloud import storage

# from pyspark.mllib.tree import GradientBoostedTrees
from pyspark.ml.classification import LogisticRegression

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import MulticlassMetrics
from io import BytesIO
from tensorflow.python.lib.io import file_io
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
from pyspark.ml.linalg import Vectors

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [3]:
# Load Data
full_data = spark.read.json('gs://bdl_project/yelp.json')

In [4]:
raw_data = full_data

In [5]:
bucket = 'bdl_project'

In [6]:
# Loading and applying all pre-trained models
indexer = PipelineModel.load('gs://' + bucket + '/indexer')
raw_data = indexer.transform(raw_data)
raw_data = raw_data.drop('business_id', 'user_id')

In [7]:
pre_process = PipelineModel.load('gs://' + bucket + '/pre_process')
data = pre_process.transform(raw_data)
data = data.drop('text', 'tokenized_text')

In [8]:
# Perform stemming
ps = PorterStemmer()
stemmer = udf(lambda text: [ps.stem(token) for token in text], ArrayType(StringType()))
data = data.withColumn('stemmed_text', stemmer('filtered_text'))
data = data.drop('filtered_text')

In [9]:
## Train test split
(train, test) = data.randomSplit([0.95, 0.05], seed=5)

In [10]:
cv_model = CountVectorizerModel().load('gs://' + bucket + '/cv_model')  
cvData = cv_model.transform(train)
idf_model = IDFModel().load('gs://' + bucket + '/idf_model')
tfidfData = idf_model.transform(cvData)

In [11]:
tfidfData.columns

['cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'useful',
 'business_ind',
 'user_ind',
 'stemmed_text',
 'tf',
 'tf_idf']

In [12]:
f = BytesIO(file_io.read_file_to_string('gs://nithya1998/glove.npy', binary_mode=True))
glove = np.load(f)

In [None]:
#embedding_glove = GloVe(name='6B', dim=100)

In [13]:
# Get vocabulary and word index
vocab = cv_model.vocabulary
ind2word = dict(zip(range(len(vocab)), vocab))
vocab_len = len(vocab)

In [16]:
vocab_len

262144

In [None]:
# Obtain glove stacking for the words in the vocabulary

# for i in range(vocab_len):
#     if i ==0:
#         glove = embedding_glove[vocab[i]].numpy()
#     else:
#         glove = np.vstack((glove, embedding_glove[vocab[i]].numpy()))

In [17]:
# Feature engineering to obtain sentence representation using weighted glove representations
glove_cols = tfidfData.rdd.map(lambda x:(x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], Vectors.dense(x[10].dot(glove)/x[10].dot(np.ones(vocab_len))))).toDF()

In [18]:
# Renaming glove column names with original column names
oldColumns = glove_cols.schema.names
newColumns = tfidfData.schema.names[:-1] + ["glove_emb"]

train = reduce(lambda glove_cols, idx: glove_cols.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), glove_cols)
train = train.drop("lemmatized_text", "tf")

In [19]:
print('Done')

Done


In [20]:
bucket = 'nithya1998'
min_date = datetime.now()
client = storage.Client()
bucket_obj = client.get_bucket(bucket)
blob = bucket_obj.blob('min_date.txt')
blob.upload_from_string(str(min_date))

delta_days = udf(lambda x: int((min_date-datetime.strptime(x, '%Y-%m-%d %H:%M:%S')).days))
train = train.withColumn('delta_days', delta_days('date'))
train = train.drop('date')

In [21]:
from pyspark.sql.types import IntegerType
train = train.withColumn("delta_days",train["delta_days"].cast(IntegerType()))

In [22]:
train.printSchema()

root
 |-- cool: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- useful: long (nullable = true)
 |-- business_ind: double (nullable = true)
 |-- user_ind: double (nullable = true)
 |-- stemmed_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- glove_emb: vector (nullable = true)
 |-- delta_days: integer (nullable = true)



## LR

In [None]:
bucket_store = <your-bucket>

In [None]:
assembler = VectorAssembler(inputCols = ['cool','funny','useful','glove_emb','delta_days','business_ind','user_ind'], outputCol="features")
train = assembler.transform(train).select("features", "stars")

In [None]:
lr_train = train.rdd.map(lambda x: LabeledPoint(x.stars, MLLibVectors.fromML(x.features))).toDF()

In [None]:
lr_train.show()

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(lr_train)
model.save('gs://' + bucket_store + '/lr_model')

In [None]:
print('Trained and saved')

In [None]:
trainingSummary = model.summary
accuracy = trainingSummary.accuracy

In [None]:
print('Accuracy =', accuracy)

# XGBOOST

In [23]:
xgbassembler = VectorAssembler(inputCols = ['cool','funny','useful','glove_emb','delta_days','business_ind','user_ind'], outputCol="features")
train = xgbassembler.transform(train).select("features", "stars")

In [24]:
# The input to xgboost needs to be an RDD of LabeledPoints and the feature vector needs to be of type mllib.linalg.vector
xgb_train = train.rdd.map(lambda x: LabeledPoint(x.stars, MLLibVectors.fromML(x.features)))

In [25]:
print('Done')

Done


In [None]:
#xgb_train.toDF().show()

In [None]:
#df.agg({"your-column": "max"}).collect()[0][0]

In [None]:
# categoricalFeaturesInfo is a dict with key as column index of categorical variable and value as no. of classes
num_bus = 209393
num_users = 1968703
model = GradientBoostedTrees.trainRegressor(xgb_train, categoricalFeaturesInfo={104:num_bus, 105:num_users}, numIterations=3)
print('Model trained')

KeyboardInterrupt: 

In [None]:
model.save('gs://nithya1998/xgb_model')

In [None]:
train_predictions = model.predict(xgb_train.map(lambda x: x.features))
stars_and_pred = train.rdd.map(lambda lp: lp.stars).zip(train_predictions).toDF()
stars_and_pred = stars_and_pred.withColumn('pred', round(col('_2'))).withColumnRenamed('_1', 'stars')
stars_and_pred = stars_and_pred.select('stars','pred')

In [None]:
metrics_multi = MulticlassMetrics(stars_and_pred.rdd.map(tuple))
acc = metrics_multi.accuracy
print("Training Accuracy =", acc)