## 0. Spark Initialization and data loading

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()


In [2]:
!java -version

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)


In [3]:
import sys
import os
from datetime import *
from time import *
from pyspark.sql import *
from pyspark import SparkContext
from pyspark import SparkConf
df=spark.read.csv("youtube/animals_comments.csv",
                      inferSchema=True,
                      header=True)
df.show(10)

+--------------------+------+-------------------------------------+
|        creator_name|userid|                              comment|
+--------------------+------+-------------------------------------+
|        Doug The Pug|  87.0|                 I shared this to ...|
|        Doug The Pug|  87.0|                   Super cute  😀🐕🐶|
|         bulletproof| 530.0|                 stop saying get e...|
|       Meu Zoológico| 670.0|                 Tenho uma jiboia ...|
|              ojatro|1031.0|                 I wanna see what ...|
|     Tingle Triggers|1212.0|                 Well shit now Im ...|
|Hope For Paws - O...|1806.0|                 when I saw the en...|
|Hope For Paws - O...|2036.0|                 Holy crap. That i...|
|          Life Story|2637.0|武器はクエストで貰えるんじゃないん...|
|       Brian Barczyk|2698.0|                 Call the teddy Larry|
+--------------------+------+-------------------------------------+
only showing top 10 rows



In [4]:
df.dtypes

[('creator_name', 'string'), ('userid', 'double'), ('comment', 'string')]

In [5]:
df, df_rest= df.randomSplit([0.05, 0.95])

## 1.Data labeling

In [6]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
cond = (df["comment"].like("%my dog%") 
        | df["comment"].like("%I have a dog%") 
        | df["comment"].like("%my dogs%") 
        | df["comment"].like("%I have dog%")
        | df["comment"].like("%my cat%") 
        | df["comment"].like("%my cats%") 
        | df["comment"].like("%I have a cat%") 
        | df["comment"].like("%I have cat%") 
        | df["comment"].like("%my puppy%") 
        | df["comment"].like("%my puppies%") 
        | df["comment"].like("%my kitty%") 
        | df["comment"].like("%my kitties%") 
        | df["comment"].like("%I have a kitty%") 
        | df["comment"].like("%I have kitties%") 
        | df["comment"].like("%I have a puppy%") 
        | df["comment"].like("%I have puppies%"))

df_clean = df.withColumn('dog_cat',  cond)

# find user do not have 
df_clean = df_clean.withColumn('no_pet', ~df_clean["comment"].like("%my%") & ~df_clean["comment"].like("%have%")) 
df_clean = df_clean.withColumn('label', col("dog_cat").cast(IntegerType()).cast('double'))

df_clean.show()

+------------+-------+--------------------+-------+------+-----+
|creator_name| userid|             comment|dog_cat|no_pet|label|
+------------+-------+--------------------+-------+------+-----+
|        null| 9224.0|So many ignorant ...|  false|  true|  0.0|
|        null|14478.0|copter pilot no g...|  false|  true|  0.0|
|        null|28126.0|That guy will los...|  false|  true|  0.0|
|        null|28324.0|           ggvvvvvvv|  false|  true|  0.0|
|        null|33544.0|Nice buy Islamaba...|  false|  true|  0.0|
|        null|33661.0|Дами и Господа пр...|  false|  true|  0.0|
|        null|36985.0|Vert nice eye can...|  false|  true|  0.0|
|        null|37343.0| Alec is my favorite|  false| false|  0.0|
|        null|37407.0|Long time fan her...|  false| false|  0.0|
|        null|40081.0|Stefan is... brok...|  false|  true|  0.0|
|        null|52789.0|HELL YEAH now wer...|  false|  true|  0.0|
|        null|53007.0|bruh he slapet hi...|  false|  true|  0.0|
|        null|55762.0|Lov

In [7]:
for colume in df_clean.columns:
    df_clean=df_clean.filter(df_clean[colume].isNotNull())

