#### 0. Data Exploration and Cleaning

In [2]:
df_clean=spark.read.csv("/FileStore/tables/animals_comments.csv",inferSchema=True,header=True)
df_clean.show(10)

In [3]:
df_clean.count() 

In [4]:
df_clean = df_clean.na.drop(subset=["comment"])
df_clean.count()

In [5]:
df_clean.show()

In [6]:
# find user with preference of dog and cat
from pyspark.sql.functions import when
from pyspark.sql.functions import col

# you can user your ways to extract the label

df_clean = df_clean.withColumn("label", \
                           (when(col("comment").like("%my dog%"), 1) \
                           .when(col("comment").like("%I have a dog%"), 1) \
                           .when(col("comment").like("%my cat%"), 1) \
                           .when(col("comment").like("%I have a cat%"), 1) \
                           .when(col("comment").like("%my puppy%"), 1) \
                           .when(col("comment").like("%my pup%"), 1) \
                           .when(col("comment").like("%my kitty%"), 1) \
                           .when(col("comment").like("%my pussy%"), 1) \
                           .otherwise(0)))

In [7]:
df_clean.show()

#### 1. Data preprocessing and Build the classifier

In [9]:
from pyspark.ml.feature import RegexTokenizer, Word2Vec
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="words", pattern="\\W")

word2Vec = Word2Vec(inputCol="words", outputCol="features")

In [10]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, word2Vec])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df_clean)
dataset = pipelineFit.transform(df_clean)

In [11]:
dataset.show()

In [12]:
(lable0_train,lable0_test)=dataset.filter(col('label')==1).randomSplit([0.7, 0.3],seed = 100)
(lable1_train, lable1_ex)=dataset.filter(col('label')==0).randomSplit([0.005, 0.995],seed = 100)
(lable1_test, lable1_ex2)=lable1_ex.randomSplit([0.002, 0.998],seed = 100)

In [13]:
lable1_ex2.count()

In [14]:
trainingData = lable0_train.union(lable1_train)
testData=lable0_test.union(lable1_test)

In [15]:
print("Dataset Count: " + str(dataset.count()))
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

##### LogisticRegression

In [17]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(trainingData)


In [18]:
predictions = lrModel.transform(testData)
predictions.select('label', 'prediction', 'probability','creator_name','userid').show(10)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

##### Parameter Tuning and K-fold cross-validation

In [21]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [22]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel_lr = cv.fit(trainingData)
# Use test set to measure the accuracy of our model on new data
predictions = cvModel_lr.transform(testData)

In [23]:
predictions_lr = cvModel_lr.transform(testData)
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions)))

In [24]:
selected = predictions_lr.select('label', 'prediction', 'probability','creator_name','userid')
display(selected)

label,prediction,probability,creator_name,userid
1,1.0,"List(1, 2, List(), List(0.27544027080082545, 0.7245597291991744))",Big Cat Rescue,315612
1,1.0,"List(1, 2, List(), List(0.2777782910741955, 0.7222217089258045))",Big Cat Rescue,2477786
1,1.0,"List(1, 2, List(), List(0.07664810768814397, 0.9233518923118561))",Brandon,1500606
1,1.0,"List(1, 2, List(), List(0.29307622070444944, 0.7069237792955506))",Brave Wilderness,346987
1,1.0,"List(1, 2, List(), List(0.4934544006846522, 0.5065455993153478))",Brave Wilderness,929599
1,1.0,"List(1, 2, List(), List(0.1679985697131536, 0.8320014302868464))",Brave Wilderness,1255228
1,1.0,"List(1, 2, List(), List(0.007314693902205931, 0.9926853060977942))",Brave Wilderness,1400234
1,1.0,"List(1, 2, List(), List(0.21193344892556062, 0.7880665510744393))",Brave Wilderness,1767265
1,0.0,"List(1, 2, List(), List(0.8815505753848388, 0.11844942461516116))",Brave Wilderness,2182581
1,1.0,"List(1, 2, List(), List(0.0030220866717503053, 0.9969779133282496))",Brian Barczyk,127220


##### RandomForest

In [26]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")


In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [28]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [29]:
cvModel_rf = cv.fit(trainingData)
# Use test set to measure the accuracy of our model on new data
predictions_rf = cvModel_rf.transform(testData)

In [30]:
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions_rf)))

In [31]:
selected = predictions_rf.select('label', 'prediction', 'probability','creator_name','userid')
display(selected)

