In [1]:
import re
from pyspark import SparkConf, SparkContext, SQLContext 
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, udf, lit, size, col
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.clustering import LDA
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
sc = SparkContext.getOrCreate()
rdd = sc.wholeTextFiles("gs://mmj2169hw2/hadoop/tmp/bigquery/pyspark_output/lda_file/lda.txt/*")

tweets = rdd.map(lambda x: x[1].split('\n'))
tweet_array = tweets.collect()

In [3]:
df = spark.createDataFrame(map(lambda a: (a[0], ), tweet_array), ["tweets"])
df.printSchema()
df.show(truncate=False)

root
 |-- tweets: string (nullable = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tweets                                                                 

In [4]:
# preprocess the dataframe to have an array of words which can be sued for lda in the next stop
# preprocessing by removing links, numbers, punctuations, hashtags etc

def remove_users(tweet):
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet

def remove_links(tweet):
    tweet = re.sub(r'http\S+', '', tweet) 
    tweet = re.sub(r'bit.ly/\S+', '', tweet) 
    tweet = tweet.strip('[link]') 
    return tweet

def remove_punctuation(tweet):
    tweet = re.sub('['+punctuation + ']+', ' ', tweet) 
    return tweet

def remove_number(tweet):
    tweet = re.sub('([0-9]+)', '', tweet) 
    return tweet

def remove_hashtag(tweet):
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet

def remove_non_english(tweet):
    tweet = re.sub(r"[^\x00-\x7F]+", '', tweet)
    return tweet

def remove_empty_words(tweet):
    tweet = [word for word in tweet if len(word)>0]
    return tweet

In [5]:
remove_links=udf(remove_links)
remove_users=udf(remove_users)
remove_punctuation=udf(remove_punctuation)
remove_number=udf(remove_number)
remove_hashtag=udf(remove_hashtag)
remove_non_english=udf(remove_non_english)
remove_empty_words=udf(remove_empty_words)
df = df.withColumn('preprocessed_tweets', remove_links(df['tweets']))
df = df.withColumn('preprocessed_tweets', remove_users(df['preprocessed_tweets']))
df = df.withColumn('preprocessed_tweets', remove_punctuation(df['preprocessed_tweets']))
df = df.withColumn('preprocessed_tweets', remove_number(df['preprocessed_tweets']))
df = df.withColumn('preprocessed_tweets', remove_non_english(df['preprocessed_tweets']))
df = df.withColumn('preprocessed_tweets', split(df['preprocessed_tweets'], ' '))
df = df.withColumn('preprocessed_tweets', remove_empty_words(df['preprocessed_tweets']))

In [8]:
tokenizer = RegexTokenizer().setPattern("[\\W_]+").setMinTokenLength(3).setInputCol("preprocessed_tweets").setOutputCol("words")

In [9]:
transformed_dataframe = tokenizer.transform(df)

In [10]:
transformed_dataframe = transformed_dataframe.drop("tweets", "preprocessed_tweets")
transformed_dataframe.printSchema()
pd.DataFrame(transformed_dataframe.take(10), columns=transformed_dataframe.columns)

root
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)



Unnamed: 0,words
0,[]
1,"[latam, needs, gether, the, movie]"
2,"[themovielatam, winmetawin, brincando, jupy, m..."
3,"[wasn, the, best, actress, all, time, but, god..."
4,"[themovielatam, please, want, have, the, oppor..."
5,"[latam, needs, gether, the, moviert, before, g..."
6,"[shock, city, battle, squidgames]"
7,"[queria, tanto, tombo, gui, arajo, amanh, prov..."
8,"[disrupt, ala, conta, aiai, vejo, hora, tomar,..."
9,"[bbrightvc, breaking, marvel, studios, reporte..."


#### LDA - Classification on Stream Data

In [11]:
cv = CountVectorizer()
cv.setInputCol("words")
cv.setOutputCol("count_vectors")
cv_model = cv.fit(transformed_dataframe)
cv_model.setInputCol("words")
cv_df = cv_model.transform(transformed_dataframe)

In [12]:
cv_df.printSchema()
pd.DataFrame(cv_df.take(10), columns=cv_df.columns)

root
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- count_vectors: vector (nullable = true)