## 2.data preprocessing and tokenizing
tokenizing is the process of dividing a string of written language into its component words.
Stop words are words that appear frequently in the document, but do not carry too much meaning. They should not be considered in the algorithm.

In [8]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Display some of the stop words
stopwords[:10]

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your']

In [9]:
from pyspark.ml.feature import RegexTokenizer
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="text", pattern="\\W")
df_clean= regexTokenizer.transform(df_clean)
 

In [10]:
remover.setInputCol("text")
remover.setOutputCol("vector_no_stopw")
df_clean = remover.transform(df_clean)
df_clean.show(10)

+--------------------+---------+--------------------+-------+------+-----+--------------------+--------------------+
|        creator_name|   userid|             comment|dog_cat|no_pet|label|                text|     vector_no_stopw|
+--------------------+---------+--------------------+-------+------+-----+--------------------+--------------------+
|#CameraLord™ • Ko...| 106860.0|Niggas in the sam...|  false|  true|  0.0|[niggas, in, the,...|[niggas, position...|
|#CameraLord™ • Ko...| 254431.0|How u Wuga World ...|  false|  true|  0.0|[how, u, wuga, wo...|[u, wuga, world, ...|
|#CameraLord™ • Ko...| 273476.0|Look up BLACK FAM...|  false|  true|  0.0|[look, up, black,...|[look, black, fam...|
|#CameraLord™ • Ko...| 479125.0|Whole concept dop...|  false|  true|  0.0|[whole, concept, ...|[whole, concept, ...|
|#CameraLord™ • Ko...| 864514.0|Alright I see u B...|  false|  true|  0.0|[alright, i, see,...|[alright, see, u,...|
|#CameraLord™ • Ko...| 895169.0|t-shirt from h&m ...|  false|  t

### The Porter stemming algorithm (or ‘Porter stemmer’) is a process for removing the commoner morphological and inflexional endings from words in English.


In [11]:
from nltk.stem.porter import *
from pyspark.sql.functions import udf

# Instantiate stemmer object
stemmer = PorterStemmer()

# Create stemmer python function
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec

# Create user defined function for stemming with return type Array<String>
from pyspark.sql.types import *
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

# Create new column with vectors containing the stemmed tokens 
df_clean = df_clean.withColumn("vector_stemmed", stemmer_udf("vector_no_stopw"))

df_clean.show()

+--------------------+---------+--------------------+-------+------+-----+--------------------+--------------------+--------------------+
|        creator_name|   userid|             comment|dog_cat|no_pet|label|                text|     vector_no_stopw|      vector_stemmed|
+--------------------+---------+--------------------+-------+------+-----+--------------------+--------------------+--------------------+
|#CameraLord™ • Ko...| 106860.0|Niggas in the sam...|  false|  true|  0.0|[niggas, in, the,...|[niggas, position...|[nigga, posit, ye...|
|#CameraLord™ • Ko...| 254431.0|How u Wuga World ...|  false|  true|  0.0|[how, u, wuga, wo...|[u, wuga, world, ...|[wuga, world, don...|
|#CameraLord™ • Ko...| 273476.0|Look up BLACK FAM...|  false|  true|  0.0|[look, up, black,...|[look, black, fam...|[look, black, fam...|
|#CameraLord™ • Ko...| 479125.0|Whole concept dop...|  false|  true|  0.0|[whole, concept, ...|[whole, concept, ...|[whole, concept, ...|
|#CameraLord™ • Ko...| 864514.0|Al

In [12]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import col,size,count,when,isnan
from pyspark.sql import *
from functools import reduce

df_clean.na.drop()
#Maps a sequence of terms to their term frequencies using the hashing.
hashingTF = HashingTF(inputCol="vector_stemmed", outputCol="tf", numFeatures=200)
featurizedData = hashingTF.transform(df_clean)
featurizedData.na.drop()

featurizedData.withColumn('userid', col('userid').cast('float').cast(IntegerType()))

