#### 0. Data Exploration and Cleaning

In [2]:
# read data
df = spark.read.load("/FileStore/tables/animals_comments_csv-5aaff.gz", format='csv', header = True, inferSchema = True)
from pyspark.sql.types import IntegerType
df = df.withColumn("userid", df["userid"].cast(IntegerType()))
df.show(10)

In [3]:
# find user with preference of dog and cat
# note: please propose your own approach and rule to label data 
cond = (df["comment"].like("%my dog%") | df["comment"].like("%I have a dog%")\
        | df["comment"].like("%my cat%") | df["comment"].like("%I have a cat%"))

df_clean = df.withColumn('dog_cat',  cond)
df_clean = df_clean.withColumn('has_cat',  (df["comment"].like("%my cat%") | df["comment"].like("%I have a cat%")))
df_clean = df_clean.withColumn('has_dog',  (df["comment"].like("%my dog%") | df["comment"].like("%I have a dog%")))
# find user do not have 
df_clean = df_clean.withColumn('no_pet', ~df_clean["comment"].like("%my%") & ~df_clean["comment"].like("%have%")) 


In [4]:
df_clean.show(10)

In [5]:
# data preprocessing 
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="text", pattern="\\W")
df_clean = regexTokenizer.transform(df_clean)
df_clean.show(10)

#### 1. Build the classifier 
In order to train a model against the comments, you can use RegexTokenizer to split each comment into a list of words and then use Word2Vec or other model to convert the list to a word vector. What Word2Vec does is to map each word to a unique fixed-size vector and then transform each document into a vector using the average of all words in the document.

In [7]:
from pyspark.ml.feature import Word2Vec
# Learn a mapping from words to Vectors. referenced at: https://spark.apache.org/docs/2.1.0/ml-features.html#word2vec
word2Vec = Word2Vec(vectorSize=300, minCount=2, inputCol="text", outputCol="w2v_result") 
# choose vectorSize as 300 a normal practice and minCount=2 based on the 'text' column generated from above cell 
df_word = spark.createDataFrame(df_clean.select("text").take(9960)) 
# Explanation for 9960:for some reason, after 9960, there will be an error, so I choose to run the data set up until 9960 first.

model = word2Vec.fit(df_word) # train the model 
result = model.transform(df_clean) # apply trained model to entire dataset df_clean 
for row in result.take(10):
    creator_name,userid,comment,dog_cat,has_cat,has_dog,no_pet,text,w2v_result = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(w2v_result)))

In [8]:
from pyspark.ml.classification import LogisticRegression
df_clean_train = result.where("has_cat=true or has_dog=true")
df_clean_train.show(10)
df_clean_train.registerTempTable('df_clean_train')
#df_clean_train.createOrReplaceTempView("df_clean_train")
df_clean_train = spark.sql("SELECT CASE when has_cat =true then 1 else 0 end as label, w2v_result  AS features FROM df_clean_train") # create features table for model 
df_clean_train.show(10)
# train LR model
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(df_clean_train)
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 100  # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.0001, lr.threshold: 0.55})  # Specify multiple Params.

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(df_clean_train, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

In [9]:
# test data 
df_clean_valid = result.where("has_cat=true or has_dog=true") # same as train
df_clean_valid.show(10)
df_clean_valid.registerTempTable('df_clean_valid')
df_clean_valid = spark.sql("SELECT userid, CASE when has_cat =true then 1 else 0 end as label, comment, w2v_result AS features FROM df_clean_valid")

# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(df_clean_valid)
valid_result = prediction.select("label","userid", "comment" , "myProbability", "prediction") 

total,correct = 0.,0.

for row in valid_result.collect():
    total += 1
    if row.prediction == row.label:
      correct +=1
print("total accuracy: %d/%d: %.2f" %(correct,total, correct* 100./total))


#### 2. Classify All The Users
We can now apply the cat/dog classifiers to all the other users in the dataset.

In [11]:
# Now expanding the trained model to other users in the dataset, specifically the users that were previously classified as having no cat nor dog. 
df_clean_test = result.where("has_cat=false AND has_dog=false")
df_clean_test.show(10)
df_clean_test.registerTempTable('df_clean_test')
df_clean_test = spark.sql("SELECT creator_name, userid, comment, w2v_result AS features FROM df_clean_train")
prediction = model2.transform(df_clean_test)
test_result = prediction.select("creator_name","userid", "comment" , "myProbability", "prediction")
#referenced at: https://spark.apache.org/docs/latest/ml-pipeline.html#example-estimator-transformer-and-param

In [12]:
# visualize results
test_result.registerTempTable('test_result')
test_result = spark.sql("SELECT creator_name,userid, comment, CASE WHEN prediction < 0.5 then 'dog_person' ELSE 'cat_person' END as prediction FROM test_result")
test_result.show(20)

#### 3. Get insigts of Users

In [14]:
print("if the user mentions \"dog\", he/she tends to be a dog owner, maked as 0")
print("if the user mentions \"cat\", he/she tends to be a cat owner,maked as 1")
test_result2 = spark.sql("SELECT COUNT(userid), prediction FROM test_result GROUP BY prediction")
test_result2.show()

#### 4. Identify Creators With Cat And Dog Owners In The Audience

In [16]:
print("all users are analysed already from the table")
test_result = spark.sql("SELECT creator_name, sum(dog_count), sum(cat_count) FROM \
                        (SELECT creator_name,userid, comment, CASE WHEN prediction < 0.5 then 1 ELSE 0 END as dog_count, \
                                  CASE WHEN prediction >= 0.5 then 1 ELSE 0 END as cat_count FROM test_result) as t \
                                  GROUP BY creator_name")
test_result.show(20)

#### 5. Analysis and Future work

In [18]:
# try random forest or other machine learning models.