# Initialise Spark

In [2]:
from pyspark import SparkContext,SQLContext,SparkConf

conf=SparkConf()
conf.setMaster('local[4]')
conf.set('spark.executor.memory','8g')
conf.set('spark.driver.memory','14g')
conf.set('spark.driver.maxResultSize','14g')
#conf.set('spark.yarn.executor.memoryOverhead','800m')

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [7]:
sc.stop()

# Read Wikipedia Text

In [9]:
df=sqlContext.read.json('text/*/*')

In [11]:
df.count()

5463182

# Process Text

In [10]:
stopwords_english = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', 'couldn', 'didn', 'doesn', 'hadn', 'hasn', 'haven', 'isn', 'ma', 'mightn', 'mustn', 'needn', 'shan', 'shouldn', 'wasn', 'weren', 'won', 'wouldn']

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, StopWordsRemover

tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\p{L}+", gaps=False)
stopwordRemover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords=stopwords_english, caseSensitive=False)
cv = CountVectorizer(inputCol="filtered", outputCol="vec",vocabSize=20000, minDF=50)

pipeline = Pipeline(stages=[tokenizer, stopwordRemover, cv])

In [12]:
%%time
model = pipeline.fit(df)

Wall time: 9min 30s


In [13]:
vecs = model.transform(df)

In [14]:
vecs2=vecs.drop('text').drop('words').drop('filtered')

In [16]:
vecs2 = vecs2.repartition(25)

# Save Word Vectors & Text Model

In [17]:
%%time
vecs2.write.parquet('wikipedia_vecs2.parquet')

Wall time: 9min 54s


In [29]:
pipeline.save('file:///text_pipeline')
model.save('file:///text_model')

# Reload Word Vectors

In [3]:
vecs2 = sqlContext.read.parquet('wikipedia/wikipedia_vecs.parquet')

In [4]:
vecs2=vecs2.repartition(25)

In [54]:
vecs3 = vecs2.sample(False, 0.5)

# Topic Modelling

In [4]:
%%time
# Wall time: 27min 54s for 50% data
#Run the topic modelling

from pyspark.ml.clustering import LDA
#inputCol="vec", outputCol="ldaVec", k=3, optimizer="online"

# Works with spark.driver.maxResultSize property set to 3G (5 x m4.16xlarge cluster)
lda = LDA(k=100, maxIter=60, featuresCol="vec")
ldaModel = lda.fit(vecs2)

Wall time: 5h 14min 39s


In [5]:
ldaModel.save('file:///lda_model')

In [8]:
pageTopicVecs = ldaModel.transform(vecs2)

In [9]:
%%time
pageTopicVecs.drop('vec').write.parquet('wikipedia_topic_vecs.parquet')

Wall time: 1h 2min 45s


# Print Topic Labels

In [None]:
from pyspark.ml.clustering import LDA, LocalLDAModel
ldaModel = LocalLDAModel.load('file:///wikipedia/lda_model')

In [6]:
import pandas as pd
import numpy as np
from pyspark.ml.pipeline import PipelineModel

# Get topic-word matrix
X=ldaModel.topicsMatrix().toArray()

#Get word indices
textmodel = PipelineModel.load('file:///text_model')
vocab = np.array(textmodel.stages[2].vocabulary)

#Print most likely words per topic
for i in range(100):
    print(' '.join(vocab[np.argsort(X[:,i])[::-1][:10]]))

br plant oil flowers leaves species plants long green indonesia
league baseball season games major game played home first runs
c p r g b n k f e h
fish bay wolf ray reed cook fishing bat shark waters
fire israel bc israeli roman jerusalem city jewish tel syria
city county street center community california state park north area
german germany van dutch berlin von swedish born netherlands sweden
zealand new grand auckland prix wellington sun hamilton nelson te
nuclear kelly allen energy burns atomic reactor newman uranium donald
party election elected elections democratic vote state council president political
island islands coast storm sea bay ocean tropical south north
blue wales white welsh black red wear parker martial worn
new york city ny jersey brooklyn ice connecticut hampshire american
may refer disambiguation also thailand thai following name places ryan
life indian also women one korean film india love korea
century one book also first th greek work published ancient
japan ja