Unnamed: 0,words,count_vectors
0,[],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,"[latam, needs, gether, the, movie]","(1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,"[themovielatam, winmetawin, brincando, jupy, m...","(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ..."
3,"[wasn, the, best, actress, all, time, but, god...","(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ..."
4,"[themovielatam, please, want, have, the, oppor...","(3.0, 1.0, 2.0, 1.0, 2.0, 2.0, 2.0, 1.0, 1.0, ..."
5,"[latam, needs, gether, the, moviert, before, g...","(3.0, 2.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, ..."
6,"[shock, city, battle, squidgames]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,"[queria, tanto, tombo, gui, arajo, amanh, prov...","(0.0, 1.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,"[disrupt, ala, conta, aiai, vejo, hora, tomar,...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
9,"[bbrightvc, breaking, marvel, studios, reporte...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, ..."


#### Train LDA

In [13]:
lda = LDA(featuresCol="count_vectors", k=10)
lda_model = lda.fit(cv_df)

In [14]:
topics = lda_model.describeTopics(10)
topics.printSchema()
pd.DataFrame(topics.take(10), columns=topics.columns) 

root
 |-- topic: integer (nullable = false)
 |-- termIndices: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- termWeights: array (nullable = true)
 |    |-- element: double (containsNull = false)



Unnamed: 0,topic,termIndices,termWeights
0,0,"[11, 2, 170, 1, 177, 113, 57, 216, 166, 138]","[0.007491742205836237, 0.006514935777410096, 0..."
1,1,"[179, 47, 163, 77, 238, 161, 200, 141, 245, 97]","[0.005113887875789274, 0.0050726772149615, 0.0..."
2,2,"[167, 119, 230, 48, 1, 126, 172, 161, 219, 39]","[0.005199508682557248, 0.005172873605263552, 0..."
3,3,"[17, 5, 4, 1, 0, 9, 10, 3, 13, 140]","[0.011594834103265384, 0.011533603747157443, 0..."
4,4,"[92, 240, 135, 126, 239, 85, 128, 56, 45, 205]","[0.00883759827781119, 0.008472574434990018, 0...."
5,5,"[8, 139, 237, 185, 180, 113, 163, 199, 219, 97]","[0.005176307993981117, 0.005082828233701475, 0..."
6,6,"[1, 0, 9, 10, 60, 36, 240, 222, 206, 59]","[0.006073734114041892, 0.006062915008160177, 0..."
7,7,"[175, 7, 183, 192, 23, 40, 152, 16, 49, 43]","[0.007505569913712284, 0.007278665927345907, 0..."
8,8,"[214, 98, 222, 85, 89, 13, 62, 56, 5, 74]","[0.004990908947394858, 0.004894285541201307, 0..."
9,9,"[0, 146, 91, 182, 7, 12, 159, 229, 233, 42]","[0.006518755937754465, 0.006247457472250252, 0..."


#### Topic Rendering

In [15]:
transformed = lda_model.transform(cv_df).select("topicDistribution")  
transformed.show(truncate=False)  

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topicDistribution                                                                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]                                                                                                                                                                              |
|[0.016153501631012014,0.015096157605701883,0.015096242810481601,0.8605464226632783,0.01609209799452398,0.015096228037751205,0.0

In [16]:
ll = lda_model.logLikelihood(cv_df)  
lp = lda_model.logPerplexity(cv_df)
print("ll: ", ll)
print("lp: ", lp)

ll:  -4160.19803443369
lp:  13.551133662650455


In [17]:
print("Learned topics (as distributions over vocab of 250 words):")
topics_matrix = lda_model.topicsMatrix()
print(topics_matrix)

Learned topics (as distributions over vocab of 250 words):
DenseMatrix([[0.67021367, 0.81626506, 0.77396337, ..., 1.36951606, 0.84415586,
              1.2977404 ],
             [1.17242519, 0.62382319, 0.93440961, ..., 1.30128548, 0.73366457,
              0.64164861],
             [1.31198274, 0.8150819 , 0.83428161, ..., 0.80464391, 0.64943342,
              0.83267468],
             ...,
             [0.85542488, 0.76398576, 0.83763683, ..., 0.79912621, 0.74905216,
              0.8248356 ],
             [1.08716536, 0.75273105, 0.79605102, ..., 0.68827195, 0.77213337,
              0.72626644],
             [0.80212443, 0.65958154, 0.79308804, ..., 0.6447466 , 0.71329115,
              0.69264659]])
