# Youtube comments analysis project

In this notebook, we have a dataset of user comments for youtube videos related to animals or pets. I will attempt to identify cat or dog owners based on these comments, find out the topics important to them, and then identify video creators with the most viewers that are cat or dog owners.

The dataset provided are comments for videos related to animals and/or pets. The dataset is 240MB compressed (647MB decompressed); please download the file using this google drive link:
https://drive.google.com/file/d/1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a/view?usp=sharing

 The dataset file is comma separated, with a header line defining the field names, listed here:
- creator_name. Name of the YouTube channel creator.
- userid. Integer identifier for the users commenting on the YouTube channels.
- comment. Text of the comments made by the users.

Keep your code clean and efficient, with
enough documentation easily follow your train of thought. Summarize
the key results from each step. Explain how to execute your code from a command line
interface.

Step 1: Identify Cat And Dog Owners  
Find the users who are cat and/or dog owners.

Step 2: Build And Evaluate Classifiers  
Build classifiers for the cat and dog owners and measure the performance of the classifiers.

Step 3: Classify All The Users  
Apply the cat/dog classifiers to all the users in the dataset. Estimate the fraction of all users
who are cat/dog owners.

Step 4: Extract Insights About Cat And Dog Owners  
Find topics important to cat and dog owners.

Step 5: Identify Creators With Cat And Dog Owners In The Audience  
Find creators with the most cat and/or dog owners. Find creators with the highest statistically
significant percentages of cat and/or dog owners.

In [0]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import array_join, collect_list
from pyspark.ml.feature import RegexTokenizer, Word2Vec
from pyspark.ml import Pipeline

from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

from pyspark.ml.feature import IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml.clustering import LocalLDAModel
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [0]:
%sh
pip install googledrivedownloader

Collecting googledrivedownloader
  Downloading googledrivedownloader-0.4-py2.py3-none-any.whl (3.9 kB)
Installing collected packages: googledrivedownloader
Successfully installed googledrivedownloader-0.4
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.


In [0]:
# link: https://drive.google.com/file/d/1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a/view?usp=sharing
# need to install googledrivedownloader==0.4 on cluster beforehand

from google_drive_downloader import GoogleDriveDownloader as gdd
gdd.download_file_from_google_drive(file_id='1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a', dest_path='/FileStore/tables/animal_comments.gz')


Downloading 1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a into /FileStore/tables/animal_comments.gz... Done.


In [0]:
%sh
gunzip -k /FileStore/tables/animal_comments.gz
ls /FileStore/tables

animal_comments
animal_comments.gz


In [0]:
# move unzipped files from local space to DBFS
dbutils.fs.cp("file:/FileStore/tables/animal_comments", "dbfs:/FileStore/tables/animal_comments.csv")
#display(dbutils.fs.ls("file:/FileStore/tables"))
#display(dbutils.fs.ls("dbfs:/FileStore/tables"))

# tips on bash command in Databricks:
# %sh: access local space by default; if want to access DBFS need to add /dbfs/ in front of address explicitly
# %fs: access DBFS by default; if want to access local space need to add file:/ in front of address explicitly

## 0. Data Exploration and Cleaning

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/animal_comments"))

path,name,size
dbfs:/FileStore/tables/animal_comments,animal_comments,646719046


In [0]:
# load data into Spark Dataframe
df_clean=spark.read.csv("dbfs:/FileStore/tables/animal_comments.csv",inferSchema=True,header=True)

In [0]:
df_clean.count() 

Out[15]: 5820035

In [0]:
df_clean = df_clean.na.drop(subset=["comment"])
df_clean.count()

Out[8]: 5818984

In [0]:
df_clean.show()

+--------------------+------+-------------------------------------+
|        creator_name|userid|                              comment|
+--------------------+------+-------------------------------------+
|        Doug The Pug|  87.0|                 I shared this to ...|
|        Doug The Pug|  87.0|                   Super cute  😀🐕🐶|
|         bulletproof| 530.0|                 stop saying get e...|
|       Meu Zoológico| 670.0|                 Tenho uma jiboia ...|
|              ojatro|1031.0|                 I wanna see what ...|
|     Tingle Triggers|1212.0|                 Well shit now Im ...|
|Hope For Paws - O...|1806.0|                 when I saw the en...|
|Hope For Paws - O...|2036.0|                 Holy crap. That i...|
|          Life Story|2637.0|武器はクエストで貰えるんじゃないん...|
|       Brian Barczyk|2698.0|                 Call the teddy Larry|
|            The Dodo|2702.0|                   😐🤔😓😢😭😭😭😭😟|
|Hope For Paws - O...|2911.0|                 That mother cat l...|
|Hope For

We can see that after unzipped the original dataset is around 647MB, with 5,818,984 data points after dropping empty comments.  
A look on the data shows that there's a variety of comment contents. Some comments are not alphabetic characters, and some contains only emojis. In this project, I will only focus on alphanumerical comments.

### Label the data & Clarify the goal of training classifier
The most important and challenging part is labeling the dog/cat owners. Correct labels can greatly improve machine learning efficiency and save a lot of troubles in later process. Hereby I would like to first explain the idea of labeling and setting up goal and principles for later data splitting and training process.

In the original data there's no provided label, so I need to first **manually identify and label** some of the targets.  
Due to the extremely large dataset and limited manual and time resources, I decided to identify dog/cat owners by **searching explicit patterns** in the comments, such as 'I have a dog/cat'.  
However, it should be kept in mind that a dog/cat owner is unlikely to mention such pattern in every comment they makes, which means for different comments from the same user, the above searching will give different labels based on each *comment*, not based on each *user*!  

