# Tubular Data Science Coding Test
In this notebook, we have a dataset of user comments for youtube videos related to animals or pets. We will attempt to identify cat or dog owners based on these comments, find out the topics important to them, and then identify video creators with the most viewers that are cat or dog owners.

## Data Exploration and Cleaning 

In [2]:
from pyspark.sql.types import *

"""
schema = StructType([
    StructField("creator_name", StringType(), True),
    StructField("userid", IntegerType(), True),
    StructField("comment", StringType(), True)])

"""

df = spark.read.load("animals_comments.csv.gz", format='csv', header = True, inferSchema = True)
df.show(5)

+-------------+------+--------------------+
| creator_name|userid|             comment|
+-------------+------+--------------------+
| Doug The Pug|  87.0|I shared this to ...|
| Doug The Pug|  87.0|  Super cute  😀🐕🐶|
|  bulletproof| 530.0|stop saying get e...|
|Meu Zoológico| 670.0|Tenho uma jiboia ...|
|       ojatro|1031.0|I wanna see what ...|
+-------------+------+--------------------+
only showing top 5 rows



In [3]:
df.printSchema()

root
 |-- creator_name: string (nullable = true)
 |-- userid: double (nullable = true)
 |-- comment: string (nullable = true)



Let's change the type of "userid" to integer.

In [4]:
df = df.withColumn('userid', df['userid'].cast('int'))

In [5]:
df.printSchema()

root
 |-- creator_name: string (nullable = true)
 |-- userid: integer (nullable = true)
 |-- comment: string (nullable = true)



In [6]:
df.show(5)

+-------------+------+--------------------+
| creator_name|userid|             comment|
+-------------+------+--------------------+
| Doug The Pug|    87|I shared this to ...|
| Doug The Pug|    87|  Super cute  😀🐕🐶|
|  bulletproof|   530|stop saying get e...|
|Meu Zoológico|   670|Tenho uma jiboia ...|
|       ojatro|  1031|I wanna see what ...|
+-------------+------+--------------------+
only showing top 5 rows



In [16]:
print "There are a total of {} rows.".format(df.count())

There are a total of 5820035 rows.


In [17]:
print "There are {} users.".format(df.select('userid').distinct().count())

There are 2537174 users.


In [18]:
print "There are {} creators.".format(df.select('creator_name').distinct().count())

There are 4241 creators.


Now let's check if there are any NULLs or NANs in the data

In [23]:
from pyspark.sql.functions import isnan, when, count, col

In [24]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------------+------+-------+
|creator_name|userid|comment|
+------------+------+-------+
|       32050|   565|   1051|
+------------+------+-------+



In [25]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+------------+------+-------+
|creator_name|userid|comment|
+------------+------+-------+
|           0|     0|      0|
+------------+------+-------+



We can see that there are indeed quite a lof of NULLs. For "creator_name", I will just fill the missing strings with "unknown".

In [26]:
df = df.fillna('unknown', subset=['creator_name'])

In [27]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------------+------+-------+
|creator_name|userid|comment|
+------------+------+-------+
|           0|   565|   1051|
+------------+------+-------+



For "comment", on the other hand, we just have to remove all the rows with NULLs since we can not extract any informtion from them.

In [28]:
df_clean = df.dropna(subset = ['comment'])

In [29]:
df_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in df_clean.columns]).show()

+------------+------+-------+
|creator_name|userid|comment|
+------------+------+-------+
|           0|     0|      0|
+------------+------+-------+



Fortunately, after removing the rows without comments, there are no NULLs in "userid", either.

## Plan of Attack

If a user does not explicitly says she/he has a dog or cat in the comment, there is no way we can tell whether she/he has a dog a cat. Therefore, strictly speaking we can only determine whether a user mentions she/he has a dog or cat.
<br>

My plan of attack is described as follows. First, I will select (1) a subset of comments in which the users mention "my dog", "my cat", "I have a dog", or "I have a cat" (positive examples), and (2) a subset of comments in which the users do not use the words "my" and "have" (negative examples). The first subset are the comments we are sure the users of which have dogs/cats, and the second subset are the comments we are sure the users of which do not mention that they have dogs/cats. The comments that do not fall into these two groups are the ones I will use a classifier to decide. Some of these comments might mention something like "I have 2 cats" (more than one), or "I have a Corgi" (use the breed name instead). 
<br>

