In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

from pyspark.ml.clustering import LDA
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.types import StructField, StringType, StructType
from pyspark.ml.feature import StopWordsRemover

labels = ["title","body","FROM_UNIXTIME"]

fields = [StructField(field_name, StringType(), True) for field_name in labels]
schema = StructType(fields)

# Loads data.
data_df = spark.read.csv("/home/marcos/code/data/noticias_small.csv", schema=schema)
#.map(lambda row: row.split("\r\n"))
print(data_df)

tokenizer = Tokenizer(inputCol="body", outputCol="words")
wordsDataFrame = tokenizer.transform(data_df)

stopWords = StopWordsRemover.loadDefaultStopWords("portuguese")
remover = StopWordsRemover(inputCol="words", outputCol="words_filtered", stopWords = stopWords)
wordsFiltered = remover.transform(wordsDataFrame)

cv_tmp = CountVectorizer(inputCol="words_filtered", outputCol="tmp_vectors")
cv_tmp_model = cv_tmp.fit(wordsFiltered)
df_vect = cv_tmp_model.transform(wordsFiltered)

def parseVectors(line):
    return [int(line[2]), line[1]]

sparsevector = df_vect.select("FROM_UNIXTIME", "tmp_vectors")

lda = LDA(k=10, maxIter=5, featuresCol="tmp_vectors")
ldaModel = lda.fit(sparsevector)
topics = ldaModel.topicsMatrix()
# model = LDA.train(sparsevector, k=5, seed=1)

# Describe topics.
topics = ldaModel.describeTopics(3)

DataFrame[title: string, body: string, FROM_UNIXTIME: string]


In [11]:
topics.show(truncate=False)

+-----+---------------+---------------------------------------------------------------------+
|topic|termIndices    |termWeights                                                          |
+-----+---------------+---------------------------------------------------------------------+
|0    |[344, 512, 356]|[0.0015499128539677857, 0.0015376121897656258, 0.001534578266471104] |
|1    |[18, 30, 65]   |[0.0022911604122462727, 0.001960418125507248, 0.0018349997465317437] |
|2    |[786, 488, 233]|[0.0015630183260770532, 0.001526854272085165, 0.0014876709358065464] |
|3    |[694, 472, 278]|[0.0015284687573058132, 0.0014744810916764734, 0.0014590275131129385]|
|4    |[26, 41, 59]   |[0.0021430016431183575, 0.0020186395596801013, 0.0019869963503508]   |
|5    |[299, 52, 79]  |[0.001565406743621223, 0.0015238536866013536, 0.0015172878531468001] |
|6    |[40, 20, 13]   |[0.002642871084424427, 0.002399134704608303, 0.002290245868006309]   |
|7    |[1, 0, 2]      |[0.003685734064449033, 0.003193004308

In [23]:
for x, topic in enumerate(topics):
    print('topic nr: ' + str(x))
    words = topic["termIndices"]
    print(words.row[0])
    weights = topic["termWeights"]

topic nr: 0
Column<b"topic['termIndices']['row'][0]">
topic nr: 1
Column<b"termIndices['termIndices']['row'][0]">
topic nr: 2
Column<b"termWeights['termIndices']['row'][0]">
