Task description.


Step 1: Identify Cat And Dog Owners
Find the users who are cat and/or dog owners.
Step 2: Build And Evaluate Classifiers
Build classifiers for the cat and dog owners and measure the performance of the classifiers.
Step 3: Classify All The Users
Apply the cat/dog classifiers to all the users in the dataset. Estimate the fraction of all users
who are cat/dog owners.
Step 4: Extract Insights About Cat And Dog Owners
Find topics important to cat and dog owners.
Step 5: Identify Creators With Cat And Dog Owners In The Audience
Find creators with the most cat and/or dog owners. Find creators with the highest statistically
significant percentages of cat and/or dog owners.


Import necessary packages.


In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, Tokenizer
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import  IDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import desc
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import LDA
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.classification import LogisticRegression
import matplotlib.pyplot as plt
import numpy as np

Hadoop enables distributed, low-cost storage for this growing amount of unstructured data. In this post, I'll show one way to analyze unstructured data using Apache Spark. Spark is advantageous for text analytics because it provides a platform for scalable, distributed computing. 


In [3]:
%time
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("Text_Classification") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

spark.sparkContext.setLogLevel('WARN')

print(spark.version)

CPU times: user 6 µs, sys: 2 µs, total: 8 µs
Wall time: 15.5 µs


2.3.1


In [4]:
path = '/home/okravchenko/text_classification/data'
data = spark.read.csv([path])
print("Number of documents read in is:", data.count())

Number of documents read in is: 5820036


Data exploration.


In [3]:
data.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [5]:
data.head(50)