I will train a classifier using the two selected groups of comments above and then make predictoins for the remaining comments. Because a user might mention the breed names instead of explicitly using the word "dog" or "cat", I will vectorize all the comments using the [Word2Vec](https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#pyspark.ml.feature.Word2Vec) model, in order to capture similar semantic information between differnt comments (such as "dog" v.s "Corgi")
<br>

Lastly, note that what I will model and predict are individual comments instead of individual users. I will not group the comments by the users, given that a user might say totally unrealated things in different comments. As a result, a user will be considered a dog/cat owner as long as one of her/his comments do mention it.

## Step 1:  Identify Cat or Dog Owners and Users Who Have No Pets

Now I will create a new column "dog_cat" in the data frame. The rows whose values are True in this column are the first subset (labeled positive examples) I defined above.

In [30]:
cond = (df_clean["comment"].like("%my dog%") | df_clean["comment"].like("%I have a dog%")\
        | df_clean["comment"].like("%my cat%") | df_clean["comment"].like("%I have a cat%"))

df_clean = df_clean.withColumn('dog_cat',  cond)

In [31]:
df_clean.show(10)

+--------------------+------+--------------------+-------+
|        creator_name|userid|             comment|dog_cat|
+--------------------+------+--------------------+-------+
|        Doug The Pug|    87|I shared this to ...|  false|
|        Doug The Pug|    87|  Super cute  😀🐕🐶|  false|
|         bulletproof|   530|stop saying get e...|  false|
|       Meu Zoológico|   670|Tenho uma jiboia ...|  false|
|              ojatro|  1031|I wanna see what ...|  false|
|     Tingle Triggers|  1212|Well shit now Im ...|  false|
|Hope For Paws - O...|  1806|when I saw the en...|  false|
|Hope For Paws - O...|  2036|Holy crap. That i...|  false|
|          Life Story|  2637|武器はクエストで貰えるんじゃないん...|  false|
|       Brian Barczyk|  2698|Call the teddy Larry|  false|
+--------------------+------+--------------------+-------+
only showing top 10 rows



Let's create another new column "no_pet". The rows whose values are True in this column are the second subset (labeled negative examples) I defined above.

In [32]:
df_clean = df_clean.withColumn('no_pet', ~df_clean["comment"].like("%my%") & ~df_clean["comment"].like("%have%")) 

In [33]:
df_clean.show(10)

+--------------------+------+--------------------+-------+------+
|        creator_name|userid|             comment|dog_cat|no_pet|
+--------------------+------+--------------------+-------+------+
|        Doug The Pug|    87|I shared this to ...|  false| false|
|        Doug The Pug|    87|  Super cute  😀🐕🐶|  false|  true|
|         bulletproof|   530|stop saying get e...|  false| false|
|       Meu Zoológico|   670|Tenho uma jiboia ...|  false|  true|
|              ojatro|  1031|I wanna see what ...|  false|  true|
|     Tingle Triggers|  1212|Well shit now Im ...|  false|  true|
|Hope For Paws - O...|  1806|when I saw the en...|  false|  true|
|Hope For Paws - O...|  2036|Holy crap. That i...|  false|  true|
|          Life Story|  2637|武器はクエストで貰えるんじゃないん...|  false|  true|
|       Brian Barczyk|  2698|Call the teddy Larry|  false|  true|
+--------------------+------+--------------------+-------+------+
only showing top 10 rows



## Step 2: Build and Evaluate Classifiers

In order to train a model against the comments, I will use [RegexTokenizer](https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.ml.feature.RegexTokenizer) to split each comment into a list of words and then use [Word2Vec](https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#pyspark.ml.feature.Word2Vec) to convert the list to a word vector. What Word2Vec does is to map each word to a unique fixed-size vector and then transform each document into a vector using the average of all words in the document.

In [39]:
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="text", pattern="\\W")
df_clean = regexTokenizer.transform(df_clean)
df_clean.show(10)

+--------------------+------+--------------------+-------+------+--------------------+
|        creator_name|userid|             comment|dog_cat|no_pet|                text|
+--------------------+------+--------------------+-------+------+--------------------+
|        Doug The Pug|    87|I shared this to ...|  false| false|[i, shared, this,...|
|        Doug The Pug|    87|  Super cute  😀🐕🐶|  false|  true|       [super, cute]|
|         bulletproof|   530|stop saying get e...|  false| false|[stop, saying, ge...|
|       Meu Zoológico|   670|Tenho uma jiboia ...|  false|  true|[tenho, uma, jibo...|
|              ojatro|  1031|I wanna see what ...|  false|  true|[i, wanna, see, w...|
|     Tingle Triggers|  1212|Well shit now Im ...|  false|  true|[well, shit, now,...|
|Hope For Paws - O...|  1806|when I saw the en...|  false|  true|[when, i, saw, th...|
|Hope For Paws - O...|  2036|Holy crap. That i...|  false|  true|[holy, crap, that...|
|          Life Story|  2637|武器はクエストで貰えるんじゃないん

In [40]:
#from pyspark.sql.functions import split
#df_clean = df_clean.withColumn("text", split("comment", "\s+"))
#df_clean.show(10)

**Here I randomly choose 100000 examples from the original dataset for all the following procedures. The full dataset will be processed with databricks.**

In [41]:
from pyspark.sql.functions import rand 

df_clean.orderBy(rand(seed=0)).createOrReplaceTempView("table")
t = spark.sql("select * from table limit 100000")

Now I will use Word2Vec to create a new column called "vector". The default value of ``vectorSize`` is 100, but in order to reduce the computation time, here I only use ``vectorSize`` = 10. Using a higher dimension should lead to a better model performance.

In [42]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=10, minCount=0, inputCol="text", outputCol="vector", seed=0)
model = word2Vec.fit(t)
t = model.transform(t)

In [43]:
t.show(10)

+--------------------+-------+--------------------+-------+------+--------------------+--------------------+
|        creator_name| userid|             comment|dog_cat|no_pet|                text|              vector|
+--------------------+-------+--------------------+-------+------+--------------------+--------------------+
|     Gohan The Husky| 106460|keep HIM! and cal...|  false|  true|[keep, him, and, ...|[-0.0758979980332...|
|          Real Shock|1050336|Я на все правильн...|  false|  true|                  []|          (10,[],[])|
|       Brian Barczyk| 636384| HOGWARTS 🐍🐍🐍🐍🐍|  false|  true|          [hogwarts]|[-0.0186032876372...|
|  Think Like A Horse|1411449|I love how you al...|  false| false|[i, love, how, yo...|[-0.0615040621986...|
|            Mạnh CFM| 879606|      Hai vai a manh|  false|  true| [hai, vai, a, manh]|[-0.5470096990466...|
| Home Aquatics Hobby|1995671|      Imma try this!|  false|  true|   [imma, try, this]|[-0.3004563550154...|
|             Sad Cat|20

Now I will filter the confirmed positive examples, which are the rows with ``dog_cat = True`` 

In [44]:
t_label_pos = t.filter(t.dog_cat == True)
print "There are {} confirmed positive examples.".format(t_label_pos.count())

There are 615 confirmed positive examples.


The ones we will predict are those rows with ``dog_cat = False`` and ``no_pet = False``

In [45]:
t_unknown = t.filter((t.dog_cat == False) & (t.no_pet == False))
print "There are {} examples that we are going to predict.".format(t_unknown.count())

There are 14385 examples that we are going to predict.


The confirmed negative examples are the rows with ``no_pet = True``

In [46]:
print "There are {} confirmed negative examples.".format(t.filter(t.no_pet == True).count())

There are 85000 confirmed negative examples.


This is over 100 times more than the confirmed positive examples... To avoid using a unbalanced dataset for model training, I just randomly sample 1000 rows from them.

In [47]:
t.filter(t.no_pet == True).orderBy(rand(seed=0)).createOrReplaceTempView("table")
t_label_neg = spark.sql("select * from table where limit 1000")

Now we can combine the positive and negative examples.

In [48]:
t_label = t_label_pos.union(t_label_neg).select('dog_cat','vector')
t_label = t_label.withColumn('dog_cat',  t_label.dog_cat.cast('integer'))
t_label.show(10)

+-------+--------------------+
|dog_cat|              vector|
+-------+--------------------+
|      1|[-0.0390798664755...|
|      1|[-0.1118405458370...|
|      1|[-0.0028305472806...|
|      1|[0.02547704771553...|
|      1|[0.03385367389354...|
|      1|[-0.0884501119454...|
|      1|[0.03460733519624...|
|      1|[0.06628606600376...|
|      1|[-0.0940878521806...|
|      1|[-0.0592414590840...|
+-------+--------------------+
only showing top 10 rows



I will split these labeled data into training and testing sets

In [49]:
train, test = t_label.randomSplit([0.7, 0.3], seed = 0)

I choose a random forest classifier for this binary classification problem.

In [50]:
#from pyspark.ml import Pipeline
#from pyspark.ml.classification import LogisticRegression
#from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator #,MulticlassClassificationEvaluator

We can actually use K-fold cross validation to tune the model's hyperparameters. But again, in order to reduce my computation time, I'll skip this procedure. I will just use a random forest with 10 trees and leave all the other parameters at their default values.

In [51]:
"""
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

rf = RandomForestClassifier(labelCol="dog_cat", featuresCol="vector", numTrees=10)

grid = ParamGridBuilder().addGrid(rf.maxDepth, [4, 5, 6]).build()

evaluator = BinaryClassificationEvaluator(
    labelCol="dog_cat", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

cv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(train)
predictions = model.transform(test)

print evaluator.evaluate(cvModel.transform(train))
print evaluator.evaluate(predictions)
"""

'\nfrom pyspark.ml.tuning import CrossValidator\nfrom pyspark.ml.tuning import ParamGridBuilder\n\nrf = RandomForestClassifier(labelCol="dog_cat", featuresCol="vector", numTrees=10)\n\ngrid = ParamGridBuilder().addGrid(rf.maxDepth, [4, 5, 6]).build()\n\nevaluator = BinaryClassificationEvaluator(\n    labelCol="dog_cat", rawPredictionCol="rawPrediction", metricName="areaUnderROC")\n\ncv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator)\ncvModel = cv.fit(train)\npredictions = model.transform(test)\n\nprint evaluator.evaluate(cvModel.transform(train))\nprint evaluator.evaluate(predictions)\n'

Now we can train the random forest against the training set and predict for the testing set

In [68]:
rf = RandomForestClassifier(labelCol="dog_cat", featuresCol="vector", numTrees=10)
model = rf.fit(train)
predictions = model.transform(test)
predictions.show(10)

+-------+--------------------+--------------------+--------------------+----------+
|dog_cat|              vector|       rawPrediction|         probability|prediction|
+-------+--------------------+--------------------+--------------------+----------+
|      1|[-0.3882409960031...|[4.34092039636372...|[0.43409203963637...|       1.0|
|      1|[-0.2765789677699...|[4.68853457157885...|[0.46885345715788...|       1.0|
|      1|[-0.2682929776608...|[9.30002301178203...|[0.93000230117820...|       0.0|
|      1|[-0.2387377321720...|[3.78149485443725...|[0.37814948544372...|       1.0|
|      1|[-0.2311467430912...|[4.73215266872314...|[0.47321526687231...|       1.0|
|      1|[-0.1856541619636...|[4.76012818252756...|[0.47601281825275...|       1.0|
|      1|[-0.1793941780924...|[4.61924106815534...|[0.46192410681553...|       1.0|
|      1|[-0.1785858562216...|[4.74656255906810...|[0.47465625590681...|       1.0|
|      1|[-0.1707417197259...|[2.63424319738626...|[0.26342431973862...|    

In [69]:
evaluator = BinaryClassificationEvaluator(
    labelCol="dog_cat", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
AUC = evaluator.evaluate(predictions)

TP = predictions[(predictions["dog_cat"] == 1) & (predictions["prediction"] == 1.0)].count()
FP = predictions[(predictions["dog_cat"] == 0) & (predictions["prediction"] == 1.0)].count()
TN = predictions[(predictions["dog_cat"] == 0) & (predictions["prediction"] == 0.0)].count()
FN = predictions[(predictions["dog_cat"] == 1) & (predictions["prediction"] == 0.0)].count()

accuracy = (TP + TN)*1.0 / (TP + FP + TN + FN)
precision = TP*1.0 / (TP + FP)
recall = TP*1.0 / (TP + FN)

In [70]:
print "True Positives:", TP
print "False Positives:", FP
print "True Negatives:", TN
print "False Negatives:", FN
print "Test Accuracy:", accuracy
print "Test Precision:", precision
print "Test Recall:", recall
print "Test AUC of ROC:", AUC

True Positives: 130
False Positives: 55
True Negatives: 241
False Negatives: 41
Test Accuracy: 0.79443254818
Test Precision: 0.702702702703
Test Recall: 0.760233918129
Test AUC of ROC: 0.872935435435


In [71]:
"""
Test: how the evaluation metrics change when we tune the threshold of the classifier
Maybe we can use a for loop to optimize the threshold. But what defines the best threshold...?   

"""
from pyspark.sql.functions import udf
import pyspark.sql.functions as f


# get the 2nd element of "probability" vector, which is the probability of the example being positive, 
# and then use a new threshold (other than 0.5) to make predictions
thresh = 0.55
element = udf(lambda v: int(float(v[1]) > thresh) , IntegerType())
tmp = predictions.select(element('probability'))
tmp = tmp.withColumnRenamed("<lambda>(probability)", "pred")

# since there is no common column between these two dataframes ("predictions" and "tmp"), we add 
# "row_index" so they can be joined
predictions = predictions.withColumn('row_index', f.monotonically_increasing_id())
tmp = tmp.withColumn('row_index', f.monotonically_increasing_id())

predictions = predictions.join(tmp, on=["row_index"]).sort("row_index").drop("row_index")
predictions.show(10)

+-------+--------------------+--------------------+--------------------+----------+----+
|dog_cat|              vector|       rawPrediction|         probability|prediction|pred|
+-------+--------------------+--------------------+--------------------+----------+----+
|      1|[-0.3882409960031...|[4.34092039636372...|[0.43409203963637...|       1.0|   1|
|      1|[-0.2765789677699...|[4.68853457157885...|[0.46885345715788...|       1.0|   0|
|      1|[-0.2682929776608...|[9.30002301178203...|[0.93000230117820...|       0.0|   0|
|      1|[-0.2387377321720...|[3.78149485443725...|[0.37814948544372...|       1.0|   1|
|      1|[-0.2311467430912...|[4.73215266872314...|[0.47321526687231...|       1.0|   0|
|      1|[-0.1856541619636...|[4.76012818252756...|[0.47601281825275...|       1.0|   0|
|      1|[-0.1793941780924...|[4.61924106815534...|[0.46192410681553...|       1.0|   0|
|      1|[-0.1785858562216...|[4.74656255906810...|[0.47465625590681...|       1.0|   0|
|      1|[-0.17074171

In [72]:
TP = predictions[(predictions["dog_cat"] == 1) & (predictions["pred"] == 1)].count()
FP = predictions[(predictions["dog_cat"] == 0) & (predictions["pred"] == 1)].count()
TN = predictions[(predictions["dog_cat"] == 0) & (predictions["pred"] == 0)].count()
FN = predictions[(predictions["dog_cat"] == 1) & (predictions["pred"] == 0)].count()

accuracy = (TP + TN)*1.0 / (TP + FP + TN + FN)
precision = TP*1.0 / (TP + FP)
recall = TP*1.0 / (TP + FN)

print "True Positives:", TP
print "False Positives:", FP
print "True Negatives:", TN
print "False Negatives:", FN
print "Test Accuracy:", accuracy
print "Test Precision:", precision
print "Test Recall:", recall

True Positives: 118
False Positives: 39
True Negatives: 257
False Negatives: 53
Test Accuracy: 0.802997858672
Test Precision: 0.751592356688
Test Recall: 0.690058479532


## Step 3: Classify All The Users

We can now apply the cat/dog classifiers to all the other users in the dataset. 

In [77]:
print "There are {} examples that we are going to predict.".format(t_unknown.count())

There are 14385 examples that we are going to predict.


In [78]:
# make predictions
t_unknown.withColumn('dog_cat', t_unknown.dog_cat.cast('integer'))
pred = model.transform(t_unknown)

In [79]:
n_total = t.select('userid').distinct().count()
print "There are a total of {} users.".format(n_total)

There are a total of 90460 users.


In [80]:
# The number of users with at least one comment that is labeled as positive by us
n_pos = t_label_pos.select('userid').distinct().count()

# The number of users with at least one comment that is predicted to be positive
n_pred = pred.filter(pred.prediction == 1).select('userid').distinct().count()

print "There are {} users that are confirmed or predicted to be dog/cat owners.".format(n_pos + n_pred)
print "About {:.2f}% of the users are dog/cat owners ".format((n_pos + n_pred)*1.0/n_total*100)

There are 7101 users that are confirmed or predicted to be dog/cat owners.
About 7.85% of the users are dog/cat owners 


**My idea was to use the test precision/recall to estimate the true number of comments telling us the users are dog/cat owners. But the problem is...**
- **The number of comments may not really translate to the number of uers**
- **The distribution of our labeled and labeled are definitly not the same**


## Step 4: Extract Insights About Cat And Dog Owners

Goal: Find topics important to cat and dog owners.

**Get word counts and then show the top-k most frequent words??**

## Step 5: Identify Creators With Cat And Dog Owners In The Audience

Goal: Find creators with the most cat and/or dog owners. Find creators with the highest statistically significant percentages of cat and/or dog owners.