In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('big-data').master("local[8]").config("spark.driver.memory", "8g").config('spark.sql.execution.arrow.enabled', 'true').getOrCreate()

First read the generated fake reviews and try to get a sense of how long the average review is.

In [2]:
tokenized_fake_reviews = spark.read.text('fake-reviews/generated.txt')

In [3]:
from pyspark.sql.functions import col, length, mean

tokenized_fake_reviews.agg(mean(length(tokenized_fake_reviews.value))).show()

+------------------+
|avg(length(value))|
+------------------+
|100.48283333333333|
+------------------+



Now read some real reviews

In [4]:
real_reviews = spark.read.json('yelp-dataset/yelp_academic_dataset_review.json').sample(False, 0.05, seed=42)

We want to limit reviews to around 100 characters. This is obviously not 100% exact because the generated reviews are tokenized and the yelp review data set is not.

In [5]:
from pyspark.sql.functions import col, length

real_reviews = real_reviews.where(length(real_reviews.text) <= 100).limit(6000)

Remove new-lines and non-ascii characters

In [6]:
from pyspark.sql.functions import regexp_replace
real_reviews = real_reviews.withColumn('text', regexp_replace(real_reviews.text, "[\\r\\n]", " "))
real_reviews = real_reviews.withColumn('text', regexp_replace(real_reviews.text, "[^\x00-\x7F]+", " ")) 

In [7]:
real_reviews.count()

6000

In [8]:
real_reviews.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|pDoDY-cDLyeKQgDx5...|   0|2012-04-28 09:28:54|    0|iS0ANobtW1ZfBdu36...|  5.0|Great food. Try t...|     0|0MKwtAFrhNslL-h5E...|
|ybbcg01-j7tKJ_oLE...|   0|2015-04-01 15:20:43|    0|Sc0nxPCLfPt7KHdN3...|  2.0|Very average Toro...|     0|GqGlMZegcGp5GKM6N...|
|YzMUZjUMcgI-NSGu4...|   0|2014-07-13 15:06:28|    0|TJjrhzZxMfmMjlGrw...|  4.0|Food was amazon, ...|     0|xvACCLMLVs1p4Q4va...|
|EAs61Wm1O6tLjCs8t...|   0|2015-03-18 21:06:16|    0|LbRonpdNBwxWT4Pou...|  4.0|I recommend there...|     0|pcr9Gj69fZtU5hX6O...|
|nrahyQyopCtajDqUt...|   0|2018-04-04 02:16:24|    0|5gKt71TDpn0LyQHlf...|  5.0|I have eat

Save for tokenization step

In [9]:
real_reviews.select(real_reviews.text).write.format('text').save('fake-reviews/real_reviews.txt')

As noted in the fake_review generation notebook, the regular tokenizers are not good enough, and we have used Stanford's CoreNLP package for tokenization when training our fake review generator model.

In [10]:
!cat fake-reviews/real_reviews.txt/part-*.txt | CLASSPATH=~/dev/stanford-parser-full-2018-10-17/stanford-parser.jar java edu.stanford.nlp.process.PTBTokenizer -lowerCase -preserveLines > fake-reviews/tokenized_real_reviews.txt

PTBTokenizer tokenized 111869 tokens at 741788.36 tokens per second.


In [11]:
tokenized_real_reviews = spark.read.text('fake-reviews/tokenized_real_reviews.txt')

In [12]:
tokenized_real_reviews.count()

6000

In [13]:
from pyspark.sql.functions import lit
tokenized_real_reviews = tokenized_real_reviews.withColumn('label', lit(0))

In [14]:
tokenized_fake_reviews = tokenized_fake_reviews.withColumn('label', lit(1))

In [15]:
reviews = tokenized_fake_reviews.union(tokenized_real_reviews)

In [16]:
train, val = reviews.randomSplit([0.9, 0.1], seed=42)

In [18]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

tokenizer = Tokenizer(inputCol="value", outputCol="tokens")
tf = HashingTF(numFeatures=2**16, inputCol="tokens", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=4)
pipeline = Pipeline(stages=[tokenizer, tf, idf])

idf_model = pipeline.fit(train)
train_df = idf_model.transform(train)
val_df = idf_model.transform(val)

In [19]:
lr = LogisticRegression(maxIter=100, labelCol='label')
lr_model = lr.fit(train_df)

In [20]:
predict_test = lr_model.transform(val_df)

Area under Curve / ROC:

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predict_test)

0.9781689747598836

So we have a good Area under Curve / ROC. This is very dangerous however since in real life fake and real reviews are imbalanced. There are fewer fake reviews than real reviews and we should thus use the f1-score or Area Under Curve / PR instead.

In [24]:
def f1_score(df, p=1, n=0):
    tp = df.where((df.label == p) & (df.prediction == p)).count()
    tn = df.where((df.label == n) & (df.prediction == n)).count()
    fp = df.where((df.label == n) & (df.prediction == p)).count()
    fn = df.where((df.label == p) & (df.prediction == n)).count()
    print('tp {}, fp {}, tn {}, fn {}'.format(tp, fp, tn, fn))
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    print('precision', precision)
    print('recall', recall)
    return 2 * (precision * recall) / (precision + recall)

