***Nguyen Thi Tuong Vy***

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
# import libraries
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

import pandas as pd

# Load Data

In [4]:
spark = SparkSession.builder.appName("Womens_Clothing_E_Commerce_Reviews").getOrCreate()

In [5]:
data = pd.read_excel("Womens_Clothing_E_Commerce_Reviews.xlsx", sheet_name="Reviews", engine="openpyxl", index_col=0)

In [6]:
data = spark.createDataFrame(data.astype(str))

In [7]:
data.show(3)

+-----------+---+--------------------+--------------------+------+---------------+-----------------------+-------------+---------------+----------+
|Clothing ID|Age|               Title|         Review Text|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+-----------+---+--------------------+--------------------+------+---------------+-----------------------+-------------+---------------+----------+
|        767| 33|                 nan|Absolutely wonder...|     4|              1|                      0|    Initmates|       Intimate| Intimates|
|       1080| 34|                 nan|Love this dress! ...|     5|              1|                      4|      General|        Dresses|   Dresses|
|       1077| 60|Some major design...|I had such high h...|     3|              0|                      0|      General|        Dresses|   Dresses|
+-----------+---+--------------------+--------------------+------+---------------+-----------------------+------

In [8]:
for line in data.head(5):
    print(line, '\n')

Row(Clothing ID='767', Age='33', Title='nan', Review Text='Absolutely wonderful - silky and sexy and comfortable', Rating='4', Recommended IND='1', Positive Feedback Count='0', Division Name='Initmates', Department Name='Intimate', Class Name='Intimates') 

Row(Clothing ID='1080', Age='34', Title='nan', Review Text='Love this dress!  it\'s sooo pretty.  i happened to find it in a store, and i\'m glad i did bc i never would have ordered it online bc it\'s petite.  i bought a petite and am 5\'8".  i love the length on me- hits just a little below the knee.  would definitely be a true midi on someone who is truly petite.', Rating='5', Recommended IND='1', Positive Feedback Count='4', Division Name='General', Department Name='Dresses', Class Name='Dresses') 

Row(Clothing ID='1077', Age='60', Title='Some major design flaws', Review Text='I had such high hopes for this dress and really wanted it to work for me. i initially ordered the petite small (my usual size) but i found this to be outr

In [9]:
data.count()

23481

In [10]:
data.printSchema()

root
 |-- Clothing ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Review Text: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Recommended IND: string (nullable = true)
 |-- Positive Feedback Count: string (nullable = true)
 |-- Division Name: string (nullable = true)
 |-- Department Name: string (nullable = true)
 |-- Class Name: string (nullable = true)



In [11]:
data.describe().show()

+-------+------------------+------------------+--------------------+--------------------+------------------+-------------------+-----------------------+-------------+---------------+----------+
|summary|       Clothing ID|               Age|               Title|         Review Text|            Rating|    Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+-------+------------------+------------------+--------------------+--------------------+------------------+-------------------+-----------------------+-------------+---------------+----------+
|  count|             23481|             23481|               23481|               23481|             23481|              23481|                  23481|        23481|          23481|     23481|
|   mean|  918.108641028917| 43.19837315276181|                null|                null| 4.196286359184021| 0.8224521953920191|     2.5353690217622757|         null|           null|      null|
| stddev|203.31464963720921|12

In [12]:
data.columns

['Clothing ID',
 'Age',
 'Title',
 'Review Text',
 'Rating',
 'Recommended IND',
 'Positive Feedback Count',
 'Division Name',
 'Department Name',
 'Class Name']

# Clean and Prepare the Data

In [13]:
from pyspark.sql.functions import *

In [14]:
# Check for NaN, null
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).toPandas().T

Unnamed: 0,0
Clothing ID,0
Age,0
Title,0
Review Text,0
Rating,0
Recommended IND,0
Positive Feedback Count,0
Division Name,0
Department Name,0
Class Name,0


In [15]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().T

