<a href="https://colab.research.google.com/github/smalaboy/projet_ter/blob/main/twitter_topics_modelling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# !wget -O twitter_training2.zip https://drive.google.com/u/0/uc?id=1EPin6POZkj1S4xBhRv_EYHmlCF_fO9tk&export=download

In [2]:
!ls

drive	     spark-3.2.1-bin-hadoop2.7	    training.csv
sample_data  spark-3.2.1-bin-hadoop2.7.tgz


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
!ls

drive	     spark-3.2.1-bin-hadoop2.7	    spark-3.2.1-bin-hadoop2.7.tgz.1
sample_data  spark-3.2.1-bin-hadoop2.7.tgz  training.csv


In [6]:
!unzip /content/drive/MyDrive/datasets/twitter_training.zip

Archive:  /content/drive/MyDrive/datasets/twitter_training.zip
replace training.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: training.csv            


In [3]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType

In [8]:
schema = StructType()\
              .add("No", IntegerType(), True)\
              .add("id", StringType(), True)\
              .add("datetime", StringType(), True)\
              .add("query", StringType(), True)\
              .add("user", StringType(), True)\
              .add("text", StringType(), True)
dataset = spark.read.csv('training.csv', schema=schema)
dataset.printSchema()
dataset.head(5)

root
 |-- No: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



