In [1]:
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

import findspark
findspark.init()

In [2]:
##First create a spark session 
conf = pyspark.SparkConf().set('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.1').setMaster('local').setAppName('my app').setAll([("spark.driver.memory","40g"), ("spark.executor.memory","50g")])
sc= SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()


In [3]:
from pyspark.sql.types import *
from pyspark.ml import Pipeline

import sparknlp
from sparknlp import DocumentAssembler, Finisher
nlp_spark = sparknlp.start()

In [4]:
spark

In [5]:
df_tweet = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tweet_file.tweet_stream_").load()
df_tweet.show(5)
df_tweet.printSchema()

+--------------------+--------------------+------+--------------------+
|                 _id|                data|errors|            includes|
+--------------------+--------------------+------+--------------------+
|{62e97965167d0fcc...|{155909486, {null...|  null|{null, null, [{15...|
|{62e97965167d0fcc...|{1235404915515613...|  null|{null, [{12245393...|
|{62e97965167d0fcc...|{15127829, {null}...|  null|{null, [{89501706...|
|{62e97966167d0fcc...|{235714500, {null...|  null|{null, [{15128293...|
|{62e97966167d0fcc...|{2630269581, {nul...|  null|{null, null, [{26...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- author_id: string (nullable = true)
 |    |-- geo: struct (nullable = true)
 |    |    |-- place_id: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- referenced_tweets: array (

In [6]:
## query tweets and autor_id
b=df_tweet.select('data.author_id', 'data.text')
b.show(5)

+-------------------+--------------------+
|          author_id|                text|
+-------------------+--------------------+
|          155909486|https://t.co/HTjo...|
|1235404915515613184|RT @ImranKhanPTI:...|
|           15127829|RT @ShibleyTelham...|
|          235714500|@MiguelM34149403 ...|
|         2630269581|Alguém pra beber ...|
+-------------------+--------------------+
only showing top 5 rows



## Processing words

In [7]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from sparknlp.annotator import Stemmer, Lemmatizer, LemmatizerModel



In [8]:
tokenizer = Tokenizer(inputCol="text", outputCol="words_token")
tokenized = tokenizer.transform(b)


In [9]:
tokenized.show(5)

+-------------------+--------------------+--------------------+
|          author_id|                text|         words_token|
+-------------------+--------------------+--------------------+
|          155909486|https://t.co/HTjo...|[https://t.co/htj...|
|1235404915515613184|RT @ImranKhanPTI:...|[rt, @imrankhanpt...|
|           15127829|RT @ShibleyTelham...|[rt, @shibleytelh...|
|          235714500|@MiguelM34149403 ...|[@miguelm34149403...|
|         2630269581|Alguém pra beber ...|[alguém, pra, beb...|
+-------------------+--------------------+--------------------+
only showing top 5 rows



In [10]:
word_token_= tokenized.select("author_id",'words_token')
word_token_.show(5)

+-------------------+--------------------+
|          author_id|         words_token|
+-------------------+--------------------+
|          155909486|[https://t.co/htj...|
|1235404915515613184|[rt, @imrankhanpt...|
|           15127829|[rt, @shibleytelh...|
|          235714500|[@miguelm34149403...|
|         2630269581|[alguém, pra, beb...|
+-------------------+--------------------+
only showing top 5 rows



In [11]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words_token", outputCol="filtered")
removed_stop=remover.transform(word_token_)

In [12]:
removed_stop.show(5)

+-------------------+--------------------+--------------------+
|          author_id|         words_token|            filtered|
+-------------------+--------------------+--------------------+
|          155909486|[https://t.co/htj...|[https://t.co/htj...|
|1235404915515613184|[rt, @imrankhanpt...|[rt, @imrankhanpt...|
|           15127829|[rt, @shibleytelh...|[rt, @shibleytelh...|
|          235714500|[@miguelm34149403...|[@miguelm34149403...|
|         2630269581|[alguém, pra, beb...|[alguém, pra, beb...|
+-------------------+--------------------+--------------------+
only showing top 5 rows



In [13]:
from pyspark.ml.feature import CountVectorizer

# fit a CountVectorizerModel from the corpus.
c_v = CountVectorizer(inputCol="filtered", outputCol="features")
c_v_model = c_v.fit(removed_stop)
result = c_v_model.transform(removed_stop)

In [14]:
result.show(5)

+-------------------+--------------------+--------------------+--------------------+
|          author_id|         words_token|            filtered|            features|
+-------------------+--------------------+--------------------+--------------------+
|          155909486|[https://t.co/htj...|[https://t.co/htj...|(16287,[16148],[1...|
|1235404915515613184|[rt, @imrankhanpt...|[rt, @imrankhanpt...|(16287,[0,12,312,...|
|           15127829|[rt, @shibleytelh...|[rt, @shibleytelh...|(16287,[0,207,431...|
|          235714500|[@miguelm34149403...|[@miguelm34149403...|(16287,[313,369,5...|
|         2630269581|[alguém, pra, beb...|[alguém, pra, beb...|(16287,[35,80,133...|
+-------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [15]:
from pyspark.ml.feature import NGram
ngram = NGram(n=2, inputCol="filtered", outputCol="ngrams")

ngramDataFrame = ngram.transform(result)

In [17]:
ngramDataFrame.show(5)

+-------------------+--------------------+--------------------+--------------------+--------------------+
|          author_id|         words_token|            filtered|            features|              ngrams|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|          155909486|[https://t.co/htj...|[https://t.co/htj...|(16287,[16148],[1...|                  []|
|1235404915515613184|[rt, @imrankhanpt...|[rt, @imrankhanpt...|(16287,[0,12,312,...|[rt @imrankhanpti...|
|           15127829|[rt, @shibleytelh...|[rt, @shibleytelh...|(16287,[0,207,431...|[rt @shibleytelha...|
|          235714500|[@miguelm34149403...|[@miguelm34149403...|(16287,[313,369,5...|[@miguelm34149403...|
|         2630269581|[alguém, pra, beb...|[alguém, pra, beb...|(16287,[35,80,133...|[alguém pra, pra ...|
+-------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



## TFIDF

In [18]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [118]:
hashingTF = HashingTF(inputCol="filtered", outputCol="hashed")
featurizedData = hashingTF.transform(removed_stop)

idf = IDF(inputCol="hashed", outputCol="TFIDF")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


In [119]:
rescaledData.show(5)

+-------------------+--------------------+--------------------+--------------------+--------------------+
|          author_id|         words_token|            filtered|              hashed|               TFIDF|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|          155909486|[https://t.co/htj...|[https://t.co/htj...|(262144,[81354],[...|(262144,[81354],[...|
|1235404915515613184|[rt, @imrankhanpt...|[rt, @imrankhanpt...|(262144,[32869,49...|(262144,[32869,49...|
|           15127829|[rt, @shibleytelh...|[rt, @shibleytelh...|(262144,[8798,101...|(262144,[8798,101...|
|          235714500|[@miguelm34149403...|[@miguelm34149403...|(262144,[6805,152...|(262144,[6805,152...|
|         2630269581|[alguém, pra, beb...|[alguém, pra, beb...|(262144,[4412,352...|(262144,[4412,352...|
+-------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [121]:
word_token_1= rescaledData.select("author_id",'filtered','TFIDF')
word_token_1.show(5)

+-------------------+--------------------+--------------------+
|          author_id|            filtered|               TFIDF|
+-------------------+--------------------+--------------------+
|          155909486|[https://t.co/htj...|(262144,[81354],[...|
|1235404915515613184|[rt, @imrankhanpt...|(262144,[32869,49...|
|           15127829|[rt, @shibleytelh...|(262144,[8798,101...|
|          235714500|[@miguelm34149403...|(262144,[6805,152...|
|         2630269581|[alguém, pra, beb...|(262144,[4412,352...|
+-------------------+--------------------+--------------------+
only showing top 5 rows



In [122]:
word_token_1 = word_token_1.pandas_api()

In [140]:
word_token_1.filtered[4]

['alguém', 'pra', 'beber', 'hoje,', 'eu', 'imploro']

In [123]:
word_token_1.TFIDF[4]

SparseVector(262144, {4412: 5.1573, 35213: 7.0291, 74789: 7.0291, 83106: 6.6236, 161758: 7.0291, 207680: 4.6777})

## Modeling  Tweets Topic

In [125]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA

lda = LDA(
    featuresCol='TFIDF',
    seed=123,
    maxIter=20,
    k=10,
    topicDistributionCol='featurizedData',
)

model = lda.fit(rescaledData)
# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(rescaledData)
transformed.show(truncate=False)

The topics described by their top-weighted terms:
+-----+-----------------------+---------------------------------------------------------------------+
|topic|termIndices            |termWeights                                                          |
+-----+-----------------------+---------------------------------------------------------------------+
|0    |[2903, 217717, 42434]  |[1.734842755698955E-4, 1.6230068023698996E-4, 1.3002519445698781E-4] |
|1    |[38640, 249180, 89833] |[4.185095095737049E-4, 3.42043411015669E-4, 3.128435789258739E-4]    |
|2    |[249180, 151393, 77407]|[0.0016264847548520694, 3.093773686671326E-4, 2.7793488126551057E-4] |
|3    |[43265, 164359, 140762]|[0.0013828416768217988, 0.001121624697245928, 0.0010102484487148969] |
|4    |[224590, 249180, 52426]|[4.70015415792075E-4, 4.2440763493126844E-4, 3.606693013514077E-4]   |
|5    |[249180, 77407, 186480]|[0.0010853978535658821, 3.473550918148462E-4, 2.9464970745558366E-4] |
|6    |[190256, 55905, 221416]|[

In [127]:
transformed.show(5)

+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          author_id|         words_token|            filtered|              hashed|               TFIDF|      featurizedData|
+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          155909486|[https://t.co/htj...|[https://t.co/htj...|(262144,[81354],[...|(262144,[81354],[...|[0.01029995376275...|
|1235404915515613184|[rt, @imrankhanpt...|[rt, @imrankhanpt...|(262144,[32869,49...|(262144,[32869,49...|[8.20601082592988...|
|           15127829|[rt, @shibleytelh...|[rt, @shibleytelh...|(262144,[8798,101...|(262144,[8798,101...|[8.54797569197740...|
|          235714500|[@miguelm34149403...|[@miguelm34149403...|(262144,[6805,152...|(262144,[6805,152...|[0.99279453808100...|
|         2630269581|[alguém, pra, beb...|[alguém, pra, beb...|(262144,[4412,352...|(262144,[4412,352...|[0.002

In [128]:
word_token_= transformed.select("filtered",'TFIDF','featurizedData')
word_token_.show(5)

+--------------------+--------------------+--------------------+
|            filtered|               TFIDF|      featurizedData|
+--------------------+--------------------+--------------------+
|[https://t.co/htj...|(262144,[81354],[...|[0.01029995376275...|
|[rt, @imrankhanpt...|(262144,[32869,49...|[8.20601082592988...|
|[rt, @shibleytelh...|(262144,[8798,101...|[8.54797569197740...|
|[@miguelm34149403...|(262144,[6805,152...|[0.99279453808100...|
|[alguém, pra, beb...|(262144,[4412,352...|[0.00211526444379...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [85]:
word_token_.printSchema()

root
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- TFIDF: vector (nullable = true)
 |-- featurizedData: vector (nullable = true)



In [70]:
import pyspark.pandas as ps



In [129]:
word_processed= word_token_.pandas_api()

In [130]:
word_processed.head()

Unnamed: 0,filtered,TFIDF,featurizedData
0,[https://t.co/htjojyngvv],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.010299953762756752, 0.011495250270449205, 0..."
1,"[rt, @imrankhanpti:, tragic, news, army, aviat...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.000820601082592988, 0.9922530146669469, 0.0..."
2,"[rt, @shibleytelhami:, regardless, whether, am...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.0008547975691977401, 0.0009540117955397931,..."
3,"[@miguelm34149403, @krasnyyskorpion, mucho, bl...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.9927945380810044, 0.0008414785023742882, 0...."
4,"[alguém, pra, beber, hoje,, eu, imploro]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.0021152644437947535, 0.9800300745071279, 0...."


In [133]:
word_processed.filtered[2]

['rt',
 '@shibleytelhami:',
 'regardless',
 'whether',
 'americans',
 'support',
 'boycotts',
 'israel,',
 'two',
 'thirds,',
 'including',
 '52%',
 'republicans',
 '82%',
 'of…']

In [134]:
word_processed.TFIDF[2]

SparseVector(262144, {8798: 6.3359, 10156: 7.0291, 33835: 7.0291, 53326: 7.0291, 77407: 0.6835, 118749: 7.0291, 129819: 7.0291, 138478: 7.0291, 177817: 5.9305, 184774: 6.6236, 190040: 6.6236, 195182: 7.0291, 249598: 6.6236, 251394: 6.6236, 258728: 5.525})

In [139]:
word_processed.featurizedData[2]

DenseVector([0.0009, 0.001, 0.0009, 0.001, 0.0009, 0.0009, 0.0009, 0.0009, 0.0008, 0.9918])

In [131]:
topics_model = topics.pandas_api()

In [132]:
topics_model

Unnamed: 0,topic,termIndices,termWeights
0,0,"[2903, 217717, 42434]","[0.0001734842755698955, 0.00016230068023698996..."
1,1,"[38640, 249180, 89833]","[0.0004185095095737049, 0.000342043411015669, ..."
2,2,"[249180, 151393, 77407]","[0.0016264847548520694, 0.0003093773686671326,..."
3,3,"[43265, 164359, 140762]","[0.0013828416768217988, 0.001121624697245928, ..."
4,4,"[224590, 249180, 52426]","[0.000470015415792075, 0.00042440763493126844,..."
5,5,"[249180, 77407, 186480]","[0.0010853978535658821, 0.0003473550918148462,..."
6,6,"[190256, 55905, 221416]","[0.00022758832365001638, 0.0002221785053223970..."
7,7,"[71432, 249180, 82364]","[0.00046260753900908424, 0.0003069116480373383..."
8,8,"[75577, 250004, 92847]","[0.0001822850295918075, 0.00013752098779432016..."
9,9,"[188835, 32895, 200868]","[0.00017116977047355276, 0.0001265240428499675..."


In [136]:
topics_model.termIndices[2]

[249180, 151393, 77407]

In [199]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors



idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+



In [200]:
test = rescaledData.pandas_api()

In [201]:
test

Unnamed: 0,label,sentence,words,rawFeatures,features
0,0.0,Hi I heard about Spark,"[hi, i, heard, about, spark]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.2876820724517..."
1,0.0,I wish Java could use case classes,"[i, wish, java, could, use, case, classes]","(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, ...","(0.6931471805599453, 0.0, 0.6931471805599453, ..."
2,1.0,Logistic regression models are neat,"[logistic, regression, models, are, neat]","(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.6931471805599453, 0.69314718..."


In [243]:
test.features[2]

SparseVector(20, {3: 0.6931, 4: 0.6931, 6: 0.2877, 11: 0.6931, 19: 0.6931})

In [235]:
hashingTF.indexOf('java')

7

In [241]:
for i in test.words[0]:
    print(i, hashingTF.indexOf(i))

hi 8
i 16
heard 13
about 16
spark 6


In [239]:
for i in test.words[1]:
    print(i, hashingTF.indexOf(i))

i 16
wish 15
java 7
could 0
use 13
case 2
classes 7


In [240]:
for i in test.words[2]:
    print(i, hashingTF.indexOf(i))

logistic 4
regression 19
models 11
are 3
neat 6


In [202]:
test.words[1]

['i', 'wish', 'java', 'could', 'use', 'case', 'classes']

In [211]:
test.features[0]

SparseVector(20, {6: 0.2877, 8: 0.6931, 13: 0.2877, 16: 0.5754})

In [212]:
test.features[1]

SparseVector(20, {0: 0.6931, 2: 0.6931, 7: 1.3863, 13: 0.2877, 15: 0.6931, 16: 0.2877})

In [213]:
test.features[2]

SparseVector(20, {3: 0.6931, 4: 0.6931, 6: 0.2877, 11: 0.6931, 19: 0.6931})

In [207]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA

lda = LDA(
    featuresCol='features',
    seed=123,
    maxIter=20,
    k=3,
    topicDistributionCol='featurizedData',
)

model = lda.fit(rescaledData)
# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(rescaledData)
transformed.show(truncate=False)

The topics described by their top-weighted terms:
+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+---------------------------------------------------------------+
|0    |[7, 2, 15] |[0.06727333317531045, 0.0615793310719479, 0.05931043314007861] |
|1    |[3, 19, 6] |[0.06763450195906132, 0.06077733185902623, 0.0546225125000529] |
|2    |[8, 6, 18] |[0.060804345997195745, 0.0580141250226337, 0.05444104987469735]|
+-----+-----------+---------------------------------------------------------------+

+-----+-----------------------------------+------------------------------------------+-----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------+
|label|sentence          

In [198]:
ll = model.logLikelihood(rescaledData)
lp = model.logPerplexity(rescaledData)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -198541.352159139
The upper bound on perplexity: 18094.331467000426


In [221]:
trans=topics.pandas_api()

In [226]:
trans.termWeights[2]

[0.060804345997195745, 0.0580141250226337, 0.05444104987469735]