# Project Summary
### Overview : 
This project aims to build up classification models for the cat and dog owners from text comments. There are around 6,000,000 comments recorded in the dataset and the dataset itself is unlabeled. Each data row will have an userid, an text comment and belongs to one channel.

### Dataset : 
The dataset is acquired from GitHub. https://drive.google.com/file/d/1dZk9s3qi3WwZMlgcH8GA3sP2QfJWyB8c/view?usp=sharing

There are around 6,000,000 comments recorded in the dataset and the dataset itself is unlabeled. The dataset is comma separated, with a header line defining the attribute names, listed here:

  creator_name: Name of the Youtube channel creator;
  userid: Integer identifier for the users commenting on the Youtube channels;
  comment: Text of the comments made by the users.

### Details :
1. Searched specific terms which a cat/dog owner might have, and label those users as dog&cat owners. Also labeled users who don't have pets as those whose comments don't contain any specific terms. By this way, we turn our dataset into labeled ones and convert problem from unsupervised to supervised.
2. Use RegexTokenizer to tokenize text comments.

 RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. By default, the parameter “pattern” is used as delimiters to split the input text. Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.)

3. Use Word2Vec represent text features as vectors.
4. Train Logistic regression model and random forest model to classify audiences. Evaluation metrics are include: precision, recall, accuracy, AUC. Based on my analysis (I only randomly pick a small portion of dataset for training in order to speed process up) random forest outperforms LR model.
5. Use our trained model to predict all users in the dataset. And get statistical sense of how dog&cat owners' distribution. Extract word frequency to see related topics regards to those owner.

## 0. Data Exploration and Cleaning

In [3]:
df = spark.read.load("/FileStore/tables/animals_comments.csv", format='csv', header = True, inferSchema = True)
df.show(10)

In [4]:
df.dtypes

In [5]:
print("Number of rows in df:", df.count())

In [6]:
# Count null values in each columns 
print('Number of null values in creator_name: ',df.filter(df['creator_name'].isNull()).count())
print('Number of null values in userid: ',df.filter(df['userid'].isNull()).count())
print('Number of null values in comment: ',df.filter(df['comment'].isNull()).count())

In [7]:
#drop out rows with no comments and no userid
def pre_process(df):
  df_drop = df.filter(df['comment'].isNotNull())
  df_drop = df_drop.filter(df_drop['userid'].isNotNull())
  df_drop = df_drop.dropDuplicates()
  
  print('After dropping, we have ', str(df_drop.count()), 'row in dataframe')
  return df_drop

df_drop = pre_process(df)
5820035

In [8]:
import pyspark.sql.functions as F
#convert text in comment to lower case.
df_clean = df_drop.withColumn('comment', F.lower(F.col('comment')))

In [9]:
display(df_clean)

creator_name,userid,comment
Brian Barczyk,28322.0,since i watched these snake videos i am not afraid of snakes anymore and now i really want a snake for my birthday
WaysideWaifs,89939.0,why are humans so inhuman...
TobyTurner,236731.0,roses are red this makes no sense i got a science that i havemicro wave
meow meow,260511.0,..omai-wa.*rejectooooooo!!!*
Brave Wilderness,420830.0,good job on getting on trending
꼬부기아빠 My Pet Diary,459543.0,2!!0!!!만명!!!!! 축하드려용!!!
The Dodo,485503.0,who gave thumbs down to this video? must be a person without heart
Hope For Paws - Official Rescue Channel,504270.0,such an intense rescue! i felt i could feel your emotions and it was draining!!god bless lisa on her trip to her new home!!!
Viktor Larkhill,542233.0,oh that poor cat!!
Brave Wilderness,543795.0,you can live your live free by traveling the unknown! lets travel then!


#### This is an unlabeled dataset and we want to train a clasifier to identify cat and dog owners. Thus first thing to do is to label each comment.
1. Label comment when he/she has dogs or cats.
2. label comment when he/she don't have a dog or cat.
3. Combine 1 and 2 as our training dataset, and rest of the dataset will be the data we predict.
4. The strategy to tell if a user own or not own is using key words (such as 'I have a dog') to tell. We don't have enough time to check the user information one by one.