Unnamed: 0,0
Clothing ID,0
Age,0
Title,0
Review Text,0
Rating,0
Recommended IND,0
Positive Feedback Count,0
Division Name,0
Department Name,0
Class Name,0


==> The dataset doesn't have NaN and Null

In [16]:
# Check for duplicates
n_rows = data.count()
n_dist_rows = data.distinct().count()
dup_rows = n_rows - n_dist_rows

In [17]:
display(n_rows, n_dist_rows, dup_rows)

23481

23460

21

==> The dataset has duplicated rows

In [18]:
# Delete duplicates
data = data.drop_duplicates()

In [19]:
print("There are {} rows after deleting duplicated rows".format(data.count()))

There are 23460 rows after deleting duplicated rows


In [20]:
from pyspark.sql.functions import length

In [21]:
data = data.withColumn("length", length(data["Review Text"]))

In [22]:
data.show(5)

+-----------+---+--------------------+--------------------+------+---------------+-----------------------+--------------+---------------+----------+------+
|Clothing ID|Age|               Title|         Review Text|Rating|Recommended IND|Positive Feedback Count| Division Name|Department Name|Class Name|length|
+-----------+---+--------------------+--------------------+------+---------------+-----------------------+--------------+---------------+----------+------+
|       1081| 59|                 nan|I passed up this ...|     5|              1|                      1|General Petite|        Dresses|   Dresses|   500|
|        895| 63|Tunic doesn't loo...|Nice weight sweat...|     5|              1|                      1|       General|           Tops|Fine gauge|   241|
|        872| 71|                 nan|Great shirt. the ...|     5|              1|                      0|       General|           Tops|     Knits|   274|
|        745| 28|The softest leggings|These leggings ar...|     

In [23]:
data.groupby("Rating").mean().show()

+------+------------------+
|Rating|       avg(length)|
+------+------------------+
|     3|322.97317073170734|
|     5|286.01006864988557|
|     1| 297.4946492271106|
|     4| 312.8264039408867|
|     2| 316.1668797953964|
+------+------------------+



# Feature Transformations

In [24]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer

