In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, RegexTokenizer, Bucketizer, StringIndexer, OneHotEncoder, VectorAssembler, ChiSqSelector
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, LogisticRegression, NaiveBayes, OneVsRest, MultilayerPerceptronClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import NGram

**Data extraction**

In [4]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(path="wine1.csv")

In [5]:
(df).show()

+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+
|_c0|  country|         description|         designation|points|price|         province|           region_1|         region_2|       taster_name|taster_twitter_handle|               title|           variety|             winery|
+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+
|  0|    Italy|Aromas include tr...|        Vulkà Bianco|    87| null|Sicily & Sardinia|               Etna|             null|     Kerin O’Keefe|         @kerinokeefe|Nicosia 2013 Vulk...|       White Blend|            Nicosia|
|  1| Portugal|This is ripe and ...|            Avidagos|    87| 15.0|            Douro|

**Data preprocessing**

In [6]:
df_clean_null = df.dropna(subset=('description','price', 'points', 'country')).select('*')
df_clean_null = df_clean_null.selectExpr("_c0","description as description",'price as price','points as points','country as country','taster_name as taster_name', 'variety as variety')

In [7]:
print(df.count(), df_clean_null.count())

129975 120911


In [8]:
(df_clean_null).show()

+---+--------------------+-----+------+---------+------------------+------------------+
|_c0|         description|price|points|  country|       taster_name|           variety|
+---+--------------------+-----+------+---------+------------------+------------------+
|  1|This is ripe and ...| 15.0|    87| Portugal|        Roger Voss|    Portuguese Red|
|  2|Tart and snappy, ...| 14.0|    87|       US|      Paul Gregutt|        Pinot Gris|
|  3|Pineapple rind, l...| 13.0|    87|       US|Alexander Peartree|          Riesling|
|  4|Much like the reg...| 65.0|    87|       US|      Paul Gregutt|        Pinot Noir|
|  5|Blackberry and ra...| 15.0|    87|    Spain| Michael Schachner|Tempranillo-Merlot|
|  6|Here's a bright, ...| 16.0|    87|    Italy|     Kerin O’Keefe|          Frappato|
|  7|This dry and rest...| 24.0|    87|   France|        Roger Voss|    Gewürztraminer|
|  8|Savory dried thym...| 12.0|    87|  Germany|Anna Lee C. Iijima|    Gewürztraminer|
|  9|This has great de...| 27.0|

In [9]:
df_lower = df_clean_null.select('_c0','price', 'points', 'country', 'taster_name', 'variety', lower(df_clean_null['description']).alias('description'))
(df_lower).show()

+---+-----+------+---------+------------------+------------------+--------------------+
|_c0|price|points|  country|       taster_name|           variety|         description|
+---+-----+------+---------+------------------+------------------+--------------------+
|  1| 15.0|    87| Portugal|        Roger Voss|    Portuguese Red|this is ripe and ...|
|  2| 14.0|    87|       US|      Paul Gregutt|        Pinot Gris|tart and snappy, ...|
|  3| 13.0|    87|       US|Alexander Peartree|          Riesling|pineapple rind, l...|
|  4| 65.0|    87|       US|      Paul Gregutt|        Pinot Noir|much like the reg...|
|  5| 15.0|    87|    Spain| Michael Schachner|Tempranillo-Merlot|blackberry and ra...|
|  6| 16.0|    87|    Italy|     Kerin O’Keefe|          Frappato|here's a bright, ...|
|  7| 24.0|    87|   France|        Roger Voss|    Gewürztraminer|this dry and rest...|
|  8| 12.0|    87|  Germany|Anna Lee C. Iijima|    Gewürztraminer|savory dried thym...|
|  9| 27.0|    87|   France|    

In [10]:
df_lower = df_lower.withColumn("points",col("points").cast("int"))
(df_lower.groupBy("country").avg("points").orderBy(F.desc("avg(points)")).take(20))