In [11]:
# find user with preference of dog and cat
cond = (df_clean["comment"].like("%my dog%") | df_clean["comment"].like("%i have a dog%")\
        | df_clean["comment"].like("%my cat%") | df_clean["comment"].like("%i have a cat%") \
        | df_clean["comment"].like("%my dogs%") | df_clean["comment"].like("%my cats%")\
        | df_clean["comment"].like("%my cat%") | df_clean["comment"].like("%i have dogs%")\
        | df_clean["comment"].like("%i have cats%") | df_clean["comment"].like("%my puppy%")\
        | df_clean["comment"].like("%my kitten%") | df_clean["comment"].like("%i have a puppy%")\
        | df_clean["comment"].like("%i have puppies%"))

df_clean = df_clean.withColumn('dog_cat',  cond)

# find user do not have 
df_clean = df_clean.withColumn('no_pet', ~df_clean["comment"].like("%my%") & ~df_clean["comment"].like("%have%") & ~df_clean["comment"].like("%my dog%") \
                              & ~df_clean["comment"].like("%my cat%")) 
display(df_clean)

creator_name,userid,comment,dog_cat,no_pet
Brian Barczyk,28322.0,since i watched these snake videos i am not afraid of snakes anymore and now i really want a snake for my birthday,False,False
WaysideWaifs,89939.0,why are humans so inhuman...,False,True
TobyTurner,236731.0,roses are red this makes no sense i got a science that i havemicro wave,False,False
meow meow,260511.0,..omai-wa.*rejectooooooo!!!*,False,True
Brave Wilderness,420830.0,good job on getting on trending,False,True
꼬부기아빠 My Pet Diary,459543.0,2!!0!!!만명!!!!! 축하드려용!!!,False,True
The Dodo,485503.0,who gave thumbs down to this video? must be a person without heart,False,True
Hope For Paws - Official Rescue Channel,504270.0,such an intense rescue! i felt i could feel your emotions and it was draining!!god bless lisa on her trip to her new home!!!,False,True
Viktor Larkhill,542233.0,oh that poor cat!!,False,True
Brave Wilderness,543795.0,you can live your live free by traveling the unknown! lets travel then!,False,True


### 1. Build the classifier

In [13]:
# data preprocessing 
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="text", pattern="\\W")
df_clean = regexTokenizer.transform(df_clean)
df_clean.show(10)

# '\W' Matches any character which is not a Unicode word character. This is the opposite of \w. If the ASCII flag is used this becomes the equivalent of [^a-zA-Z0-9_] (but the flag affects the entire regular expression, so in such cases using an explicit [^a-zA-Z0-9_] may be a better choice).

In [14]:
#Only select 1,000,000 rows for testing
from pyspark.sql.functions import rand 

df_clean.orderBy(rand(seed=0)).createOrReplaceTempView("table1")
df_clean = spark.sql("select * from table1 limit 1000000")

In [15]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Display some of the stop words
stopwords[:10]

In [16]:
remover.setInputCol("text")
remover.setOutputCol("vector_no_stopw")

df_clean = remover.transform(df_clean)
df_clean.show(10)

In [17]:
# use word2vec get text vector feature.
from pyspark.ml.feature import Word2Vec
# Learn a mapping from words to Vectors. (choose higher vectorSize here)
word2Vec = Word2Vec(vectorSize=20, minCount=1, inputCol="vector_no_stopw", outputCol="wordVector")
#word2Vec = Word2Vec(vectorSize=50, minCount=1, inputCol="text", outputCol="wordVector")
model = word2Vec.fit(df_clean)

df_model = model.transform(df_clean)
df_model.show(10)

#### Get train and test dataset.
We will only use the observations of which the user we are sure about (dog/cat owner or no-pet-owner) to form the train and test dataset.
The rest observations will be served as dataset for predicting.

In [19]:
df_pets = df_model.filter(F.col('dog_cat') == True) 
df_no_pets = df_model.filter(F.col('no_pet') ==  True)
print("Number of confirmed user who own dogs or cats: ", df_pets.count())
print("Number of confirmed user who don't have pet's: ", df_no_pets.count())

In [20]:
df_pets.show() 

In [21]:
df_no_pets.show(10)