label,prediction,probability,creator_name,userid
1,1.0,"List(1, 2, List(), List(0.34420877239561476, 0.6557912276043851))",Big Cat Rescue,315612
1,1.0,"List(1, 2, List(), List(0.3037094733943609, 0.6962905266056392))",Big Cat Rescue,2477786
1,0.0,"List(1, 2, List(), List(0.5750667195607656, 0.42493328043923445))",Brandon,1500606
1,0.0,"List(1, 2, List(), List(0.5622846696063767, 0.4377153303936233))",Brave Wilderness,346987
1,0.0,"List(1, 2, List(), List(0.6001523825609952, 0.39984761743900477))",Brave Wilderness,929599
1,1.0,"List(1, 2, List(), List(0.44206563503091434, 0.5579343649690857))",Brave Wilderness,1255228
1,1.0,"List(1, 2, List(), List(0.17571210718932412, 0.8242878928106758))",Brave Wilderness,1400234
1,1.0,"List(1, 2, List(), List(0.35868260360929727, 0.6413173963907027))",Brave Wilderness,1767265
1,0.0,"List(1, 2, List(), List(0.8017677770967515, 0.19823222290324857))",Brave Wilderness,2182581
1,1.0,"List(1, 2, List(), List(0.19247219615548347, 0.8075278038445164))",Brian Barczyk,127220


##### Gradient boosting

In [33]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)


In [34]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 20])\
  .build()

In [35]:
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel_gbt = cv.fit(trainingData)
# Use test set to measure the accuracy of our model on new data
predictions_gbt = cvModel_gbt.transform(testData)

In [36]:
predictions_gbt.select('label', 'prediction', 'probability','creator_name','userid').show(10)

In [37]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions)))

#### As Random Forest gives us the best ROC evaludation, we will use Random Forest as our best model to generate predictions on the all the users.

In [39]:
bestModel = cvModel_rf.bestModel

#### 2. Classify All The Users

In [41]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [42]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [43]:
selected = finalPredictions.select('label', 'prediction', 'probability','creator_name','userid','comment','words')
display(selected)

label,prediction,probability,creator_name,userid,comment,words
0,1.0,"List(1, 2, List(), List(0.1891011195151004, 0.8108988804848996))",Doug The Pug,87,I shared this to my friends and mom the were lol,"List(i, shared, this, to, my, friends, and, mom, the, were, lol)"
0,0.0,"List(1, 2, List(), List(0.957407239583751, 0.04259276041624897))",Doug The Pug,87,Super cute 😀🐕🐶,"List(super, cute)"
0,0.0,"List(1, 2, List(), List(0.5528723497388928, 0.44712765026110723))",bulletproof,530,stop saying get em youre literally dumb . have some common sense or dont own this kind of dog. fucking retarded I swear,"List(stop, saying, get, em, youre, literally, dumb, have, some, common, sense, or, dont, own, this, kind, of, dog, fucking, retarded, i, swear)"
0,0.0,"List(1, 2, List(), List(0.9539685649462936, 0.04603143505370649))",Meu Zoológico,670,Tenho uma jiboia e um largato,"List(tenho, uma, jiboia, e, um, largato)"
0,0.0,"List(1, 2, List(), List(0.7097968958589418, 0.2902031041410582))",ojatro,1031,I wanna see what happened to the pigs after that please,"List(i, wanna, see, what, happened, to, the, pigs, after, that, please)"
0,0.0,"List(1, 2, List(), List(0.7113389885156643, 0.28866101148433554))",Tingle Triggers,1212,Well shit now Im hungry,"List(well, shit, now, im, hungry)"
0,1.0,"List(1, 2, List(), List(0.18987383820734594, 0.8101261617926541))",Hope For Paws - Official Rescue Channel,1806,when I saw the end it said to adopt I saw different animal sites I was mad that they separated the cute little pups after being together for a long time,"List(when, i, saw, the, end, it, said, to, adopt, i, saw, different, animal, sites, i, was, mad, that, they, separated, the, cute, little, pups, after, being, together, for, a, long, time)"
0,0.0,"List(1, 2, List(), List(0.5365549306814937, 0.46344506931850626))",Hope For Paws - Official Rescue Channel,2036,Holy crap. That is quite literally the most adorable pup Ive ever seen.,"List(holy, crap, that, is, quite, literally, the, most, adorable, pup, ive, ever, seen)"
0,0.0,"List(1, 2, List(), List(0.9892600454480907, 0.01073995455190938))",Life Story,2637,武器はクエストで貰えるんじゃないんですか？,List()
0,0.0,"List(1, 2, List(), List(0.9274100934024766, 0.07258990659752333))",Brian Barczyk,2698,Call the teddy Larry,"List(call, the, teddy, larry)"