Therefore, I first concatenate comments from the same users together to identify and label on **user-based**.  


- An example of the above setting:

| userid | comment | label (comment-based) | 
| -|-|-|
| 7850 | My dog like this! | 1 |
|7850|Wow|0||

- After concatenation:

| userid | concatenated comment | label (user-based) |
| -|-|-|
| 7850 | My dog like this! Wow| 1 |


After such process, we will get label = 1 for **explicit dog/cat owners**, and others are label = 0. However, those with label = 0 include **implicit dog/cat owners** and **non-owners**, i.e. unknown status.  

Therefore, the goal of training classifiers is to **learn characteristic patterns from explicit users' comments, apply to the unknown users' comments and identify implicit dog/cat owners from non-owners**.  

Two things we need to notice for modeling and assumptions: 
1. The classifier is a **comment-based** model, because we don't want to conduct NLP on group concatenated comments, but on individual comments due to contextual concern (i.e. concatenation will make unrelevant comments become a paragraph when they are not). Thus we need to add user-based labels back to original dataset.  
2. There's a risk when there's **no implicit patterns** in comments as I assumed (though the chance is small in my opinion).  
That is, every dog/cat owners are boastful and always leaves exlicit comments, so their other comments don't contain such 'implicit' patterns for us to derive. If that happens, we won't learn what we believe to learn from the training process, and the classifier will identify unexpected type of users.



- An example of the modeling assumption:

| userid | comment | label (user-based) | prediction by classifier (comment-based) |
| -|-|-|
| 7850 | My dog like this! | 1 |1 (*explicit comment*)
|7850|I often buy this brand of dog food|1|1 (*implicit pattern by dog/cat owners we hope to learn*)|
| 0099| I often buy this brand of cat food|0|1 (*implicit pattern learned and applied*)
| 0099| Nice |0|0|

In [0]:
# find user with preference of dog and cat

# 1. concat comments from same users
df_concat = df_clean.groupBy('userid').agg(array_join(collect_list('comment'), delimiter=' ').alias('comment'))
#df_concat.show()

# 2. identify dog/cat owners by searching explicit comments
df_label = df_concat.withColumn("label", \
                           (when(col("comment").like("%my dog%"), 1) \
                           .when(col("comment").like("%I have a dog%"), 1) \
                           .when(col("comment").like("%my cat%"), 1) \
                           .when(col("comment").like("%I have a cat%"), 1) \
                           .when(col("comment").like("%my puppy%"), 1) \
                           .when(col("comment").like("%my pup%"), 1) \
                           .when(col("comment").like("%my kitty%"), 1) \
                           .when(col("comment").like("%my kitten%"), 1) \
                           .when(col("comment").like("%my pussy%"), 1) \
                           .otherwise(0)))


In [0]:
df_label.show()

+------+--------------------------------+-----+
|userid|                         comment|label|
+------+--------------------------------+-----+
|   7.0|            I love your chann...|    0|
|   8.0|            How about Macklam...|    0|
|  67.0|            tooooooooooo cute...|    0|
|  69.0|            me alegra leer to...|    0|
|  70.0|            wow Im surprised ...|    0|
| 108.0|            Its day two and C...|    0|
| 112.0|            😭😭😭😭😭😭 I lo...|    0|
| 124.0|            Damn why Kobe gon...|    0|
| 128.0|            первое и с дедом ...|    0|
| 142.0|            I loved the super...|    0|
| 147.0|            God youre pretty....|    0|
| 154.0|            Me gustó el de dó...|    0|
| 160.0|                        First !!|    0|
| 168.0|            Where is laika no...|    0|
| 169.0|저게 얼굴이 부은거라니..자괴감..|    0|
| 170.0|            That was really f...|    0|
| 180.0|            I also have a hus...|    0|
| 184.0|            Brave wilderness ...|    0|
| 191.0|   

In [0]:
#number of explicit owners
df_label[df_label.label==1].count()

Out[20]: 35866

In [0]:
# add user-based label back to original dataset
df_pre = df_clean.join(df_label, 'userid', 'left').select(df_clean.creator_name, df_clean.userid, df_clean.comment, df_label.label)


In [0]:
# number of comments by explicit owners
df_pre[df_pre.label==1].count()

Out[22]: 330455

In [0]:
# ratio of such comments in total dataset
330455/5818984

Out[1]: 0.05678912332462162

We can see that only 5.7% comments are made by explicit dog/cat owners.

In [0]:
df_pre.show()

+--------------------+------+--------------------+-----+
|        creator_name|userid|             comment|label|
+--------------------+------+--------------------+-----+
|    Brave Wilderness|   7.0|I love your chann...|    0|
|       Brian Barczyk|   8.0|How about Macklam...|    0|
|            The Dodo|  67.0|tooooooooooo cute...|    0|
|    Brave Wilderness|  67.0|Who ever subscrib...|    0|
|    Brave Wilderness|  67.0|Yo come to Trinid...|    0|
|             ViralBe|  67.0|Animals are not r...|    0|
|         Info Marvel|  69.0|me alegra leer to...|    0|
|           Vet Ranch|  70.0|wow Im surprised ...|    0|
|           Vet Ranch|  70.0|and anyone else n...|    0|
|  Taylor Nicole Dean| 108.0|Its day two and C...|    0|
|Hope For Paws - O...| 112.0|😭😭😭😭😭😭 I lo...|    0|
|Hope For Paws - O...| 112.0|            😭😭😭😭|    0|
|              ojatro| 124.0|Damn why Kobe gon...|    0|
|          Real Shock| 128.0|первое и с дедом ...|    0|
|       Brian Barczyk| 142.0|I loved the 