[Row(No=0, id='1467810369', datetime='Mon Apr 06 22:19:45 PDT 2009', query='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D"),
 Row(No=0, id='1467810672', datetime='Mon Apr 06 22:19:49 PDT 2009', query='NO_QUERY', user='scotthamilton', text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"),
 Row(No=0, id='1467810917', datetime='Mon Apr 06 22:19:53 PDT 2009', query='NO_QUERY', user='mattycus', text='@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds'),
 Row(No=0, id='1467811184', datetime='Mon Apr 06 22:19:57 PDT 2009', query='NO_QUERY', user='ElleCTF', text='my whole body feels itchy and like its on fire '),
 Row(No=0, id='1467811193', datetime='Mon Apr 06 22:19:57 PDT 2009', query='NO_QUERY', user='Karoli', text="@nationwideclass no, it's not behaving at all. i'm mad. why am 

**Preprocessing tweets**

In [4]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_timestamp
import pyspark.sql.types as T


In [10]:
# Removes twitter handles
def remove_users(tweet):
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet

In [11]:
# Removes punctuation
punctuation = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~•@â'
def remove_punctuation(tweet):
    tweet = re.sub(f'[{punctuation}]+', ' ', tweet) 
    return tweet

In [12]:
# Removes numbers
def remove_number(tweet):
    tweet = re.sub('([0-9]+)', '', tweet) 
    return tweet

In [13]:
# Removes hastags
def remove_hashtag(tweet):
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet

In [14]:
def remove_links(tweet):
    tweet = re.sub(r'http\S+', '', tweet) 
    tweet = re.sub(r'bit.ly/\S+', '', tweet) 
    tweet = tweet.strip('[link]') 
    return tweet

In [15]:
# User defined functions registration
remove_users=udf(remove_users)
remove_punctuation=udf(remove_punctuation)
remove_number=udf(remove_number)
remove_hashtag=udf(remove_hashtag)
remove_links=udf(remove_links)

In [16]:
processed_tweets_1 = dataset.withColumn('processed_text', remove_users(dataset.text))
processed_tweets_1 = processed_tweets_1.withColumn('processed_text', remove_punctuation(processed_tweets_1.processed_text))
processed_tweets_1 = processed_tweets_1.withColumn('processed_text', remove_number(processed_tweets_1.processed_text))
processed_tweets_1 = processed_tweets_1.withColumn('processed_text', remove_hashtag(processed_tweets_1.processed_text))
processed_tweets_1 = processed_tweets_1.withColumn('processed_text', remove_links(processed_tweets_1.processed_text))

In [17]:
processed_tweets_1.printSchema()
processed_tweets_1.head(5)

root
 |-- No: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- processed_text: string (nullable = true)



[Row(No=0, id='1467810369', datetime='Mon Apr 06 22:19:45 PDT 2009', query='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", processed_text=' http twitpic com yzl   Awww  that s a bummer   You shoulda got David Carr of Third Day to do it   D'),
 Row(No=0, id='1467810672', datetime='Mon Apr 06 22:19:49 PDT 2009', query='NO_QUERY', user='scotthamilton', text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!", processed_text='s upset that he can t update his Facebook by texting it  and might cry as a result  School today also  Blah '),
 Row(No=0, id='1467810917', datetime='Mon Apr 06 22:19:53 PDT 2009', query='NO_QUERY', user='mattycus', text='@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds', processed_text=' I dived many times for the ball  Managed to save    The rest go out o

In [5]:
from pyspark.ml.feature import RegexTokenizer

In [19]:
# Tokenize and filter out words with len < 3
tokenizer = RegexTokenizer().setPattern("[\\W_]+").setMinTokenLength(3).setInputCol("processed_text").setOutputCol("tokens")

In [20]:
tokenized_tweets = tokenizer.transform(processed_tweets_1)

In [21]:
tokenized_tweets.printSchema()
tokenized_tweets.head(5)

root
 |-- No: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- processed_text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



[Row(No=0, id='1467810369', datetime='Mon Apr 06 22:19:45 PDT 2009', query='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", processed_text=' http twitpic com yzl   Awww  that s a bummer   You shoulda got David Carr of Third Day to do it   D', tokens=['http', 'twitpic', 'com', 'yzl', 'awww', 'that', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'third', 'day']),
 Row(No=0, id='1467810672', datetime='Mon Apr 06 22:19:49 PDT 2009', query='NO_QUERY', user='scotthamilton', text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!", processed_text='s upset that he can t update his Facebook by texting it  and might cry as a result  School today also  Blah ', tokens=['upset', 'that', 'can', 'update', 'his', 'facebook', 'texting', 'and', 'might', 'cry', 'result', 'school', 'today', 'also', 'blah']),
 Row(No=0, id='146781

In [6]:
import nltk
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


In [23]:
# Lemmatization
lemmatizer = WordNetLemmatizer()
def lemmatize(row):
    row = [lemmatizer.lemmatize(word,'v') for word in row]
    return row

lemmatization_udf = udf(lemmatize, T.ArrayType(T.StringType()))

In [24]:
tokenized_tweets=tokenized_tweets.withColumn('tokens_lemma', lemmatization_udf(tokenized_tweets['tokens']))

In [25]:
tokenized_tweets.printSchema()
tokenized_tweets.head(5)

root
 |-- No: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- processed_text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tokens_lemma: array (nullable = true)
 |    |-- element: string (containsNull = true)



[Row(No=0, id='1467810369', datetime='Mon Apr 06 22:19:45 PDT 2009', query='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", processed_text=' http twitpic com yzl   Awww  that s a bummer   You shoulda got David Carr of Third Day to do it   D', tokens=['http', 'twitpic', 'com', 'yzl', 'awww', 'that', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'third', 'day'], tokens_lemma=['http', 'twitpic', 'com', 'yzl', 'awww', 'that', 'bummer', 'you', 'shoulda', 'get', 'david', 'carr', 'third', 'day']),
 Row(No=0, id='1467810672', datetime='Mon Apr 06 22:19:49 PDT 2009', query='NO_QUERY', user='scotthamilton', text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!", processed_text='s upset that he can t update his Facebook by texting it  and might cry as a result  School today also  Blah ', tokens=['upset', 'that', 'can',

**Topic modelling with grid search for the number of topics**

In [7]:
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

In [27]:
countVectorizer = CountVectorizer()
countVectorizer.setInputCol("tokens_lemma")
countVectorizer.setOutputCol("features")
vectorizerModel = countVectorizer.fit(tokenized_tweets)
wordsVector = vectorizerModel.transform(tokenized_tweets)

In [28]:
wordsVector.show(5)

+---+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| No|        id|            datetime|   query|           user|                text|      processed_text|              tokens|        tokens_lemma|            features|
+---+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...| http twitpic com...|[http, twitpic, c...|[http, twitpic, c...|(262144,[2,6,7,17...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|s upset that he c...|[upset, that, can...|[upset, that, can...|(262144,[1,7,13,2...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...| I dived many tim...|[dived, many, tim...|[dive, many, time...|(262144,[0,5,2

In [29]:
# num_topics=range(3,8)
# models=[]
# log_likeli=[]
# log_perp=[]
# for num in num_topics:
#   lda = LDA(k=num, maxIter=50)
#   ldaModel = lda.fit(wordsVector)
#   models.append(ldaModel)
#   ll = ldaModel.logLikelihood(wordsVector)
#   lp = ldaModel.logPerplexity(wordsVector)
#   log_likeli.append(ll)
#   log_perp.append(lp)

In [30]:
# import pandas as pd
# import matplotlib.pyplot as plt
# plot_data=pd.DataFrame(list(zip(num_topics,log_likeli,log_perp)),
#             columns=['topics_num','logLikelihood','logPerplexity'])    

# plot_data.plot(x='topics_num',y='logLikelihood',kind = 'line')
# plt.show()

# plot_data.plot(x='topics_num',y='logPerplexity',kind = 'line')
# plt.show()


In [31]:
lda = LDA(k=6, maxIter=100)
lda_model = lda.fit(wordsVector)

In [32]:
# extracting vocabulary from CountVectorizer
vocabulary = vectorizerModel.vocabulary

# create topics based on LDA
lda_topics = lda_model.describeTopics()
lda_topics.show()


+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[24, 42, 118, 96,...|[0.14688533470031...|
|    1|[2491, 0, 5108, 5...|[0.00429466665852...|
|    2|[0, 1, 4, 3, 2, 5...|[0.03710505284451...|
|    3|[3146, 3991, 0, 4...|[0.00311697690132...|
|    4|[323, 799, 1055, ...|[0.05283965861080...|
|    5|[2, 33, 5, 68, 12...|[0.08312545185625...|
+-----+--------------------+--------------------+





In [33]:
topics_words = lda_topics.rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocabulary[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)
    
transformed = lda_model.transform(wordsVector)
transformed.show(10)

topic: 0
*************************
http
com
twitpic
bite
tinyurl
plurk
blip
the
and
photo
*************************
topic: 1
*************************
othing
the
homee
for
boredd
ting
with
chillen
wakey
som
*************************
topic: 2
*************************
the
and
have
be
you
for
get
that
but
just
*************************
topic: 3
*************************
dia
bom
the
ana
bind
neither
hai
hugh
gnight
boa
*************************
topic: 4
*************************
welcome
yup
congratulations
pleasure
goodmorning
thankyou
que
your
you
spongebob
*************************
topic: 5
*************************
you
thank
for
twitter
follow
miss
be
know
what
hey
*************************


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.2.1-bin-hadoop2.7/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.2.1-bin-hadoop2.7/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [34]:
transformed = lda_model.transform(wordsVector)
transformed.show(10)

+---+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| No|        id|            datetime|   query|           user|                text|      processed_text|              tokens|        tokens_lemma|            features|   topicDistribution|
+---+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...| http twitpic com...|[http, twitpic, c...|[http, twitpic, c...|(262144,[2,6,7,17...|[0.24075470291006...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|s upset that he c...|[upset, that, can...|[upset, that, can...|(262144,[1,7,13,2...|[0.00528633055907...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       ma

In [35]:
lda_model.save("lda_model")

In [39]:
!tar cvf lda_model.tar lda_model

lda_model/
lda_model/data/
lda_model/data/part-00000-0316421a-2920-4def-a82e-d6e8aa3edabd-c000.snappy.parquet
lda_model/data/._SUCCESS.crc
lda_model/data/.part-00000-0316421a-2920-4def-a82e-d6e8aa3edabd-c000.snappy.parquet.crc
lda_model/data/_SUCCESS
lda_model/metadata/
lda_model/metadata/.part-00000.crc
lda_model/metadata/part-00000
lda_model/metadata/._SUCCESS.crc
lda_model/metadata/_SUCCESS


In [40]:
wordsVector.write.json("processed_data")

In [41]:
!tar cvf processed_data.tar processed_data

processed_data/
processed_data/part-00000-b0ac3ea1-1b94-42e4-a463-cd009ba86bdc-c000.json
processed_data/part-00001-b0ac3ea1-1b94-42e4-a463-cd009ba86bdc-c000.json
processed_data/.part-00000-b0ac3ea1-1b94-42e4-a463-cd009ba86bdc-c000.json.crc
processed_data/.part-00001-b0ac3ea1-1b94-42e4-a463-cd009ba86bdc-c000.json.crc
processed_data/._SUCCESS.crc
processed_data/_SUCCESS


In [47]:
!ls

drive		processed_data.tar	       spark-3.2.1-bin-hadoop2.7.tgz.1
lda_model	sample_data		       training.csv
lda_model.tar	spark-3.2.1-bin-hadoop2.7
processed_data	spark-3.2.1-bin-hadoop2.7.tgz


In [10]:
from pyspark.ml.clustering import LocalLDAModel, LDAModel, LDA

In [8]:
!tar xvf lda_model.tar

lda_model/
lda_model/data/
lda_model/data/part-00000-0316421a-2920-4def-a82e-d6e8aa3edabd-c000.snappy.parquet
lda_model/data/._SUCCESS.crc
lda_model/data/.part-00000-0316421a-2920-4def-a82e-d6e8aa3edabd-c000.snappy.parquet.crc
lda_model/data/_SUCCESS
lda_model/metadata/
lda_model/metadata/.part-00000.crc
lda_model/metadata/part-00000
lda_model/metadata/._SUCCESS.crc
lda_model/metadata/_SUCCESS


In [11]:
loaded_model = LocalLDAModel.load("lda_model")

In [50]:
lda_model.transform(wordsVector[0])

TypeError: ignored