featurizedData.show()

+--------------------+---------+--------------------+-------+------+-----+--------------------+--------------------+--------------------+--------------------+
|        creator_name|   userid|             comment|dog_cat|no_pet|label|                text|     vector_no_stopw|      vector_stemmed|                  tf|
+--------------------+---------+--------------------+-------+------+-----+--------------------+--------------------+--------------------+--------------------+
|#CameraLord™ • Ko...| 106860.0|Niggas in the sam...|  false|  true|  0.0|[niggas, in, the,...|[niggas, position...|[nigga, posit, ye...|(200,[26,55,87,99...|
|#CameraLord™ • Ko...| 254431.0|How u Wuga World ...|  false|  true|  0.0|[how, u, wuga, wo...|[u, wuga, world, ...|[wuga, world, don...|(200,[15,58,104,1...|
|#CameraLord™ • Ko...| 273476.0|Look up BLACK FAM...|  false|  true|  0.0|[look, up, black,...|[look, black, fam...|[look, black, fam...|(200,[18,28,62,65...|
|#CameraLord™ • Ko...| 479125.0|Whole concept 

In [13]:
#IDF is an Estimator which is fit on a dataset and produces an IDFModel. 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
idf = IDF(inputCol="tf", outputCol="features")
idfModel = idf.fit(featurizedData)