In [0]:
# save labelled dataset
df_pre.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/tables/df_pre.csv")

In [0]:
# load labelled dataset (this block is just for demonstration)
sparkDF = spark.read.csv("dbfs:/FileStore/tables/df_pre.csv", header="true", inferSchema="true")
sparkDF.show()

+--------------------+------+--------------------+-----+
|        creator_name|userid|             comment|label|
+--------------------+------+--------------------+-----+
|    Brave Wilderness|   7.0|I love your chann...|    0|
|       Brian Barczyk|   8.0|How about Macklam...|    0|
|            The Dodo|  67.0|tooooooooooo cute...|    0|
|    Brave Wilderness|  67.0|Who ever subscrib...|    0|
|    Brave Wilderness|  67.0|Yo come to Trinid...|    0|
|             ViralBe|  67.0|Animals are not r...|    0|
|         Info Marvel|  69.0|me alegra leer to...|    0|
|           Vet Ranch|  70.0|wow Im surprised ...|    0|
|           Vet Ranch|  70.0|and anyone else n...|    0|
|  Taylor Nicole Dean| 108.0|Its day two and C...|    0|
|Hope For Paws - O...| 112.0|😭😭😭😭😭😭 I lo...|    0|
|Hope For Paws - O...| 112.0|            😭😭😭😭|    0|
|              ojatro| 124.0|Damn why Kobe gon...|    0|
|          Real Shock| 128.0|первое и с дедом ...|    0|
|       Brian Barczyk| 142.0|I loved the 

## 1. Data preprocessing

### Tokenization & Word2Vec embedding
For natural language data, we need to apply tokenization and word embedding methods to transform text into numbers. In Spark, the Word2Vec algorithm will generate a weight vector for each word based on its neighbouring words, and then transform the original comments by taking average of all word vectors in the comment. 

Why I didn't use pretrained word2vec features? Because it may contains unnecessary noise from other corpus [as suggested in this tutorial](https://medium.com/@dcameronsteinke/tf-idf-vs-word-embedding-a-comparison-and-code-tutorial-5ba341379ab0).

The first thing I need to consider is whether to split train and test set for fitting Word2Vec vectors.   
The reasons why I decided **fit on full dataset**: 
1. This project use offline data, that is, when we apply classifier on 'future' data, we actually apply on a subset of data with label = 0 from previous labeling steps (See Dataset Split part below for more details). Therefore, it doesn't matter if the Word2Vec here is trained using full data, because it's just a method to transform text into numbers.   
Also, it's very likely that some comments in the future data already appears in the historical data, so such 'information leakage' should not be critical.
2. By its nature, Word2Vec serve as a method of word embedding in NLP, so it should not affect later model training by having information leakage problem. Word2Vec is considered an 'unsupervised' algorithm, so at least during its training, it is not typical to hold back any 'test' data for later evaluation. 

The second thing I need to consider is whether to apply some traditional NLP methods like stemming, lemmatization or stopwords removal before fitting Word2Vec algorithm.  
The reasons why I chose **not to apply such actions**: 
1. According to Kaggle site, to train Word2Vec it is better not to remove stop words because the algorithm relies on the broader context of the sentence in order to produce high-quality word vector.
3. It's said that Word2Vec will automatically reduce the weights on frequent words.
1. Since the dataset is really large, it's not time-efficient to run stemming or lemmatization before Word2Vec. Lemmatization is more complicated since it need to refer to a dictionary. Also, it's said that when data is large enough, it's okay to perform word embedding without stemming.  
4. *Though if have enough computation power and resources, it's suggested to try both ways and compare their performance. NLP is highly dependent on each project's purpose so there's no universal act that is optimal in all situations.*

Therefore, I simply applied **alphanumerical character filter, slice by space and lowercase transformation** before training Word2Vec.

In [0]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="words", pattern="\\W")
#\W matches any non-word characters (short for [^a-zA-Z0-9_]).

word2Vec = Word2Vec(inputCol="words", outputCol="features",seed=0, numPartitions=1000)
# a larger numPartitions will affect accuracy of performance but reduce time for training by parallel computaion

In [0]:
pipeline = Pipeline(stages=[regexTokenizer, word2Vec])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df_pre)
dataset = pipelineFit.transform(df_pre)

#save dataset
dataset.write.mode("overwrite").save("dbfs:/FileStore/tables/dataset")

In [0]:
# reload saved dataset
dataset=spark.read.format('delta').load("dbfs:/FileStore/tables/dataset")
dataset.show()