Note that number of negative labels is around 100 times more than positive labels, so here we need to downsampling the negative labels. By rule of thumb, the gap should be no more than 10 times. But here I make them balance to the ratio aroudn 1:2 (1 for positive: 2 for negative)

In [23]:
from pyspark.sql.functions import rand 
df_no_pets.orderBy(rand()).createOrReplaceTempView("table")

Num_Pos_Label = df_model.filter(F.col('dog_cat') == True).count() 
Num_Neg_Label = df_model.filter(F.col('no_pet') ==  True).count()

#Q1 = spark.sql("SELECT col1 from table where col2>500 limit {}, 1".format(q25))
#pass variable to sql
df_no_pets_down = spark.sql("select * from table where limit {}".format(Num_Pos_Label*2))


In [24]:
positive:nagative = 1:2
print('Now after balancing the lables, we have ')   
print('Positive label: ', Num_Pos_Label)
print('Negtive label: ', df_no_pets_down.count())

In [25]:
#combine the datasets
def get_label(df_pets,df_no_pets_down):
  df_labeled = df_pets.select('dog_cat','wordVector').union(df_no_pets_down.select('dog_cat','wordVector'))
  return df_labeled

df_labeled = get_label(df_pets,df_no_pets_down)
df_labeled.show(10)

In [26]:
#convert Boolean value to 1 and 0's
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

def multiple(x):
  return int(x*1)
udf_boolToInt= udf(lambda z: multiple(z),IntegerType())
df_labeled = df_labeled.withColumn('label',udf_boolToInt('dog_cat'))
df_labeled.show(10)

In [27]:
#see whether agg reasonable (optional)
# from pyspark.sql.functions import col, countDistinct
# df_labeled.agg(countDistinct(col("label")).alias("count")).show()
# df_labeled.select('dog_cat').distinct().count()

### a. Logistic Regression Model

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

train, test = df_labeled.randomSplit([0.8, 0.2], seed=12345)