tokenizer = Tokenizer(inputCol="Review Text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol="token_text", outputCol="stop_tokens")
count_vec = CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
to_number = StringIndexer(inputCol="Rating", outputCol="label")

In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [26]:
clean_up = VectorAssembler(inputCols=["tf_idf", "length"], outputCol="features")

# Build The Model

## 1. NaiveBayes

In [27]:
from pyspark.ml.classification import NaiveBayes

In [28]:
nb = NaiveBayes()

Pipeline

In [29]:
from pyspark.ml import Pipeline

In [30]:
data_prep_pipe = Pipeline(stages=[to_number, tokenizer,
                                  stopremove, count_vec,
                                  idf, clean_up])

In [31]:
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

Training and Evaluating

In [32]:
clean_data = clean_data.select(["label", "features"])
clean_data.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(38356,[1,4,8,13,...|
|  0.0|(38356,[3,4,17,24...|
|  0.0|(38356,[7,9,13,15...|
|  0.0|(38356,[3,5,10,23...|
|  0.0|(38356,[0,4,11,15...|
|  0.0|(38356,[133,38355...|
|  4.0|(38356,[19,24,28,...|
|  1.0|(38356,[2,5,10,16...|
|  0.0|(38356,[4,5,10,16...|
|  0.0|(38356,[0,2,6,14,...|
+-----+--------------------+
only showing top 10 rows



In [33]:
(training, testing) = clean_data.randomSplit([0.7, 0.3])

In [34]:
nbModel = nb.fit(training)

In [35]:
data.printSchema()

root
 |-- Clothing ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Review Text: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Recommended IND: string (nullable = true)
 |-- Positive Feedback Count: string (nullable = true)
 |-- Division Name: string (nullable = true)
 |-- Department Name: string (nullable = true)
 |-- Class Name: string (nullable = true)
 |-- length: integer (nullable = true)



In [36]:
test_results = nbModel.transform(testing)

In [37]:
test_results.show(10)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38356,[0,2,6,14,...|[-1821.8495064102...|[0.99998115615575...|       0.0|
|  0.0|(38356,[0,6,25,32...|[-1422.3694682226...|[0.00256508219177...|       1.0|
|  0.0|(38356,[1,4,8,13,...|[-1838.0004288153...|[1.0,5.3858740704...|       0.0|
|  0.0|(38356,[2,5,25,10...|[-1077.9260456315...|[1.0,3.6343103586...|       0.0|
|  0.0|(38356,[2,7,11,30...|[-844.72226047346...|[0.98449100977947...|       0.0|
|  0.0|(38356,[4,5,10,16...|[-2318.2354538445...|[0.99999352960251...|       0.0|
|  0.0|(38356,[4,6,11,18...|[-2685.0123664157...|[2.84880303061502...|       1.0|
|  0.0|(38356,[6,10,17,2...|[-1092.2246379669...|[1.0,3.5790957657...|       0.0|
|  0.0|(38356,[6,10,30,4...|[-1668.0406313214...|[0.99243809566153...|       0.0|
|  0.0|(38356,[7

In [38]:
test_results.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|   90|
|  1.0|       1.0|  536|
|  3.0|       2.0|  192|
|  4.0|       2.0|   68|
|  0.0|       1.0|  781|
|  0.0|       4.0|   83|
|  1.0|       0.0|  533|
|  2.0|       2.0|  342|
|  3.0|       1.0|   83|
|  2.0|       3.0|  166|
|  1.0|       4.0|   29|
|  4.0|       4.0|   43|
|  2.0|       4.0|   55|
|  3.0|       4.0|   42|
|  2.0|       1.0|  227|
|  1.0|       2.0|  283|
|  0.0|       0.0| 2655|
|  1.0|       3.0|  108|
|  4.0|       3.0|   67|
|  0.0|       2.0|  242|
+-----+----------+-----+
only showing top 20 rows



In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [40]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model: {}".format(acc))

Accuracy of model: 0.5484841599625897


- The result is not really good (only ~55%)
- Should try with other models

## 2. LogisticRegression

In [41]:
from pyspark.ml.classification import LogisticRegression

In [42]:
logistic = LogisticRegression(featuresCol="features",
                     labelCol="label",
                     predictionCol="prediction")   

In [43]:
logisticModel = logistic.fit(training)

In [44]:
# Evaluate Model
test_model = logisticModel.transform(testing)
test_model.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|  189|
|  1.0|       1.0|  498|
|  3.0|       2.0|  140|
|  4.0|       2.0|   65|
|  0.0|       1.0|  841|
|  0.0|       4.0|   30|
|  1.0|       0.0|  685|
|  2.0|       2.0|  272|
|  3.0|       1.0|  120|
|  2.0|       3.0|  126|
|  1.0|       4.0|   29|
|  4.0|       4.0|   42|
|  2.0|       4.0|   36|
|  3.0|       4.0|   44|
|  2.0|       1.0|  257|
|  1.0|       2.0|  221|
|  0.0|       0.0| 2777|
|  1.0|       3.0|   56|
|  4.0|       3.0|   37|
|  0.0|       2.0|  214|
+-----+----------+-----+
only showing top 20 rows



In [45]:
acc_eval = MulticlassClassificationEvaluator()
acc_1 = acc_eval.evaluate(test_model)
print("Accuracy of model: {}".format(acc_1))

Accuracy of model: 0.5299158290703699


## 3. Random forest

In [46]:
from pyspark.ml.classification import RandomForestClassifier

In [47]:
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 500, \
                            maxDepth = 5, \
                            maxBins = 64)

In [48]:
rfModel = rf.fit(training)

In [49]:
test_model_rf = rfModel.transform(testing)
test_model_rf.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|  880|
|  1.0|       0.0| 1489|
|  0.0|       0.0| 3913|
|  4.0|       0.0|  227|
|  3.0|       0.0|  470|
+-----+----------+-----+



In [50]:
test_model_rf.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 6979|
+----------+-----+



In [51]:
acc_eval = MulticlassClassificationEvaluator()
acc_2 = acc_eval.evaluate(test_model_rf)
print("Accuracy of model: {}".format(acc_2))

Accuracy of model: 0.4028550948475246


**Conclusion: Choose NaiveBayes Model because Accuracy Score is highest although it is only ~55%**

# Save and Load The Model

In [52]:
# Save model
nbModel.save("nbModel_Womens_Clothing_E_Commerce_Reviews_new")

In [53]:
from pyspark.ml.classification import NaiveBayesModel

In [54]:
# Load Model
nbModel2 = NaiveBayesModel.load("nbModel_Womens_Clothing_E_Commerce_Reviews_new")

# Predict New Data

In [55]:
new_data = pd.read_excel("Data/Womens_Clothing_E_Commerce_Reviews.xlsx", sheet_name="new_reviews", engine="openpyxl", index_col=0)

In [56]:
new_data = spark.createDataFrame(new_data.astype(str))

In [57]:
new_data = new_data.withColumn("length", length(new_data["Review Text"]))

In [58]:
tokenizer_ud = Tokenizer(inputCol="Review Text", outputCol="token_text_ud")
stopremove_ud = StopWordsRemover(inputCol="token_text_ud", outputCol="stop_tokens_ud")
count_vec_ud = CountVectorizer(inputCol="stop_tokens_ud", outputCol="c_vec_ud")
idf_ud = IDF(inputCol="c_vec_ud", outputCol="tf_idf_ud")

In [59]:
clean_up_ud = VectorAssembler(inputCols=["tf_idf_ud", "length"], outputCol="features")

In [60]:
data_prep_pipe_ud = Pipeline(stages=[tokenizer_ud,
                                  stopremove_ud, count_vec_ud,
                                  idf_ud, clean_up_ud])

In [61]:
cleaner_ud = data_prep_pipe_ud.fit(new_data)
clean_data_ud = cleaner.transform(new_data)

In [62]:
unlabeled_data = clean_data_ud.select("features")

In [63]:
predictions = nbModel2.transform(unlabeled_data)

In [64]:
predictions[["features", "probability", "prediction"]].show(5, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------

In [65]:
for pre in predictions[["features", "probability", "prediction"]].head(5):
    print(pre, '\n')

Row(features=SparseVector(38356, {1: 3.0854, 6: 1.7998, 10: 1.9269, 16: 2.1991, 19: 2.2216, 20: 2.1937, 28: 2.4302, 32: 2.488, 50: 2.6785, 54: 2.7347, 85: 6.0751, 138: 3.3848, 175: 3.5923, 201: 3.6881, 238: 3.8667, 290: 4.0792, 342: 4.2341, 348: 4.1938, 430: 4.5179, 586: 4.8267, 1265: 5.7064, 1473: 5.9042, 2075: 6.3495, 2290: 6.5077, 2963: 6.885, 3022: 6.9721, 3586: 7.1727, 3809: 7.2905, 6099: 7.9837, 6612: 8.1172, 38355: 336.0}), probability=DenseVector([0.0, 0.0, 1.0, 0.0, 0.0]), prediction=2.0) 

Row(features=SparseVector(38356, {7: 3.406, 9: 1.9357, 13: 2.0451, 21: 2.265, 24: 2.3462, 33: 2.5289, 35: 2.5321, 42: 2.6124, 54: 2.7347, 71: 2.968, 82: 3.0147, 89: 3.1132, 118: 3.2889, 124: 3.3587, 147: 3.4311, 184: 3.6932, 188: 3.6562, 197: 3.6628, 200: 3.6513, 208: 3.7423, 215: 3.7827, 308: 4.249, 362: 4.24, 457: 4.5024, 506: 4.5908, 637: 4.8591, 682: 4.9571, 684: 5.0197, 704: 4.9755, 705: 5.1003, 713: 5.0005, 814: 5.1286, 1194: 5.6087, 1460: 5.8734, 2951: 6.885, 3548: 7.1727, 3606: 7.17