[Row(_c0='creator_name', _c1='userid', _c2='comment'),
 Row(_c0='Doug The Pug', _c1=' 87', _c2='I shared this to my friends and mom the were lol'),
 Row(_c0='Doug The Pug', _c1=' 87', _c2='Super cute  😀🐕🐶'),
 Row(_c0='bulletproof', _c1=' 530', _c2='stop saying get em youre literally dumb . have some common sense or dont own this kind of dog. fucking retarded I swear'),
 Row(_c0='Meu Zoológico', _c1=' 670', _c2='Tenho uma jiboia e um largato'),
 Row(_c0='ojatro', _c1=' 1031', _c2='I wanna see what happened to the pigs after that  please'),
 Row(_c0='Tingle Triggers', _c1=' 1212', _c2='Well shit now Im hungry'),
 Row(_c0='Hope For Paws - Official Rescue Channel', _c1=' 1806', _c2='when I saw the end it said to adopt  I saw different animal sites I was mad that they separated the cute little pups after being  together for a long time'),
 Row(_c0='Hope For Paws - Official Rescue Channel', _c1=' 2036', _c2='Holy crap. That is quite literally the most adorable pup Ive ever seen.'),
 Row(_c0=

In [6]:
len(data.columns), data.columns

(3, ['_c0', '_c1', '_c2'])

In [7]:
data.describe().show()

+-------+--------------------+-----------------+--------------------+
|summary|                 _c0|              _c1|                 _c2|
+-------+--------------------+-----------------+--------------------+
|  count|             5787986|          5819471|             5818985|
|   mean|1.160987527582250...|1270216.870654716|            Infinity|
| stddev|1.072166106440056...|733809.5444038915|                 NaN|
|    min|      🙌º°˚* Have...|                1|                    |
|    max|     😁😁😁😁😁😁😁"|           userid|🧡🧡💛🧡💛🧡💛🧡?...|
+-------+--------------------+-----------------+--------------------+



In [5]:
%%time
data = data.withColumnRenamed('_c0', 'owner').withColumnRenamed('_c1', 'id_comentator').withColumnRenamed('_c2', 'text')
data.show(5, truncate = False)

+-------------+-------------+-----------------------------------------------------------------------------------------------------------------------+
|owner        |id_comentator|text                                                                                                                   |
+-------------+-------------+-----------------------------------------------------------------------------------------------------------------------+
|creator_name |userid       |comment                                                                                                                |
|Doug The Pug | 87          |I shared this to my friends and mom the were lol                                                                       |
|Doug The Pug | 87          |Super cute  😀🐕🐶                                                                                                     |
|bulletproof  | 530         |stop saying get em youre literally dumb . have some common sense or dont o

In [None]:
#TODO perform language detection (fasttext library can be used).

+-------------+--------------------+
|        owner|                text|
+-------------+--------------------+
| creator_name|             comment|
| Doug The Pug|I shared this to ...|
| Doug The Pug|  Super cute  😀🐕🐶|
|  bulletproof|stop saying get e...|
|Meu Zoológico|Tenho uma jiboia ...|
+-------------+--------------------+
only showing top 5 rows



In [5]:
data.groupBy("owner") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(n=50, truncate=40)

+---------------------------------------+-------+
|                                  owner|  count|
+---------------------------------------+-------+
|                       Brave Wilderness|1048751|
|                          Brian Barczyk| 386324|
|                               The Dodo| 325071|
|                     Taylor Nicole Dean| 235644|
|Hope For Paws - Official Rescue Channel| 127731|
|                            Info Marvel| 121353|
|                           Robin Seplut| 116551|
|                     꼬부기아빠 My Pet Diary| 102594|
|                              Vet Ranch| 102127|
|                        Gohan The Husky|  99580|
|                               ViralHog|  87939|
|                        Viktor Larkhill|  76341|
|                      Talking Kitty Cat|  74315|
|                    Keedes channel LIVE|  72829|
|                              MonkeyBoo|  70173|
|                               Mạnh CFM|  64826|
|                  Gone to the Snow Dogs|  55244|


I will drop commentator_id column as have assumptions that this data is not obligatory for the task.


In [6]:
drop_list = ['id_comentator']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+-------------+--------------------+
|        owner|                text|
+-------------+--------------------+
| creator_name|             comment|
| Doug The Pug|I shared this to ...|
| Doug The Pug|  Super cute  😀🐕🐶|
|  bulletproof|stop saying get e...|
|Meu Zoológico|Tenho uma jiboia ...|
+-------------+--------------------+
only showing top 5 rows



Fill na values.


In [7]:
%%time
ownerMissing = data.where(col("owner").isNull())
data = data.na.drop(subset=["owner"])
data.count()
textMissing = data.where(col("text").isNull())
data = data.na.drop(subset=["text"])
data.count()

CPU times: user 6.44 ms, sys: 1.82 ms, total: 8.26 ms
Wall time: 8.82 s


Data pre-processing stages.
1. Index owner id as numeric variable.
2. Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). Let’s tokenize the messages and create a list of words of each message.
3. CountVectorizer converts the list of tokens above to vectors of token counts.
4. Remove stopwords. Elimninate short messages, digits, special characters and top-50.
5. CountVEctorizer within clean data.
6. TD-IDF. Numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus.

In [8]:
%%time
stringIndexer = StringIndexer(inputCol="owner", outputCol= "owner" + "Index")
model = stringIndexer.fit(data)
data = model.transform(data)

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsDataFrame = tokenizer.transform(data)

cv_tmp = CountVectorizer(inputCol="words", outputCol="tmp_vectors")
cv_tmp_model = cv_tmp.fit(wordsDataFrame)

top20 = list(cv_tmp_model.vocabulary[0:50])
more_then_3_charachters = [word for word in cv_tmp_model.vocabulary if len(word) <= 3]
contains_digits = [word for word in cv_tmp_model.vocabulary if any(char.isdigit() for char in word)]
stopwords = [] 
stopwords = stopwords + top20 + more_then_3_charachters + contains_digits
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords = stopwords)
wordsDataFrame = remover.transform(wordsDataFrame)

cv = CountVectorizer(inputCol="filtered", outputCol="raw_features")
cvmodel = cv.fit(wordsDataFrame)
df_vect = cvmodel.transform(wordsDataFrame)

idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(df_vect)
rescaledData = idfModel.transform(df_vect)
rescaledData.select("ownerIndex", "features").show()
print('Finished')