lr = LogisticRegression(featuresCol="wordVector",labelCol="label" , maxIter=10, regParam=0.1, elasticNetParam=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
lrModel = lr.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
predictions = lrModel.transform(test)
predictions.show(10)

In [30]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# # Obtain the objective per iteration
# objectiveHistory = trainingSummary.objectiveHistory
# print("objectiveHistory:")
# for objective in objectiveHistory:
#     print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
# print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# # Set the model threshold to maximize F-Measure
# fMeasure = trainingSummary.fMeasureByThreshold
# maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
# bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
#     .select('threshold').head()['threshold']
# lr.setThreshold(bestThreshold)

In [31]:
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


def get_evaluation_result(predictions):
  evaluator = BinaryClassificationEvaluator(
      labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
  AUC = evaluator.evaluate(predictions)

  TP = predictions[(predictions["label"] == 1) & (predictions["prediction"] == 1.0)].count()
  FP = predictions[(predictions["label"] == 0) & (predictions["prediction"] == 1.0)].count()
  TN = predictions[(predictions["label"] == 0) & (predictions["prediction"] == 0.0)].count()
  FN = predictions[(predictions["label"] == 1) & (predictions["prediction"] == 0.0)].count()

  accuracy = (TP + TN)*1.0 / (TP + FP + TN + FN)
  precision = TP*1.0 / (TP + FP)
  recall = TP*1.0 / (TP + FN)


  print ("True Positives:", TP)
  print ("False Positives:", FP)
  print ("True Negatives:", TN)
  print ("False Negatives:", FN)
  print ("Test Accuracy:", accuracy)
  print ("Test Precision:", precision)
  print ("Test Recall:", recall)
  print ("Test AUC of ROC:", AUC)

print("Prediction result summary for Logistic Regression Model:  ")
get_evaluation_result(predictions)

### b. Random Forest Model

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="wordVector", numTrees=15)

# Train model.  This also runs the indexers.
model = rf.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.show(10)

In [35]:
print("Prediction result summary for Random Forest Model:  ")
get_evaluation_result(predictions)



### Model Performance Evaluation
#####LR:
Test Accuracy: 0.709243998566822
Test Precision: 0.6995073891625616
Test Recall: 0.2282958199356913
Test AUC of ROC: 0.8616747542852341
#####RF:
Test Accuracy: 0.8258688642063776
Test Precision: 0.802027027027027
Test Recall: 0.6361200428724545
Test AUC of ROC: 0.9075520878400731

#####Conclusion: RF performs better than LR. (RF has higher precision and recall.)

### Classify All The Users
We can now apply the cat/dog classifiers to all the other users in the dataset.

In [38]:
# get dataset for prediction (note to exclude people we already know the label)
# Users we don't know yet are those who don't own dog&cat and no_pets attribute is also flase
df_unknow = df_model.filter((F.col('dog_cat') == False) & (F.col('no_pet') == False)) 
df_unknow = df_unknow.withColumn('label',df_unknow.dog_cat.cast('integer'))
print("There are {} users whose attribute is unclear.".format(df_unknow.count()))
pred_all = model.transform(df_unknow)
pred_all.show(10)

##### Fraction of the users who are cat/dog owners (ML estimate):

(Num of owner labeled + Num of owner predicted) / Total users in our used dataset

In [40]:
#df.select("columnname").distinct().show()

#number of total user
total_user = df_model.select('userid').distinct().count()
#number of labeled owner
owner_labeled = df_pets.select('userid').distinct().count() 
#number of owner predicted
owner_pred = pred_all.filter(F.col('prediction') == 1.0).count()

fraction = (owner_labeled+owner_pred)/total_user
print('Fraction of the users who are cat/dog owners (ML estimate): ', round(fraction,3))

### 3. Gain insights about what cat/dog owners care

In [42]:
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA
import pyspark.sql.functions as f

#Combine df_pets (Num of owner labeled) and predicts (Num of owner predicted)
df_all_owner = df_pets.select('vector_no_stopw').union(pred_all.filter(F.col('prediction') == 1.0).select('vector_no_stopw'))

#TF model
count_vec = CountVectorizer(inputCol = 'vector_no_stopw',outputCol = 'rawFeatures',vocabSize = 123241, minDF = 2.0)
count_vec_model = count_vec.fit(df_all_owner)
featurizedData = count_vec_model.transform(df_all_owner)
vocab = count_vec_model.vocabulary

#IDF model
idf = IDF(inputCol = 'rawFeatures',outputCol = 'features',minDocFreq=2)
idfmodel = idf.fit(featurizedData)
rescaledData = idfmodel.transform(featurizedData)
rescaledData.cache()


In [43]:
rescaledData.show(10)

In [44]:
#LDA
lda = LDA(k=10, maxIter=10)
lda_model = lda.fit(rescaledData.select('features'))
topics = lda_model.describeTopics(10)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# Shows the result
transformed = lda_model.transform(rescaledData)
transformed.show(truncate=False)


In [45]:
pddf = topics.select('termIndices').toPandas()
group1 = []
t=[]
for num in pddf.loc[1][0]:
  #group1.append(vocab[num])
  t.append(num)
#print("Topic 1 most important words are : {}".format(str(group1)))
print(t)

In [46]:
pddf.info()

In [47]:
for i in range(10):
  group = []
  for num in pddf.loc[i][0]:
    group.append(vocab[num])
  print("Topic", i+1, " most important words are : {}".format(str(group)))

### 4. Identify Video Creators With Cat And Dog Owners In The Audience

In [49]:
#Get all creators whenever the users label is True(cat/dog owner)
df_create = df_pets.select('creator_name').union(pred_all.filter(F.col('prediction') == 1.0).select('creator_name'))

df_create.createOrReplaceTempView("create_table")

#get count
create_count = spark.sql("select distinct creator_name, count(*) as Number\
                          from create_table \
                          group by creator_name \
                          order by Number DESC")

In [50]:
create_count.show()

### 5. Results and Future work

#####The topics that cat/dog owners are interested in are summaried as follows:

##### Business opportunities:

##### Future Work:
1. Tune parameters and do cross-validation to improve classifier performance.
2. Explore the users' background info and contents of the videos posted to help increase labelling accuracy.
3. Divide the cat and dog owners to cat owners and dog owners and do some exploration.
4. Try to identify more user segments.