In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder.appName("Facebook_srossg").getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [2]:
from pymongo import MongoClient
client = MongoClient('compute-0-11', 27017)

db = client.FacebookChallenge_akar1
#train data
collection = db.fb_hw
contents = collection.find().limit(1000)

In [3]:
from bson import json_util, ObjectId
import json
rddSan = json.loads(json_util.dumps(contents))


In [4]:
rdd = sc.parallelize(rddSan)


In [5]:
schema = StructType([StructField("Body", StringType(), True), 
                     StructField("Id", IntegerType(), True), 
                     StructField("Tags", StringType(), True),
                     StructField("Title", StringType(), True),
                     StructField("_id", StringType(), True)])

train = sqlContext.createDataFrame(rdd, schema=schema)


In [6]:
train = train.drop("_id")
#eliminate the duplicates
train = train.drop_duplicates(['Body','Title'])

In [7]:
# load the relevant packages
from pyspark.sql import SQLContext, functions as sf
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,Tokenizer, RegexTokenizer,StopWordsRemover, StringIndexer, NGram, HashingTF, IDF,MinHashLSH,MinHashLSHModel,Word2Vec,Normalizer
from pyspark.ml.classification import DecisionTreeClassifier, OneVsRest,LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer,CountVectorizer
from pyspark.ml.clustering import KMeans,GaussianMixture,BisectingKMeans
import numpy as np
import pandas as pd
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors

In [8]:
train.printSchema()

root
 |-- Body: string (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)



In [9]:
import sys
import csv as csv 
from bs4 import BeautifulSoup
import os

In [10]:
#Reference:https://github.com/alexeyza/Kaggle-Facebook3/blob/master/model/pre_process.py
#remove the HTML tags associated with the model
def filter_html_tags(text):
    # the following tags and their content will be removed, for example <a> tag will remove any html links
    tags_to_filter = ['code','a']
    if isinstance(text, unicode):
        text = text.encode('utf8')
    soup = BeautifulSoup(text)
    for tag_to_filter in tags_to_filter:
        text_to_remove = soup.findAll(tag_to_filter)
        [tag.extract() for tag in text_to_remove]
    return soup.get_text()
filter_html_tags_udf = udf(filter_html_tags, StringType())
train2 = train.withColumn("body",filter_html_tags_udf("Body"))

In [11]:
#fitting the transformers
#Regex Tokenizer to tokenize the data based on the pattern
tokenize_body  = RegexTokenizer(inputCol="body",outputCol="body_tokenized", pattern="[^a-zA-Z#+]")
train1 = tokenize_body.transform(train2)
tokenize_Title  = RegexTokenizer(inputCol="Title",outputCol="title_tokenized",pattern="[^a-zA-Z#+]")
train1 = tokenize_Title.transform(train1)
tokenize_tags  = Tokenizer(inputCol="Tags",outputCol="tags_tokenized")
train1 = tokenize_tags.transform(train1)
remover_body_stop = StopWordsRemover(inputCol="body_tokenized",outputCol="body_stopremove")
train1 = remover_body_stop.transform(train1)
remover_title_stop = StopWordsRemover(inputCol="title_tokenized",outputCol="title_stopremove")
train1 = remover_title_stop.transform(train1)

In [12]:
#get the unique tags in the train data
tags =()
for word in train1.select("tags_tokenized").take(train1.count()):
    if word not in tags:
        tags += word
j =1
flat_taglist = []
for sublist in tags:
    for item in sublist:
        flat_taglist.append(item)
        j+=1
unique_tags = list(set(flat_taglist))    

In [13]:
#get the count corresponding to the unique tags
tags_count={}
for i in flat_taglist:
    tags_count[i]=flat_taglist.count(i)


In [14]:
#Average tags per sentence(Based on 1000 rows)
str(j/train1.count())

'2'

In [15]:
#tried stemming the tokens but it does not impact our keywords infact it adds noise to the data with 
#wrong stemming like "consider" becoming'conside'

from nltk.stem.snowball import SnowballStemmer
from nltk.stem import PorterStemmer, WordNetLemmatizer

stemmer = SnowballStemmer("english")
train3 = train1.toPandas()
train3['body_stopremove'] = train3["body_stopremove"].apply(lambda x: [stemmer.stem(y) for y in x])

In [16]:
train1_renamed = train1.withColumnRenamed("tags_tokenized","label")

In [17]:
#Combining body and title as per the probability of 1:2 (Giving more importance to words in Title than body)
from itertools import chain
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *

def concat(type):
    def concat_(*args):
        return list(chain(*args))
    return udf(concat_, ArrayType(type))


concat_string_arrays = concat(StringType())

#training data
train1_concat= train1.select(concat_string_arrays(col("body_stopremove"), col("title_stopremove")),"ID","tags_tokenized","title_stopremove")
train1_concat = train1_concat.withColumnRenamed("concat_(body_stopremove, title_stopremove)", "body_title")
train2_concat= train1_concat.select(concat_string_arrays(col("body_title"), col("title_stopremove")),"ID","tags_tokenized","title_stopremove")
train2_concat = train2_concat.withColumnRenamed("concat_(body_title, title_stopremove)", "body_title")

