In [0]:
# Load in one of the tables
df = spark.sql("select * from default.reviews_train").sample(0.01,47)
print((df.count(), len(df.columns)))

(31320, 11)


In [0]:
df.printSchema()

root
 |-- reviewID: integer (nullable = true)
 |-- overall: double (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: integer (nullable = true)
 |-- label: integer (nullable = true)



In [0]:
# Let's look at some quick summary statistics
df.describe().show()

+-------+------------------+-----------------+----------+--------------------+--------------------+------------+--------------------+--------------+--------------------+-------------------+
|summary|          reviewID|          overall|reviewTime|          reviewerID|                asin|reviewerName|          reviewText|       summary|      unixReviewTime|              label|
+-------+------------------+-----------------+----------+--------------------+--------------------+------------+--------------------+--------------+--------------------+-------------------+
|  count|             31320|            31320|     31320|               31320|               31320|       31317|               31320|         31317|               31320|              31320|
|   mean|1568608.4842911877|4.320657726692209|      null|                null| 3.759744299164589E7|         NaN|                 1.0|        1984.0| 1.393091244137931E9|0.17723499361430395|
| stddev|  905932.476062001|1.121994969453793|    

In [0]:
# The count of each overall rating

from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df.groupBy(["overall",'label']).count().orderBy(col('overall').desc()).orderBy(col('label').desc()).withColumn("percentage", F.col("count") / F.sum("count").over(Window.partitionBy("overall"))).show()


+-------+-----+-----+-------------------+
|overall|label|count|         percentage|
+-------+-----+-----+-------------------+
|    1.0|    1|  678| 0.4291139240506329|
|    1.0|    0|  902| 0.5708860759493671|
|    2.0|    1|  441| 0.3174946004319654|
|    2.0|    0|  948| 0.6825053995680346|
|    3.0|    1|  621|0.23389830508474577|
|    3.0|    0| 2034| 0.7661016949152543|
|    4.0|    1| 1022| 0.1864963503649635|
|    4.0|    0| 4458| 0.8135036496350365|
|    5.0|    1| 2789| 0.1379600316580926|
|    5.0|    0|17427| 0.8620399683419074|
+-------+-----+-----+-------------------+



In [0]:
# Drop duplicates

print("Before duplication removal: ", df.count())
df = df.dropDuplicates(['reviewerID', 'asin', 'label'])
print("After duplication removal: ", df.count())

Before duplication removal:  31320
After duplication removal:  31294


In [0]:
# For our intitial modeling efforts, we are not going to use the following features
drop_list = ['reviewerName','reviewTime']
# drop_list = ['overall', 'summary', 'asin', 'reviewID', 'reviewerID', 'summary', 'unixReviewTime','reviewTime', 'image', 'style', 'verified', 'reviewerName']
df = df.select([column for column in df.columns if column not in drop_list])
df.show(5)
print((df.count(), len(df.columns)))

+--------+-------+--------+--------------------+----------+--------------------+--------------------+--------------+-----+
|reviewID|overall|verified|          reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|
+--------+-------+--------+--------------------+----------+--------------------+--------------------+--------------+-----+
|  915322|    5.0|    true|A0014392U7DSQERYR8EC|B0009ZAA7Y|Excellent product...|               *****|    1408060800|    0|
|  757854|    4.0|   false|A01400728E7LZSFKVHDR|0007461100|I don't know much...| A good good album.!|    1502409600|    0|
| 3048792|    5.0|   false|A0277912HT4JSJKVSL3E|B00RKK11O2|love the story , ...|      love the story|    1445904000|    1|
| 1020416|    5.0|    true|A0279230T5X4MD3OD3IH|B000COC69C|I bought my first...|An Old Friend Com...|    1419552000|    0|
| 1579561|    4.0|    true|A02967948XGO2PAHLYNV|0060751967|A little bit of a...|      Slow but good.|    1357084800|    0|
+--------+------

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

df = df.withColumn("length", F.length("reviewText"))
df.show(5)

+--------+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+------+
|reviewID|overall|verified|    reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|length|
+--------+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+------+
| 3025622|    3.0|   false|A100DXY4SLAMPM|B00KTNU40E|If I were a bette...|Okay, But Better ...|    1416873600|    1|   708|
| 2029373|    1.0|   false|A1015LXPAN91MN|B0015M0UAM|The gun in our 14...|Looks great!  Coo...|    1225929600|    1|  3065|
|  515656|    5.0|    true|A108BUPYBC9FMZ|B0026FCARI|favorite for all ...|          Five Stars|    1426032000|    0|    23|
| 2460347|    1.0|   false|A10B4PQQTT28RV|B001G8Y2X6|We are very caref...|Here we go again ...|    1508457600|    0|   754|
| 1242329|    3.0|    true|A10BH1JDT6SNGA|0060187956|Interesting but d...|Good for history ...|    1493769600|    0|    31|
+-------

In [0]:
df=df.withColumn('verified_ind', (F.col('verified').cast('int')))
df=df.drop('verified')
df.show(5)

# df.groupBy(["verified_ind",'label']).count().orderBy(col('verified_ind').desc()).orderBy(col('label').desc()).withColumn("percentage", F.col("count") / F.sum("count").over(Window.partitionBy("verified_ind"))).show(5)

# df.groupBy(["overall",'verified_ind']).count().orderBy(col('overall').desc()).orderBy(col('verified_ind').desc()).withColumn("percentage", F.col("count") / F.sum("count").over(Window.partitionBy("overall"))).show(20)

+--------+-------+--------------+----------+--------------------+--------------------+--------------+-----+------+------------+
|reviewID|overall|    reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|length|verified_ind|
+--------+-------+--------------+----------+--------------------+--------------------+--------------+-----+------+------------+
| 3025622|    3.0|A100DXY4SLAMPM|B00KTNU40E|If I were a bette...|Okay, But Better ...|    1416873600|    1|   708|           0|
| 2029373|    1.0|A1015LXPAN91MN|B0015M0UAM|The gun in our 14...|Looks great!  Coo...|    1225929600|    1|  3065|           0|
|  515656|    5.0|A108BUPYBC9FMZ|B0026FCARI|favorite for all ...|          Five Stars|    1426032000|    0|    23|           1|
| 2460347|    1.0|A10B4PQQTT28RV|B001G8Y2X6|We are very caref...|Here we go again ...|    1508457600|    0|   754|           0|
| 1242329|    3.0|A10BH1JDT6SNGA|0060187956|Interesting but d...|Good for history ...|    1493769600|   

In [0]:
# Convert Unix timestamp to readable date

from pyspark.sql.functions import from_unixtime, to_date
from pyspark.sql.types import *
import datetime
from pyspark.sql.functions import year, month, dayofweek
from pyspark.sql.functions import datediff,col

from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *

df_date = df.withColumn("reviewTime", to_date(from_unixtime(df.unixReviewTime))) \
                                                .drop("unixReviewTime")
# df_date.select(
#     year("reviewTime").alias('year'), 
#     month("reviewTime").alias('month'), 
#     dayofmonth("reviewTime").alias('day')
# ).show(5)

#df_date = df_date.withColumn('Year', year(col('reviewTime')))
# df_date = df_date.withColumn('month', month(col('reviewTime')))
# df_date_transformed = df_date.withColumn('day', dayofweek(col('reviewTime')))
df_date.withColumn("current_date", current_date())
df_date_transformed=df_date.withColumn("diff_in_years", datediff(col("current_date"),col("reviewTime"))/365.25)

df_date_transformed=df_date_transformed.drop('reviewTime')
df_date_transformed.show(5)

df_date_transformed.show(5)

+--------+-------+--------------+----------+--------------------+--------------------+-----+------+------------+------------------+
|reviewID|overall|    reviewerID|      asin|          reviewText|             summary|label|length|verified_ind|     diff_in_years|
+--------+-------+--------------+----------+--------------------+--------------------+-----+------+------------+------------------+
| 3025622|    3.0|A100DXY4SLAMPM|B00KTNU40E|If I were a bette...|Okay, But Better ...|    1|   708|           0| 7.819301848049282|
| 2029373|    1.0|A1015LXPAN91MN|B0015M0UAM|The gun in our 14...|Looks great!  Coo...|    1|  3065|           0|13.869952087611225|
|  515656|    5.0|A108BUPYBC9FMZ|B0026FCARI|favorite for all ...|          Five Stars|    0|    23|           1| 7.529089664613279|
| 2460347|    1.0|A10B4PQQTT28RV|B001G8Y2X6|We are very caref...|Here we go again ...|    0|   754|           0| 4.917180013689254|
| 1242329|    3.0|A10BH1JDT6SNGA|0060187956|Interesting but d...|Good for hi

In [0]:
# The ratio over year

# from pyspark.sql.functions import col
# from pyspark.sql import functions as F
# from pyspark.sql.window import Window

# df_date_transformed.groupBy(["Year",'label']).count().orderBy(col('Year').desc()).orderBy(col('label').desc()).withColumn("percentage", F.col("count") / F.sum("count").over(Window.partitionBy("Year"))).show(5)

# df_date_transformed.groupBy(["month",'label']).count().orderBy(col('month').desc()).orderBy(col('label').desc()).withColumn("percentage", F.col("count") / F.sum("count").over(Window.partitionBy("month"))).show(30)

# df_date_transformed.groupBy(["day",'label']).count().orderBy(col('day').desc()).orderBy(col('label').desc()).withColumn("percentage", F.col("count") / F.sum("count").over(Window.partitionBy("day"))).show(30)



In [0]:
from pyspark.ml.feature import StringIndexer


df_date_transformed=df_date_transformed.withColumn('overall_str', (F.col('overall').cast('string')))
df_date_transformed=df_date_transformed.drop('overall')

indexer = StringIndexer(inputCols=["reviewerID","asin", "overall_str"], 
                        outputCols=["reviewerIDIndex", "asinIndex", "overallIndex"],
                        handleInvalid='keep' 
                       ).fit(df_date_transformed)
indexed_df = indexer.transform(df_date_transformed)

indexed_df.show(5)
print((indexed_df.count(), len(indexed_df.columns)))


+--------+--------------+----------+--------------------+--------------------+-----+------+------------+------------------+-----------+---------------+---------+------------+
|reviewID|    reviewerID|      asin|          reviewText|             summary|label|length|verified_ind|     diff_in_years|overall_str|reviewerIDIndex|asinIndex|overallIndex|
+--------+--------------+----------+--------------------+--------------------+-----+------+------------+------------------+-----------+---------------+---------+------------+
| 3025622|A100DXY4SLAMPM|B00KTNU40E|If I were a bette...|Okay, But Better ...|    1|   708|           0| 7.819301848049282|        3.0|          886.0|   5376.0|         2.0|
| 2029373|A1015LXPAN91MN|B0015M0UAM|The gun in our 14...|Looks great!  Coo...|    1|  3065|           0|13.869952087611225|        1.0|          888.0|  10789.0|         3.0|
|  515656|A108BUPYBC9FMZ|B0026FCARI|favorite for all ...|          Five Stars|    0|    23|           1| 7.529089664613279|  

In [0]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
    inputCols=["reviewerIDIndex","asinIndex", "overallIndex"],  
    outputCols=["reviewerIDVec","asinVec", "overallVec"],
    handleInvalid='keep',
    dropLast=True
).fit(indexed_df)

