In [2]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

In [1]:
data = sc.textFile("../data/mllib/sample_lda_data.txt")

In [3]:
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))

In [4]:
print(parsedData.collect())

[DenseVector([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0]), DenseVector([1.0, 3.0, 0.0, 1.0, 3.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]), DenseVector([1.0, 4.0, 1.0, 0.0, 0.0, 4.0, 9.0, 0.0, 1.0, 2.0, 0.0]), DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 3.0, 9.0]), DenseVector([3.0, 1.0, 1.0, 9.0, 3.0, 0.0, 2.0, 0.0, 0.0, 1.0, 3.0]), DenseVector([4.0, 2.0, 0.0, 3.0, 4.0, 5.0, 1.0, 1.0, 1.0, 4.0, 0.0]), DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 2.0, 9.0]), DenseVector([1.0, 1.0, 1.0, 9.0, 2.0, 1.0, 2.0, 0.0, 0.0, 1.0, 3.0]), DenseVector([4.0, 4.0, 0.0, 3.0, 4.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0]), DenseVector([2.0, 8.0, 2.0, 0.0, 3.0, 0.0, 2.0, 0.0, 2.0, 7.0, 2.0]), DenseVector([1.0, 1.0, 1.0, 9.0, 0.0, 2.0, 2.0, 0.0, 0.0, 3.0, 3.0]), DenseVector([4.0, 1.0, 0.0, 0.0, 4.0, 5.0, 1.0, 3.0, 0.0, 1.0, 0.0])]


In [5]:
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

In [6]:
print(corpus.collect())

[[0, DenseVector([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0])], [1, DenseVector([1.0, 3.0, 0.0, 1.0, 3.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0])], [2, DenseVector([1.0, 4.0, 1.0, 0.0, 0.0, 4.0, 9.0, 0.0, 1.0, 2.0, 0.0])], [3, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 3.0, 9.0])], [4, DenseVector([3.0, 1.0, 1.0, 9.0, 3.0, 0.0, 2.0, 0.0, 0.0, 1.0, 3.0])], [5, DenseVector([4.0, 2.0, 0.0, 3.0, 4.0, 5.0, 1.0, 1.0, 1.0, 4.0, 0.0])], [6, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 2.0, 9.0])], [7, DenseVector([1.0, 1.0, 1.0, 9.0, 2.0, 1.0, 2.0, 0.0, 0.0, 1.0, 3.0])], [8, DenseVector([4.0, 4.0, 0.0, 3.0, 4.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0])], [9, DenseVector([2.0, 8.0, 2.0, 0.0, 3.0, 0.0, 2.0, 0.0, 2.0, 7.0, 2.0])], [10, DenseVector([1.0, 1.0, 1.0, 9.0, 0.0, 2.0, 2.0, 0.0, 0.0, 3.0, 3.0])], [11, DenseVector([4.0, 1.0, 0.0, 0.0, 4.0, 5.0, 1.0, 3.0, 0.0, 1.0, 0.0])]]


In [7]:
ldaModel = LDA.train(corpus, k=3)

In [9]:
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

Learned topics (as distributions over vocab of 11 words):
Topic 0:
 7.870055947998022
 12.618028607911148
 4.016254009291091
 4.884771999346885
 7.833432285656277
 4.409711976563522
 11.985738386127196
 2.8184864520540898
 3.6756678279119632
 11.173212199037335
 15.621414674862786
Topic 1:
 9.166886620087244
 11.925940099481805
 5.882916223867666
 7.087553401206473
 7.575440144744908
 6.399742439276987
 12.292265165542991
 3.7895270098493707
 2.9718345101033457
 9.074338069725266
 9.809187104692665
Topic 2:
 8.963057431914736
 4.456031292607045
 2.1008297668412417
 28.027674599446645
 9.591127569598815
 11.190545584159493
 6.721996448329814
 3.39198653809654
 1.3524976619846911
 3.7524497312374003
 7.5693982204445485


In [12]:
labelsAndPreds1 = parsedData.map(lambda p: (p.label, sameModel.predict(p.features)))

In [13]:
trainErr1 = labelsAndPreds1.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())

In [14]:
print("Training Error = " + str(trainErr1))

Training Error = 0.36645962732919257


In [23]:
print(parsedData.collect())

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [2.857738033247042,0.0,0.0,2.619965104088255,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [2.857738033247042,0.0,2.061393766919624,0.0,0.0,2.004684436494304,0.0,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(1.0, [0.0,0.0,2.061393766919624,2.619965104088255,0.0,2.004684436494304,2.000347299268466,0.0,0.0,0.0,0.0,2.055002875864414,0.0,0.0,0.0,0.0]), LabeledPoint(1.0, [2.857738033247042,0.0,2.061393766919624,2.619965104088255,0.0,2.004684436494304,0.0,0.0,0.0,0.0,0.0,2.055002875864414,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [2.857738033247042,0.0,2.061393766919624,2.619965104088255,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(1.0, [0.0

In [15]:
print(labelsAndPreds1.collect())

[(1.0, 1), (0.0, 1), (0.0, 0), (1.0, 1), (1.0, 0), (0.0, 1), (1.0, 1), (1.0, 1), (0.0, 0), (0.0, 0), (1.0, 1), (1.0, 0), (1.0, 0), (1.0, 0), (0.0, 0), (0.0, 1), (0.0, 0), (0.0, 1), (0.0, 0), (1.0, 1), (0.0, 1), (0.0, 0), (0.0, 0), (1.0, 0), (1.0, 1), (1.0, 0), (0.0, 1), (1.0, 1), (0.0, 1), (1.0, 1), (1.0, 1), (0.0, 1), (1.0, 0), (0.0, 0), (1.0, 1), (0.0, 1), (1.0, 1), (1.0, 0), (1.0, 1), (0.0, 1), (1.0, 1), (0.0, 0), (0.0, 1), (1.0, 1), (1.0, 1), (0.0, 1), (1.0, 1), (0.0, 0), (1.0, 1), (1.0, 1), (0.0, 0), (0.0, 0), (0.0, 0), (1.0, 0), (1.0, 1), (0.0, 1), (0.0, 1), (0.0, 0), (1.0, 0), (1.0, 1), (0.0, 0), (0.0, 1), (1.0, 1), (1.0, 1), (1.0, 1), (0.0, 0), (0.0, 1), (0.0, 1), (1.0, 0), (1.0, 1), (0.0, 0), (0.0, 0), (0.0, 0), (0.0, 1), (1.0, 1), (1.0, 1), (1.0, 1), (1.0, 1), (1.0, 1), (0.0, 0), (1.0, 1), (0.0, 1), (1.0, 0), (1.0, 0), (1.0, 1), (0.0, 0), (1.0, 1), (1.0, 1), (0.0, 0), (1.0, 1), (1.0, 1), (0.0, 1), (0.0, 0), (1.0, 0), (0.0, 1), (1.0, 0), (0.0, 0), (1.0, 1), (1.0, 1), (1.0, 1),