In [55]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.ml.classification import  LogisticRegression, NaiveBayes
from pyspark.ml.classification import LogisticRegressionModel, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, StopWordsRemover,RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, IndexToString
import shutil
import os

In [56]:
#spark.stop()

In [57]:
#open Spark Session
spark = SparkSession.builder.appName('GSK').getOrCreate()

In [58]:
#read source I remove last 10 entries for predictions 
data=spark.read.csv("Dataset_N.csv", inferSchema=True,sep=';',header=True)

### DATA EXPLORATION

In [59]:
data

DataFrame[V1: int, V2: string, V3: string, V4: string, V5: string]

In [60]:
data.show(3)

+--------+---------------+--------------------+--------------------+-----+
|      V1|             V2|                  V3|                  V4|   V5|
+--------+---------------+--------------------+--------------------+-----+
|26229701|WASHINGMACHINES|           WAQ284E25|      WASCHMASCHINEN|BOSCH|
|16576864|     USB MEMORY|LEEF IBRIDGE MOBI...|PC__1100COMPUTING...| LEEF|
|26155618|     USB MEMORY|SANDISK 32GB ULTR...|               W1370| null|
+--------+---------------+--------------------+--------------------+-----+
only showing top 3 rows



In [61]:
data.printSchema()

root
 |-- V1: integer (nullable = true)
 |-- V2: string (nullable = true)
 |-- V3: string (nullable = true)
 |-- V4: string (nullable = true)
 |-- V5: string (nullable = true)



### DATA PREPARATION

In [62]:
data=data.withColumnRenamed("V1","ID").withColumnRenamed("V2","product_group").withColumnRenamed("V3","main_text")\
    .withColumnRenamed("V4","add_text").withColumnRenamed("V5","manufacturer")

In [63]:
data=data.select('product_group','main_text','add_text','manufacturer') # ID doesnt add any value

In [64]:
data.printSchema()

root
 |-- product_group: string (nullable = true)
 |-- main_text: string (nullable = true)
 |-- add_text: string (nullable = true)
 |-- manufacturer: string (nullable = true)



In [65]:
# Count nulls
data.filter(data.manufacturer.isNull()).count()

1343

In [66]:
data.filter(data.main_text.isNull()).count()

2

In [67]:
# remove NULL because cause a lot of issues
data=data.fillna({'manufacturer':"NO_Manufacturer"})
data=data.fillna({'main_text':"NO_TEXT"})

### DATA TRANSFORMATION TO FEED ESTIMATORS

In [68]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="main_text", outputCol="main_text_t")
stopwordsRemover = StopWordsRemover(inputCol="main_text_t", outputCol="main_text_f")
hashingTF = HashingTF(inputCol="main_text_f", outputCol="rawFeatures1")
idf = IDF(inputCol="rawFeatures1", outputCol="tf_idf")
# regular expression tokenizer
regexTokenizer2 = RegexTokenizer(inputCol="add_text", outputCol="add_text_t")
stopwordsRemover2 = StopWordsRemover(inputCol="add_text_t", outputCol="add_text_f")
hashingTF2 = HashingTF(inputCol="add_text_f", outputCol="rawFeatures2")
idf2 = IDF(inputCol="rawFeatures2", outputCol="tf_idf2")

In [69]:
#convert manufacturers to categories 
manufacturer_Y = StringIndexer(inputCol = "manufacturer", outputCol = "manufacturer_C")

In [70]:
# tranfor categories to vectors, to made it easy for estimators
encoders2 = OneHotEncoder(inputCol=manufacturer_Y.getOutputCol(), outputCol="manufacture_1H") 

In [71]:
#define pipeline for features
data_prep_pipe = Pipeline(stages=[regexTokenizer,stopwordsRemover,hashingTF,idf,\
                                  regexTokenizer2,stopwordsRemover2,hashingTF2,idf2,manufacturer_Y,encoders2])

In [72]:
data_transformer = data_prep_pipe.fit(data)

In [73]:
data = data_transformer.transform(data)

In [74]:
data.show(3)