In [14]:
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[26,55,87,99...|
|  0.0|(200,[15,58,104,1...|
|  0.0|(200,[18,28,62,65...|
|  0.0|(200,[40,85,103,1...|
|  0.0|(200,[115,125,141...|
|  0.0|(200,[20,126],[3....|
|  0.0|(200,[4,12,60],[3...|
|  0.0|(200,[47,78,83,10...|
|  0.0|(200,[7,44,60],[3...|
|  0.0|(200,[51,67,73,82...|
|  0.0|(200,[125,149,197...|
|  0.0|(200,[1,13,18,31,...|
|  0.0|(200,[30,42,63,85...|
|  0.0|(200,[141],[3.535...|
|  0.0|(200,[13,40,158],...|
|  0.0|(200,[21,29,42,53...|
|  0.0|(200,[112],[2.710...|
|  0.0|(200,[17,42,50,53...|
|  0.0|(200,[0,9,24,40,9...|
|  0.0|(200,[90,121],[3....|
+-----+--------------------+
only showing top 20 rows



In [15]:
pet = rescaledData.filter("label=1.0")
pet_train, pet_test = pet.randomSplit([0.8, 0.2])
nopet = rescaledData.filter("label=0.0")
sampleRatio = float(pet.count()) / float(nopet.count())
sample_nopet = nopet.sample(False, sampleRatio)
df_sample = pet.unionAll(sample_nopet)
sample_nopet_train, sample_nopet_test = sample_nopet.randomSplit([0.8, 0.2])

df_train = pet_train.unionAll(sample_nopet_train)
df_test = pet_test.unionAll(sample_nopet_test)
print ('training size',df_train.count())
print ('testing size',df_test.count())

training size 3124
testing size 796


In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

lr = LogisticRegression(maxIter=10,featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01,0.1]) \
    .build()

evaluator=BinaryClassificationEvaluator()
crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)


cvModel = crossval.fit(df_train)
best_model = cvModel.bestModel
trainingSummary = best_model.summary

In [18]:
path = "youtube/"

best_model.save(path + 'best_model')

In [19]:
prediction_train = best_model.transform(df_train)
prediction_test = best_model.transform(df_test)
accuracy_train = prediction_train.filter(prediction_train.label == prediction_train.prediction).count()/float(df_train.count())
accuracy_test = prediction_test.filter(prediction_test.label == prediction_test.prediction).count()/float(df_test.count())

print('Training set areaUnderROC: ' + str(evaluator.evaluate(prediction_train)))
print('Testing set areaUnderROC ' + str(evaluator.evaluate(prediction_test)))
print('Training set accuracy: ' + str(accuracy_train))
print('Testing set accuracy ' + str(accuracy_test))

Training set areaUnderROC: 0.9667893698761792
Testing set areaUnderROC 0.9453214396348439
Training set accuracy: 0.9231754161331626
Testing set accuracy 0.8932160804020101


## 3. Classify All The Users
We can now apply the cat/dog classifiers to all the other users in the dataset.

In [20]:
prediction = best_model.transform(rescaledData)

total_pet_owner = prediction.filter("prediction = 1.0").count()
total_population = df.select("userid").distinct().count()
pet_owner_ratio = float(total_pet_owner)/float(total_population)
print('total_pet_owner :',total_pet_owner)
print('total_population :',total_population)
print('pet_owner_ratio :',pet_owner_ratio)

total_pet_owner : 31964
total_population : 240075
pet_owner_ratio : 0.1331417265437884


## 4. Get insigts of Users

In [21]:
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel

pet_owner = prediction.filter("prediction = 1.0").select('userid','vector_stemmed')

cv = CountVectorizer(inputCol="vector_stemmed", outputCol="features",
                     minTF=2, # minium number of times a word must appear in a document
                     minDF=4) # minimun number of documents a word must appear in

countVectorModel = cv.fit(pet_owner)

countVectors = (countVectorModel
                .transform(pet_owner)
                .select("userid", "features").cache())

print(len(countVectorModel.vocabulary))  # how many documents, vocab size

numTopics = 10 # number of topics

lda = LDA(k = numTopics,
          maxIter = 50 # number of iterations
          )

ldaModel = lda.fit(countVectors)


# Print topics and top-weighted terms
topics = ldaModel.describeTopics(maxTermsPerTopic=20)
vocabArray = countVectorModel.vocabulary

ListOfIndexToWords = udf(lambda wl: list([vocabArray[w] for w in wl]))
FormatNumbers = udf(lambda nl: ["{:1.4f}".format(x) for x in nl])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)

7442
+---------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                  |
+---------------------------------------------------------------------------------------------------------------------------------------+
|[win, happi, home, long, adopt, vegan, right, ass, one, shelbi, store, swim, cho, colleg, chi, eagl, chewi, releas, evil, record]      |
|[get, hors, back, guy, tank, littl, day, put, cute, ride, mom, show, water, treat, come, poor, never, way, cut, start]                 |
|[one, video, love, get, make, dont, time, know, dog, see, want, peopl, use, got, like, also, anim, year, work, realli]                 |
|[kitten, hola, comment, half, miss, counter, dan, bulli, box, surgeri, other, race, stitch, nose, dude, view, bitten, het, leaf, fed]  |
|[dog, thank, snake, help, pl

## 5. Identify Creators With Cat And Dog Owners In The Audience

In [22]:
from pyspark.sql.functions import countDistinct
tmp = prediction.filter("prediction = 1.0")
tmp.groupBy('creator_name').agg(countDistinct('userid')).sort('count(DISTINCT userid)',ascending= False).show()

+--------------------+----------------------+
|        creator_name|count(DISTINCT userid)|
+--------------------+----------------------+
|    Brave Wilderness|                  2605|
|            The Dodo|                  2315|
|        Robin Seplut|                  1714|
|       Brian Barczyk|                  1252|
|  Taylor Nicole Dean|                  1202|
|Hope For Paws - O...|                  1175|
|           Vet Ranch|                   940|
|    Cole & Marmalade|                   889|
|     Gohan The Husky|                   773|
|     Viktor Larkhill|                   758|
|Gone to the Snow ...|                   641|
|   Talking Kitty Cat|                   592|
|          stacyvlogs|                   499|
|        Paws Channel|                   461|
|Zak Georges Dog T...|                   386|
|            ViralHog|                   325|
|  Think Like A Horse|                   288|
|         Info Marvel|                   287|
|    SlideShow ForFun|            