+--------------------+------+--------------------+-----+--------------------+--------------------+
|        creator_name|userid|             comment|label|               words|            features|
+--------------------+------+--------------------+-----+--------------------+--------------------+
|    Brave Wilderness|   7.0|I love your chann...|    0|[i, love, your, c...|[0.00309866814563...|
|       Brian Barczyk|   8.0|How about Macklam...|    0|[how, about, mack...|[-0.0434638609488...|
|            The Dodo|  67.0|tooooooooooo cute...|    0|[tooooooooooo, cu...|[0.12398493289947...|
|    Brave Wilderness|  67.0|Who ever subscrib...|    0|[who, ever, subsc...|[0.02306891850788...|
|    Brave Wilderness|  67.0|Yo come to Trinid...|    0|[yo, come, to, tr...|[0.01056309863924...|
|             ViralBe|  67.0|Animals are not r...|    0|[animals, are, no...|[-0.0442808867217...|
|         Info Marvel|  69.0|me alegra leer to...|    0|[me, alegra, leer...|[-0.1325184987702...|
|         

In [0]:
# test on training time
#pipelineFit = pipeline.fit(df_pre.limit(800))

In [0]:
#test on transforming time
#dataset = pipelineFit.transform(df_pre.limit(800))

### Dataset split
Based on labels of explicit (label=1) and unknown owners (label=0), the data should be divided into three parts: **train data, test data and apply data**.  
The train and test data should be obtained by **stratified sampling on explicit owners and unknown owners**, and apply data **only need unknown owners**, since we don't need to train or evaluate but simply apply our classifier.  
This is also why we need to first do user-based label before splitting.  
Note: Here I made the *naive assumption* that explicit owners will remain as dog/cat owner (i.e. their status won't change in the future). If consider status changing, the project will be too complicated for now (See Part 3 for status changing concerns).  

When splitting the dataset in this project, several things need to be taken into account:
- Goal of modeling  
Remember, the goal of training classifiers is to learn characteristic patterns from explicit users' comments, apply to the unknown users' comments and identify implicit dog/cat owners from non-owners.  
Therefore, **each comment** should be the unit of observation, instead of user.
- Unit of split  
Based on the goal of modeling and the above principle for splitting, I split the **comments** into train, test, and apply set based on their labels.
- Stratified sampling  
Since there're much less explicit owners than unknown owners, and thus much less comments with label=1 than label=0, this is an imbalanced classification problem and need stratified sampling.  
Below I used **downsampling** method to downsample the comments by unknown owners to achieve a balanced dataset for training and testing.

In [0]:
(lable1_train,lable1_test)=dataset.filter(col('label')==1).randomSplit([0.7, 0.3],seed = 100)
(lable0_train, lable0_rest)=dataset.filter(col('label')==0).randomSplit([0.05, 0.95],seed = 100)
(lable0_test, lable0_apply)=lable0_rest.randomSplit([0.02, 0.98],seed = 100)

In [0]:
trainingData = lable1_train.union(lable0_train)
testData=lable1_test.union(lable0_test)

In [0]:
print("Dataset Count: " + str(dataset.count()))
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Dataset Count: 5818984


In [0]:
# check ratio of positive and negative class in training set
print("Training Dataset positive label: " + str(trainingData[trainingData.label==1].count()))
print("Training Dataset negative label: " + str(trainingData[trainingData.label==0].count()))

Training Dataset positive label: 231071
Training Dataset negative label: 274117


In [0]:
# class ratio in train set
231071/(231071+274117)

Out[14]: 0.45739605849703474

In [0]:
# check ratio of positive and negative class in test set
print("Test Dataset positive label: " + str(testData[testData.label==1].count()))
print("Test Dataset negative label: " + str(testData[testData.label==0].count()))

Test Dataset positive label: 99384
Test Dataset negative label: 103516


In [0]:
# class ratio in test set
99384/(99384+103516)

Out[15]: 0.4898176441596846

After splitting, 505,188 data points in training set, and 202,900 in test set, each has roughly balanced labels.  
The remaining 5,110,896 data points will be applied the classifier built.

## 2. Build the classifier 

In this part, I will train and tune supervised machine learning models: Logistic regression and Random Forest with 3-fold cross validation, and choose the model with best performance on test set for applying to full data.  
For evaluation, I use the default AUC by Spark BinaryClassificationEvaluator. AUC ranges from 0 to 1, the closer to 1 the better.

### LogisticRegression

In [0]:
lr_model = LogisticRegression(featuresCol='features', labelCol='label', maxIter=5)
grid = ParamGridBuilder() \
    .addGrid(lr_model.regParam, [0.1, 0.01]) \
    .addGrid(lr_model.elasticNetParam, [1, 0]) \
    .build()
 
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr_model, estimatorParamMaps=grid, evaluator=evaluator,numFolds=3)

In [0]:
lr_trained_model = cv.fit(trainingData)




In [0]:
lr_BestModel_balanced = lr_trained_model.bestModel

In [0]:
# Prediction and accuracy in training set
lr_preds_train_b = lr_BestModel_balanced.transform(trainingData)
evaluator.evaluate(lr_preds_train_b)

Out[19]: 0.7124774193213037

In [0]:
# Prediction and accuracy in test set
lr_preds_b = lr_BestModel_balanced.transform(testData)
evaluator.evaluate(lr_preds_b)

Out[20]: 0.7126820036534212

In [0]:
best_params = lr_trained_model.getEstimatorParamMaps()[np.argmin(lr_trained_model.avgMetrics)]
print('**Best model**')
for i,j in best_params.items():
  print('  '+i.name+': '+str(j))

**Best ALS model**
  regParam: 0.1
  elasticNetParam: 1.0


In [0]:
#Save Logistic Regression model
lr_BestModel.save("dbfs:/FileStore/nlp/lr_models/")

In [0]:
lr_BestModel_balanced.save("dbfs:/FileStore/nlp/lr_models_b/")

In [0]:
#load logistic regression model
lr_BestModel = LogisticRegressionModel.load("dbfs:/FileStore/nlp/lr_models/")


I only trained and did very simple tuning on 3-fold CV, and it took over half an hour to complete the process.   
The best Logistic Regression model has regularization parameter = 0.1 and L1 penalty. The AUC on train set is 0.712, and 0.713 on test set.

### RandomForest

In [0]:
rf_model = RandomForestClassifier(labelCol="label", featuresCol="features",predictionCol="prediction", probabilityCol="probability",maxDepth=5, impurity="gini")
 
rf_grid = ParamGridBuilder() \
.addGrid(rf_model.numTrees, [10, 15,20]) \
.build()
 
 
rf_cv = CrossValidator(estimator=rf_model, estimatorParamMaps=rf_grid, evaluator=evaluator,numFolds=3)

In [0]:
rf_trained_model = rf_cv.fit(trainingData)

In [0]:
best_params = rf_trained_model.getEstimatorParamMaps()[np.argmin(rf_trained_model.avgMetrics)]
print('**Best model**')
for i,j in best_params.items():
  print('  '+i.name+': '+str(j))

**Best ALS model**
  numTrees: 10


In [0]:
# prediction on training set
rf_BestModel_b = rf_trained_model.bestModel
rf_preds_train_b = rf_BestModel_b.transform(trainingData)
evaluator.evaluate(rf_preds_train_b)

Out[33]: 0.7210195403393079

In [0]:
# prediction on test set
rf_preds_test_b = rf_BestModel_b.transform(testData)
evaluator.evaluate(rf_preds_test_b)


Out[34]: 0.7205561503266785

In [0]:

#Save Random Forest model
rf_BestModel_b.save("dbfs:/FileStore/nlp/random_forest_model_b/")

In [0]:
#load
rf_BestModel_b = RandomForestClassificationModel.load("dbfs:/FileStore/nlp/random_forest_model_b/")

Similarly, I trained a Random Forest model with simple tuning by 3-fold CV, and it took over 40 mins to complete the process.    
The best RF model has numTrees = 10. AUC is 0.721 on training set and test set.   
It's slightly better than best Logistic model, though if fine tuning in the future, the model may be further improved.

### Gradient boosting (not recommended)

Here I include the code for Gradient boosting as a reference, but it took too long to train (in my case, over 6 hours and it was still running).   
It makes sense because gradient boosting is sequentially trained. It may be just not practical to use on such big data.

In [0]:
from pyspark.ml.classification import GBTClassifier
gbt_model = GBTClassifier()
 
gbt_grid = ParamGridBuilder() \
.addGrid(gbt_model.maxIter, [30, 50]) \
.addGrid(gbt_model.maxDepth, [5, 10]) \
.build()
 
gbt_cv = CrossValidator(estimator=gbt_model, estimatorParamMaps=gbt_grid, evaluator=evaluator,numFolds=3)
 


In [0]:
#gbt_trained_model = gbt_cv.fit(trainingData)

In [0]:
#gbt_BestModel = gbt_trained_model.bestModel
#gbt_preds_train = gbt_BestModel.transform(trainingData)
#evaluator.evaluate(lr_preds_train)

In [0]:
#gbt_preds_test = gbt_BestModel.transform(testData)
#evaluator.evaluate(gbt_preds_test)

In [0]:
#gbt_BestModel.save("dbfs:/FileStore/nlp/Best_GBT_model/")

In [0]:
# I choose Random Forest over Logistic Regression
Best_model = rf_BestModel_b

## 3. Classify All The Users

In this part, I applied the best model from the previous part to full dataset and get predicted labels of dog/cat owners based on each comment. 

Since the prediction is comment-based (as in training process), there may be occasions when comments by the same user got different labels. Thus we need to somehow *translate* the *comments-based* results into *user-based* labels, because the goal is to find those dog/cat owners.  

Here's the rule/threshold I apply to classify users:  
Input: `label` (the manual label from part 0), `prediction` (comment based)  
Output: `user_label` 
1. if label = 1 then user_label = 1   
----Explicit owners should still be dog/cat owners 
2. else:  
if avg(predictions) > 0.5 then user_label = 1;  
else: user_label = 0.  
----Classify unknown owners by **majority vote**: if more than 50% of their comments are positive, classify the user to be positive


Concerns of the above method:
1. I only worked on offline dataset in this project, but if we want to apply the classifier to future data (new users or new comments by existing users), it's hard to **update** and evaluate the future performance.  
2. I assume the **owner status is constant** in this offline data for simplicity, but if the owener status change, how will the classifier perform, and how to evaluate?  

Specifically, **three situations** may occur: 
 1. User wasn't dog/cat owner before, but later they become dog/cat owner: does the **classifier correctly identify their comments in each stage**? Does the **rule/threshold on user_label correctly reflect that change** as well?  
 For example, if threshold is still 0.5, even if the user later have more positive predictions after becoming dog/cat owner, it may still take time to reach the 0.5 threshold if they have many negative predictions previously. 
 
 2. User was dog/owner before, but no longer later.   
 Similarly, there's problem of **correctly and timely identify the status change**.  
 
 3. Explicit comments updates in the future   
 If a non-owner user later make an explicit comment indicating they are dog/cat owner, how should the previous predictions be evaluated?   
 Does it means **they change status recently**, or does it means **the model didn't identify them correctly before**?    
 Also, the rule I applied have a naive assumption that explicit users won't change status, by always setting user_label = 1 once they have made an explicit comments. 
 
In short, considering status changing, the modeling and classifying process becomes much more complicated and hard to evaluate **without human raters**.

In [0]:
predictions = Best_model.transform(dataset)
evaluator.evaluate(predictions)

Out[7]: 0.7210898768259922

In [0]:
predictions.show()

+--------------------+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|        creator_name|userid|             comment|label|               words|            features|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|    Brave Wilderness|   7.0|I love your chann...|    0|[i, love, your, c...|[0.00309866814563...|[9.81949479909670...|[0.49097473995483...|       1.0|
|       Brian Barczyk|   8.0|How about Macklam...|    0|[how, about, mack...|[-0.0434638609488...|[11.9686486212221...|[0.59843243106110...|       0.0|
|            The Dodo|  67.0|tooooooooooo cute...|    0|[tooooooooooo, cu...|[0.12398493289947...|[10.2028199007031...|[0.51014099503515...|       0.0|
|    Brave Wilderness|  67.0|Who ever subscrib...|    0|[who, ever, subsc...|[0.02306891

In [0]:
predictions.createOrReplaceTempView("predictions")

In [0]:
dataset_pred = spark.sql("""
with cte as (select userid, avg(prediction) as avg
from predictions
group by 1)

select cte.userid, cte.avg, 
case when p.label = 1 then 1 
when cte.avg > 0.5 then 1
else 0 end as pred
from predictions p 
left join cte on p.userid = cte.userid
""")
dataset_pred.show()

+------+---+----+
|userid|avg|pred|
+------+---+----+
|   7.0|1.0|   1|
|   8.0|0.0|   0|
|  67.0|0.0|   0|
|  67.0|0.0|   0|
|  67.0|0.0|   0|
|  67.0|0.0|   0|
|  69.0|0.0|   0|
|  70.0|0.5|   0|
|  70.0|0.5|   0|
| 108.0|0.0|   0|
| 112.0|0.0|   0|
| 112.0|0.0|   0|
| 124.0|0.0|   0|
| 128.0|0.0|   0|
| 142.0|1.0|   1|
| 147.0|0.0|   0|
| 154.0|0.0|   0|
| 154.0|0.0|   0|
| 154.0|0.0|   0|
| 160.0|0.0|   0|
+------+---+----+
only showing top 20 rows



In [0]:
# estimate the fraction of dog/cat owners
print("Fraction of predicted dog/cat owners: "+ str(dataset_pred[dataset_pred.pred==1].count()/dataset_pred.count()))

Fraction of predicted dog/cat owners: 0.20399952981482677


In [0]:
# save classified dataset
dataset_pred.write.mode("overwrite").save("dbfs:/FileStore/tables/dataset_pred")

In [0]:
# reload saved dataset
dataset_pred=spark.read.format('delta').load("dbfs:/FileStore/tables/dataset_pred")
dataset_pred.show()

+------+---+----+
|userid|avg|pred|
+------+---+----+
|   7.0|1.0|   1|
|   8.0|0.0|   0|
|  67.0|0.0|   0|
|  67.0|0.0|   0|
|  67.0|0.0|   0|
|  67.0|0.0|   0|
|  69.0|0.0|   0|
|  70.0|0.5|   0|
|  70.0|0.5|   0|
| 108.0|0.0|   0|
| 112.0|0.0|   0|
| 112.0|0.0|   0|
| 124.0|0.0|   0|
| 128.0|0.0|   0|
| 142.0|1.0|   1|
| 147.0|0.0|   0|
| 154.0|0.0|   0|
| 154.0|0.0|   0|
| 154.0|0.0|   0|
| 160.0|0.0|   0|
+------+---+----+
only showing top 20 rows



## 4. Get insigts of Users

In this part, I conducted topic modeling on the comments from the predicted dog/cat owners from last part and got insights on their topics of interest.

Methods used：  
- stopwords removal  
Before conducting topic modeling, I included a removal of stop words in the preprocessing part. The reason is that in the topic modeling, we will look at the derived key words from TF-IDF, and we don't want meaningless stopwords to take up the top key words for each cluster.  
Also, since the dataset is really large, I didn't include stemming or lemmatization in preprocessing, but it's recommended to do so if have enough computing power or time to run.

- TF-IDF  
The reason why I fit data in TF-IDF instead of using Word2Vec embedding from previous part is that TF-IDF is more suitable for topic modeling as it takes into account for term frequency and document frequency, and can extract key words for analzying topic clusters. Word2Vec features are more suitable for other tasks like finding similar words or modeling.  
In this part, I use the Spark methods of CounterVectorizer for TF instead of hashingTF, because CounterVectorizer can extract word lists for later analysis, but hashingTF cannot reverse features back to words. 

- LDA  
LDA is a common unsupervised clustering method for topic modeling. Here I use the Spark LDA method to fit the data.  
Notice that I only use randomly selected 50% of data here due to running time concern. It took approximately 6 hours to fit the LDA on half of the data.  

Further improvement:  
- hyperparameters tuning  
For CounterVectorizier, I used the default vocabSize = 262144.   
For LDA, I chose number of cluster = 10 and maxIter = 10 based on the results on small sample data and running time concern.   
Further improvement may involve better tuning on relevant hyperparameters to achieve better results.

- more preprocessing process  
As stated above, I only included stop words removal for preprocessing after tokenization and lowercase transformation.  
There are many more process like stemming, lemmatization, ngrams, etc. that can improved the results.

- more clustering methods   
If interested, other clustering methods like KMeans can also be used and compared to the results from LDA to ensure robustness and/or accuracy.

In [0]:
dataset.createOrReplaceTempView("dataset")
dataset_pred.createOrReplaceTempView("dataset_pred")

In [0]:
# filter dog/cat owners' comments
owners = spark.sql("""
select a.creator_name, a.userid, a.comment, a.words, a.features, b.pred
from dataset a 
left join dataset_pred b on a.userid = b.userid
where b.pred = 1
""")
owners.show()

+--------------------+-------+--------------------+--------------------+--------------------+----+
|        creator_name| userid|             comment|               words|            features|pred|
+--------------------+-------+--------------------+--------------------+--------------------+----+
|          TobyTurner|  299.0|This is so beauti...|[this, is, so, be...|[0.07589078632493...|   1|
|        Paws Channel| 2862.0|Good on him poor ...|[good, on, him, p...|[-0.0531695816045...|   1|
|           MonkeyBoo| 4066.0|Your Monkey is so...|[your, monkey, is...|[0.13709949925541...|   1|
|Worlds Fuzziest V...| 5360.0|The dog who ate t...|[the, dog, who, a...|[-0.0338870728388...|   1|
|Worlds Fuzziest V...| 5360.0|The dog who ate t...|[the, dog, who, a...|[-0.0338870728388...|   1|
|    Cole & Marmalade| 5360.0|I love animals bu...|[i, love, animals...|[-0.0315600565706...|   1|
|    Cole & Marmalade| 5360.0|I love animals bu...|[i, love, animals...|[-0.0315600565706...|   1|
|         

In [0]:
#save owners comments
owners.write.mode("overwrite").save("dbfs:/FileStore/tables/owners")

In [0]:
# reload saved dataset
owners=spark.read.format('delta').load("dbfs:/FileStore/tables/owners")
owners.show()

+--------------------+-------+--------------------+--------------------+--------------------+----+
|        creator_name| userid|             comment|               words|            features|pred|
+--------------------+-------+--------------------+--------------------+--------------------+----+
|          TobyTurner|  299.0|This is so beauti...|[this, is, so, be...|[0.07589078632493...|   1|
|        Paws Channel| 2862.0|Good on him poor ...|[good, on, him, p...|[-0.0531695816045...|   1|
|           MonkeyBoo| 4066.0|Your Monkey is so...|[your, monkey, is...|[0.13709949925541...|   1|
|Worlds Fuzziest V...| 5360.0|The dog who ate t...|[the, dog, who, a...|[-0.0338870728388...|   1|
|Worlds Fuzziest V...| 5360.0|The dog who ate t...|[the, dog, who, a...|[-0.0338870728388...|   1|
|    Cole & Marmalade| 5360.0|I love animals bu...|[i, love, animals...|[-0.0315600565706...|   1|
|    Cole & Marmalade| 5360.0|I love animals bu...|[i, love, animals...|[-0.0315600565706...|   1|
|         

In [0]:
# using only half of the data to fit LDA
(owners, remains) = owners.randomSplit([0.50, 0.50],seed = 100)

In [0]:
# TF-IDF pipeline

remover = StopWordsRemover(inputCol="words", outputCol="stopwords_removed")
tf = CountVectorizer(inputCol='stopwords_removed',
                         outputCol='tf_features')
idf = IDF(inputCol="tf_features", outputCol="tfidf_features")

pipeline = Pipeline(stages=[remover, tf, idf])

model = pipeline.fit(owners)
tfidf_owners = model.transform(owners)
tfidf_owners.show()

+------------+---------+--------------------+--------------------+--------------------+----+--------------------+--------------------+--------------------+
|creator_name|   userid|             comment|               words|            features|pred|   stopwords_removed|         tf_features|      tfidf_features|
+------------+---------+--------------------+--------------------+--------------------+----+--------------------+--------------------+--------------------+
|        null|1281156.0|     15:18... me AF.|    [15, 18, me, af]|[-0.0605082670226...|   1|        [15, 18, af]|(220504,[899,1555...|(220504,[899,1555...|
|        null|1281156.0|     15:18... me AF.|    [15, 18, me, af]|[-0.0605082670226...|   1|        [15, 18, af]|(220504,[899,1555...|(220504,[899,1555...|
|        null|1281156.0|     15:18... me AF.|    [15, 18, me, af]|[-0.0605082670226...|   1|        [15, 18, af]|(220504,[899,1555...|(220504,[899,1555...|
|        null|1281156.0|     15:18... me AF.|    [15, 18, me, af

In [0]:
vocab = model.stages[1].vocabulary

def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = udf(get_words, T.ArrayType(T.StringType()))

In [0]:
# fit LDA
num_topics = 10
max_iter = 10
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='tfidf_features')
lda_model = lda.fit(tfidf_owners)

In [0]:
#save LDA model
local_model_path = "dbfs:/FileStore/NLP" + "/lda_local_model"
lda_model.save(local_model_path)

In [0]:
# reload LDA model
sameLocalModel = LocalLDAModel.load(local_model_path)

In [0]:
# extract the top key words in the cluster
num_top_words = 10
topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=100)

+-----+-----------------------------------------------------------------------+
|topic|                                                             topicWords|
+-----+-----------------------------------------------------------------------+
|    0|     [people, animals, thank, god, like, life, animal, dogs, love, get]|
|    1|      [horse, max, horses, dont, like, one, get, dogs, think, cuteness]|
|    2|[3, adorable, love, little, beautiful, cat, robin, kitty, thank, hello]|
|    3|[cute, happy, great, like, video, looks, love, birthday, funny, really]|
|    4|                 [aww, love, cute, lol, boo, ha, loki, mr, loves, like]|
|    5|            [dont, love, people, get, like, know, make, time, want, im]|
|    6|          [love, like, really, nice, good, cat, day, looks, see, awwww]|
|    7|            [max, thank, sweet, good, love, happy, cats, n, dan, bless]|
|    8|     [val, amazing, kevin, joseph, good, awesome, mom, get, one, great]|
|    9|         [video, brian, im, love,

### Results

Here I present the top 10 words in each of the 10 clusters derived by LDA.  
The words are not easy to derive meaning since I didn't conduct stemming or tune the number of cluster.  

Based on the above keywords, I would like to summarize the 10 most frequent topics for dog/cat owners probably are: horses (topic 1), cat (topic 2), dog (topic 0, 1, 9), animals in life (topic 0, 3, 4, 6), and their interactions with people/creators (topic 5, 7, 8, 9).

## 5. Identify Creators With Cat And Dog Owners In The Audience
Here I found the top 100 creators with the most cat/dog owners count in their audience, as well as top 100 creators from a combined ranking by ranking of count plust ranking of percentage.

In [0]:
# numbers rank top 100
most_creators = spark.sql("""
select creator_name, count, 
dense_rank() over(order by count desc) as count_rank, 
percentage,
dense_rank() over(order by percentage desc) as percentage_rank 
from 
(select creator_name, count(distinct case when pred=1 then a.userid end) as count, round(count(distinct case when pred=1 then a.userid end)/count(distinct a.userid)*100,2) as percentage
from dataset a 
left join dataset_pred b on a.userid = b.userid
group by 1) foo
order by count_rank
limit 100

""")
most_creators.show()

+--------------------+-----+----------+----------+---------------+
|        creator_name|count|count_rank|percentage|percentage_rank|
+--------------------+-----+----------+----------+---------------+
|    Brave Wilderness|51361|         1|      8.08|           1114|
|            The Dodo|49942|         2|     29.94|            416|
|  Taylor Nicole Dean|28710|         3|      21.2|            635|
|Hope For Paws - O...|27983|         4|      30.5|            403|
|           Vet Ranch|19474|         5|     29.44|            431|
|        Robin Seplut|17825|         6|     38.21|            270|
|     Gohan The Husky|17745|         7|     27.47|            473|
|       Brian Barczyk|17558|         8|     12.88|            926|
|            ViralHog|13977|         9|     19.73|            683|
|    Cole & Marmalade|12819|        10|     40.21|            235|
|     Viktor Larkhill|11611|        11|     38.62|            265|
|          stacyvlogs|10751|        12|     40.23|            

In [0]:
# percentage rank 100
most_creators_precentage = spark.sql("""

select creator_name, count, 
dense_rank() over(order by count desc) as count_rank, 
percentage,
dense_rank() over(order by percentage desc) as percentage_rank 
from 
(select creator_name, count(distinct case when pred=1 then a.userid end) as count, round(count(distinct case when pred=1 then a.userid end)/count(distinct a.userid)*100,2) as percentage
from dataset a 
left join dataset_pred b on a.userid = b.userid
group by 1) foo
order by count_rank + percentage_rank
limit 100

""")
most_creators_precentage.show()

+--------------------+-----+----------+----------+---------------+
|        creator_name|count|count_rank|percentage|percentage_rank|
+--------------------+-----+----------+----------+---------------+
|          MaxluvsMya| 1834|        49|     58.04|             47|
|Zak Georges Dog T...| 7012|        16|     52.31|             87|
|         FROSTY Life| 1237|        70|     55.87|             57|
|        Daily Monkey|  521|       119|     65.45|             19|
|           BrookIvy3|  532|       116|     60.94|             34|
|    SlideShow ForFun| 5738|        20|     46.47|            146|
|    Lennon The Bunny| 3059|        32|     47.15|            139|
|      Jackson Galaxy| 1086|        75|     51.67|             99|
|  Think Like A Horse| 6506|        18|     45.46|            158|
|       Cat Man Chris| 3827|        26|     45.65|            157|
|      Floppycats.com|  355|       158|     63.39|             26|
|       Frozen Kitten|  932|        82|      50.3|            

## 6. Analysis and Future work

In this project, I completed the following steps:

Step 1: Identify Cat And Dog Owners  
Find the users who are cat and/or dog owners.  
- Challenges: manually labeling on user-based vs comment-based.
- Improvements: better labeling methods or get labeled data.

Step 2: Build And Evaluate Classifiers  
Build classifiers for the cat and dog owners and measure the performance of the classifiers.  
* Challenges: modeling on comment-based labels; carefully split dataset; stratified sampling; word2vec embedding; classification model selection.
- Improvements: try other classification methods; tuning relevant hyperparameters.

Step 3: Classify All The Users  
Apply the cat/dog classifiers to all the users in the dataset. Estimate the fraction of all users
who are cat/dog owners.  
- Challenges: majority vote to get user-based predictions; assumption and concern on dog/cat ownership status change.
- Improvements: alternative assumptions and taking into accounts of status change.

Step 4: Extract Insights About Cat And Dog Owners  
Find topics important to cat and dog owners.
- Challenges: preprocessing methods; CounterVectorizer + IDF method to do TF-IDF in Spark; LDA fitting.
- Improvements: more preprocessing methods; topic number selection; try other clustering methods; tuning relevant hyperparameters.

Step 5: Identify Creators With Cat And Dog Owners In The Audience  
Find creators with the most cat and/or dog owners. Find creators with the highest statistically
significant percentages of cat and/or dog owners.
- Challenges: set reasonable ranking rule.
- Improvements: more suitable ranking rules for specific use cases; considering statistically significant percentage.