In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
sc = spark.sparkContext
data = sc.textFile("Clean_Tweets1.txt")
documents = data.map(lambda line: line.split(" "))

In [3]:
documents.take(2)

[['toyota',
  'second',
  'attempt',
  'ship',
  'prius',
  'car',
  'photovoltaic',
  'roof',
  'current',
  'tech',
  'still',
  'far'],
 ['hydrogen', 'solution', 'energy', 'storage']]

In [4]:
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
df2 = spark.createDataFrame(documents,ArrayType(StringType()))

In [5]:
df2.show()

+--------------------+
|               value|
+--------------------+
|[toyota, second, ...|
|[hydrogen, soluti...|
|[new, solar, cell...|
|[bosch, vision, a...|
|[sun, hot, topic,...|
|[see, divine, fem...|
|          [well, do]|
|[paint, road, gre...|
|[bargain, solar, ...|
|[crazy, solar, ec...|
|[copy, consumer, ...|
|[publish, medium,...|
|[wind, solar, ene...|
|[magnetic, fly, s...|
|[high, cost, capi...|
|[household, pay, ...|
|[solar, power, tree]|
|[benefit, locally...|
|[tan, fan, ura, l...|
|[solar, energy, e...|
+--------------------+
only showing top 20 rows



In [6]:
from pyspark.ml.feature import CountVectorizer

In [7]:
cv = CountVectorizer(inputCol="value", outputCol="rawFeatures", vocabSize=1000, minDF=2.0)

In [8]:
model = cv.fit(df2)

In [9]:
result = model.transform(df2)
result.show(truncate=False)

+--------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+
|value                                                                                             |rawFeatures                                                                 |
+--------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+
|[toyota, second, attempt, ship, prius, car, photovoltaic, roof, current, tech, still, far]        |(1000,[101,196,306,319,550,714,886],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])          |
|[hydrogen, solution, energy, storage]                                                             |(1000,[1,39,97],[1.0,1.0,1.0])                                              |
|[new, solar, cell, capture, nearly, energy, solar, spectrum]                                      |(1000,[0,1

In [10]:
vocab = model.vocabulary
vocab_broadcast = sc.broadcast(vocab)

In [51]:
#from pyspark.ml.feature import HashingTF, IDF, Tokenizer
#hashingTF = HashingTF(inputCol="value", outputCol="rawFeatures", numFeatures=20)

In [62]:
#featurizedData = hashingTF.transform(df2)

In [12]:
from pyspark.ml.feature import IDF

In [13]:
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [14]:
idfModel = idf.fit(result)

In [15]:
rescaledData = idfModel.transform(result)

In [16]:
rescaledData.select("features").show()

+--------------------+
|            features|
+--------------------+
|(1000,[101,196,30...|
|(1000,[1,39,97],[...|
|(1000,[0,1,6,119,...|
|(1000,[176,207,34...|
|(1000,[0,1,37,91,...|
|(1000,[1,60,90,70...|
|(1000,[241,429],[...|
|(1000,[0,1,13,37,...|
|(1000,[0,1,2,9,28...|
|(1000,[0,1,4,350,...|
|(1000,[43,176,242...|
|(1000,[105,258,87...|
|(1000,[0,1,3],[0....|
|(1000,[0,1,10,432...|
|(1000,[0,5,14,25,...|
|(1000,[0,1,98,135...|
|(1000,[0,2],[0.37...|
|(1000,[1,7,9,142]...|
|(1000,[28,905],[3...|
|(1000,[0,1,79,84,...|
+--------------------+
only showing top 20 rows



In [17]:
from pyspark.ml.clustering import LDA

In [18]:
lda = LDA(k=10, seed=123, optimizer="em", featuresCol="features")

In [19]:
ldamodel= lda.fit(rescaledData)

In [20]:
ldatopics = ldamodel.describeTopics()
ldatopics.show(25)

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[78, 77, 111, 13,...|[0.01878740098879...|
|    1|[27, 56, 144, 124...|[0.02234461353753...|
|    2|[76, 4, 136, 34, ...|[0.01740265220412...|
|    3|[145, 1, 159, 31,...|[0.01643813697646...|
|    4|[2, 18, 101, 42, ...|[0.02556384629924...|
|    5|[17, 28, 41, 2, 3...|[0.03401049878239...|
|    6|[30, 66, 68, 67, ...|[0.02371864805385...|
|    7|[57, 85, 40, 79, ...|[0.02531449653623...|
|    8|[48, 6, 63, 105, ...|[0.03642119201522...|
|    9|[12, 46, 110, 108...|[0.02669104210769...|
+-----+--------------------+--------------------+



In [21]:
def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

In [22]:
from pyspark.sql.functions import UserDefinedFunction
udf_map_termID_to_Word =UserDefinedFunction(map_termID_to_Word , ArrayType(StringType()))

In [23]:
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))

In [24]:
ldaResults = ldamodel.transform(rescaledData)
ldaResults.show()

+--------------------+--------------------+--------------------+--------------------+
|               value|         rawFeatures|            features|   topicDistribution|
+--------------------+--------------------+--------------------+--------------------+
|[toyota, second, ...|(1000,[101,196,30...|(1000,[101,196,30...|[0.06104347306875...|
|[hydrogen, soluti...|(1000,[1,39,97],[...|(1000,[1,39,97],[...|[0.08994214994102...|
|[new, solar, cell...|(1000,[0,1,6,119,...|(1000,[0,1,6,119,...|[0.07220296451517...|
|[bosch, vision, a...|(1000,[176,207,34...|(1000,[176,207,34...|[0.10500829409018...|
|[sun, hot, topic,...|(1000,[0,1,37,91,...|(1000,[0,1,37,91,...|[0.07961827877802...|
|[see, divine, fem...|(1000,[1,60,90,70...|(1000,[1,60,90,70...|[0.09034843081697...|
|          [well, do]|(1000,[241,429],[...|(1000,[241,429],[...|[0.08576108411485...|
|[paint, road, gre...|(1000,[0,1,13,37,...|(1000,[0,1,13,37,...|[0.09852136300376...|
|[bargain, solar, ...|(1000,[0,1,2,9,28...|(1000,[0,1,

In [25]:
ldatopics_mapped.select('topic', 'topic_desc').show(10,False)

+-----+----------------------------------------------------------------------------+
|topic|topic_desc                                                                  |
+-----+----------------------------------------------------------------------------+
|0    |[find, next, technology, use, home, bill, panel, eclipse, nuclear, energy]  |
|1    |[thanks, daily, school, business, wall, big, australia, energy, make, clean]|
|2    |[time, eclipse, europe, great, year, effort, instal, two, solar, energy]    |
|3    |[move, energy, planet, news, use, right, solar, rise, new, plan]            |
|4    |[power, plant, roof, battery, become, set, gas, wind, electricity, price]   |
|5    |[save, light, money, power, wind, lead, program, americans, amount, live]   |
|6    |[coal, farm, world, read, like, largest, produce, eclipse, moon, u]         |
|7    |[water, tesla, first, even, city, country, company, year, storage, support] |
|8    |[rt, new, fight, post, blog, interest, report, today, chan