In [0]:
#We will be building a text classifier using various techniques
#As we are using Databricks Community version, we are limited by resouces, and as such, fail to run BERT and ELMO
#We will be utilizing MLFlow to track the various performances of these techniques
#Dataset is from National Data Science Challenge on Kaggle

In [0]:
train_df = spark.read.parquet('dbfs:/user/hive/warehouse/products_train')
display(train_df)

In [0]:
from pyspark.sql.functions import col

display(train_df.groupBy("Category") \
            .count() \
            .orderBy(col("count").desc()))


Category,count
3,81250
18,56598
5,55279
4,42688
25,33922
26,33845
35,30590
32,29754
1,28670
31,27836


In [0]:
display(train_df.groupBy("Main_Category") \
            .count() \
            .orderBy(col("count").desc()))

Main_Category,count
beauty,286583
fashion,219702
mobile,160330


In [0]:
test_df = spark.read.parquet('dbfs:/user/hive/warehouse/products_test')

display(test_df)

In [0]:
display(test_df.groupBy("Main_Category") \
            .count() \
            .orderBy(col("count").desc()))

Main_Category,count
beauty,76545
fashion,55440
mobile,40417


In [0]:
from pyspark.ml import Pipeline

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# use sklearn to evalute the results on test dataset
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score

In [0]:
# Spark NLP Pipeline
# convert text column to spark nlp document
document_assembler = DocumentAssembler()\
  .setInputCol("title")\
  .setOutputCol("document")

# convert document to array of tokens
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
 
# clean tokens 
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
  .setInputCols("normalized")\
  .setOutputCol("cleanTokens")\
  .setCaseSensitive(False)

# stems tokens to bring it to root form
stemmer = Stemmer() \
  .setInputCols(["cleanTokens"]) \
  .setOutputCol("stem")

# Convert custom document structure to array of tokens.
finisher = Finisher() \
  .setInputCols(["stem"]) \
  .setOutputCols(["token_features"]) \
  .setOutputAsArray(True) \
  .setCleanAnnotations(False)

# To generate Term Frequency
hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)

# To generate Inverse Document Frequency
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

# convert labels (string) to integers. Easy to process compared to string.
label_stringIdx = StringIndexer(inputCol = "Main_Category", outputCol = "label")

# To convert index(integer) to corresponding class labels
label_to_stringIdx = IndexToString(inputCol="label", outputCol="class")

use = UniversalSentenceEncoder.pretrained()\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")


tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][OK!]


### Prepare Data

In [0]:
pipeline = Pipeline(
  stages = [document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            label_stringIdx])

# fit pipeline to training data
df_fit = pipeline.fit(train_df)
trainData = df_fit.transform(train_df)

# fit pipeline to testing data
df_fit = pipeline.fit(test_df)
testData = df_fit.transform(test_df)

In [0]:
print("Training Dataset Count: " + str(trainData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 666615
Test Dataset Count: 172402


### Train a Logistic Regression Classifier

In [0]:
# define a simple Multinomial logistic regression model. 
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.0)
lrModel = lr.fit(trainData)
lr_preds = lrModel.transform(testData)

preds_df = lr_preds.select('label', 'prediction').toPandas()

print(classification_report(preds_df.label, preds_df.prediction))
print('Accuracy Score: ' + str(accuracy_score(preds_df.label, preds_df.prediction)))
print('F1 Score: ' + str(f1_score(preds_df.label, preds_df.prediction, average='weighted')))

              precision    recall  f1-score   support

         0.0       0.97      0.99      0.98     76545
         1.0       0.99      0.98      0.99     55440
         2.0       0.98      0.95      0.97     40417

    accuracy                           0.98    172402
   macro avg       0.98      0.98      0.98    172402
weighted avg       0.98      0.98      0.98    172402

Accuracy Score: 0.9791243721070522
F1 Score: 0.9791119267671589


In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
evaluator.evaluate(lr_preds)

Out[11]: 0.9791119267671589

### Naive Bayes

In [0]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
nbModel = nb.fit(trainData)
nb_preds = nbModel.transform(testData)

preds_df = nb_preds.select('label', 'prediction').toPandas()

print(classification_report(preds_df.label, preds_df.prediction))
print('Accuracy Score: ' + str(accuracy_score(preds_df.label, preds_df.prediction)))
print('F1 Score: ' + str(f1_score(preds_df.label, preds_df.prediction, average='weighted')))

              precision    recall  f1-score   support

         0.0       0.98      0.97      0.97     76545
         1.0       0.97      0.99      0.98     55440
         2.0       0.97      0.97      0.97     40417

    accuracy                           0.97    172402
   macro avg       0.97      0.98      0.97    172402
weighted avg       0.97      0.97      0.97    172402

Accuracy Score: 0.9748262781174233
F1 Score: 0.9748130360294088


In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
evaluator.evaluate(nb_preds)

Out[13]: 0.9748130360294089

### Random Forest Classifier

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)


rfModel = rf.fit(trainData)
rf_preds = rfModel.transform(testData)

preds_df = rf_preds.select('label', 'prediction').toPandas()

print(classification_report(preds_df.label, preds_df.prediction))
print('Accuracy Score: ' + str(accuracy_score(preds_df.label, preds_df.prediction)))
print('F1 Score: ' + str(f1_score(preds_df.label, preds_df.prediction, average='weighted')))

              precision    recall  f1-score   support

         0.0       0.66      1.00      0.80     76545
         1.0       1.00      0.81      0.90     55440
         2.0       1.00      0.28      0.44     40417

    accuracy                           0.77    172402
   macro avg       0.89      0.70      0.71    172402
weighted avg       0.85      0.77      0.74    172402