+---------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+----------------+
|  product_group|           main_text|            add_text|   manufacturer|         main_text_t|         main_text_f|        rawFeatures1|              tf_idf|          add_text_t|          add_text_f|        rawFeatures2|             tf_idf2|manufacturer_C|  manufacture_1H|
+---------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+----------------+
|WASHINGMACHINES|           WAQ284E25|      WASCHMASCHINEN|          BOSCH|         [waq284e25]|         [waq284e25]|(262144,[4112],[1...|(262144,[4112],[8...|    [waschmas

In [75]:
#only select relevant columns
data=data.select("main_text","add_text","product_group","rawFeatures1","rawFeatures2","manufacture_1H")

In [76]:
# product categories to number
product_group_Y = StringIndexer(inputCol = "product_group", outputCol = "label")

In [77]:
data=product_group_Y.fit(data).transform(data)


In [78]:
#create groupby to reverse categories later
datagroup=data.groupby('product_group').avg('label')

In [79]:
datagroup=datagroup.withColumnRenamed("avg(label)", "label")

In [80]:
datagroup.show()

+---------------+-----+
|  product_group|label|
+---------------+-----+
|     USB MEMORY|  0.0|
|       BICYCLES|  2.0|
| CONTACT LENSES|  3.0|
|WASHINGMACHINES|  1.0|
+---------------+-----+



In [81]:
#create temporal view to access during session
datagroup.createOrReplaceGlobalTempView('Category_label')

In [82]:
#produce feature vector
clean_up = VectorAssembler(inputCols=['rawFeatures1','rawFeatures2','manufacture_1H'],outputCol='features')

In [83]:
dataout=clean_up.transform(data)

In [84]:
dataout.show(1)

+---------+--------------+---------------+--------------------+--------------------+---------------+-----+--------------------+
|main_text|      add_text|  product_group|        rawFeatures1|        rawFeatures2| manufacture_1H|label|            features|
+---------+--------------+---------------+--------------------+--------------------+---------------+-----+--------------------+
|WAQ284E25|WASCHMASCHINEN|WASHINGMACHINES|(262144,[4112],[1...|(262144,[120109],...|(623,[8],[1.0])|  1.0|(524911,[4112,382...|
+---------+--------------+---------------+--------------------+--------------------+---------------+-----+--------------------+
only showing top 1 row



In [85]:
#select relevant columns
datax=dataout.select("main_text","add_text","product_group","features",'label')

In [86]:
#FINAL DATAFRAME TO TRAIN MODELS
datax.show(3)

+--------------------+--------------------+---------------+--------------------+-----+
|           main_text|            add_text|  product_group|            features|label|
+--------------------+--------------------+---------------+--------------------+-----+
|           WAQ284E25|      WASCHMASCHINEN|WASHINGMACHINES|(524911,[4112,382...|  1.0|
|LEEF IBRIDGE MOBI...|PC__1100COMPUTING...|     USB MEMORY|(524911,[22790,45...|  0.0|
|SANDISK 32GB ULTR...|               W1370|     USB MEMORY|(524911,[45908,12...|  0.0|
+--------------------+--------------------+---------------+--------------------+-----+
only showing top 3 rows



### SPLITING FOR TRAINING AND TESTING

In [87]:
# set seed for reproducibility and split data for training and evaluating
(trainingData, testData) = datax.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 5654
Test Dataset Count: 2335


### LOGISTIC REGRESSION

In [88]:
#Logistic Regression training and prediction
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.select("main_text","add_text","product_group","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+-------------+------------------------------+-----+----------+
|                     main_text|                      add_text|product_group|                   probability|label|prediction|
+------------------------------+------------------------------+-------------+------------------------------+-----+----------+
|USB3.0-STICK 32GB INTENSO S...|                    USB-STICKS|   USB MEMORY|[0.998670474339512,2.509527...|  0.0|       0.0|
|7244919 - 7244919 - USB3 FL...|CATEGORY 5__PC PERIPHERALS_...|   USB MEMORY|[0.9967433922962178,0.00110...|  0.0|       0.0|
|USB2.0-STICK 16GB DATATRAVE...|                    USB-STICKS|   USB MEMORY|[0.9951644212654676,0.00145...|  0.0|       0.0|
|USB2.0 STICK, 8GB USB-SPEIC...|061005__COMPUTER-/INFORMATI...|   USB MEMORY|[0.9916429671615415,0.00442...|  0.0|       0.0|
|USB STICK JETFLASH 880 OTG ...|   STORAGE/MEDIA/USBKEY/USBKEY|   USB MEMORY|[0.9905120334855472,0.00345...|  0.0|    

In [89]:
#delete previous model serialized if exists
dirpath = os.path.join("lg_mode_path")
if os.path.exists(dirpath) and os.path.isdir(dirpath):
    shutil.rmtree(dirpath)

In [90]:
#save model to disk
lrModel.save("lg_mode_path")

In [91]:
#load the model from disk
evaluatorlg = LogisticRegressionModel.load("lg_mode_path")

In [92]:
#get metrics of evaluation
evaluatorlg = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
metrics=evaluatorlg.evaluate(predictions)

In [93]:
print("Accuraccy: {}".format(metrics))

Accuraccy: 0.9974304068522484


In [94]:
#read completelly new data
Newdata=spark.read.csv("New_Data.csv", inferSchema=True,sep=';') # Now no Headers be careful

In [95]:
Newdata.show(3)

+--------+--------------+--------------------+--------------------+----+
|     _c0|           _c1|                 _c2|                 _c3| _c4|
+--------+--------------+--------------------+--------------------+----+
|62217839|CONTACT LENSES|ACUVUE OASYS FOR ...|              LINSEN| J&J|
|62224046|CONTACT LENSES|SOFLENS TORIC6 ST...|              LINSEN| B&L|
|67407315|      BICYCLES|227445  CITYRAD D...|112100__FASHION &...|null|
+--------+--------------+--------------------+--------------------+----+
only showing top 3 rows



In [96]:
Newdata=Newdata.withColumnRenamed("_c0","ID").withColumnRenamed("_c1","product_group").withColumnRenamed("_c2","main_text")\
    .withColumnRenamed("_c3","add_text").withColumnRenamed("_c4","manufacturer")

In [97]:
Newdata=Newdata.select('product_group','main_text','add_text','manufacturer') # ID doesnt add any value

In [98]:
# remove NULL because cause a lot of issues
Newdata=Newdata.fillna({'manufacturer':"NO_Manufacturer"})
Newdata=Newdata.fillna({'main_text':"NO_TEXT"})

In [99]:
#transform the imput data with same pipeline to feed the estimator for predictions
data_transformer2 = data_prep_pipe.fit(Newdata)

In [100]:
data_new = data_transformer.transform(Newdata)

In [101]:
data_new.printSchema()

root
 |-- product_group: string (nullable = true)
 |-- main_text: string (nullable = false)
 |-- add_text: string (nullable = true)
 |-- manufacturer: string (nullable = false)
 |-- main_text_t: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- main_text_f: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures1: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- add_text_t: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- add_text_f: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures2: vector (nullable = true)
 |-- tf_idf2: vector (nullable = true)
 |-- manufacturer_C: double (nullable = false)
 |-- manufacture_1H: vector (nullable = true)



In [102]:
# CREATE FEATURE VECTORS
dataout_new=clean_up.transform(data_new)

In [103]:
dataout_new.printSchema()

root
 |-- product_group: string (nullable = true)
 |-- main_text: string (nullable = false)
 |-- add_text: string (nullable = true)
 |-- manufacturer: string (nullable = false)
 |-- main_text_t: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- main_text_f: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures1: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- add_text_t: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- add_text_f: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures2: vector (nullable = true)
 |-- tf_idf2: vector (nullable = true)
 |-- manufacturer_C: double (nullable = false)
 |-- manufacture_1H: vector (nullable = true)
 |-- features: vector (nullable = true)



In [104]:
# PREDICTING MULTICLASS
prediction2=lrModel.transform(dataout_new)

In [105]:
#LISTING PREDICTIONS
prediction2.select("main_text","add_text","product_group","probability","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+---------------+------------------------------+----------+
|                     main_text|                      add_text|  product_group|                   probability|prediction|
+------------------------------+------------------------------+---------------+------------------------------+----------+
|USB 3.0 S25 32G MORADO USB ...|         MEMORIAS USB __ 32 GB|     USB MEMORY|[0.9316143969863324,0.02388...|       0.0|
|CREME ECHO SOLO 16-SPEED WHITE|FAHRRAEDER>>RENNRAEDER>>REN...|       BICYCLES|[0.1076267302129298,0.10542...|       2.0|
|TOPSY KINDERRAD 12 1/2 POLA...|H006W0792344__WERKZEUG_AUTO...|       BICYCLES|[0.08226058786349069,0.1105...|       2.0|
|LAVAMAT 63479 FL A+++ WASCH...|                WASCHMASCHINEN|WASHINGMACHINES|[0.05164731497816811,0.8367...|       1.0|
|    SOFLENS TORIC6 STÜCKUNISEX|                        LINSEN| CONTACT LENSES|[0.0424872114889493,0.04947...|       3.0|
|    SOFLENS TORIC6 STÜC

In [106]:
#CREATE TEMPORARY VIEW FOR USE IN MEMORY
prediction2.createOrReplaceTempView('outcomes') # Category_label df1.join(df2, df1.id == df2.id).select('df1.*')

In [107]:
# JOIN WITH CATEGORIES DATASET
temp1=prediction2.join(datagroup, prediction2.prediction==datagroup.label ,how='left')\
.select(prediction2.product_group,prediction2.probability, datagroup.product_group)

In [108]:
# SHOW PREDICITIONS 100% ACCURACY
temp1.show(truncate=False)

+---------------+-----------------------------------------------------------------------------------+---------------+
|product_group  |probability                                                                        |product_group  |
+---------------+-----------------------------------------------------------------------------------+---------------+
|CONTACT LENSES |[0.036355251929029574,0.04204493244567242,0.03696971819952107,0.8846300974257769]  |CONTACT LENSES |
|CONTACT LENSES |[0.0424872114889493,0.049477380802978706,0.04336559964885834,0.8646698080592137]   |CONTACT LENSES |
|BICYCLES       |[0.027452775561929318,0.019966407270397828,0.9336363990752068,0.018944418092465883]|BICYCLES       |
|CONTACT LENSES |[0.036355251929029574,0.04204493244567242,0.03696971819952107,0.8846300974257769]  |CONTACT LENSES |
|CONTACT LENSES |[0.0424872114889493,0.049477380802978706,0.04336559964885834,0.8646698080592137]   |CONTACT LENSES |
|USB MEMORY     |[0.9316143969863324,0.02388848587956634

In [109]:
out=temp1.collect()

In [117]:
out[0][1]

DenseVector([0.0364, 0.042, 0.037, 0.8846])

In [113]:
max(out[0]['probability'])

0.88463009742577692

### NAIVES BAYES

In [82]:
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.select("main_text","add_text","product_group","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+-------------+------------------------------+-----+----------+
|                     main_text|                      add_text|product_group|                   probability|label|prediction|
+------------------------------+------------------------------+-------------+------------------------------+-----+----------+
|USB  8GB 20/35 PREMIUM LINE...|          3__COMPUTER__STORAGE|   USB MEMORY|[1.0,9.484162951836256E-17,...|  0.0|       0.0|
|Speicher Kingston 8GB USB S...|                           354|   USB MEMORY|[1.0,8.512250015411041E-17,...|  0.0|       0.0|
|VERBATIM 49808 128GB V3 MAX...|PC__1100COMPUTINGMEMORY__11...|   USB MEMORY|[1.0,7.853676720109339E-17,...|  0.0|       0.0|
|*SANDISK 16GB CRUZER EDGE U...|                    USB-STICKS|   USB MEMORY|[1.0,5.1238276931385994E-17...|  0.0|       0.0|
|           VERBATIM MINI METAL|SPEICHER __ DATENTRAEGER/PC...|   USB MEMORY|[1.0,4.8765372743696744E-17...|  0.0|    

In [83]:
#delete previous model serialized if exists
dirpath = os.path.join("nb_mode_path")
if os.path.exists(dirpath) and os.path.isdir(dirpath):
    shutil.rmtree(dirpath)

In [84]:
model.save("nb_mode_path")

In [85]:
evaluatornb = NaiveBayesModel.load("nb_mode_path")

In [88]:
evaluatornb = MulticlassClassificationEvaluator(predictionCol="prediction")
metrics=evaluatornb.evaluate(predictions)

In [89]:
print("F1: {}".format(metrics))

F1: 0.9974307081346534
