In [114]:
#Make MongoDB connection
from pymongo import MongoClient
client = MongoClient('compute-0-11', 27017)

db = client.FacebookChallenge_akar1
collection1 = db.fb_hw
collection2  = db.fb_hw_test
contentsTrain = collection1.find().limit(1000)
contentsTest = collection2.find().limit(1000)

In [115]:
#Create spark connector
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder \
        .appName("Facebook_nebaditn") \
        .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [116]:

from bson import json_util, ObjectId
import json
rddSan1 = json.loads(json_util.dumps(contentsTrain))
rddSan2 = json.loads(json_util.dumps(contentsTest))

In [117]:
#Generate RDD
rddTrain = sc.parallelize(rddSan1)
rddTest = sc.parallelize(rddSan2)

In [118]:
#Create schema to generate train and test dataframe
schemaTrain = StructType([StructField("Body", StringType(), True), 
                     StructField("Id", IntegerType(), True), 
                     StructField("Tags", StringType(), True),
                     StructField("Title", StringType(), True),
                     StructField("_id", StringType(), True)])
schemaTest = StructType([StructField("Body", StringType(), True), 
                     StructField("Id", IntegerType(), True), 
                     StructField("Title", StringType(), True),
                     StructField("_id", StringType(), True)])

train = sqlContext.createDataFrame(rddTrain, schema=schemaTrain)
test = sqlContext.createDataFrame(rddTest, schema=schemaTest)

In [119]:
#Print and check the data
train.show()
test.show()

