In [2]:
import pandas as pd
import ijson
import json
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')

In [3]:
import glob
tweet_id=[]
time = []
text = []
location =[]
city = []
read_files = glob.glob("30day/*.json")

for file in read_files:
    with open(str(file), 'r') as f:
        objects = ijson.items(f, 'results.item')
        for obj in objects:
            try:
                for row in obj:
                    if row['text'].startswith("RT") == False and row['id'] not in tweet_id:
                        city.append(row['place']['name'])
                        location.append(row['coordinates'])
                        time.append(row['created_at'])
                        text.append(row['text'])
                        tweet_id.append(row['id'])
            except:
                pass

data = pd.DataFrame({"text":text,"time":time,"location":location},columns=["text","time","location"])
data['time'] = pd.to_datetime(data['time']).dt.date

In [4]:
# tweet_id = {'id':tweet_id}
# with open('30daytweet_id.json', 'w') as outfile:
#     json.dump(tweet_id, outfile)

In [7]:
from SentimentAnalysis import *

In [8]:
spark = SparkSession.builder.appName('NLP').getOrCreate()

In [9]:
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
remove_features_udf = udf(remove_features, StringType())
sentiment_analysis_udf = udf(sentiment_analysis , FloatType())
sentiment_udf = udf(lambda x: condition(x), StringType())

In [13]:
df = spark.createDataFrame(data)
df = df.withColumn('text_non_asci',strip_non_ascii_udf(df['text']))
df.show()