+----------+--------------------+
|ownerIndex|            features|
+----------+--------------------+
|    3811.0|(262144,[208],[5....|
|      90.0|(262144,[34,310,2...|
|      90.0|(262144,[14,168],...|
|     835.0|(262144,[0,12,75,...|
|     164.0|(262144,[7115,869...|
|     176.0|(262144,[33,94,24...|
|    1364.0|(262144,[64,232,1...|
|       4.0|(262144,[14,25,30...|
|       4.0|(262144,[63,97,18...|
|     317.0|      (262144,[],[])|
|       1.0|(262144,[188,3127...|
|       2.0|      (262144,[],[])|
|       4.0|(262144,[37,227,4...|
|       4.0|(262144,[6,21,28,...|
|      12.0|(262144,[136,160,...|
|       0.0|(262144,[0,2,7,9,...|
|      56.0|(262144,[36,651,5...|
|      99.0|(262144,[18742],[...|
|       2.0|(262144,[32,230,1...|
|       0.0|(262144,[2,9,36,2...|
+----------+--------------------+
only showing top 20 rows

Finished
CPU times: user 3.53 s, sys: 540 ms, total: 4.07 s
Wall time: 2min 39s


Pipiline approach


In [None]:
#stages = []
#stringIndexer = StringIndexer(inputCol="owner", outputCol= "owner" + "Index")
#encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=["owner" + "classVec"])
#regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
#TODO implement better stopwords solution
#remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
#countVectors = CountVectorizer(inputCol="filtered_words", outputCol="rawFeatures", vocabSize=10000, minDF=5)
#idf = IDF(inputCol="rawFeatures", outputCol="features")
#TODO IDF do not needed for unsupervized task?
#stages += [stringIndexer, encoder, regexTokenizer, remover, countVectors, idf]

#pipeline = Pipeline(stages=stages)
#pipelineModel = pipeline.fit(data)
#data = pipelineModel.transform(data)
#data.show()

LDA


LDA attempts to do so by interpreting topics as unseen, or latent, distributions over all of the possible words (vocabulary) in all of the documents (corpus). This was originally developed for text analysis


In [None]:
lda = LDA(k=2, 
          #seed=123, 
          #optimizer="em", 
          featuresCol="vectors")

model = lda.fit(df_vect)

#Print the topics in the model
topics = model.describeTopics(maxTermsPerTopic = 15)
print('Finished')


In [16]:
print("Learned topics (as distributions over vocab of " + str(model.vocabSize())
      + " words):")

Learned topics (as distributions over vocab of 262144 words):


In [32]:
wordNumbers = 10
topicIndices = model.describeTopics(maxTermsPerTopic = wordNumbers)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:37807)
Traceback (most recent call last):
  File "/home/okravchenko/venv/text_classification/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/okravchenko/venv/text_classification/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:37807)

In [43]:
topicIndices.show()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:37807)
Traceback (most recent call last):
  File "/home/okravchenko/venv/text_classification/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/okravchenko/venv/text_classification/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:37807)

In [None]:
topics = model.topicsMatrix()


for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, model.vocabSize()):
        print(" " + str(topics[word][topic]))

PySpark has internal error on this stage.


K-means modelling



k-means works better for a data distribution possessing hyper-ellipsoidal shape 


In [None]:
k = 2
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(rescaledData)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)


In [None]:
evaluator = ClusteringEvaluator()
predictions = model.transform(rescaledData)
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))


Silhouette coefficients (as these values are referred to as) near +1 indicate that the sample is far away from the neighboring clusters. A value of 0 indicates that the sample is on or very close to the decision boundary between two neighboring clusters and negative values indicate that those samples might have been assigned to the wrong cluster.


In [19]:
predictions.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|               owner|                text|ownerIndex|               words|            filtered|        raw_features|            features|prediction|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|        creator_name|             comment|    3811.0|           [comment]|           [comment]|(262144,[208],[1.0])|(262144,[208],[5....|         0|
|        Doug The Pug|I shared this to ...|      90.0|[i, shared, this,...|[shared, friends,...|(262144,[34,310,2...|(262144,[34,310,2...|         0|
|        Doug The Pug|  Super cute  😀🐕🐶|      90.0|[super, cute, , ?...|[super, cute, 😀?...|(262144,[14,168],...|(262144,[14,168],...|         0|
|         bulletproof|stop saying get e...|     835.0|[stop, saying, ge...|[stop, saying, yo...|(262144,

BisectingKMeans


In [9]:
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(rescaledData)


In [10]:
cost = model.computeCost(rescaledData)
print("Within Set Sum of Squared Errors = " + str(cost))

Within Set Sum of Squared Errors = 5274651215.386315


In [11]:
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)


Cluster Centers: 


[8.53331854e-02 7.88765874e-02 7.71313244e-02 ... 4.99173650e-06
 9.98347300e-06 9.98347300e-06]
[1.34905471e+00 1.41399935e+00 1.28937830e+00 ... 1.44658858e-04
 0.00000000e+00 0.00000000e+00]


In [12]:
predictions_bi = model.transform(rescaledData)

In [14]:
predictions_bi.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|               owner|                text|ownerIndex|               words|            filtered|        raw_features|            features|prediction|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|        creator_name|             comment|    3811.0|           [comment]|           [comment]|(262144,[208],[1.0])|(262144,[208],[5....|         0|
|        Doug The Pug|I shared this to ...|      90.0|[i, shared, this,...|[shared, friends,...|(262144,[34,310,2...|(262144,[34,310,2...|         0|
|        Doug The Pug|  Super cute  😀🐕🐶|      90.0|[super, cute, , ?...|[super, cute, 😀?...|(262144,[14,168],...|(262144,[14,168],...|         0|
|         bulletproof|stop saying get e...|     835.0|[stop, saying, ge...|[stop, saying, yo...|(262144,

In [16]:
predictions_bi.groupBy("prediction").count().show()

+----------+-------+
|prediction|  count|
+----------+-------+
|         1| 191725|
|         0|5595220|
+----------+-------+



In [27]:
p_0 = predictions_bi.filter(predictions_bi.prediction == 0)  
p_0.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|               owner|                text|ownerIndex|               words|            filtered|        raw_features|            features|prediction|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|        creator_name|             comment|    3811.0|           [comment]|           [comment]|(262144,[208],[1.0])|(262144,[208],[5....|         0|
|        Doug The Pug|I shared this to ...|      90.0|[i, shared, this,...|[shared, friends,...|(262144,[34,310,2...|(262144,[34,310,2...|         0|
|        Doug The Pug|  Super cute  😀🐕🐶|      90.0|[super, cute, , ?...|[super, cute, 😀?...|(262144,[14,168],...|(262144,[14,168],...|         0|
|         bulletproof|stop saying get e...|     835.0|[stop, saying, ge...|[stop, saying, yo...|(262144,

In [40]:
p_0.groupBy("filtered") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(n=50, truncate=40)

+-----------------+------+
|         filtered| count|
+-----------------+------+
|               []|193399|
|           [cute]| 23752|
|          [first]| 10022|
|           [nice]|  8356|
|           [cool]|  8039|
|        [awesome]|  5885|
|         [coyote]|  5035|
|          [great]|  4244|
|      [beautiful]|  4103|
|        [amazing]|  3858|
|          [cute!]|  3722|
|           [want]|  3450|
|       [adorable]|  3163|
|         [videos]|  2973|
|           [keep]|  2939|
|          [early]|  2770|
|          [funny]|  2629|
|          [hello]|  2319|
|           [hola]|  2203|
|          [bless]|  2184|
|          [thank]|  2116|
|          [sweet]|  2110|
|           [awww]|  1983|
|           [fake]|  1956|
|    [give, shout]|  1836|
|         [crying]|  1792|
|           [much]|  1785|
|           [best]|  1754|
| [raptor, fossil]|  1733|
|[happy, birthday]|  1696|
|        [comment]|  1673|
|          [crazy]|  1662|
|        [channel]|  1638|
|           [vids]|  1630|
|

In [None]:
#p_0 - dogs chanell

Prediction modelling

In [25]:
(trainingData, testData) = predictions_bi.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|               owner|                text|ownerIndex|               words|            filtered|        raw_features|            features|prediction|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|#CameraLord™ • Ko...|#CameraLord went ...|     238.0|[#cameralord, wen...|[#cameralord, wen...|(262144,[193,248,...|(262144,[193,248,...|         0|
|#CameraLord™ • Ko...|#GANG SHITBIG 140...|     238.0|[#gang, shitbig, ...|[#gang, shitbig, ...|(262144,[5,153,58...|(262144,[5,153,58...|         0|
|#CameraLord™ • Ko...|#MMR #SQUAD Lets ...|     238.0|[#mmr, #squad, le...|[#mmr, #squad, lets]|(262144,[411,2475...|(262144,[411,2475...|         0|
|#CameraLord™ • Ko...|               #Mobb|     238.0|             [#mobb]|             [#mobb]|    

In [26]:
trainingData.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|               owner|                text|ownerIndex|               words|            filtered|        raw_features|            features|prediction|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|#CameraLord™ • Ko...|#CameraLord went ...|     238.0|[#cameralord, wen...|[#cameralord, wen...|(262144,[193,248,...|(262144,[193,248,...|         0|
|#CameraLord™ • Ko...|#GANG SHITBIG 140...|     238.0|[#gang, shitbig, ...|[#gang, shitbig, ...|(262144,[5,153,58...|(262144,[5,153,58...|         0|
|#CameraLord™ • Ko...|#MMR #SQUAD Lets ...|     238.0|[#mmr, #squad, le...|[#mmr, #squad, lets]|(262144,[411,2475...|(262144,[411,2475...|         0|
|#CameraLord™ • Ko...|               #Mobb|     238.0|             [#mobb]|             [#mobb]|    

In [None]:
trainingData = trainingData.withColumnRenamed('prediction', 'label')

In [33]:
trainingData = trainingData.select('features', 'label')
trainingData.show()