In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="mass2lda")

In [2]:
sc

In [3]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("spark-2.3.1-bin-hadoop2.7/data/mllib/sample_lda_data.txt")


In [4]:
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

In [5]:
corpus.collect()

[[0, DenseVector([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0])],
 [1, DenseVector([1.0, 3.0, 0.0, 1.0, 3.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0])],
 [2, DenseVector([1.0, 4.0, 1.0, 0.0, 0.0, 4.0, 9.0, 0.0, 1.0, 2.0, 0.0])],
 [3, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 3.0, 9.0])],
 [4, DenseVector([3.0, 1.0, 1.0, 9.0, 3.0, 0.0, 2.0, 0.0, 0.0, 1.0, 3.0])],
 [5, DenseVector([4.0, 2.0, 0.0, 3.0, 4.0, 5.0, 1.0, 1.0, 1.0, 4.0, 0.0])],
 [6, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 2.0, 9.0])],
 [7, DenseVector([1.0, 1.0, 1.0, 9.0, 2.0, 1.0, 2.0, 0.0, 0.0, 1.0, 3.0])],
 [8, DenseVector([4.0, 4.0, 0.0, 3.0, 4.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0])],
 [9, DenseVector([2.0, 8.0, 2.0, 0.0, 3.0, 0.0, 2.0, 0.0, 2.0, 7.0, 2.0])],
 [10, DenseVector([1.0, 1.0, 1.0, 9.0, 0.0, 2.0, 2.0, 0.0, 0.0, 3.0, 3.0])],
 [11, DenseVector([4.0, 1.0, 0.0, 0.0, 4.0, 5.0, 1.0, 3.0, 0.0, 1.0, 0.0])]]

In [5]:
ldaModel = LDA.train(corpus, k=3)

In [6]:
ldaModel.topicsMatrix()

array([[ 6.80911683,  5.92534803, 13.26553513],
       [ 7.24924337, 17.0773221 ,  4.67343454],
       [ 3.75034495,  2.56833433,  5.68132072],
       [ 6.38941297, 17.14583544, 16.46475159],
       [ 6.07433555,  4.73043233, 14.19523212],
       [ 3.64703307,  2.73534038, 15.61762655],
       [11.64292356, 15.13036523,  4.22671121],
       [ 1.368516  ,  1.05503843,  7.57644557],
       [ 5.22835134,  2.11640909,  0.65523956],
       [11.53902065,  9.73173587,  2.72924348],
       [22.62038117,  7.86545966,  2.51415917]])

In [11]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))


Learned topics (as distributions over vocab of 11 words):
Topic 0:
 11.220762185889827
 19.929586890740527
 7.38745659266717
 1.5048994357421388
 13.107379966278149
 11.712010008136973
 6.485715422606812
 6.862510383310972
 1.604766069074537
 7.976274243751031
 1.0333503563902324
Topic 1:
 9.494966954989192
 5.02468648800577
 2.2027495310373935
 12.896192308365976
 8.050589711910968
 9.038185946650472
 15.017624024524089
 2.5091200003758347
 2.8786421564258666
 6.884481439572461
 9.593661396705894
Topic 2:
 5.284270859120981
 4.045726621253703
 2.409793876295435
 25.598908255891885
 3.8420303218108836
 1.249804045212554
 9.496660552869095
 0.6283696163131931
 3.5165917744995965
 9.139244316676509
 22.372988246903873


In [13]:
?Vectors.sparse

[0;31mSignature:[0m [0mVectors[0m[0;34m.[0m[0msparse[0m[0;34m([0m[0msize[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Create a sparse vector, using either a dictionary, a list of
(index, value) pairs, or two separate arrays of indices and
values (sorted by index).

:param size: Size of the vector.
:param args: Non-zero entries, as a dictionary, list of tuples,
             or two sorted lists containing indices and values.

>>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
SparseVector(4, {1: 1.0, 3: 5.5})
[0;31mFile:[0m      ~/miniconda3/envs/spark/python/pyspark/mllib/linalg/__init__.py
[0;31mType:[0m      function


In [14]:
s =Vectors.sparse(4, {1: 1.0, 3: 5.5})

In [16]:
?LDA.train

[0;31mSignature:[0m [0mLDA[0m[0;34m.[0m[0mtrain[0m[0;34m([0m[0mrdd[0m[0;34m,[0m [0mk[0m[0;34m=[0m[0;36m10[0m[0;34m,[0m [0mmaxIterations[0m[0;34m=[0m[0;36m20[0m[0;34m,[0m [0mdocConcentration[0m[0;34m=[0m[0;34m-[0m[0;36m1.0[0m[0;34m,[0m [0mtopicConcentration[0m[0;34m=[0m[0;34m-[0m[0;36m1.0[0m[0;34m,[0m [0mseed[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mcheckpointInterval[0m[0;34m=[0m[0;36m10[0m[0;34m,[0m [0moptimizer[0m[0;34m=[0m[0;34m'em'[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Train a LDA model.

:param rdd:
  RDD of documents, which are tuples of document IDs and term
  (word) count vectors. The term count vectors are "bags of
  words" with a fixed-size vocabulary (where the vocabulary size
  is the length of the vector). Document IDs must be unique
  and >= 0.
:param k:
  Number of topics to infer, i.e., the number of soft cluster
  centers.
  (default: 10)
:param maxIterations:
  Maximum number of iterat

In [18]:
v = Vectors.dense([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0])

In [19]:
?Vectors.sparse

[0;31mSignature:[0m [0mVectors[0m[0;34m.[0m[0msparse[0m[0;34m([0m[0msize[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Create a sparse vector, using either a dictionary, a list of
(index, value) pairs, or two separate arrays of indices and
values (sorted by index).

:param size: Size of the vector.
:param args: Non-zero entries, as a dictionary, list of tuples,
             or two sorted lists containing indices and values.

>>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
SparseVector(4, {1: 1.0, 3: 5.5})
[0;31mFile:[0m      ~/miniconda3/envs/spark/python/pyspark/mllib/linalg/__init__.py
[0;31mType:[0m      function