+--------------------+----------+--------------------+--------------------+
|                text|      time|            location|       text_non_asci|
+--------------------+----------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @Peter...|
|@SciNate @Breakfa...|2021-03-30|                null|@SciNate @Breakfa...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras I...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|
|Time for the peop...|2021-03-30|                null|Time for the peop...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|Just posted a pho...|
|In time for Easte...|2021-03-30|{coordinates -> [...|In time for Easte...|
|@Reuters Why is i...|2021-03-30|                null|@Reuters Why is i...|
|@fairfieldbooks ?...|2021-03-30|                null|@fairfieldbooks  ...|
|@Linsulin_junkie ...|2021-03-30|                null|@Linsulin_junkie ...|
|Shityoushou

In [14]:
df = df.withColumn('fixed_abbrev',fix_abbreviation_udf(df['text_non_asci']))

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|      time|            location|       text_non_asci|        fixed_abbrev|
+--------------------+----------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @Peter...|@austrippa @peter...|
|@SciNate @Breakfa...|2021-03-30|                null|@SciNate @Breakfa...|@scinate @breakfa...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras I...|@angelabetheras i...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|@alwayscold11 @50...|
|Time for the peop...|2021-03-30|                null|Time for the peop...|time for the peop...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|Just posted a pho...|just posted a pho...|
|In time for Easte...|2021-03-30|{coordinates -> [...|In time for Easte...|in time for easte...|
|@Reuters Why is i...|2021-03-

In [15]:
df = df.withColumn('removed',remove_features_udf(df['fixed_abbrev']))

+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|                text|      time|            location|       text_non_asci|        fixed_abbrev|             removed|
+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @Peter...|@austrippa @peter...|true no surprises...|
|@SciNate @Breakfa...|2021-03-30|                null|@SciNate @Breakfa...|@scinate @breakfa...|happy birthday fe...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras I...|@angelabetheras i...|its one of the pl...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|@alwayscold11 @50...|this is me with p...|
|Time for the peop...|2021-03-30|                null|Time for the peop...|time for the peop...|time for the peop...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|Ju

In [16]:
df  = df.withColumn("sentiment_score", sentiment_analysis_udf( df['removed'] ))

+--------------------+----------+--------------------+--------------------+--------------------+--------------------+---------------+
|                text|      time|            location|       text_non_asci|        fixed_abbrev|             removed|sentiment_score|
+--------------------+----------+--------------------+--------------------+--------------------+--------------------+---------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @Peter...|@austrippa @peter...|true no surprises...|           0.35|
|@SciNate @Breakfa...|2021-03-30|                null|@SciNate @Breakfa...|@scinate @breakfa...|happy birthday fe...|            0.1|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras I...|@angelabetheras i...|its one of the pl...|            0.0|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|@alwayscold11 @50...|this is me with p...|            0.1|
|Time for the peop...|2021-03-30|                null|Time for

In [12]:
df  = df.withColumn("sentiment", sentiment_udf( df['sentiment_score'] ))
SA_results = df.select('text','time','sentiment_score','sentiment')

In [13]:
contain_covid = df.filter(("removed like '%covid%' or removed like '%coronavirus%' or removed like '%vaccin%'"))
contain_covid = contain_covid.select('text','time','sentiment_score','sentiment').groupBy('sentiment').count()
contain_covid_pd= contain_covid.toPandas()

In [14]:
contain_covid_pd['percentage %'] = contain_covid_pd['count']/contain_covid_pd['count'].sum()*100
contain_covid_pd

Unnamed: 0,sentiment,count,percentage %
0,positive,227,33.830104
1,neutral,323,48.137109
2,negative,121,18.032787


In [15]:
overall = SA_results.groupBy(['sentiment']).count()
overall_pd = overall.toPandas()
overall_pd['percentage'] = overall_pd['count']/overall_pd['count'].sum()*100
overall_pd

Unnamed: 0,sentiment,count,percentage
0,positive,15598,36.05557
1,neutral,21804,50.401054
2,negative,5859,13.543376


In [16]:
contain_covid_pd['count']

0    227
1    323
2    121
Name: count, dtype: int64

# Topic Model: Latent Dirichlet Allocation

In [20]:
from TextAnalytics import *

In [21]:
from pyspark.sql.types import ArrayType
def to_word(text):
    return text.split(" ")

In [22]:
spark = SparkSession.builder.appName('NLP').getOrCreate()

In [23]:
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
check_blanks_udf = udf(check_blanks, StringType())
check_lang_udf = udf(check_lang, StringType())
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
remove_stops_udf = udf(remove_stops, StringType())
remove_features_udf = udf(remove_features, StringType())
tag_and_remove_udf = udf(tag_and_remove, StringType())
lemmatize_udf = udf(lemmatize, StringType())
to_word_udf = udf(to_word,ArrayType(StringType()))

In [24]:
rawdata = spark.createDataFrame(data)

In [25]:
raw_cols =  rawdata.columns

In [26]:
rawdata = rawdata.withColumn('non_asci', strip_non_ascii_udf(rawdata['text']))


+--------------------+----------+--------------------+--------------------+
|                text|      time|            location|            non_asci|
+--------------------+----------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @Peter...|
|@SciNate @Breakfa...|2021-03-30|                null|@SciNate @Breakfa...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras I...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|
|Time for the peop...|2021-03-30|                null|Time for the peop...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|Just posted a pho...|
|In time for Easte...|2021-03-30|{coordinates -> [...|In time for Easte...|
|@Reuters Why is i...|2021-03-30|                null|@Reuters Why is i...|
|@fairfieldbooks ?...|2021-03-30|                null|@fairfieldbooks  ...|
|@Linsulin_junkie ...|2021-03-30|                null|@Linsulin_junkie ...|
|Shityoushou

In [27]:
rawdata = rawdata.select(raw_cols+['non_asci'])\
                 .withColumn('fixed_abbrev',fix_abbreviation_udf(rawdata['non_asci']))
rawdata.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|      time|            location|            non_asci|        fixed_abbrev|
+--------------------+----------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @Peter...|@austrippa @peter...|
|@SciNate @Breakfa...|2021-03-30|                null|@SciNate @Breakfa...|@scinate @breakfa...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras I...|@angelabetheras i...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|@alwayscold11 @50...|
|Time for the peop...|2021-03-30|                null|Time for the peop...|time for the peop...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|Just posted a pho...|just posted a pho...|
|In time for Easte...|2021-03-30|{coordinates -> [...|In time for Easte...|in time for easte...|
|@Reuters Why is i...|2021-03-

In [28]:
rawdata = rawdata.select(raw_cols+['fixed_abbrev'])\
                .withColumn('stop_texts',remove_stops_udf(rawdata['fixed_abbrev']))
rawdata.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|      time|            location|        fixed_abbrev|          stop_texts|
+--------------------+----------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @peter...|@austrippa @peter...|
|@SciNate @Breakfa...|2021-03-30|                null|@scinate @breakfa...|@scinate @breakfa...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras i...|@angelabetheras o...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|@alwayscold11 @50...|
|Time for the peop...|2021-03-30|                null|time for the peop...|time people work,...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|just posted a pho...|posted photo @ cl...|
|In time for Easte...|2021-03-30|{coordinates -> [...|in time for easte...|time easter dinin...|
|@Reuters Why is i...|2021-03-

In [29]:
rawdata=rawdata.select(raw_cols+['stop_texts'])\
                .withColumn('removed',remove_features_udf(rawdata['stop_texts']))
rawdata.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|      time|            location|          stop_texts|             removed|
+--------------------+----------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|@austrippa @peter...| true surprises here|
|@SciNate @Breakfa...|2021-03-30|                null|@scinate @breakfa...|happy birthday fe...|
|@angelabetheras I...|2021-03-30|                null|@angelabetheras o...|one places legiti...|
|@alwayscold11 @50...|2021-03-30|                null|@alwayscold11 @50...|podcasts min podc...|
|Time for the peop...|2021-03-30|                null|time people work,...|time people work ...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|posted photo @ cl...|posted photo clyd...|
|In time for Easte...|2021-03-30|{coordinates -> [...|time easter dinin...|time easter dinin...|
|@Reuters Why is i...|2021-03-

In [30]:
rawdata = rawdata.select(raw_cols+['removed'])\
                  .withColumn('tagged_text',tag_and_remove_udf(rawdata['removed']))
rawdata.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|      time|            location|             removed|         tagged_text|
+--------------------+----------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null| true surprises here|     true surprises |
|@SciNate @Breakfa...|2021-03-30|                null|happy birthday fe...| happy birthday f...|
|@angelabetheras I...|2021-03-30|                null|one places legiti...|       places upski |
|@alwayscold11 @50...|2021-03-30|                null|podcasts min podc...| podcasts min pod...|
|Time for the peop...|2021-03-30|                null|time people work ...| time people work...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|posted photo clyd...| posted photo cly...|
|In time for Easte...|2021-03-30|{coordinates -> [...|time easter dinin...| time easter dini...|
|@Reuters Why is i...|2021-03-

In [31]:
rawdata = rawdata.select(raw_cols+['tagged_text']) \
                  .withColumn('lemm_text',lemmatize_udf(rawdata['tagged_text']))
rawdata.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|                text|      time|            location|         tagged_text|           lemm_text|
+--------------------+----------+--------------------+--------------------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|     true surprises |       true surprise|
|@SciNate @Breakfa...|2021-03-30|                null| happy birthday f...|happy birthday fe...|
|@angelabetheras I...|2021-03-30|                null|       places upski |         place upski|
|@alwayscold11 @50...|2021-03-30|                null| podcasts min pod...|podcasts min podc...|
|Time for the peop...|2021-03-30|                null| time people work...|time people work ...|
|Just posted a pho...|2021-03-30|{coordinates -> [...| posted photo cly...|post photo clyde ...|
|In time for Easte...|2021-03-30|{coordinates -> [...| time easter dini...|time easter din t...|
|@Reuters Why is i...|2021-03-

In [32]:
rawdata = rawdata.select(raw_cols+['lemm_text']) \
                  .withColumn("is_blank", check_blanks_udf(rawdata["lemm_text"]))
rawdata.show()

+--------------------+----------+--------------------+--------------------+--------+
|                text|      time|            location|           lemm_text|is_blank|
+--------------------+----------+--------------------+--------------------+--------+
|@austrippa @Peter...|2021-03-30|                null|       true surprise|   False|
|@SciNate @Breakfa...|2021-03-30|                null|happy birthday fe...|   False|
|@angelabetheras I...|2021-03-30|                null|         place upski|   False|
|@alwayscold11 @50...|2021-03-30|                null|podcasts min podc...|   False|
|Time for the peop...|2021-03-30|                null|time people work ...|   False|
|Just posted a pho...|2021-03-30|{coordinates -> [...|post photo clyde ...|   False|
|In time for Easte...|2021-03-30|{coordinates -> [...|time easter din t...|   False|
|@Reuters Why is i...|2021-03-30|                null|love volcano news...|   False|
|@fairfieldbooks ?...|2021-03-30|                null|        loo

In [33]:
rawdata = rawdata.select(raw_cols+['lemm_text','is_blank']) \
                  .withColumn("word", to_word_udf(rawdata["lemm_text"]))

+--------------------+----------+--------------------+--------------------+--------+--------------------+
|                text|      time|            location|           lemm_text|is_blank|                word|
+--------------------+----------+--------------------+--------------------+--------+--------------------+
|@austrippa @Peter...|2021-03-30|                null|       true surprise|   False|    [true, surprise]|
|@SciNate @Breakfa...|2021-03-30|                null|happy birthday fe...|   False|[happy, birthday,...|
|@angelabetheras I...|2021-03-30|                null|         place upski|   False|      [place, upski]|
|@alwayscold11 @50...|2021-03-30|                null|podcasts min podc...|   False|[podcasts, min, p...|
|Time for the peop...|2021-03-30|                null|time people work ...|   False|[time, people, wo...|
|Just posted a pho...|2021-03-30|{coordinates -> [...|post photo clyde ...|   False|[post, photo, cly...|
|In time for Easte...|2021-03-30|{coordinates 

In [30]:
from pyspark.sql.functions import monotonically_increasing_id
# Create Unique ID
rawdata = rawdata.withColumn("uid", monotonically_increasing_id())
data = rawdata.filter(rawdata["is_blank"] == "False")

In [31]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.clustering import LDA
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import CountVectorizer

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="lemm_text", outputCol="words")
#data = tokenizer.transform(data)
vectorizer = CountVectorizer(inputCol= "words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
#idfModel = idf.fit(data)

lda = LDA(k=20, seed=1, optimizer="em")

In [32]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="word", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(data)
featurizedData = cvmodel.transform(data)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF

In [33]:
# "em" = expectation-maximization 
lda = LDA(k=5, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
ldatopics = ldamodel.describeTopics()

In [34]:
def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))

In [35]:
ldatopics = ldatopics.withColumn(
    "topics_words", indices_to_terms(cvmodel.vocabulary)("termIndices"))

In [1]:
ldatopics.select(['topic','topics_words']).show()

NameError: name 'ldatopics' is not defined

In [37]:
from pyspark.ml.feature import StopWordsRemover,Tokenizer, RegexTokenizer, CountVectorizer, IDF
from pyspark.sql.functions import udf, col, size, explode, regexp_replace, trim, lower, lit
from pyspark.sql.types import ArrayType, StringType, DoubleType, IntegerType, LongType
# from pyspark.ml.clustering import LDA
import pyLDAvis

import numpy as np
def format_data_to_pyldavis(df_filtered, count_vectorizer, transformed, lda_model):
    xxx = df_filtered.select((explode(df_filtered.word)).alias("words")).groupby("words").count()
    word_counts = {r['words']:r['count'] for r in xxx.collect()}
    word_counts = [word_counts[w] for w in count_vectorizer.vocabulary]


    data = {'topic_term_dists': np.array(lda_model.topicsMatrix().toArray()).T, 
            'doc_topic_dists': np.array([x.toArray() for x in transformed.select(["topicDistribution"]).toPandas()['topicDistribution']]),
            'doc_lengths': [r[0] for r in df_filtered.select(size(df_filtered.word)).collect()],
            'vocab': count_vectorizer.vocabulary,
            'term_frequency': word_counts}

    return data

def filter_bad_docs(data):
    bad = 0
    doc_topic_dists_filtrado = []
    doc_lengths_filtrado = []

    for x,y in zip(data['doc_topic_dists'], data['doc_lengths']):
        if np.sum(x)==0:
            bad+=1
        elif np.sum(x) != 1:
            bad+=1
        elif np.isnan(x).any():
            bad+=1
        else:
            doc_topic_dists_filtrado.append(x)
            doc_lengths_filtrado.append(y)

    data['doc_topic_dists'] = doc_topic_dists_filtrado
    data['doc_lengths'] = doc_lengths_filtrado

transformed = ldamodel.transform(rescaledData)

# # FORMAT DATA AND PASS IT TO PYLDAVIS
formatted = format_data_to_pyldavis(data, cvmodel, transformed, ldamodel)
filter_bad_docs(formatted)

In [38]:
py_lda_prepared_data = pyLDAvis.prepare(formatted['topic_term_dists'],formatted['doc_topic_dists'],formatted['doc_lengths'],formatted['vocab'],formatted['term_frequency'])
pyLDAvis.display(py_lda_prepared_data)

  and should_run_async(code)