encoded_df = encoder.transform(indexed_df)

encoded_df.show(5)
print((encoded_df.count(), len(encoded_df.columns)))

+--------+--------------+----------+--------------------+--------------------+-----+------+------------+------------------+-----------+---------------+---------+------------+-------------------+--------------------+-------------+
|reviewID|    reviewerID|      asin|          reviewText|             summary|label|length|verified_ind|     diff_in_years|overall_str|reviewerIDIndex|asinIndex|overallIndex|      reviewerIDVec|             asinVec|   overallVec|
+--------+--------------+----------+--------------------+--------------------+-----+------+------------+------------------+-----------+---------------+---------+------------+-------------------+--------------------+-------------+
| 3025622|A100DXY4SLAMPM|B00KTNU40E|If I were a bette...|Okay, But Better ...|    1|   708|           0| 7.819301848049282|        3.0|          886.0|   5376.0|         2.0|(30299,[886],[1.0])|(14428,[5376],[1.0])|(6,[2],[1.0])|
| 2029373|A1015LXPAN91MN|B0015M0UAM|The gun in our 14...|Looks great!  Coo...|  

In [0]:
encoded_df_clean=encoded_df.drop('reviewerID','asin', "reviewerIDIndex", "asinIndex", "overallIndex", "reviewID")
encoded_df_clean.show(5)

