In [None]:
#!jupyter nbconvert --to script 'topicModelParameterTuning.ipynb'

In [1]:
from collections import defaultdict
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA
from pyspark.sql import SQLContext
import re
from nltk.corpus import stopwords
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
# check if spark context is defined
print(sc.version)
import time

2.3.2


In [2]:
data = sc.wholeTextFiles("file:///data2/sunandan/sparkex/sshrotri/allAbstracts/*.txt").map(lambda x: x[1])

In [3]:
# Tokenize the data
tokens = data.map(lambda document: document.strip().lower()) \
    .map(lambda x: x.split()) \
    .map(lambda word: [x for x in word if x.isalpha()]) \
    .map(lambda word: [x for x in word if len(x) > 2])

In [4]:
tokens.take(5)

[['cytochrome',
  'enzyme',
  'metabolizes',
  'the',
  'anticancer',
  'prodrug',
  'cyclophosphamide',
  'which',
  'decomposes',
  'the',
  'cytotoxic',
  'metabolites',
  'acrolein',
  'and',
  'phosphoramide',
  'have',
  'evaluated',
  'the',
  'bystander',
  'cytotoxicity',
  'cpa',
  'combination',
  'with',
  'enzyme',
  'prodrug',
  'therapy',
  'using',
  'cell',
  'agarose',
  'overlay',
  'this',
  'method',
  'mimics',
  'the',
  'tumor',
  'microenvironment',
  'limiting',
  'the',
  'diffusion',
  'metabolites',
  'and',
  'reducing',
  'the',
  'oxygen',
  'concentration',
  'levels',
  'similar',
  'those',
  'found',
  'solid',
  'under',
  'these',
  'the',
  'cyp',
  'activity',
  'tumor',
  'cells',
  'was',
  'decreased',
  'compared',
  'standard',
  'aerobic',
  'despite',
  'this',
  'decrease',
  'metabolic',
  'potent',
  'bystander',
  'effect',
  'was',
  'resulting',
  'killing',
  'cpa',
  'tumor',
  'cell',
  'population',
  'comprised',
  'only',
  'tu

In [5]:
stop = stopwords.words('english')
# Get vocabulary 
# Flat map to put words into one giant list
# Remove stopwords
# Map and then reduce by key
# Sort by word count
termCounts = tokens.flatMap(lambda document: document) \
    .filter(lambda x: x not in stop) \
    .map(lambda word: (word,1)) \
    .reduceByKey(lambda x,y: x+y) \
    .map(lambda tuple: (tuple[1],tuple[0])) \
    .sortByKey(False)

In [6]:
termCounts.take(10)

[(12096, 'gene'),
 (11168, 'cells'),
 (9428, 'expression'),
 (9028, 'cell'),
 (9028, 'cancer'),
 (7785, 'tumor'),
 (7289, 'patients'),
 (6981, 'genetic'),
 (5893, 'human'),
 (5285, 'associated')]

In [6]:
# Dictionary
vocabulary = termCounts.map(lambda x: x[1]) \
    .zipWithIndex() \
    .collectAsMap()

In [8]:
vocabulary

{'gene': 0,
 'cells': 1,
 'expression': 2,
 'cell': 3,
 'cancer': 4,
 'tumor': 5,
 'patients': 6,
 'genetic': 7,
 'human': 8,
 'associated': 9,
 'clinical': 10,
 'results': 11,
 'genes': 12,
 'using': 13,
 'study': 14,
 'may': 15,
 'also': 16,
 'protein': 17,
 'two': 18,
 'identified': 19,
 'variants': 20,
 'mutations': 21,
 'analysis': 22,
 'data': 23,
 'found': 24,
 'disease': 25,
 'association': 26,
 'increased': 27,
 'growth': 28,
 'showed': 29,
 'significantly': 30,
 'novel': 31,
 'role': 32,
 'dna': 33,
 'treatment': 34,
 'therapy': 35,
 'studies': 36,
 'one': 37,
 'compared': 38,
 'levels': 39,
 'significant': 40,
 'risk': 41,
 'mice': 42,
 'therapeutic': 43,
 'potential': 44,
 'used': 45,
 'mutation': 46,
 'including': 47,
 'breast': 48,
 'could': 49,
 'high': 50,
 'observed': 51,
 'factor': 52,
 'effect': 53,
 'activity': 54,
 'three': 55,
 'sequencing': 56,
 'vector': 57,
 'show': 58,
 'tumors': 59,
 'effects': 60,
 'new': 61,
 'different': 62,
 'suggest': 63,
 'findings': 64

In [9]:
# Convert a given document into a vector of word counts
def document_vector(document):
    id = document[1]
    counts = defaultdict(int)
    for token in document[0]:
        if token in vocabulary:
            token_id = vocabulary[token]
            counts[token_id] += 1
    counts = sorted(counts.items())
    keys = [x[0] for x in counts]
    values = [x[1] for x in counts]
    return (id, SparseVector(len(vocabulary), keys, values))

# Process all the documents into word vector using above function
documents = tokens.zipWithIndex().map(document_vector).map(list)

In [10]:
documents.take(3)

[[0,
  SparseVector(29342, {1: 1.0, 2: 1.0, 3: 3.0, 5: 4.0, 8: 1.0, 13: 1.0, 24: 1.0, 34: 1.0, 35: 1.0, 38: 1.0, 39: 1.0, 51: 1.0, 53: 3.0, 54: 1.0, 59: 1.0, 63: 1.0, 64: 1.0, 92: 1.0, 105: 1.0, 134: 2.0, 151: 1.0, 156: 1.0, 171: 1.0, 179: 1.0, 183: 1.0, 198: 1.0, 222: 1.0, 256: 1.0, 280: 1.0, 330: 1.0, 363: 1.0, 370: 1.0, 391: 1.0, 418: 1.0, 434: 2.0, 448: 1.0, 450: 1.0, 459: 1.0, 505: 1.0, 513: 1.0, 521: 1.0, 529: 2.0, 734: 1.0, 755: 2.0, 790: 1.0, 833: 1.0, 907: 1.0, 965: 1.0, 1100: 1.0, 1109: 4.0, 1140: 2.0, 1180: 1.0, 1226: 1.0, 1263: 1.0, 1390: 1.0, 1439: 1.0, 1645: 1.0, 1820: 1.0, 1971: 1.0, 2341: 1.0, 2413: 1.0, 2533: 1.0, 2580: 1.0, 3121: 2.0, 3150: 1.0, 3349: 3.0, 3396: 1.0, 3804: 1.0, 4344: 1.0, 5124: 1.0, 5159: 1.0, 7235: 3.0, 12074: 1.0, 12668: 1.0, 13944: 2.0, 14360: 1.0, 27411: 1.0, 27412: 1.0})],
 [1,
  SparseVector(29342, {4: 1.0, 5: 1.0, 10: 1.0, 23: 1.0, 25: 1.0, 43: 1.0, 291: 1.0, 298: 1.0, 394: 1.0, 595: 1.0, 1127: 1.0, 1275: 1.0, 1326: 2.0, 1783: 1.0, 1875: 1.0, 3

In [27]:
df = documents.toDF(['doc_id', 'features'])

In [28]:
df.show()

+------+--------------------+
|doc_id|            features|
+------+--------------------+
|     0|(29342,[1,2,3,5,8...|
|     1|(29342,[4,5,10,23...|
|     2|(29342,[0,1,2,3,5...|
|     3|(29342,[0,1,4,5,8...|
|     4|(29342,[0,1,3,4,5...|
|     5|(29342,[5,9,11,15...|
|     6|(29342,[0,1,2,3,4...|
|     7|(29342,[0,1,4,5,8...|
|     8|(29342,[1,2,4,8,1...|
|     9|(29342,[0,1,2,3,4...|
|    10|(29342,[0,5,6,11,...|
|    11|(29342,[2,3,4,5,8...|
|    12|(29342,[0,1,3,4,5...|
|    13|(29342,[0,2,3,4,5...|
|    14|(29342,[1,3,5,8,1...|
|    15|(29342,[0,1,2,15,...|
|    16|(29342,[1,4,6,8,9...|
|    17|(29342,[0,2,15,17...|
|    18|(29342,[0,4,15,25...|
|    19|(29342,[0,2,5,12,...|
+------+--------------------+
only showing top 20 rows



In [31]:
# create spark dataframe
#df = spark.createDataFrame(documents, ["doc_id", "features"])
# split into training and testing data
dfs = df.randomSplit([0.85, 0.15])
dfTrain = dfs[0]
dfTest = dfs[1]

In [32]:
dfTrain.show()

+------+--------------------+
|doc_id|            features|
+------+--------------------+
|     0|(29342,[1,2,3,5,8...|
|     1|(29342,[4,5,10,23...|
|     2|(29342,[0,1,2,3,5...|
|     3|(29342,[0,1,4,5,8...|
|     4|(29342,[0,1,3,4,5...|
|     5|(29342,[5,9,11,15...|
|     6|(29342,[0,1,2,3,4...|
|     7|(29342,[0,1,4,5,8...|
|     8|(29342,[1,2,4,8,1...|
|     9|(29342,[0,1,2,3,4...|
|    10|(29342,[0,5,6,11,...|
|    11|(29342,[2,3,4,5,8...|
|    13|(29342,[0,2,3,4,5...|
|    14|(29342,[1,3,5,8,1...|
|    15|(29342,[0,1,2,15,...|
|    16|(29342,[1,4,6,8,9...|
|    17|(29342,[0,2,15,17...|
|    18|(29342,[0,4,15,25...|
|    19|(29342,[0,2,5,12,...|
|    20|(29342,[0,1,2,7,8...|
+------+--------------------+
only showing top 20 rows



In [33]:
dfTrain.count()

11210

In [34]:
# choose different values of topics
Ks = [k for k in range(2, 21)]
# topic prior, determines how sparse or non-sparse the topics are 
betas = [0.05, 0.1, 2., 5.]
# initialize LDA model 
lda_model = LDA(seed=10, optimizer='online', maxIter=2000)
# number of documents
D = dfTrain.count()

output = []

for K in Ks:
    for beta in betas:
        t=time.time()
        # per document proportion prior, based on Griffiths et. al, 2004
        alpha = [50/K for _ in range(K)]
        # set parameters in model accordingly
        lda_model.setParams(k=K, docConcentration=alpha, topicConcentration=beta)
        # fit model
        df_fit = lda_model.fit(dfTrain)
        # get fit perplexity
        df_perp = df_fit.logPerplexity(dfTest)
        
        print('Perplexity={} for K={}, alpha={}, beta={}'.format(df_perp, K, 50/K, beta))
        output.append(['Perplexity={} for K={}, alpha={}, beta={}'.format(df_perp, K, 50/K, beta)])
        print(time.time()-t)

Perplexity=8.855095541683063 for K=2, alpha=25.0, beta=0.05
1285.8037214279175


KeyboardInterrupt: 

In [17]:
with open('modelTuningResults', 'w') as f:
    for item in output:
        f.write("%s\n" % item)