In [2]:
import os
import sys

os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client/"
os.environ["PYLIB"]="/usr/hdp/current/spark2-client/python/lib"

sys.path.insert(0,os.environ["PYLIB"]+"/py4j-0.10.6-src.zip")

sys.path.insert(0,os.environ["PYLIB"]+"/pyspark.zip")

In [3]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession

#### Download the Maven package for reading xml file from DataBricks which acts as connector to read xml

In [4]:
Conf = SparkConf().setMaster('local[*]') \
       .setAppName("WIkiTopicModelling") \
       .set('spark.jars.packages','com.databricks:spark-xml_2.11:0.5.0') \
        .set('spark.executor.memory','40g')
        
sc = SparkContext(conf=Conf)

spark = SparkSession(sc)

In [5]:
spark

#### Reading the data from the wiki loaded to the local file system 

In [6]:
data = spark.read.format('xml').option('rowTag',"revision").load('file:///home/mahidharv/simplewiki-20170201-pages-articles-multistream.xml')

In [7]:
data.dtypes

[('comment', 'struct<_VALUE:string,_deleted:string>'),
 ('contributor',
  'struct<_VALUE:string,_deleted:string,id:bigint,ip:string,username:string>'),
 ('format', 'string'),
 ('id', 'bigint'),
 ('minor', 'string'),
 ('model', 'string'),
 ('parentid', 'bigint'),
 ('sha1', 'string'),
 ('text', 'struct<_VALUE:string,_space:string>'),
 ('timestamp', 'string')]

In [8]:
text_data = data.select('text')

In [9]:
text_data.count()

225974

In [10]:
text_data.printSchema()

root
 |-- text: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _space: string (nullable = true)



In [11]:
articledata = text_data.select('text._VALUE')

In [12]:
articledata.show(1,truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
articledata.select('_VALUE').columns

['_VALUE']

In [14]:
articledata = articledata.withColumnRenamed('_VALUE','text')

In [15]:
articledata= articledata.rdd.zipWithIndex().toDF()

In [16]:
articledata=articledata.withColumnRenamed("_1","text").withColumnRenamed("_2","index")

In [17]:
articledata = articledata.withColumn('text',articledata.text.cast('String'))

In [18]:
articledata.dtypes

[('text', 'string'), ('index', 'bigint')]

In [19]:
from pyspark.ml.feature import RegexTokenizer

In [20]:
tokenizer = RegexTokenizer(gaps=False,inputCol="text",outputCol="words",pattern="\\p{L}+")

In [21]:
from nltk.corpus import stopwords
stop_words = list(set(stopwords.words('english')))

In [22]:
len(stop_words)

153

In [23]:
stop_words.extend(map(unicode,"also use make people http www know many call include part find become like mean often different \
               usually take wikt come give well get since type list say change see refer actually iii name nbsp jpg png tiff wikipedia \
               &nbsp aisne kinds pas ask would way something need things want every  ref category title date font user color \
               style br url px span web publisher a b c id p n en la co site cite fc doi dq du aaa h tt s on of r od or".split()))

In [24]:
training_df, validation_df, testing_df = articledata.randomSplit([0.7, 0.3, 0.1], seed=0)

In [25]:
training_df.write.save("file:///home/mahidharv/SparkTopicModelling/traindata")

In [26]:
validation_df.write.save("file:///home/mahidharv/SparkTopicModelling/validationdata")

In [27]:
testing_df.write.save("file:///home/mahidharv/SparkTopicModelling/testdata")

In [28]:
print("No. of rows in Train Data = {}".format(training_df.count()))
print("No. of rows in Validation Data = {}".format(validation_df.count()))
print("No. of rows in Test Data = {}".format(testing_df.count()))


No. of rows in Train Data = 143969
No. of rows in Validation Data = 61559
No. of rows in Test Data = 20446


In [29]:
training_df.cache()

DataFrame[text: string, index: bigint]

In [30]:
from pyspark.ml.feature import StopWordsRemover
stopwordfilter = StopWordsRemover(inputCol="words",outputCol="filtered",stopWords=stop_words)

In [31]:
from pyspark.ml.feature import CountVectorizer,IDF
cv = CountVectorizer(inputCol="filtered", outputCol="raw_features", vocabSize=10000, minDF=5.0)
idf = IDF(inputCol="raw_features", outputCol="features")


In [32]:
from pyspark.ml import Pipeline

In [33]:
stages=[tokenizer,stopwordfilter,cv,idf]

In [34]:
from pyspark.ml.clustering import LDA,LDAModel

In [35]:
num_topics = 50
max_iterations = 100
lda_model = LDA(k=num_topics,
                featuresCol="features",
                maxIter=max_iterations,
                seed=123)

In [36]:
preprocess = Pipeline(stages=stages) 

In [37]:
lda_pipeline = Pipeline(stages=stages+[lda_model])

In [38]:
lda_Pipeline_model = lda_pipeline.fit(training_df)

In [None]:
from pyspark.ml import PipelineModel
from pyspark.ml.clustering import LDA,LDAModel
model = PipelineModel.load("/../../topicmodelling1")

In [None]:
topics =model.stages[-1].topicsMatrix()
vocabArray = model.stages[-3].vocabulary

In [None]:
vocabArray