+--------------------+--------------------+-----+------+------------+------------------+-----------+-------------------+--------------------+-------------+
|          reviewText|             summary|label|length|verified_ind|     diff_in_years|overall_str|      reviewerIDVec|             asinVec|   overallVec|
+--------------------+--------------------+-----+------+------------+------------------+-----------+-------------------+--------------------+-------------+
|If I were a bette...|Okay, But Better ...|    1|   708|           0| 7.819301848049282|        3.0|(30299,[886],[1.0])|(14428,[5376],[1.0])|(6,[2],[1.0])|
|The gun in our 14...|Looks great!  Coo...|    1|  3065|           0|13.869952087611225|        1.0|(30299,[888],[1.0])|(14428,[10789],[1...|(6,[3],[1.0])|
|favorite for all ...|          Five Stars|    0|    23|           1| 7.529089664613279|        5.0|(30299,[936],[1.0])|(14428,[4948],[1.0])|(6,[0],[1.0])|
|We are very caref...|Here we go again ...|    0|   754|        

In [0]:
print((encoded_df_clean.count(), len(encoded_df_clean.columns)))

(31294, 10)


In [0]:
encoded_df_clean.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- length: integer (nullable = true)
 |-- verified_ind: integer (nullable = true)
 |-- diff_in_years: double (nullable = true)
 |-- overall_str: string (nullable = true)
 |-- reviewerIDVec: vector (nullable = true)
 |-- asinVec: vector (nullable = true)
 |-- overallVec: vector (nullable = true)



In [0]:
df.groupBy("label").count().show()

+-----+-------+
|label|  count|
+-----+-------+
|    1| 528580|
|    0|2379800|
+-----+-------+



In [0]:
# set seed for reproducibility
(trainingData, testingData) = encoded_df_clean.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testingData.count()))
trainingData.show(5)