In [32]:
print('Area under PR', evaluator.evaluate(predict_test, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_test))

Area under PR 0.9748998646699216
tp 564, fp 50, tn 555, fn 28
precision 0.9185667752442996
recall 0.9527027027027027
f1 0.9353233830845772


In [31]:
val.count()

1197

If we create an imbalanced test set, the Area under the curve for precision-recall and the f1-score drops significantly few a 'few fake reviews' / 'a lot of real reviews' scenario.

In [33]:
imbalanced_val = val.where(val.label==1).sample(0.5).union(val.where(val.label==0))
predict_imbalanced = lr_model.transform(idf_model.transform(imbalanced_val))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area under ROC', evaluator.evaluate(predict_imbalanced))
print('Area under PR', evaluator.evaluate(predict_imbalanced, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_imbalanced))

Area under ROC 0.981659460346998
Area under PR 0.9593409499530987
tp 292, fp 50, tn 555, fn 10
precision 0.8538011695906432
recall 0.9668874172185431
f1 0.906832298136646


In [34]:
imbalanced_val = val.where(val.label==0).sample(0.5).union(val.where(val.label==1))
predict_imbalanced = lr_model.transform(idf_model.transform(imbalanced_val))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area under ROC', evaluator.evaluate(predict_imbalanced))
print('Area under PR', evaluator.evaluate(predict_imbalanced, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_imbalanced))

Area under ROC 0.9776888663407435
Area under PR 0.9865839349502402
tp 564, fp 24, tn 269, fn 28
precision 0.9591836734693877
recall 0.9527027027027027
f1 0.9559322033898305


Unfortunately, this test set isn't that large. We can try loading another 10k fake reviews which were created from context inputs the trained model has never seen. We can then try to achieve a 20:80 ratio of fake to real reviews (bear in mind we have no way of knowing whether the Yelp data set does not contain any fake reviews)

In [37]:
more_reviews = spark.read.json('yelp-dataset/yelp_academic_dataset_review.json').sample(False, 0.4, seed=42)
more_reviews = more_reviews.where(length(more_reviews.text) <= 100).limit(40000)
more_reviews = more_reviews.withColumn('text', regexp_replace(more_reviews.text, "[\\r\\n]", " "))
more_reviews = more_reviews.withColumn('text', regexp_replace(more_reviews.text, "[^\x00-\x7F]+", " "))
more_reviews.count()

40000

In [38]:
more_reviews.select(more_reviews.text).write.format('text').save('fake-reviews/real_reviews_40k.txt')
!cat fake-reviews/real_reviews_40k.txt/part-*.txt | CLASSPATH=~/dev/stanford-parser-full-2018-10-17/stanford-parser.jar java edu.stanford.nlp.process.PTBTokenizer -lowerCase -preserveLines > fake-reviews/tokenized_real_reviews_40k.txt

PTBTokenizer tokenized 748407 tokens at 2358930.41 tokens per second.


In [39]:
tokenized_more_real = spark.read.text('fake-reviews/tokenized_real_reviews_40k.txt')

In [40]:
tokenized_more_fake = spark.read.text('fake-reviews/generated_10k.txt')

In [41]:
tokenized_more_real = tokenized_more_real.withColumn('label', lit(0))
tokenized_more_fake = tokenized_more_fake.withColumn('label', lit(1))
test_reviews = tokenized_more_real.union(tokenized_more_fake)

In [42]:
test_reviews.count()

50000

In [48]:
predict_test = lr_model.transform(idf_model.transform(test_reviews))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area under ROC', evaluator.evaluate(predict_test))
print('Area under PR', evaluator.evaluate(predict_test, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_test))

Area under ROC 0.9749909325000512
Area under PR 0.8945515982757802
tp 9378, fp 3099, tn 36901, fn 622
precision 0.7516229862947824
recall 0.9378
f1 0.8344529963963163


So at first glance, the F1-Score isn't all that bad, the precision is not that great however. Looking at the precision and recall, the question is whether we could live with these high misclassification rates in a production environment. What is more important: preventing as many fake reviews as possible, or letting through some fake reviews in order to not annoy users?

We can also look at the macro averaged F1-Score for both classes.

In [47]:
print('avg f1', (f1_score(predict_test, 1, 0) + f1_score(predict_test, 0, 1))/2)

tp 9378, fp 3099, tn 36901, fn 622
precision 0.7516229862947824
recall 0.9378
tp 36901, fp 622, tn 9378, fn 3099
precision 0.983423500253178
recall 0.922525
avg f1 0.8932271689668332


Looking at a naive decision tree with tf-idf as features, it's worse than a logistic regression.

In [50]:
from pyspark.ml.classification import DecisionTreeClassifier

In [51]:
dt = DecisionTreeClassifier(labelCol='label')
dt_model = dt.fit(train_df)

In [52]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predict_dt = dt_model.transform(idf_model.transform(test_reviews))
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
print('Area under ROC', evaluator.evaluate(predict_dt))
print('Area under PR', evaluator.evaluate(predict_dt, {evaluator.metricName: 'areaUnderPR'}))
print('f1', f1_score(predict_dt))