[Row(country='England', avg(points)=91.55072463768116),
 Row(country='India', avg(points)=90.22222222222223),
 Row(country='Austria', avg(points)=90.19062947067239),
 Row(country='Germany', avg(points)=89.83632075471698),
 Row(country='Canada', avg(points)=89.37795275590551),
 Row(country='Hungary', avg(points)=89.1655172413793),
 Row(country='China', avg(points)=89.0),
 Row(country='France', avg(points)=88.73496483825598),
 Row(country='Luxembourg', avg(points)=88.66666666666667),
 Row(country='Italy', avg(points)=88.61780011827321),
 Row(country='Australia', avg(points)=88.59546643417612),
 Row(country='Morocco', avg(points)=88.57142857142857),
 Row(country='Switzerland', avg(points)=88.57142857142857),
 Row(country='US', avg(points)=88.56634656148047),
 Row(country='Israel', avg(points)=88.49693251533742),
 Row(country='Portugal', avg(points)=88.31671794871795),
 Row(country='New Zealand', avg(points)=88.30589949016752),
 Row(country='Turkey', avg(points)=88.08888888888889),
 Row(co

In [11]:
df_lower = df_lower.withColumn('country_format',
                                                  F.when(F.col('country') == 'Argentina', 'ARG')\
                                                  .when(F.col('country') == 'Armenia', 'ARM')\
                                                  .when(F.col('country') == 'Australia', 'AUS')\
                                                  .when(F.col('country') == 'Austria', 'AUT')\
                                                  .when(F.col('country') == 'Bosnia and Herzegovina', 'BIH')\
                                                  .when(F.col('country') == 'Brazil', 'BRA')\
                                                  .when(F.col('country') == 'Bulgaria', 'BGR')\
                                                  .when(F.col('country') == 'Canada', 'CAN')\
                                                  .when(F.col('country') == 'Chile', 'CHL')\
                                                  .when(F.col('country') == 'China', 'CHN')\
                                                  .when(F.col('country') == 'Croatia', 'HRV')\
                                                  .when(F.col('country') == 'Cyprus', 'CYP')\
                                                  .when(F.col('country') == 'Czech Republic', 'CZE')\
                                                  .when(F.col('country') == 'Egypt', 'EGY')\
                                                  .when(F.col('country') == 'England', 'GBR')\
                                                  .when(F.col('country') == 'France', 'FRA')\
                                                  .when(F.col('country') == 'Georgia', 'GEO')\
                                                  .when(F.col('country') == 'Germany', 'DEU')\
                                                  .when(F.col('country') == 'Greece', 'GRC')\
                                                  .when(F.col('country') == 'Hungary', 'HUN')\
                                                  .when(F.col('country') == 'India', 'IND')\
                                                  .when(F.col('country') == 'Israel', 'ISR')\
                                                  .when(F.col('country') == 'Italy', 'ITA')\
                                                  .when(F.col('country') == 'Lebanon', 'LBN')\
                                                  .when(F.col('country') == 'Luxembourg', 'LUX')\
                                                  .when(F.col('country') == 'Macedonia', 'MKD')\
                                                  .when(F.col('country') == 'Mexico', 'MEX')\
                                                  .when(F.col('country') == 'Moldova', 'MDA')\
                                                  .when(F.col('country') == 'Morocco', 'MAR')\
                                                  .when(F.col('country') == 'New Zealand', 'NZL')\
                                                  .when(F.col('country') == 'Peru', 'PER')\
                                                  .when(F.col('country') == 'Portugal', 'PRT')\
                                                  .when(F.col('country') == 'Romania', 'ROU')\
                                                  .when(F.col('country') == 'Serbia', 'SRB')\
                                                  .when(F.col('country') == 'Slovakia', 'SVK')\
                                                  .when(F.col('country') == 'Slovenia', 'SVN')\
                                                  .when(F.col('country') == 'South Africa', 'ZAF')\
                                                  .when(F.col('country') == 'Spain', 'ESP')\
                                                  .when(F.col('country') == 'Switzerland', 'CHE')\
                                                  .when(F.col('country') == 'Turkey', 'TUR')\
                                                  .when(F.col('country') == 'US', 'USA')\
                                                  .when(F.col('country') == 'Ukraine', 'UKR')\
                                                  .when(F.col('country') == 'Uruguay', 'URY')\
                                                  .otherwise('') 
                                                  )


In [12]:
(df_lower.groupBy("country_format").count()).show()

+--------------+-----+
|country_format|count|
+--------------+-----+
|           BRA|   47|
|           ARM|    2|
|           FRA|17776|
|           URY|  109|
|           ITA|16911|
|           UKR|   14|
|           HRV|   71|
|           GBR|   69|
|           AUS| 2294|
|           MEX|   70|
|           SVK|    1|
|           HUN|  145|
|           NZL| 1376|
|           BIH|    2|
|           PER|   16|
|           LUX|    6|
|           TUR|   90|
|           AUT| 2799|
|           USA|54264|
|           MAR|   28|
+--------------+-----+
only showing top 20 rows



In [13]:
(df_lower.groupBy("country_format").avg("points")).show()

+--------------+-----------------+
|country_format|      avg(points)|
+--------------+-----------------+
|           BRA|84.65957446808511|
|           ARM|             87.5|
|           FRA|88.73496483825598|
|           URY|86.75229357798165|
|           ITA|88.61780011827321|
|           UKR|84.07142857142857|
|           HRV|87.35211267605634|
|           GBR|91.55072463768116|
|           AUS|88.59546643417612|
|           MEX|85.25714285714285|
|           SVK|             87.0|
|           HUN| 89.1655172413793|
|           NZL|88.30589949016752|
|           BIH|             86.5|
|           PER|          83.5625|
|           LUX|88.66666666666667|
|           TUR|88.08888888888889|
|           AUT|90.19062947067239|
|           USA|88.56634656148047|
|           MAR|88.57142857142857|
+--------------+-----------------+
only showing top 20 rows



In [14]:
(df_lower.groupBy("variety").count().orderBy('count', ascending = False)).show()

+--------------------+-----+
|             variety|count|
+--------------------+-----+
|          Pinot Noir|12784|
|          Chardonnay|11076|
|  Cabernet Sauvignon| 9379|
|           Red Blend| 8466|
|Bordeaux-style Re...| 5339|
|            Riesling| 4966|
|     Sauvignon Blanc| 4778|
|               Syrah| 4086|
|                Rosé| 3260|
|              Merlot| 3061|
|           Zinfandel| 2708|
|              Malbec| 2593|
|          Sangiovese| 2377|
|            Nebbiolo| 2330|
|      Portuguese Red| 2196|
|         White Blend| 2167|
|     Sparkling Blend| 2027|
|         Tempranillo| 1788|
|Rhône-style Red B...| 1404|
|          Pinot Gris| 1388|
+--------------------+-----+
only showing top 20 rows



**Feature extraction**

In [15]:
# regular expression tokenizer, remove punctuations 
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")
wordsData = regexTokenizer.transform(df_lower)
regexTokenizer.transform(df_lower).show()

+---+-----+------+---------+------------------+------------------+--------------------+--------------+--------------------+
|_c0|price|points|  country|       taster_name|           variety|         description|country_format|               words|
+---+-----+------+---------+------------------+------------------+--------------------+--------------+--------------------+
|  1| 15.0|    87| Portugal|        Roger Voss|    Portuguese Red|this is ripe and ...|           PRT|[this, is, ripe, ...|
|  2| 14.0|    87|       US|      Paul Gregutt|        Pinot Gris|tart and snappy, ...|           USA|[tart, and, snapp...|
|  3| 13.0|    87|       US|Alexander Peartree|          Riesling|pineapple rind, l...|           USA|[pineapple, rind,...|
|  4| 65.0|    87|       US|      Paul Gregutt|        Pinot Noir|much like the reg...|           USA|[much, like, the,...|
|  5| 15.0|    87|    Spain| Michael Schachner|Tempranillo-Merlot|blackberry and ra...|           ESP|[blackberry, and,...|
|  6| 16

In [16]:
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
(stopwordsRemover.transform(wordsData)).show()

+---+-----+------+---------+------------------+------------------+--------------------+--------------+--------------------+--------------------+
|_c0|price|points|  country|       taster_name|           variety|         description|country_format|               words|            filtered|
+---+-----+------+---------+------------------+------------------+--------------------+--------------+--------------------+--------------------+
|  1| 15.0|    87| Portugal|        Roger Voss|    Portuguese Red|this is ripe and ...|           PRT|[this, is, ripe, ...|[ripe, fruity, wi...|
|  2| 14.0|    87|       US|      Paul Gregutt|        Pinot Gris|tart and snappy, ...|           USA|[tart, and, snapp...|[tart, snappy, fl...|
|  3| 13.0|    87|       US|Alexander Peartree|          Riesling|pineapple rind, l...|           USA|[pineapple, rind,...|[pineapple, rind,...|
|  4| 65.0|    87|       US|      Paul Gregutt|        Pinot Noir|much like the reg...|           USA|[much, like, the,...|[much, 

In [17]:
#two hyper parameters needed to be tuned
#hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures",numFeatures=5000)
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
idf = IDF(inputCol="rawFeatures", outputCol="tf_idf_features", minDocFreq=0)

In [18]:
word2vec = Word2Vec(vectorSize = 300, minCount = 5, inputCol = 'filtered', outputCol = 'word2vec_features')

In [19]:
stopWords_removed=stopwordsRemover.transform(wordsData)
# Define NGram transformer
ngram = NGram(n=2, inputCol="filtered", outputCol="bigrams")

# Create bigram_df as a transform of unigram_df using NGram tranformer
production_df = ngram.transform(stopWords_removed)
production_df = production_df.where(size(col("bigrams")) >= 2)
word2vec_bigram = Word2Vec(vectorSize = 300, minCount = 5, inputCol = 'bigrams', outputCol = 'bigram_features')

Wine Enthusiast's 100-point wine-scoring scale:
  - 98–100 – Classic
  - 94–97 – Superb
  - 90–93 – Excellent
  - 87–89 – Very good
  - 83–86 – Good
  - 80–82 – Acceptable 

In [20]:
splits_6 = [80,83, 87,90, 94,98,100]
splits_3 = [80,87,94,100]
bucketizer_3 = Bucketizer(splits=splits_3, inputCol="points", outputCol="label")
bucketizer_6 = Bucketizer(splits=splits_6, inputCol="points", outputCol="label_6")

In [21]:
cols = ['country', 'taster_name','variety']

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep")
    for c in cols
]

encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=[
        "{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

# assemble all 4 columns (country, tastername, variety and price)
assembler_num_cat = VectorAssembler(
    inputCols=['country_indexed_encoded','taster_name_indexed_encoded','variety_indexed_encoded','price'],
    outputCol="num_cat_features"
)

assembler_combine = VectorAssembler(
    inputCols=['num_cat_features', 'tf_idf_features'],
    outputCol="combine_features"
)

assembler_combine_w2v=VectorAssembler(
    inputCols=['num_cat_features', 'word2vec_features'],
    outputCol="combine_features_w2v"
)
assembler_combine_bigram=VectorAssembler(
    inputCols=['num_cat_features', 'bigram_features'],
    outputCol="combine_features_bigram"
)

In [22]:
chi_selector = ChiSqSelector(numTopFeatures=1000, featuresCol="combine_features",
                         outputCol="selectedFeatures", labelCol="label")

In [23]:
df_lower = df_lower.na.drop()

In [24]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, bucketizer_3, bucketizer_6]+ indexers+ [encoder, assembler_num_cat, assembler_combine, chi_selector])
df_lower = df_lower.withColumn("points",col("points").cast("int"))
df_lower = df_lower.withColumn("price",col("price").cast("int"))
pipelineFit = pipeline.fit(df_lower)
dataset = pipelineFit.transform(df_lower).select('_c0', 'tf_idf_features',  'num_cat_features', 'combine_features', "selectedFeatures", 'label')

In [25]:
#dataset for regression
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, bucketizer_3, bucketizer_6]+ indexers+ [encoder, assembler_num_cat, assembler_combine, chi_selector])
pipelineFit = pipeline.fit(df_lower)
rdataset=pipelineFit.transform(df_lower).select('_c0', 'tf_idf_features',  'num_cat_features', 'combine_features', "selectedFeatures", "points")

In [26]:
pipeline_w2v = Pipeline(stages=[regexTokenizer, stopwordsRemover, word2vec, bucketizer_3, bucketizer_6]+ indexers+ [encoder, assembler_num_cat, assembler_combine_w2v])
pipelineFit_w2v = pipeline_w2v.fit(df_lower)
dataset_w2v = pipelineFit_w2v.transform(df_lower).select('_c0', 'word2vec_features',  'num_cat_features', 'combine_features_w2v',  'label')

In [27]:
#for regression
pipeline_w2v = Pipeline(stages=[regexTokenizer, stopwordsRemover, word2vec, bucketizer_3, bucketizer_6]+ indexers+ [encoder, assembler_num_cat, assembler_combine_w2v])
pipelineFit_w2v = pipeline_w2v.fit(df_lower)
rdataset_w2v = pipelineFit_w2v.transform(df_lower).select('_c0', 'word2vec_features',  'num_cat_features', 'combine_features_w2v',  'points')

In [28]:
# pipeline_bigram = Pipeline(stages=[regexTokenizer, stopwordsRemover, ngram, word2vec_bigram, bucketizer_3, bucketizer_6]+ indexers+ [encoder, assembler_num_cat, assembler_combine_bigram])
# pipelineFit_bigram = pipeline_bigram.fit(df_lower)
# dataset_bigram = pipelineFit_bigram.transform(df_lower).select('_c0', 'bigram_features',  'num_cat_features', 'combine_features_bigram',  'label')

In [29]:
# #for regression
# pipeline_bigram = Pipeline(stages=[regexTokenizer, stopwordsRemover, ngram, word2vec_bigram, bucketizer_3, bucketizer_6]+ indexers+ [encoder, assembler_num_cat, assembler_combine_bigram])
# pipelineFit_bigram = pipeline_bigram.fit(df_lower)
# rdataset_bigram = pipelineFit_bigram.transform(df_lower).select('_c0', 'bigram_features',  'num_cat_features', 'combine_features_bigram',  'points')

In [30]:
dataset

DataFrame[_c0: string, tf_idf_features: vector, num_cat_features: vector, combine_features: vector, selectedFeatures: vector, label: double]

In [31]:
dataset_w2v.show()

+---+--------------------+--------------------+--------------------+-----+
|_c0|   word2vec_features|    num_cat_features|combine_features_w2v|label|
+---+--------------------+--------------------+--------------------+-----+
|  1|[-0.0203489334893...|(715,[4,42,71,714...|(1015,[4,42,71,71...|  1.0|
|  2|[0.09476183300527...|(715,[0,46,79,714...|(1015,[0,46,79,71...|  1.0|
|  3|[-0.0066880109635...|(715,[0,57,66,714...|(1015,[0,57,66,71...|  1.0|
|  4|[0.04785747185349...|(715,[0,46,61,714...|(1015,[0,46,61,71...|  1.0|
|  5|[0.09469858355199...|(715,[3,43,360,71...|(1015,[3,43,360,7...|  1.0|
|  6|[0.05348750470309...|(715,[2,44,222,71...|(1015,[2,44,222,7...|  1.0|
|  7|[0.06082502434340...|(715,[1,42,86,714...|(1015,[1,42,86,71...|  1.0|
|  8|[0.02169393928831...|(715,[8,50,86,714...|(1015,[8,50,86,71...|  1.0|
|  9|[0.03610841813497...|(715,[1,42,79,714...|(1015,[1,42,79,71...|  1.0|
| 10|[0.02402406614739...|(715,[0,45,64,714...|(1015,[0,45,64,71...|  1.0|
| 11|[0.05984527996689...

In [32]:
# rdataset_bigram.show()

**Model exploration**

In [33]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
(rtrainingData, rtestData) = rdataset.randomSplit([0.8, 0.2], seed = 100) #for regression
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 77108
Test Dataset Count: 19296


In [34]:
# Training and testing splitting for word2vec
(trainingData_w2v, testData_w2v) = dataset_w2v.randomSplit([0.8, 0.2], seed = 100)
(rtrainingData_w2v, rtestData_w2v) = rdataset_w2v.randomSplit([0.8, 0.2], seed = 100) #for regression
print("Training Dataset Count: " + str(trainingData_w2v.count()))
print("Test Dataset Count: " + str(testData_w2v.count()))

Training Dataset Count: 77108
Test Dataset Count: 19296


In [35]:
# # Training and testing splitting for bigram
# (trainingData_bigram, testData_bigram) = dataset_bigram.randomSplit([0.8, 0.2], seed = 100)
# (rtrainingData_bigram, rtestData_bigram) = rdataset_bigram.randomSplit([0.8, 0.2], seed = 100)#for regression
# print("Training Dataset Count: " + str(trainingData_bigram.count()))
# print("Test Dataset Count: " + str(testData_bigram.count()))

In [36]:
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='f1')
evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedRecall')
evaluator_precision = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedPrecision')

In [37]:
lr_tf_idf = LogisticRegression(featuresCol='tf_idf_features')
lr_tf_idf_Model = lr_tf_idf.fit(trainingData)
lr_tf_idf_predictions = lr_tf_idf_Model.transform(testData)

lr_num_cat = LogisticRegression(featuresCol='num_cat_features')
lr_num_cat_Model = lr_num_cat.fit(trainingData)
lr_num_cat_predictions = lr_num_cat_Model.transform(testData)

lr_combine = LogisticRegression(featuresCol='combine_features')
lr_combine_Model = lr_combine.fit(trainingData)
lr_combine_predictions = lr_combine_Model.transform(testData)

lr_selected = LogisticRegression(featuresCol='selectedFeatures')
lr_selected_Model = lr_selected.fit(trainingData)
lr_selected_predictions = lr_selected_Model.transform(testData)


In [38]:
# For w2v model
lr_w2v = LogisticRegression(featuresCol='word2vec_features')
lr_Model_w2v = lr_w2v.fit(trainingData_w2v)
lr_predictions_w2v = lr_Model_w2v.transform(testData_w2v)

lr_num_cat = LogisticRegression(featuresCol='num_cat_features')
lr_num_cat_Model = lr_num_cat.fit(trainingData)
lr_num_cat_predictions = lr_num_cat_Model.transform(testData)

lr_combine_w2v = LogisticRegression(featuresCol='combine_features_w2v')
lr_combine_Model_w2v = lr_combine_w2v.fit(trainingData_w2v)
lr_combine_predictions_w2v = lr_combine_Model_w2v.transform(testData_w2v)


# # For bigram w2v
# lr_bigram = LogisticRegression(featuresCol='bigram_features')
# lr_Model_bigram = lr_bigram.fit(trainingData_bigram)
# lr_predictions_bigram = lr_Model_bigram.transform(testData_bigram)

# lr_num_cat = LogisticRegression(featuresCol='num_cat_features')
# lr_num_cat_Model = lr_num_cat.fit(trainingData)
# lr_num_cat_predictions = lr_num_cat_Model.transform(testData)

# lr_combine_bigram = LogisticRegression(featuresCol='combine_features_bigram')
# lr_combine_Model_bigram = lr_combine_bigram.fit(trainingData_bigram)
# lr_combine_predictions_bigram = lr_combine_Model_bigram.transform(testData_bigram)


In [39]:
def print_eval_tf_idf(metrics,num_cat,combine, selected,name_of_feature):
  print(name_of_feature)
  print('f1: ' + str(evaluator_f1.evaluate(metrics)))
  print('acc: ' + str(evaluator_acc.evaluate(metrics)))
  print('recall: ' + str(evaluator_recall.evaluate(metrics)))
  print('precision' + str(evaluator_precision.evaluate(metrics)))
  print('num_cat_features')
  print('f1: ' + str(evaluator_f1.evaluate(num_cat)))
  print('acc: ' + str(evaluator_acc.evaluate(num_cat)))
  print('recall: ' + str(evaluator_recall.evaluate(num_cat)))
  print('precision: ' + str(evaluator_precision.evaluate(num_cat)))
  print('combine_features')
  print('f1: ' + str(evaluator_f1.evaluate(combine)))
  print('acc: ' + str(evaluator_acc.evaluate(combine)))
  print('recall: ' + str(evaluator_recall.evaluate(combine)))
  print('precision: ' + str(evaluator_precision.evaluate(combine)))
  print('selectedFeatures')
  print('f1: ' + str(evaluator_f1.evaluate(selected)))
  print('acc: ' + str(evaluator_acc.evaluate(selected)))
  print('recall: ' + str(evaluator_recall.evaluate(selected)))
  print('precision: ' + str(evaluator_precision.evaluate(selected)))

In [40]:
def print_eval_w2v(metrics,combine, name_of_feature):
  print(name_of_feature)
  print('f1: ' + str(evaluator_f1.evaluate(metrics)))
  print('acc: ' + str(evaluator_acc.evaluate(metrics)))
  print('recall: ' + str(evaluator_recall.evaluate(metrics)))
  print('precision' + str(evaluator_precision.evaluate(metrics)))
  
  print('combine_features')
  print('f1: ' + str(evaluator_f1.evaluate(combine)))
  print('acc: ' + str(evaluator_acc.evaluate(combine)))
  print('recall: ' + str(evaluator_recall.evaluate(combine)))
  print('precision: ' + str(evaluator_precision.evaluate(combine)))

In [41]:
print_eval_tf_idf(lr_tf_idf_predictions, lr_num_cat_predictions, lr_combine_predictions, lr_selected_predictions,'tf_idf_features')

tf_idf_features
f1: 0.7974992445369351
acc: 0.8007359038142621
recall: 0.800735903814262
precision0.7958951543328943
num_cat_features
f1: 0.7283781073116443
acc: 0.7585509950248757
recall: 0.7585509950248757
precision: 0.7429261662655067
combine_features
f1: 0.8212636124626823
acc: 0.8231239635157546
recall: 0.8231239635157546
precision: 0.820221491307818
selectedFeatures
f1: 0.8244558915800562
acc: 0.8303793532338308
recall: 0.8303793532338308
precision: 0.8240713723420059


In [42]:
print_eval_w2v(lr_predictions_w2v, lr_combine_predictions_w2v,'word2vec_features')

word2vec_features
f1: 0.7717003398004918
acc: 0.7921330845771144
recall: 0.7921330845771144
precision0.7739014741462955
combine_features
f1: 0.8139768381304119
acc: 0.8235903814262023
recall: 0.8235903814262022
precision: 0.8160845639774632


In [43]:
# print_eval_w2v(lr_predictions_bigram, lr_combine_predictions_bigram,'bigram_features')

In [44]:
from pyspark.ml.regression import LinearRegression

#TF-idf
lir_tfidf= LinearRegression(maxIter=5,featuresCol='tf_idf_features',labelCol='points')
lir_tfidf_model= lir_tfidf.fit(rtrainingData)
lir_tfidf_predictions = lir_tfidf_model.transform(rtestData)

lir_num_cat= LinearRegression(maxIter=5,featuresCol='num_cat_features',labelCol='points')
lir_num_cat_model= lir_num_cat.fit(rtrainingData)
lir_num_cat_predictions = lir_num_cat_model.transform(rtestData)

lir_com= LinearRegression(maxIter=5,featuresCol='combine_features',labelCol='points')
lir_com_model= lir_com.fit(rtrainingData)
lir_com_predictions = lir_com_model.transform(rtestData)

lir_selected= LinearRegression(maxIter=5,featuresCol='selectedFeatures',labelCol='points')
lir_selected_model= lir_selected.fit(rtrainingData)
lir_selected_predictions = lir_selected_model.transform(rtestData)

# For w2v model
lir_w2v= LinearRegression(maxIter=5,featuresCol='word2vec_features',labelCol='points')
lir_w2v_model= lir_w2v.fit(rtrainingData_w2v)
lir_w2v_predictions = lir_w2v_model.transform(rtestData_w2v)

lir_w2v_com= LinearRegression(maxIter=5,featuresCol='combine_features_w2v',labelCol='points')
lir_w2v_com_model= lir_w2v_com.fit(rtrainingData_w2v)
lir_w2v_com_predictions = lir_w2v_com_model.transform(rtestData_w2v)

# For bigram 
# lir_bg= LinearRegression(maxIter=5,featuresCol='bigram_features',labelCol='points')
# lir_bg_model= lir_bg.fit(rtrainingData_bigram)
# lir_bg_predictions = lir_bg_model.transform(rtestData_bigram)

# lir_bg_com= LinearRegression(maxIter=5,featuresCol='combine_features_bigram',labelCol='points')
# lir_bg_com_model= lir_bg_com.fit(rtrainingData_bigram)
# lir_bg_com_predictions = lir_bg_com_model.transform(rtestData_bigram)

In [45]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_r2 = RegressionEvaluator(predictionCol="prediction",labelCol='points',metricName="r2")
evaluator_mse = RegressionEvaluator(predictionCol="prediction",labelCol='points',metricName="mse")


print('Linear regression:')
print('using Tf-idf')
print('  using text only')
print('   R2: ' + str(evaluator_r2.evaluate(lir_tfidf_predictions)))
print('   MSE: ' + str(evaluator_mse.evaluate(lir_tfidf_predictions)))

print('  using categorical features')
print('   R2: ' + str(evaluator_r2.evaluate(lir_num_cat_predictions)))
print('   MSE: ' + str(evaluator_mse.evaluate(lir_num_cat_predictions)))

print('  combining Cat and Tfidf')
print('   R2: ' + str(evaluator_r2.evaluate(lir_com_predictions)))
print('   MSE: ' + str(evaluator_mse.evaluate(lir_com_predictions)))

print('  selecting features')
print('   R2: ' + str(evaluator_r2.evaluate(lir_selected_predictions)))
print('   MSE: ' + str(evaluator_mse.evaluate(lir_selected_predictions)))

print('using Word to Vec')
print('  using text only')
print('   R2: ' + str(evaluator_r2.evaluate(lir_w2v_predictions)))
print('   MSE: ' + str(evaluator_mse.evaluate(lir_w2v_predictions)))
print('  using text & categorical')
print('   R2: ' + str(evaluator_r2.evaluate(lir_w2v_com_predictions)))
print('   MSE: ' + str(evaluator_mse.evaluate(lir_w2v_com_predictions)))

# print('using bigram')
# print('  using text only')
# print('   R2: ' + str(evaluator_r2.evaluate(lir_bg_predictions)))
# print('   MSE: ' + str(evaluator_mse.evaluate(lir_bg_predictions)))
# print('  using text & categorical')
# print('   R2: ' + str(evaluator_r2.evaluate(lir_bg_com_predictions)))
# print('   MSE: ' + str(evaluator_mse.evaluate(lir_bg_com_predictions)))

Linear regression:
using Tf-idf
  using text only
   R2: 0.6347089320749673
   MSE: 3.1793604416886003
  using categorical features
   R2: 0.2648441196765777
   MSE: 6.398529089834541
  combining Cat and Tfidf
   R2: 0.6792317153084697
   MSE: 2.791850348516749
  selecting features
   R2: 0.6631829696258731
   MSE: 2.9315327871041603
using Word to Vec
  using text only
   R2: 0.5539752946867337
   MSE: 3.8820366239555564
  using text & categorical
   R2: 0.553963984111975
   MSE: 3.882135067079733


In [46]:
lir_com_predictions.select("points","prediction").show()

+------+-----------------+
|points|       prediction|
+------+-----------------+
|    88|87.10640672336926|
|    91|89.59796949173366|
|    91|88.19343897689214|
|    91| 90.5703323237937|
|    89|86.81542421523608|
|    89|88.37382948604044|
|    89|88.26849715976114|
|    89|87.49705631542761|
|    89|86.99790670088962|
|    88|89.06331634360116|
|    89|90.29457014757975|
|    89|88.16111324469925|
|    89|88.38075224785123|
|    88|90.90109594291435|
|    88| 88.1454424923971|
|    88|  90.368226392022|
|    88| 90.6195321591363|
|    88|87.45976226439613|
|    88|89.06470754156895|
|    86|85.23797117038593|
+------+-----------------+
only showing top 20 rows