In [44]:
from pyspark.sql.functions import col
fraction = selected.where(col('prediction')==1).count()/selected.count()
print("The fraction of all users who have dogs/cats is estimated to be %.2f."%fraction)

#### 3. Get insigts of Users

In [46]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import StopWordsRemover

owners = selected.where(col('prediction')==1)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
owners = remover.transform(owners)


In [47]:
cv = CountVectorizer(inputCol="filtered", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(owners)
featurizedData = cvmodel.transform(owners)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) 

In [48]:
from pyspark.ml.clustering import LDA
# Generate 25 Data-Driven Topics:
# "em" = expectation-maximization 
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
ldatopics = ldamodel.describeTopics()
 

In [49]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def indiciestowords(value):
  words = []
  for idx in value:
    words.append(vocab[idx])
  return ','.join(words)
sparkudf = udf(indiciestowords,StringType())

In [50]:
topic_words = ldatopics.withColumn('topics',sparkudf(col('termIndices'))).select('topic','topics')

display(topic_words)
   

topic,topics
0,"like,dog,one,love,im,people,get,dont,cat,know"
1,"like,one,dog,im,get,love,cat,dont,got,know"
2,"like,one,dog,love,im,get,cat,dont,got,know"
3,"like,one,dog,im,love,get,dont,people,know,cat"
4,"hermit,crab,crabs,like,one,dog,get,love,im,dont"
5,"dog,like,wolf,pitbull,one,get,dogs,dont,im,people"
6,"like,one,love,dog,betta,get,im,dont,cat,want"
7,"like,dog,one,get,love,im,dont,woman,know,people"
8,"horse,horses,saddle,like,get,one,ride,dont,dog,im"
9,"dog,like,one,love,n,im,get,cat,dogs,dont"


From the above topics, looks like the topics that dog and cat owners most concerned are: Dog,Cat,like,love,horses,father,mother,pitbull,kitten.

#### 4.  Identify Creators With Cat And Dog Owners In The Audience Find creators with the most cat and/or dog owners. Find creators with the highest statistically significant percentages of cat and/or dog owners.

In [53]:
creator = owners.groupBy('creator_name').count().orderBy('count',ascending=False).collect()
print("The creator with the most cat/dog owners is %s." % creator[0]['creator_name'])


In [54]:
from pyspark.sql.functions import * 
selected.columns
dcowners = selected.groupBy("creator_name").agg({'prediction':'sum'})
totalnumbers = selected.groupBy("creator_name").count()
percentage = totalnumbers.join(dcowners,'creator_name',how='inner').withColumn('percentage',round(dcowners['sum(prediction)']/totalnumbers['count'],2))
percentage.select('creator_name','percentage').orderBy('percentage',ascending=False).show()


#### 5. Analysis and Future work

1. overview of project : 
  
  The goal of the project is by examining the comments and useId of animal related videos and predict if they are dog or cat owners, identify the common important topics that these users are most concerned.
  
2. data clean and modeling:
  
  First, we did data processing, checking counts and drop any null values in the comment column. We labeled some data as dog/cat owners if we find the comments include words like my dog, I have dog. And then we created a pipeline to tokenize and vectorized the words. Next, we split the data into train and test data and ready to feed them into a machine learning algorithm.
3. data analysis 

  We checked the counts and percentage of the dog/cat owners per creator, and select the creators with the highest amount of dog/cat owners and percentage of dog/cat owners.
  We analyzed the common important topics that most dog/cat owners are concerned about.
4. build ml model

  We used spark ML pipeline: Tokenizer -> HashTF -> LogisticRegression to build the classifier to predict dog/cat owners. We also tried ParamGrid to tune hyper parameters and cross vaidation to choose the best model. We also tried Random Forest and Gradient Boosting models and decided that Random Forest gives us the best results.
  We also used LDA model to identify the topics among dog/cat owners.
5. recommendation based on the model results

  From the topics, we can tell what the dog/cat owners think most important to them thus we can recommend videos with these topics to them.