Accuracy Score: 0.7713947634018167
F1 Score: 0.7444652651123748


In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
evaluator.evaluate(rf_preds)

Out[15]: 0.7444652651123747

### Cross-Validation

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainData)

predictions = cvModel.transform(testData)

# Evaluate best model
evaluator.evaluate(predictions)

MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.
Out[16]: 0.9831521089877477

### Train a Deep Learning Model

### DL + WordEmbedding

In [0]:
# bring token to lemma form
lemma = LemmatizerModel.pretrained('lemma_antbnc') \
  .setInputCols(["cleanTokens"]) \
  .setOutputCol("lemma")

word_embeddings = WordEmbeddingsModel().pretrained() \
  .setInputCols(['document','lemma']) \
  .setOutputCol('embeddings') \
  .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
  .setInputCols(['document', 'embeddings']) \
  .setOutputCol('sentence_embeddings') \
  .setPoolingStrategy('Average')

# the classes/labels/categories are in category column
classifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("Main_Category")\
  .setMaxEpochs(5) \
  .setEnableOutputLogs(True)

# classifierdl pipeline
clf_pipeline = Pipeline(
  stages = [document_assembler,
            tokenizer,
            normalizer, 
            stopwords_cleaner,
            lemma, 
            word_embeddings,
            embeddingsSentence,
            classifierdl])

# fit the pipeline on training data
clfModel = clf_pipeline.fit(train_df)

# fit the pipeline on training data
clf_preds = clfModel.transform(test_df)

preds_df = clf_preds.select('Main_Category', 'class.result').toPandas()
preds_df['result'] = preds_df['result'].apply(lambda x: x[0])

print(classification_report(preds_df['Main_Category'], preds_df.result))
print('Accuracy Score: ' + str(accuracy_score(preds_df['Main_Category'], preds_df.result)))
print('F1 Score: ' + str(f1_score(preds_df['Main_Category'], preds_df.result, average='weighted')))

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][OK!]
              precision    recall  f1-score   support

      beauty       0.99      1.00      1.00     76545
     fashion       1.00      1.00      1.00     55440
      mobile       1.00      0.99      0.99     40417

    accuracy                           1.00    172402
   macro avg       1.00      1.00      1.00    172402
weighted avg       1.00      1.00      1.00    172402

Accuracy Score: 0.995655502836394
F1 Score: 0.9956539009284682


### DL + BERT Embedding

In [0]:

 #BERT Embeddings
word_embeddings = BertEmbeddings \
  .pretrained('bert_base_cased', 'en') \
  .setInputCols(['document','cleanTokens']) \
  .setOutputCol('embeddings')

embeddingsSentence = SentenceEmbeddings() \
  .setInputCols(['document', 'embeddings']) \
  .setOutputCol('sentence_embeddings') \
  .setPoolingStrategy('Average')

# the classes/labels/categories are in category column
classifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("Main_Category")\
  .setMaxEpochs(5) \
  .setEnableOutputLogs(True)

bert_pipeline = Pipeline(
  stages = [document_assembler,
            tokenizer,
            normalizer, 
            stopwords_cleaner,
            word_embeddings,
            embeddingsSentence,
            classifierdl])

# fit the pipeline on training data
bertModel = bert_pipeline.fit(train_df)

# fit the pipeline on training data
bert_preds = bertModel.transform(test_df)

preds_df = bert_preds.select('Main_Category', 'class.result').toPandas()
preds_df['result'] = preds_df['result'].apply(lambda x: x[0])

print(classification_report(preds_df['Main_Category'], preds_df.result))
print('Accuracy Score: ' + str(accuracy_score(preds_df['Main_Category'], preds_df.result)))
print('F1 Score: ' + str(f1_score(preds_df['Main_Category'], preds_df.result, average='weighted')))

### DL + Elmo Embeddings

In [0]:
 #elmo Embeddings
word_embeddings = ElmoEmbeddings \
  .pretrained('elmo', 'en') \
  .setInputCols(['document','lemma']) \
  .setOutputCol('embeddings')

elmo_pipeline = Pipeline(
  stages = [document_assembler,
            tokenizer,
            normalizer, 
            stopwords_cleaner,
            lemma, 
            word_embeddings,
            embeddingsSentence,
            classifierdl])

# fit the pipeline on training data
elmoModel = elmo_pipeline.fit(train_df)

# fit the pipeline on training data
elmo_preds = elmoModel.transform(test_df)

preds_df = elmo_preds.select('Main_Category', 'class.result').toPandas()
preds_df['result'] = preds_df['result'].apply(lambda x: x[0])

print(classification_report(preds_df['Main_Category'], preds_df.result))
print('Accuracy Score: ' + str(accuracy_score(preds_df['Main_Category'], preds_df.result)))
print('F1 Score: ' + str(f1_score(preds_df['Main_Category'], preds_df.result, average='weighted')))

### Export Pipeline

In [0]:
clfModel.write().overwrite().save('./clfmodel')

In [0]:
from dbmlModelExport import ModelExport

 remove old model file, if needed.
dbutils.fs.rm( "/tmp/ml_python_model_export/classifierdl_pipeline", recurse=True)

ModelExport.exportModel(clfModel, "/tmp/ml_python_model_export/classifierdl_pipeline")

In [0]:
 define the Logistic Regression nlp pipeline
lr_pipeline = Pipeline(
  stages = [document_assembler, 
            tokenizer,
            normalizer,
           stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            label_stringIdx,
            lr,
            label_to_stringIdx])

# fit the pipeline on training data
lrModel = lr_pipeline.fit(train_df)

# fit the pipeline on training data
lr_preds = lrModel.transform(test_df)