+--------------------+---+--------------------+--------------------+--------------------+
|                Body| Id|                Tags|               Title|                 _id|
+--------------------+---+--------------------+--------------------+--------------------+
|<p>I'd like to ch...|  1|php image-process...|How to check if a...|{$oid=5a047973393...|
|<p>In my favorite...|  2|             firefox|How can I prevent...|{$oid=5a047973393...|
|<p>This is probab...|  4|     c# url encoding|How do I replace ...|{$oid=5a047973393...|
|<pre><code>functi...|  5|php api file-get-...|How to modify who...|{$oid=5a047973393...|
|<p>I am using a m...|  6|proxy active-dire...|setting proxy in ...|{$oid=5a047973393...|
|<p>My image is ca...|  7|           core-plot|How to draw barpl...|{$oid=5a047973393...|
|<p>I've decided t...|  8|c# asp.net window...|How to fetch an X...|{$oid=5a047973393...|
|<p>Do you know of...|  9|.net javascript c...|.NET library for ...|{$oid=5a047973393...|
|<p>I'm us

In [120]:
# Remove HTML Tags from the Body text
#Here I have used a library beautiful soup to remove the HTML tags from the body section in train and test
from bs4 import BeautifulSoup
from pyspark.sql.functions import udf
from pyspark.sql.types import *

train = train.rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], BeautifulSoup(x[0]).text))
train = train.toDF()
test = test.rdd.map(lambda x: (x[0], x[1], x[2], x[3], BeautifulSoup(x[0]).text))
test =  test.toDF()

In [121]:
#Check the new dataframe 
train.show()
test.show()

+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|                  _1| _2|                  _3|                  _4|                  _5|                  _6|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|<p>I'd like to ch...|  1|php image-process...|How to check if a...|{$oid=5a047973393...|I'd like to check...|
|<p>In my favorite...|  2|             firefox|How can I prevent...|{$oid=5a047973393...|In my favorite ed...|
|<p>This is probab...|  4|     c# url encoding|How do I replace ...|{$oid=5a047973393...|This is probably ...|
|<pre><code>functi...|  5|php api file-get-...|How to modify who...|{$oid=5a047973393...|function modify(....|
|<p>I am using a m...|  6|proxy active-dire...|setting proxy in ...|{$oid=5a047973393...|I am using a mach...|
|<p>My image is ca...|  7|           core-plot|How to draw barpl...|{$oid=5a047973393...|My image is canno...|
|

In [122]:
#Filter columns from the data frame
workingTrain = train.selectExpr("_2 as ID", "_6 as Body", "_3 as Tags", "_4 as Title")
workingTest = test.selectExpr("_2 as ID", "_5 as Body", "_3 as Title")

In [123]:
#Again check the data frame
workingTrain.show()
workingTest.show()

+---+--------------------+--------------------+--------------------+
| ID|                Body|                Tags|               Title|
+---+--------------------+--------------------+--------------------+
|  1|I'd like to check...|php image-process...|How to check if a...|
|  2|In my favorite ed...|             firefox|How can I prevent...|
|  4|This is probably ...|     c# url encoding|How do I replace ...|
|  5|function modify(....|php api file-get-...|How to modify who...|
|  6|I am using a mach...|proxy active-dire...|setting proxy in ...|
|  7|My image is canno...|           core-plot|How to draw barpl...|
|  8|I've decided to c...|c# asp.net window...|How to fetch an X...|
|  9|Do you know of a ...|.net javascript c...|.NET library for ...|
| 10|I'm using SQL Ser...|sql variables par...|SQL Server : proc...|
| 11|Some commercial o...|.net obfuscation ...|How do commercial...|
| 13|how can I move In...|postfix migration...|Migrate from Mdae...|
|  3|I am import matla...|r matlab

In [125]:
#Select particular columns
workingTrain = workingTrain.selectExpr("ID","Body", "Title", "Tags")
workingTest = workingTest.selectExpr("ID", "Title", "Body")

In [126]:
#UDF to concatenate two columns
import pyspark.sql.functions as F
from pyspark.sql.types import *
concat_udf = F.udf(lambda cols: "".join([x if x is not None else "*" for x in cols]), StringType())

In [127]:
 #Merge the title and the body tokenized column
    
workingTrain = workingTrain.withColumn("features", concat_udf(F.array("Title", "Body")))


In [128]:
workingTrain.show()

+---+--------------------+--------------------+--------------------+--------------------+
| ID|                Body|               Title|                Tags|            features|
+---+--------------------+--------------------+--------------------+--------------------+
|  1|I'd like to check...|How to check if a...|php image-process...|How to check if a...|
|  2|In my favorite ed...|How can I prevent...|             firefox|How can I prevent...|
|  4|This is probably ...|How do I replace ...|     c# url encoding|How do I replace ...|
|  5|function modify(....|How to modify who...|php api file-get-...|How to modify who...|
|  6|I am using a mach...|setting proxy in ...|proxy active-dire...|setting proxy in ...|
|  7|My image is canno...|How to draw barpl...|           core-plot|How to draw barpl...|
|  8|I've decided to c...|How to fetch an X...|c# asp.net window...|How to fetch an X...|
|  9|Do you know of a ...|.NET library for ...|.net javascript c...|.NET library for ...|
| 10|I'm u

In [129]:
#Remove the stop words from the body
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
#Tokenize the body column
tokenizer = Tokenizer(inputCol = "features" , outputCol = "tokenizedfeatures")
workingTrain = tokenizer.transform(workingTrain) # For the train data


In [130]:
workingTrain.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+
| ID|                Body|               Title|                Tags|            features|   tokenizedfeatures|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+
|  1|I'd like to check...|How to check if a...|php image-process...|How to check if a...|[how, to, check, ...|
|  2|In my favorite ed...|How can I prevent...|             firefox|How can I prevent...|[how, can, i, pre...|
|  4|This is probably ...|How do I replace ...|     c# url encoding|How do I replace ...|[how, do, i, repl...|
|  5|function modify(....|How to modify who...|php api file-get-...|How to modify who...|[how, to, modify,...|
|  6|I am using a mach...|setting proxy in ...|proxy active-dire...|setting proxy in ...|[setting, proxy, ...|
|  7|My image is canno...|How to draw barpl...|           core-plot|How to draw barpl...|[how, to, draw, b...|
|

In [131]:
#Tokenize the Tags
tagTokenizer = Tokenizer(inputCol="Tags", outputCol="tokenizedTags")
workingTrain = tagTokenizer.transform(workingTrain)
#workingTrain.show()

In [132]:
#Remove stop words from the Tokenized title
stopWordRemover = StopWordsRemover(inputCol = "tokenizedfeatures", outputCol = "filteredFeatures")
workingTrain = stopWordRemover.transform(workingTrain) #For Train data

In [133]:
workingTrain = workingTrain.selectExpr("ID","filteredFeatures as features","tokenizedTags")


In [134]:
#In thi step we are going to set count vectorizer the features
from pyspark.ml.feature import CountVectorizer
cvFeatures = CountVectorizer(inputCol = "features", outputCol="vectorizedFeatures" )
cvModel = cvFeatures.fit(workingTrain)
workingTrain = cvModel.transform(workingTrain)

In [135]:
cvTag = CountVectorizer(inputCol = "tokenizedTags", outputCol="vectorizedTags" )
cvModelTag = cvTag.fit(workingTrain)
workingTrain = cvModelTag.transform(workingTrain)

In [137]:
workingTrain = workingTrain.selectExpr("ID", "vectorizedFeatures", "vectorizedTags" )

In [138]:
from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors

workingTrain2 = workingTrain.selectExpr("ID","vectorizedFeatures","vectorizedTags")
#workingTrain2.show()
corpus = workingTrain2.select("ID", "vectorizedFeatures").rdd.map(lambda (x,y): [x,Vectors.fromML(y)]).cache()

In [139]:
# Cluster the documents into three topics using LDA
ldaModel1 = LDA.train(corpus, k=10,maxIterations=100,optimizer='online')

In [140]:
topics = ldaModel1.topicsMatrix()
vocabArray = cvModel.vocabulary

In [141]:
wordNumbers = 20  # number of words per topic

In [142]:
topicIndices = sc.parallelize(ldaModel1.describeTopics(maxTermsPerTopic = wordNumbers))

In [143]:
#Ref: https://stackoverflow.com/questions/42051184/latent-dirichlet-allocation-lda-in-spark

def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

In [144]:
topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

In [145]:
for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

Topic0:
link
tb
english
page
template
signature
{%
%}
~
time:
ob_flush()
bgrp
r-squared:
621
psex
language
lm(formula
2e-16
mydat)
bgrp2


Topic1:
trait
cancer
<video>myvideo.avi</video>
<photo>myphoto.avi</video>
treatment
\int
nvarchar(1000)
myvideo.avi
<video
201204271113
<photo
getenv
setenv
(x86)\microsoft
_putenv
%hello%
dataaccess
distinct
dumb
terminals


Topic2:
values
search
find
points
point
three
take
tables
words
=
like
right
plot
x
question
fields
difference
main02
select
want


Topic3:
*)
daemon
2px
inspect
(nsstring
rails
jquery
<object>
ip
#333333;
background:
javascript
xslt
(nsmutableurlrequest
programmers
works
[ret
kerberos
video'
xhtml


Topic4:
"2.1.*",
ctypes.create_unicode_buffer(1024)
mongomapper
_
window.onload
netmask
[email]
outside_inbound
ami
generate
access-list
often
hoursleft
datepicker
60)
html.actionlink("click
"home",
"controller",
"stayhomefunc()"})%></div>
me",


Topic5:

=
-
{
}
5th
using
0
*
like
new
use
+
get
want
code
//
public
one
file


Topi

In [154]:
#Since the LDA model mllib doesnt have the transform function, I used the function in ml library to get the probabilities of each post falling into a particular cluster.
from pyspark.ml.clustering import LDA
# Trains a LDA model.

workingTrain3 = workingTrain2.selectExpr("ID", "vectorizedFeatures as features")
lda = LDA(k=10, maxIter=100)
ldamodel = lda.fit(workingTrain3)

In [156]:
# Shows the result
transformed = ldamodel.transform(workingTrain3)
transformed.show()

+---+--------------------+--------------------+
| ID|            features|   topicDistribution|
+---+--------------------+--------------------+
|  1|(27621,[4,7,18,19...|[8.39845281648305...|
|  2|(27621,[9,19,108,...|[9.99527316135934...|
|  4|(27621,[0,11,33,3...|[0.00110453103969...|
|  5|(27621,[0,1,2,3,4...|[6.17781095531005...|
|  6|(27621,[4,10,11,3...|[0.00113432213339...|
|  7|(27621,[0,6,7,9,1...|[7.92374833570028...|
|  8|(27621,[0,1,4,7,1...|[5.12403788145105...|
|  9|(27621,[0,5,7,11,...|[7.63600901254004...|
| 10|(27621,[0,4,7,14,...|[9.76323442064911...|
| 11|(27621,[13,19,39,...|[6.56346689174020...|
| 13|(27621,[0,471,121...|[0.00379707879667...|
|  3|(27621,[0,1,2,3,5...|[1.33001211295862...|
| 12|(27621,[0,4,5,7,1...|[5.38655366952220...|
| 14|(27621,[0,8,39,71...|[0.00199545597088...|
| 15|(27621,[0,9,19,70...|[0.00161283112608...|
| 16|(27621,[0,2,3,7,9...|[1.82791834090642...|
| 17|(27621,[0,7,35,40...|[3.36427515168108...|
| 18|(27621,[0,2,3,4,8...|[1.66838620571

In [157]:
ll = ldamodel.logLikelihood(workingTrain3)
lp = ldamodel.logPerplexity(workingTrain3)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -968768.630328
The upper bound on perplexity: 6.18097305203


In [160]:
#Ref: https://spark.apache.org/docs/2.2.0/ml-clustering.html#latent-dirichlet-allocation-lda

#Similarly, we can perform topic modelling using other algorithms like K-means
from pyspark.ml.clustering import KMeans

#Get a fresh copy of dataset
workingTrain4 = workingTrain2.selectExpr("ID", "vectorizedFeatures as features")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
kmeansModel = kmeans.fit(workingTrain4)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = kmeansModel.computeCost(workingTrain4)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = kmeansModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Within Set Sum of Squared Errors = 16168184.4355
Cluster Centers: 
[  3.49036269e+01   1.15336788e+00   6.60103627e-01 ...,   0.00000000e+00
   0.00000000e+00   1.03626943e-03]
[  9.28771429e+02   1.14857143e+01   1.14000000e+01 ...,   2.85714286e-02
   2.85714286e-02   0.00000000e+00]


In [164]:
#COmmented the below model as GMM   experiencing out of Memory issues.

#Similarly, we can perform topic modelling using other algorithms GaussianMixture with random initialization
from pyspark.ml.clustering import GaussianMixture

#Get a fresh copy of dataset
#workingTrain5 = workingTrain2.selectExpr("ID", "vectorizedFeatures as features")

#gmm = GaussianMixture().setK(2).setSeed(538009467)
#gmmModel = gmm.fit(workingTrain5)

#print("Gaussians shown as a DataFrame: ")
#gmmModel.gaussiansDF.show(truncate=False)