Area under ROC 0.89140748125
Area under PR 0.7149621761245883
tp 9138, fp 4095, tn 35905, fn 862
precision 0.690546361369304
recall 0.9138
f1 0.7866396935393623


A random forest performs roughly as well as a logistic regression

In [53]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='label', numTrees=100, maxDepth=4, maxBins=32)
rf_model = rf.fit(train_df)

In [54]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predict_rf = rf_model.transform(idf_model.transform(test_reviews))
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
print('Area under ROC', evaluator.evaluate(predict_rf))
print('Area under PR', evaluator.evaluate(predict_rf, {evaluator.metricName: 'areaUnderPR'}))
print('f1', f1_score(predict_rf))

Area under ROC 0.9820250950000251
Area under PR 0.9473892458185246
tp 9427, fp 2948, tn 37052, fn 573
precision 0.7617777777777778
recall 0.9427
f1 0.8426368715083798


## Improvements

We can try to improve the logistic regression approach by using stop words

In [56]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, NGram
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

stop_words = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"]
tokenizer2 = Tokenizer(inputCol="value", outputCol="tokens")
stopwords2 = StopWordsRemover(inputCol="tokens", outputCol="filtered").setStopWords(stop_words)
tf2 = HashingTF(numFeatures=2**16, inputCol="filtered", outputCol='tf')
idf2 = IDF(inputCol='tf', outputCol="features", minDocFreq=4)
pipeline2 = Pipeline(stages=[tokenizer2, stopwords2, tf2, idf2])

idf_model2 = pipeline.fit(train)
train_df2 = idf_model.transform(train)
val_df2 = idf_model.transform(val)

In [69]:
lr2 = LogisticRegression(maxIter=100, labelCol='label')
lr_model2 = lr2.fit(train_df2)

In [70]:
predict_test2 = lr_model2.transform(idf_model2.transform(test_reviews))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area under ROC', evaluator.evaluate(predict_test2))
print('Area under PR', evaluator.evaluate(predict_test2, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_test2))

Area under ROC 0.9749909325000512
Area under PR 0.8945515982757802
tp 9378, fp 3099, tn 36901, fn 622
precision 0.7516229862947824
recall 0.9378
f1 0.8344529963963163


Unfortunately this didn't seem to help. Another approach is to use NGrams, let's test a 1-gram, 2-gram and 3-gram input.

In [77]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, NGram, VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

def ngram_pipeline():
    tokenizer = Tokenizer(inputCol="value", outputCol="tokens")
    ngrams1 = NGram(n=1, inputCol="tokens", outputCol="ngrams1")
    ngrams2 = NGram(n=2, inputCol="tokens", outputCol="ngrams2")
    ngrams3 = NGram(n=3, inputCol="tokens", outputCol="ngrams3")
    tf1 = HashingTF(numFeatures=2**16, inputCol="ngrams1", outputCol="tf1")
    tf2 = HashingTF(numFeatures=2**16, inputCol="ngrams2", outputCol="tf2")
    tf3 = HashingTF(numFeatures=2**16, inputCol="ngrams3", outputCol="tf3")
    idf1 = IDF(inputCol="tf2", outputCol="idf1", minDocFreq=4)
    idf2 = IDF(inputCol="tf2", outputCol="idf2", minDocFreq=4)
    idf3 = IDF(inputCol="tf3", outputCol="idf3", minDocFreq=4)
    assembler = VectorAssembler(inputCols=["tf1", "tf2", "tf3"],outputCol="features")
    return Pipeline(stages=[tokenizer, ngrams1, ngrams2, ngrams3, tf1, tf2, tf3, idf1, idf2, idf3, assembler])

In [78]:
ngram_model = ngram_pipeline().fit(train)
train_ngram = ngram_model.transform(train)

In [79]:
lr_ngram = LogisticRegression(maxIter=100, labelCol='label')
lr_ngram_model = lr_ngram.fit(train_ngram)

We see that this has very much improved our f1-score! False positives are significantly reduced in this skewed test set.

In [80]:
predict_ngram = lr_ngram_model.transform(ngram_model.transform(test_reviews))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area under ROC', evaluator.evaluate(predict_ngram))
print('Area under PR', evaluator.evaluate(predict_ngram, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_ngram))

Area under ROC 0.9960579725000438
Area under PR 0.9887280759781427
tp 9841, fp 1619, tn 38381, fn 159
precision 0.8587260034904014
recall 0.9841
f1 0.917148182665424


The model also performs well on our smaller, balanced test set:

In [84]:
predict_ngram_val = lr_ngram_model.transform(ngram_model.transform(val))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area under ROC', evaluator.evaluate(predict_ngram_val))
print('Area under PR', evaluator.evaluate(predict_ngram_val, {evaluator.metricName: "areaUnderPR"}))
print('f1', f1_score(predict_ngram_val))

Area under ROC 0.9959794505249049
Area under PR 0.9961260978487302
tp 582, fp 24, tn 581, fn 10
precision 0.9603960396039604
recall 0.9831081081081081
f1 0.9716193656093489