In [18]:
train2_concat.show()

+--------------------+---+--------------------+--------------------+
|          body_title| ID|      tags_tokenized|    title_stopremove|
+--------------------+---+--------------------+--------------------+
|[aloha, everyone,...| 42|[php, firefox, sa...|[safari, displays...|
|[specified, actio...|155|[asp.net-mvc, asp...|[graceful, degrad...|
|[implement, solut...|736|[asp.net, sql, as...|[working, date, r...|
|[php, ini, file, ...|850|[php, unicode, lo...|[internal, encodi...|
|[friend, running,...|157|[windows-vista, r...|[repair, corupt, ...|
|[im, using, array...|793|   [java, arraylist]|[arraylist, objec...|
|[make, applicatio...|332|[java, android, a...|[make, applicatio...|
|[following, guide...|857|[java, refactorin...|  [smaller, methods]|
|[looking, good, r...|464|[windows, securit...|[good, resource, ...|
|[checked, many, a...|621|[xml, xslt, names...|[add, namespace, ...|
|[deleting, event,...|173|[development, eve...|[cyclic, calling,...|
|[m, running, serv...|820|[mysql, 

In [23]:
#train2_concat
hashingTF = HashingTF(inputCol="body_title", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(train2_concat)
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
normalizer = Normalizer(inputCol="idf_features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(rescaledData)
assembler = VectorAssembler(
    inputCols=["normFeatures"],
    outputCol="features")

output = assembler.transform(l1NormData)

In [103]:
#Performing Kmeans clustering
kmeans = KMeans().setK(10).setSeed(1)
model = kmeans.fit(output)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(output)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
    
print("Within Set Sum of Squared Errors = " + str(wssse))
transformed = model.transform(output).select("prediction","body_title","tags_tokenized")

Within Set Sum of Squared Errors = 2242342.85745
Cluster Centers: 
[ 0.00640124  0.02984128  0.         ...,  0.          0.04480871
  0.06307919]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.          0.26829518  0.         ...,  0.          0.          0.        ]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.  0.  0. ...,  0.  0.  0.]
[ 0.  0.  0. ...,  0.  0.  0.]
Within Set Sum of Squared Errors = 2242342.85745


In [104]:
summary= model.summary
#Show the summary predictions
print("cluster Sizes:"+str(summary.clusterSizes))



cluster Sizes:[971, 1, 18, 1, 1, 1, 4, 1, 1, 1]


In [24]:
gmm = GaussianMixture().setK(10).setSeed(232)
gmm_cluster_model = gmm.fit(output)


In [26]:
#model parameters
summary_gmm= gmm_cluster_model.summary
print("LogLikelihood of a cluster = " + str(summary_gmm.logLikelihood))
print("Cluster Sizes = " + str(summary_gmm.clusterSizes))
gmm_cluster_model.gaussiansDF.show(2,truncate=False)
#weights for each of the cluster
print ("clustermodel weights:" +str(gmm_cluster_model.weights))

LogLikelihood of a cluster = 32173.2276132
Cluster Sizes = [141, 32, 58, 93, 204, 104, 102, 155, 73, 38]
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [28]:
#View prediction and probability of belonging to each of the cluster
transformed_output= gmm_cluster_model.transform(output).select("prediction","body_title","tags_tokenized","probability")
transformed_output.select("probability","prediction").show(1,truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|probability                                                                                                                                                                                                            |prediction|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|[2.8877506806219266E-12,7.376330270715967E-31,9.356904868196832E-12,0.5526827019482105,2.8340717397983486E-5,3.745006874085722E-30,0.004156699982355203,0.4431184516408654,1.3805698926122665E-5,7.376330270352109E-31]|3         |
+-----------------------------------------------------------------------------------

In [32]:
#Reference:https://stackoverflow.com/questions/42051184/latent-dirichlet-allocation-lda-in-spark
from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors

docDF = train2_concat.select("ID","body_title")
Vector = CountVectorizer(inputCol="body_title", outputCol="vectors")
cv = Vector.fit(docDF)
output = cv.transform(docDF)

topics_bag = output.select("ID", "vectors").rdd.map(lambda (x,y): [x,Vectors.fromML(y)]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(topics_bag, k=3,maxIterations=100,optimizer='online')
topics = ldaModel.topicsMatrix()
vocabArray = model.vocabulary

wordNumbers = 10  # number of words per topic
topicIndices = sc.parallelize(ldaModel.describeTopics(maxTermsPerTopic = wordNumbers))

def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

Topic0:
number
input
n
links
post
authentication
got
product
clone
happens


Topic1:
using
m
like
code
get
use
want
way
file
one


Topic2:
statement
im
windows
wondering
problems
pass
end
instance
needs
strange