Training Dataset Count: 25110
Test Dataset Count: 6184
+--------------------+--------------------+-----+------+------------+------------------+-----------+--------------------+--------------------+-------------+
|          reviewText|             summary|label|length|verified_ind|     diff_in_years|overall_str|       reviewerIDVec|             asinVec|   overallVec|
+--------------------+--------------------+-----+------+------------+------------------+-----------+--------------------+--------------------+-------------+
| Book 1 Against t...|I Still Beg The Q...|    0|  1820|           0| 8.624229979466119|        3.0|(30299,[19350],[1...|(14428,[6397],[1.0])|(6,[2],[1.0])|
|"Crash Bandicoot ...|One giant step fo...|    0|   663|           0|20.416153319644078|        4.0| (30299,[597],[1.0])|(14428,[7362],[1.0])|(6,[1],[1.0])|
|"Sapiens" is a tr...|Everyone Should T...|    0|   463|           1|  5.06776180698152|        5.0|(30299,[21104],[1...|  (14428,[60],[1.0])|(6,[0],[1.0])|
|"S

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import NGram
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Word2Vec

import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, Stemmer, lemmatizer

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, StringIndexer, SQLTransformer, IndexToString, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# documentAssembler = DocumentAssembler()\
#     .setInputCol("reviewText")\
#     .setOutputCol("document")

# sentenceDetector = SentenceDetector()\
#     .setInputCols(['document'])\
#     .setOutputCol('sentences')

# tokenizer = Tokenizer() \
#     .setInputCols(["document"]) \
#     .setOutputCol("token")

# normalizer = Normalizer() \
#     .setInputCols(["token"]) \
#     .setOutputCol("normalized")\
#     .setLowercase(True)\
#     .setCleanupPatterns(["[^\w\d\s]"])

# stopwords_cleaner = StopWordsCleaner()\
#     .setInputCols("token")\
#     .setOutputCol("removed_stopwords")\
#     .setCaseSensitive(False)\

# stemmer = Stemmer() \
#     .setInputCols(["token"]) \
#     .setOutputCol("stem")

# convert text column to spark nlp document
document_assembler = DocumentAssembler() \
    .setInputCol("reviewText") \
    .setOutputCol("document")

document_assembler_s = DocumentAssembler() \
    .setInputCol("summary") \
    .setOutputCol("document_s")

# convert document to array of tokens 
tokenizer = Tokenizer() \
  .setInputCol("document") \
  .setOutputCol("token")

tokenizer_s = Tokenizer() \
  .setInputCol("document_s") \
  .setOutputCol("token_s")

# clean tokens 
normalizer = Normalizer() \
    .setInputCol("token") \
    .setOutputCol("normalized")

normalizer_s = Normalizer() \
    .setInputCol("token_s") \
    .setOutputCol("normalized_s")

# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCol("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stopwords_cleaner_s = StopWordsCleaner()\
      .setInputCol("normalized_s")\
      .setOutputCol("cleanTokens_s")\
      .setCaseSensitive(False)

# stems tokens to bring it to root form
# stemmer = Stemmer() \
#     .setInputCols(["cleanTokens"]) \
#     .setOutputCol("stem")

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCol("cleanTokens") \
     .setOutputCol('lemma')

lemmatizer_s = LemmatizerModel.pretrained() \
     .setInputCol("cleanTokens_s") \
     .setOutputCol("lemma_s")

# Convert custom document structure to array of tokens.
finisher = Finisher() \
    .setInputCol('lemma') \
    .setOutputCol("token_features") \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

finisher_s = Finisher() \
    .setInputCol("lemma_s") \
    .setOutputCol("token_features_s") \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

# lemmatizer = Lemmatizer() \
#     .setInputCols(["token"]) \
#     .setOutputCol("lemma") \
#     .setDictionary("./https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")



#countVectors = CountVectorizer(setInputCols="lemma", setOutputCol="features", vocabSize=10000, minDF=5)
# countVectors = CountVectorizer() \
#     .inputCol('lemma') \
#     .outputCol("features") \
#     .vocabSize=10000 \
#     .minDF=5

# Generate Term Frequency
tf = CountVectorizer(inputCols=["token_features", "token_features_s"], outputCols=["rawFeatures", "rawFeatures_s"], vocabSize=10000, minTF=1, minDF=50, maxDF=0.40)

# Generate Inverse Document Frequency weighting
idf = IDF(inputCol=["rawFeatures", "rawFeatures_s"], outputCol=["idfFeatures","idfFeatures_s"], minDocFreq=5)

# Machine Learning Algorithm
#ml_alg  = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)
# ml_alg  = RandomForestClassifier(numTrees=100, featureSubsetStrategy="auto", impurity='gini', maxDepth=4, maxBins=32)


# stringIndexer = StringIndexer(inputCols=["reviewerID","asin"], outputCols=["reviewerIDIndex","asinIndex"], handleInvalid="keep")

# encoder = OneHotEncoder(inputCols=["reviewerIDIndex","asinIndex"], outputCols=["reviewerIDVec","asinVec"], dropLast=False)
# encoder_test = OneHotEncoder(inputCols=["reviewerIDIndex","asinIndex"], outputCols=["reviewerIDVec","asinVec"], dropLast=True)
# # Combine all features into one final "features" column
assembler = VectorAssembler(inputCols=["idfFeatures", "idfFeatures_s", "reviewerIDVec","asinVec","verified_ind","overallVec", "diff_in_years"], outputCol="features")

# assembler = VectorAssembler(inputCols=["verified", "overall", "idfFeatures"], outputCol="features")

nlpPipeline = Pipeline(stages=[document_assembler,
                               document_assembler_s,
                               #sentenceDetector,
                               tokenizer,
                               tokenizer_s,
                               normalizer,
                               normalizer_s,
                               stopwords_cleaner,
                               stopwords_cleaner_s,
                               #stemmer,
                               lemmatizer,
                               lemmatizer_s,
                               finisher,
                               finisher_s,
#                                stringIndexer,
#                                encoder,
            tf,
            idf,
            

                               assembler
                               ])

# nlpPipeline_test = Pipeline(stages=[document_assembler,
#                                #sentenceDetector,
#                                tokenizer,
#                                normalizer,
#                                stopwords_cleaner,
#                                #stemmer,
#                                lemmatizer,
#                                finisher,
#                                stringIndexer,
#                                encoder_test,
#             tf,
#             idf,
            

#                                assembler
#                                ])

# nlp_pipeline = Pipeline(
#     stages=[document_assembler, 
#             tokenizer,
#             normalizer,
#             stopwords_cleaner, 
#             stemmer, 
#             finisher,
#             tf,
#             idf,
#             assembler,
#             ml_alg])



[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-678707905058868>[0m in [0;36m<cell line: 69>[0;34m()[0m
[1;32m     67[0m [0;34m[0m[0m
[1;32m     68[0m [0;31m# convert document to array of tokens[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 69[0;31m [0mtokenizer[0m [0;34m=[0m [0mTokenizer[0m[0;34m([0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     70[0m   [0;34m.[0m[0msetInputCol[0m[0;34m([0m[0;34m"document"[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m     71[0m   [0;34m.[0m[0msetOutputCol[0m[0;34m([0m[0;34m"token"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mAttributeError[0m: 'Tokenizer' object has no attribute 'setInputCol'

In [0]:
# from pyspark.ml import Pipeline
# from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
# from pyspark.ml.feature import NGram
# from pyspark.ml.feature import HashingTF, IDF, Tokenizer
# from pyspark.ml.feature import Word2Vec

# import pandas as pd
# import numpy as np
# import json
# from pyspark.ml import Pipeline
# from pyspark.sql import SparkSession
# import pyspark.sql.functions as F
# from sparknlp.annotator import *
# from sparknlp.base import *
# import sparknlp
# from sparknlp.pretrained import PretrainedPipeline

# import pandas as pd
# import numpy as np
# import json
# from pyspark.ml import Pipeline
# from pyspark.sql import SparkSession
# import pyspark.sql.functions as F

# from sparknlp.base import DocumentAssembler, Finisher
# from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, Stemmer, lemmatizer

# from pyspark.ml import Pipeline
# from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, StringIndexer, SQLTransformer, IndexToString, VectorAssembler
# from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
# from pyspark.ml.feature import OneHotEncoder, StringIndexer
# # documentAssembler = DocumentAssembler()\
# #     .setInputCol("reviewText")\
# #     .setOutputCol("document")

# # sentenceDetector = SentenceDetector()\
# #     .setInputCols(['document'])\
# #     .setOutputCol('sentences')

# # tokenizer = Tokenizer() \
# #     .setInputCols(["document"]) \
# #     .setOutputCol("token")

# # normalizer = Normalizer() \
# #     .setInputCols(["token"]) \
# #     .setOutputCol("normalized")\
# #     .setLowercase(True)\
# #     .setCleanupPatterns(["[^\w\d\s]"])

# # stopwords_cleaner = StopWordsCleaner()\
# #     .setInputCols("token")\
# #     .setOutputCol("removed_stopwords")\
# #     .setCaseSensitive(False)\

# # stemmer = Stemmer() \
# #     .setInputCols(["token"]) \
# #     .setOutputCol("stem")

# # convert text column to spark nlp document
# document_assembler = DocumentAssembler() \
#     .setInputCol("reviewText") \
#     .setOutputCol("document")\
#     .setCleanupMode("shrink")

# #Document normalizer
# cleanUpPatterns = ["<[^>]*>"]

# documentNormalizer = DocumentNormalizer() \
#     .setInputCols("document") \
#     .setOutputCol("normalizedDocument") \
#     .setAction("clean") \
#     .setPatterns(cleanUpPatterns) \
#     .setReplacement(" ") \
#     .setPolicy("pretty_all") \
#     .setLowercase(True)

# #setence detector
# # sentenceDetector = SentenceDetector()\
# #     .setInputCols(['normalizedDocument'])\
# #     .setOutputCol('sentences')

# #sentence dectector alternative
# sentencerDL = SentenceDetectorDLModel\
#     .pretrained("sentence_detector_dl", "en") \
#     .setInputCols(["normalizedDocument"]) \
#     .setOutputCol("sentences")

# # convert document to array of tokens
# # tokenizer = Tokenizer() \
# #   .setInputCols(["sentences"]) \
# #   .setOutputCol("token")
 
# # tokenizer = Tokenizer() \
# #     .setInputCols(["sentences"]) \
# #     .setOutputCol("token") \
# #     .setSplitChars(['-']) \
# #     .setContextChars(['?', '!']) \
# #     .addException("New York") \

# #Regex
# pattern = "\\s+|(?=[-.:;*+,$&%\\[\\]])|(?<=[-.:;*+,$&%\\[\\]])"
# regexTokenizer = RegexTokenizer() \
#     .setInputCols(["sentences"]) \
#     .setOutputCol("regexToken") \
#     .setPattern(pattern) \
#     .setPositionalMask(False)


# # clean tokens 
# # normalizer = Normalizer() \
# #     .setInputCols(["token"]) \
# #     .setOutputCol("normalized")

# normalizer = Normalizer() \
#     .setInputCols(["regexToken"]) \
#     .setOutputCol("normalized")\
#     .setLowercase(True)\
#     .setCleanupPatterns(["[^\w\d\s]"]) # remove punctuations (keep alphanumeric chars)
#     # if we don't set CleanupPatterns, it will only keep alphabet letters ([^A-Za-z])

# # remove stopwords
# stopwords_cleaner = StopWordsCleaner()\
#       .setInputCols("normalized")\
#       .setOutputCol("cleanTokens")\
#       .setCaseSensitive(False)

# # stems tokens to bring it to root form
# # stemmer = Stemmer() \
# #     .setInputCols(["cleanTokens"]) \
# #     .setOutputCol("stem")

# lemmatizer = LemmatizerModel.pretrained() \
#      .setInputCols(['cleanTokens']) \
#      .setOutputCol('lemma')

# # Convert custom document structure to array of tokens.
# finisher = Finisher() \
#     .setInputCols(["lemma"]) \
#     .setOutputCols(["token_features"]) \
#     .setOutputAsArray(True) \
#     .setCleanAnnotations(False)

# # lemmatizer = Lemmatizer() \
# #     .setInputCols(["token"]) \
# #     .setOutputCol("lemma") \
# #     .setDictionary("./https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")



# #countVectors = CountVectorizer(setInputCols="lemma", setOutputCol="features", vocabSize=10000, minDF=5)
# # countVectors = CountVectorizer() \
# #     .inputCol('lemma') \
# #     .outputCol("features") \
# #     .vocabSize=10000 \
# #     .minDF=5

# # Generate Term Frequency
# tf = CountVectorizer(inputCol="token_features", outputCol="rawFeatures", vocabSize=10000, minTF=1, minDF=50, maxDF=0.40)

# # Generate Inverse Document Frequency weighting
# idf = IDF(inputCol="rawFeatures", outputCol="idfFeatures", minDocFreq=5)

# # Machine Learning Algorithm
# #ml_alg  = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)
# # ml_alg  = RandomForestClassifier(numTrees=100, featureSubsetStrategy="auto", impurity='gini', maxDepth=4, maxBins=32)


# # stringIndexer = StringIndexer(inputCols=["reviewerID","asin"], outputCols=["reviewerIDIndex","asinIndex"], handleInvalid="keep")

# # encoder = OneHotEncoder(inputCols=["reviewerIDIndex","asinIndex"], outputCols=["reviewerIDVec","asinVec"], dropLast=False)
# # encoder_test = OneHotEncoder(inputCols=["reviewerIDIndex","asinIndex"], outputCols=["reviewerIDVec","asinVec"], dropLast=True)
# # # Combine all features into one final "features" column
# assembler = VectorAssembler(inputCols=["idfFeatures", "reviewerIDVec","asinVec","verified_ind","overallVec", "diff_in_years"], outputCol="features")

# # assembler = VectorAssembler(inputCols=["verified", "overall", "idfFeatures"], outputCol="features")
# # paramGrid = ParamGridBuilder() \
# #     .addGrid(ml_alg.regParam, [0.3, 0.5, 0.7]) \
# #     .addGrid(ml_alg.elasticNetParam, [0.0]) \
# #     .addGrid(tf.minTF, [1, 100, 1000]) \
# #     .addGrid(tf.vocabSize, [500, 1000, 2500, 5000]) \
# #     .build()


# nlpPipeline = Pipeline(stages=[document_assembler,
#                                #sentenceDetector,
#                                documentNormalizer,
#                                sentencerDL,
# #                                tokenizer,
#                                regexTokenizer,
#                                normalizer,
#                                stopwords_cleaner,
#                                #stemmer,
#                                lemmatizer,
#                                finisher,
# #                                stringIndexer,
# #                                encoder,
#             tf,
#             idf,
#                                assembler
#                                ])

# # nlpPipeline_test = Pipeline(stages=[document_assembler,
# #                                #sentenceDetector,
# #                                tokenizer,
# #                                normalizer,
# #                                stopwords_cleaner,
# #                                #stemmer,
# #                                lemmatizer,
# #                                finisher,
# #                                stringIndexer,
# #                                encoder_test,
# #             tf,
# #             idf,
            

# #                                assembler
# #                                ])

# # nlp_pipeline = Pipeline(
# #     stages=[document_assembler, 
# #             tokenizer,
# #             normalizer,
# #             stopwords_cleaner, 
# #             stemmer, 
# #             finisher,
# #             tf,
# #             idf,
# #             assembler,
# #             ml_alg])



In [0]:
# Fit the pipeline to training documents.
pipelineFit = nlpPipeline.fit(trainingData)
trainingDataTransformed = pipelineFit.transform(trainingData)
trainingDataTransformed.show(5)
print((trainingDataTransformed.count(), len(trainingDataTransformed.columns)))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-1968418051050861>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;31m# Fit the pipeline to training documents.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mpipelineFit[0m [0;34m=[0m [0mnlpPipeline[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mtrainingData[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m [0mtrainingDataTransformed[0m [0;34m=[0m [0mpipelineFit[0m[0;34m.[0m[0mtransform[0m[0;34m([0m[0mtrainingData[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0mtrainingDataTransformed[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;36m5[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0mprint[0m[0;34m([0m[0;34m([0m[0mtrainingDataTransformed[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m,[0m 

In [0]:
trainingDataTransformed.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- length: integer (nullable = true)
 |-- verified_ind: integer (nullable = true)
 |-- diff_in_years: double (nullable = true)
 |-- overall_str: string (nullable = true)
 |-- reviewerIDVec: vector (nullable = true)
 |-- asinVec: vector (nullable = true)
 |-- overallVec: vector (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true

In [0]:
from pyspark.ml.classification import LogisticRegression
# from sklearn.linear_model import LogisticRegression
# More classification docs: https://spark.apache.org/docs/latest/ml-classification-regression.html

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingDataTransformed)
# lr = LogisticRegression()
# lr.fit(X_train_tfidf, y_train)

In [0]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

print("Training Accuracy:  " + str(trainingSummary.accuracy))
print("Training Precision: " + str(trainingSummary.precisionByLabel))
print("Training Recall:    " + str(trainingSummary.recallByLabel))
print("Training FMeasure:  " + str(trainingSummary.fMeasureByLabel()))
print("Training AUC:       " + str(trainingSummary.areaUnderROC))

Training Accuracy:  0.8877118288942154
Training Precision: [0.8896755318972301, 0.8687868037014008]
Training Recall:    [0.9849273572142815, 0.44967568961804494]
Training FMeasure:  [0.934881489300451, 0.5926180032651559]
Training AUC:       0.9583983007439197


In [0]:
# Performance evaluation with 10-fold cross validation

# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# paramGrid = ParamGridBuilder().build()
# cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5)
# cvModel = cv.fit(prediction_doc2vec)

# print("Average AUC = %g" % cvModel.avgMetrics[0])

In [0]:
from pyspark.sql.functions import udf 
testingDataTransform = pipelineFit.transform(testingData)
testingDataTransform.show(5)

+--------------------+-----+------+------------+-----------------+-----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          reviewText|label|length|verified_ind|    diff_in_years|overall_str|       reviewerIDVec|             asinVec|   overallVec|            document|               token|          normalized|         cleanTokens|               lemma|      token_features|         rawFeatures|         idfFeatures|            features|
+--------------------+-----+------+------------+-----------------+-----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| Combaterwing 480...|    1|

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(testingDataTransform)
#predictions.show(5)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Area Under ROC 0.9435316042188884


In [0]:
# Load in the tables
test_df = spark.sql("select * from default.reviews_test")
# drop_list = ['summary','reviewID','unixReviewTime','reviewerName','reviewTime']
# test_df = test_df.select([column for column in df.columns if column not in drop_list])
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

+--------+-------+--------+-----------+--------------+----------+------------+--------------------+--------------------+--------------+
|reviewID|overall|verified| reviewTime|    reviewerID|      asin|reviewerName|          reviewText|             summary|unixReviewTime|
+--------+-------+--------+-----------+--------------+----------+------------+--------------------+--------------------+--------------+
|80000001|    4.0|   false|07 27, 2015|A1JGAP0185YJI6|0700026657|      travis|I played it a whi...|But in spite of t...|    1437955200|
|80000002|    5.0|    true| 03 3, 2014|A1WK5I4874S3O2|0700026657|  WhiteSkull|I bought this gam...|A very good game ...|    1393804800|
|80000003|    5.0|    true|01 12, 2013|A1YDQQJDRHM0FJ|0001713353|       Leila|I am very happy w...|One of our famili...|    1357948800|
|80000004|    5.0|    true|11 20, 2011|A2E6AHFDJ3JBAZ|0681795107|    robosolo|I purchased two o...|Insulated stainle...|    1321747200|
|80000005|    5.0|   false|06 28, 2011|A38NXTZUF

In [0]:
drop_list = ['summary','reviewerName','reviewTime']
# drop_list = ['overall', 'summary', 'asin', 'reviewID', 'reviewerID', 'summary', 'unixReviewTime','reviewTime', 'image', 'style', 'verified', 'reviewerName']
test_df = test_df.select([column for column in test_df.columns if column not in drop_list])
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

+--------+-------+--------+--------------+----------+--------------------+--------------+
|reviewID|overall|verified|    reviewerID|      asin|          reviewText|unixReviewTime|
+--------+-------+--------+--------------+----------+--------------------+--------------+
|80000001|    4.0|   false|A1JGAP0185YJI6|0700026657|I played it a whi...|    1437955200|
|80000002|    5.0|    true|A1WK5I4874S3O2|0700026657|I bought this gam...|    1393804800|
|80000003|    5.0|    true|A1YDQQJDRHM0FJ|0001713353|I am very happy w...|    1357948800|
|80000004|    5.0|    true|A2E6AHFDJ3JBAZ|0681795107|I purchased two o...|    1321747200|
|80000005|    5.0|   false|A38NXTZUFB1O2K|0700099867|I'm not quite fin...|    1309219200|
+--------+-------+--------+--------------+----------+--------------------+--------------+
only showing top 5 rows

(348621, 7)


In [0]:
from pyspark.sql.functions import col
test_df = test_df.withColumn("length", F.length("reviewText"))

test_df=test_df.withColumn('verified_ind', (F.col('verified').cast('int')))
test_df=test_df.drop('verified')
test_df.show(5)

+--------+-------+--------------+----------+--------------------+--------------+------+------------+
|reviewID|overall|    reviewerID|      asin|          reviewText|unixReviewTime|length|verified_ind|
+--------+-------+--------------+----------+--------------------+--------------+------+------------+
|80000001|    4.0|A1JGAP0185YJI6|0700026657|I played it a whi...|    1437955200|   297|           0|
|80000002|    5.0|A1WK5I4874S3O2|0700026657|I bought this gam...|    1393804800|   709|           1|
|80000003|    5.0|A1YDQQJDRHM0FJ|0001713353|I am very happy w...|    1357948800|   121|           1|
|80000004|    5.0|A2E6AHFDJ3JBAZ|0681795107|I purchased two o...|    1321747200|  1267|           1|
|80000005|    5.0|A38NXTZUFB1O2K|0700099867|I'm not quite fin...|    1309219200|  1492|           0|
+--------+-------+--------------+----------+--------------------+--------------+------+------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import from_unixtime, to_date
from pyspark.sql.types import *
import datetime
from pyspark.sql.functions import year, month, dayofweek
from pyspark.sql.functions import datediff,col

from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *

df_date_test = test_df.withColumn("reviewTime", to_date(from_unixtime(test_df.unixReviewTime))) \
                                                .drop("unixReviewTime")
# df_date.select(
#     year("reviewTime").alias('year'), 
#     month("reviewTime").alias('month'), 
#     dayofmonth("reviewTime").alias('day')
# ).show(5)

#df_date = df_date.withColumn('Year', year(col('reviewTime')))
# df_date = df_date.withColumn('month', month(col('reviewTime')))
# df_date_transformed = df_date.withColumn('day', dayofweek(col('reviewTime')))
df_date_test.withColumn("current_date", current_date())
df_date_test_transformed=df_date_test.withColumn("diff_in_years", datediff(col("current_date"),col("reviewTime"))/365.25)

df_date_test_transformed=df_date_test_transformed.drop('reviewTime')
df_date_test_transformed.show(5)

df_date_test_transformed.show(5)


+--------+-------+--------------+----------+--------------------+------+------------+------------------+
|reviewID|overall|    reviewerID|      asin|          reviewText|length|verified_ind|     diff_in_years|
+--------+-------+--------------+----------+--------------------+------+------------+------------------+
|80000001|    4.0|A1JGAP0185YJI6|0700026657|I played it a whi...|   297|           0| 7.151266255989048|
|80000002|    5.0|A1WK5I4874S3O2|0700026657|I bought this gam...|   709|           1| 8.550308008213552|
|80000003|    5.0|A1YDQQJDRHM0FJ|0001713353|I am very happy w...|   121|           1| 9.686516084873375|
|80000004|    5.0|A2E6AHFDJ3JBAZ|0681795107|I purchased two o...|  1267|           1|10.833675564681725|
|80000005|    5.0|A38NXTZUFB1O2K|0700099867|I'm not quite fin...|  1492|           0| 11.23066392881588|
+--------+-------+--------------+----------+--------------------+------+------------+------------------+
only showing top 5 rows

+--------+-------+------------

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F
df_date_test_transformed=df_date_test_transformed.withColumn('overall_str', (F.col('overall').cast('string')))
df_date_test_transformed=df_date_test_transformed.drop('overall')

#indexer = StringIndexer(inputCols=["reviewerID","asin"], outputCols=["reviewerIDIndex", "asinIndex"]).fit(df)
indexed_df_test = indexer.transform(df_date_test_transformed)
indexed_df_test.show(5)
print((indexed_df_test.count(), len(indexed_df_test.columns)))

+--------+--------------+----------+--------------------+------+------------+------------------+-----------+---------------+---------+------------+
|reviewID|    reviewerID|      asin|          reviewText|length|verified_ind|     diff_in_years|overall_str|reviewerIDIndex|asinIndex|overallIndex|
+--------+--------------+----------+--------------------+------+------------+------------------+-----------+---------------+---------+------------+
|80000001|A1JGAP0185YJI6|0700026657|I played it a whi...|   297|           0| 7.151266255989048|        4.0|         5727.0|  34540.0|         1.0|
|80000002|A1WK5I4874S3O2|0700026657|I bought this gam...|   709|           1| 8.550308008213552|        5.0|       163819.0|  34540.0|         0.0|
|80000003|A1YDQQJDRHM0FJ|0001713353|I am very happy w...|   121|           1| 9.686516084873375|        5.0|      1084151.0|  27239.0|         0.0|
|80000004|A2E6AHFDJ3JBAZ|0681795107|I purchased two o...|  1267|           1|10.833675564681725|        5.0|    

In [0]:
encoded_test_df = encoder.transform(indexed_df_test)

encoded_test_df.show(5)
print((encoded_test_df.count(), len(encoded_test_df.columns)))

+--------+--------------+----------+--------------------+------+------------+------------------+-----------+---------------+---------+------------+--------------------+--------------------+-------------+
|reviewID|    reviewerID|      asin|          reviewText|length|verified_ind|     diff_in_years|overall_str|reviewerIDIndex|asinIndex|overallIndex|       reviewerIDVec|             asinVec|   overallVec|
+--------+--------------+----------+--------------------+------+------------+------------------+-----------+---------------+---------+------------+--------------------+--------------------+-------------+
|80000001|A1JGAP0185YJI6|0700026657|I played it a whi...|   297|           0| 7.151266255989048|        4.0|         5727.0|  34540.0|         1.0|(1084152,[5727],[...|(65206,[34540],[1...|(6,[1],[1.0])|
|80000002|A1WK5I4874S3O2|0700026657|I bought this gam...|   709|           1| 8.550308008213552|        5.0|       163819.0|  34540.0|         0.0|(1084152,[163819]...|(65206,[34540],[

In [0]:
encoded_test_df_clean=encoded_test_df.drop('reviewerID','asin', "reviewerIDIndex", "asinIndex", "overallIndex", "reviewID")
encoded_test_df_clean.show(5)

+--------------------+------+------------+------------------+-----------+--------------------+--------------------+-------------+
|          reviewText|length|verified_ind|     diff_in_years|overall_str|       reviewerIDVec|             asinVec|   overallVec|
+--------------------+------+------------+------------------+-----------+--------------------+--------------------+-------------+
|I played it a whi...|   297|           0| 7.151266255989048|        4.0|(1084152,[5727],[...|(65206,[34540],[1...|(6,[1],[1.0])|
|I bought this gam...|   709|           1| 8.550308008213552|        5.0|(1084152,[163819]...|(65206,[34540],[1...|(6,[0],[1.0])|
|I am very happy w...|   121|           1| 9.686516084873375|        5.0|(1084152,[1084151...|(65206,[27239],[1...|(6,[0],[1.0])|
|I purchased two o...|  1267|           1|10.833675564681725|        5.0|(1084152,[440003]...|(65206,[38565],[1...|(6,[0],[1.0])|
|I'm not quite fin...|  1492|           0| 11.23066392881588|        5.0|(1084152,[47447],

In [0]:
print((encoded_test_df_clean.count(), len(encoded_test_df_clean.columns)))

(348621, 8)


In [0]:
#pipelineFit = nlpPipeline.fit(trainingData)
test_df_Transform = pipelineFit.transform(encoded_test_df_clean)
test_df_Transform.show(5)

+--------------------+------+------------+------------------+-----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          reviewText|length|verified_ind|     diff_in_years|overall_str|       reviewerIDVec|             asinVec|   overallVec|            document|               token|          normalized|         cleanTokens|               lemma|      token_features|         rawFeatures|         idfFeatures|            features|
+--------------------+------+------------+------------------+-----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|I played it a whi...|   297|           0| 

In [0]:
print((test_df_Transform.count(), len(test_df_Transform.columns)))

(348621, 17)


In [0]:
predictions = lrModel.transform(test_df_Transform)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = predictions.select( probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [0]:
display(submission_data.select('label'))

label
0.062373467
0.10774727
0.1096399
0.8152117
0.3407577
0.22600925
0.10105571
0.6798431
0.13132738
0.58538824
